zhangzengfei
2024-05-16 790c60c55054b3e75043eaed11eaef8584d2001d
添加级联,订阅功能
13个文件已添加
16个文件已修改
996 ■■■■■ 已修改文件
client/faces.go 3 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
client/notify.go 31 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
client/system.go 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
config/config.go 2 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
config/gat1400.yaml 21 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
controller/captureCtl.go 5 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
controller/subscribeCtl.go 80 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
controller/systemCtl.go 58 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
main.go 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
models/ape.go 41 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
models/cascade.go 19 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
models/db.go 10 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
models/subplatform.go 20 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
models/subscribe.go 40 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
pkg/snowflake/snowflake.go 118 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
repository/apeRepo.go 82 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
repository/captureRepo.go 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
repository/subscribeRepo.go 44 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
routes/routes.go 2 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
routes/subscribe.go 16 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
routes/system.go 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
service/device.go 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
service/resend.go 8 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
service/subscribe.go 213 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
vo/ape.go 58 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
vo/constant.go 25 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
vo/face.go 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
vo/message.go 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
vo/subscribe.go 76 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
client/faces.go
@@ -1,8 +1,9 @@
package client
import (
    "encoding/json"
    "fmt"
    "encoding/json"
    "gat1400Exchange/config"
    "gat1400Exchange/pkg/logger"
    "gat1400Exchange/util"
client/notify.go
New file
@@ -0,0 +1,31 @@
package client
import (
    "encoding/json"
    "gat1400Exchange/pkg/logger"
    "gat1400Exchange/util"
    "gat1400Exchange/vo"
)
func Notify(url string, msg []byte) int {
    if clientStatus != vo.StatusSuccess {
        return clientStatus
    }
    rsp, err := util.HttpPost(url, headers, msg)
    if err != nil {
        logger.Warn("Post notification failed, %s", err.Error())
        return vo.StatusOtherError
    }
    var stat vo.ResponseStatusList
    err = json.Unmarshal(rsp, &stat)
    if err != nil {
        logger.Warn("Post notification response unmarshal failed, %s", err.Error())
        return vo.StatusOtherError
    }
    logger.Debug("Post notification success.")
    return vo.StatusSuccess
}
client/system.go
@@ -3,11 +3,11 @@
import (
    "encoding/json"
    "fmt"
    "gat1400Exchange/util"
    "io/ioutil"
    "gat1400Exchange/config"
    "gat1400Exchange/pkg/logger"
    "gat1400Exchange/util"
    "gat1400Exchange/vo"
    dac "github.com/xinsnake/go-http-digest-auth-client"
config/config.go
@@ -13,6 +13,8 @@
    Mode     string `mapstructure:"mode"`
    Host     string `mapstructure:"host"`
    Port     string `mapstructure:"port"`
    Realm    string `mapstructure:"realm"`
    Username string `mapstructure:"username"`
    Password string `mapstructure:"password"`
    Role     string `mapstructure:"role"` // agent 设备端, proxy 1400中转 不处理deivce, server 全功能
}
config/gat1400.yaml
@@ -1,11 +1,26 @@
# web 服务配置
serve:
  # 设定模式(debug/release/test,正式版改为release)
  id: "11010500011121000001"
  id: "12312312315031231233"
  mode: "debug"
  port: "1400"
  host: "0.0.0.0"
  password: "basic1400server"
  host: "192.168.20.119"
  username: "admin"
  password: "Aa123456"
  realm: "Basic Realm"
  role: "cascade"
client:
  enable: true
  device-id: "12312312315031200003"
  username: "12312312315031200003"
  password: "123456"
  server-addr: "192.168.20.189"
  server-port: 1400
  proto: http
  upload-type: binary
  channel-number: "12312312315031200003"
  heartbeat-interval: 30
  heartbeat-count: 3
# 日志配置
log:
controller/captureCtl.go
@@ -41,7 +41,10 @@
    // 如果开启了下级, 身份应该是消息代理, 不再转发到服务器
    face := req.FaceListObject.FaceObject[0]
    if config.ClientConf.Enable && config.ServeConf.Role == "agent" {
        a.Repository.MsgForward(&req)
        a.Repository.VIIDMsgForward(&req)
    } else if config.ServeConf.Role == "cascade" {
        logger.Debug("Receive new message, Id:%s Ip:%s faceId:%s, LeftTopX:%d, appearTime:%s", c.RemoteIP(), face.DeviceID, face.FaceID, face.LeftTopX, face.FaceAppearTime)
        service.AddFaceCapture(&face)
    } else {
        logger.Debug("Receive new message, Id:%s Ip:%s faceId:%s, LeftTopX:%d, appearTime:%s", c.RemoteIP(), face.DeviceID, face.FaceID, face.LeftTopX, face.FaceAppearTime)
        a.Repository.FaceForward(req.FaceListObject.FaceObject)
controller/subscribeCtl.go
New file
@@ -0,0 +1,80 @@
package controller
import (
    "fmt"
    "gat1400Exchange/repository"
    "gat1400Exchange/vo"
    "net/http"
    "strings"
    "time"
    "github.com/gin-gonic/gin"
)
type SubscribeController struct {
    Repository repository.SubscribeRepository
}
// 构造函数
func NewSubscribeController() SubscribeController {
    svr := repository.NewSubscribeRepository()
    controller := SubscribeController{svr}
    return controller
}
func (s SubscribeController) Subscribes(c *gin.Context) {
    var req vo.RequestSubscribe
    if err := c.BindJSON(&req); err != nil {
        c.AbortWithStatus(http.StatusBadRequest)
        return
    }
    fromId := c.GetHeader("User-Identify")
    var rsp vo.ResponseStatusList
    for idx, sub := range req.SubscribeListObject.SubscribeObject {
        if err := s.Repository.CreateSubscribe(fromId, &req.SubscribeListObject.SubscribeObject[idx]); err == nil {
            rsp.ResponseStatusObject = append(rsp.ResponseStatusObject, vo.ResponseStatus{
                RequestURL:   c.FullPath(),
                StatusCode:   vo.StatusSuccess,
                StatusString: vo.StatusString[vo.StatusSuccess],
                Id:           sub.SubscribeID,
                LocalTime:    time.Now().Format("20060102150405"),
            })
        }
    }
    c.Header("Content-Type", "application/VIID+json;charset=UTF-8")
    c.JSON(http.StatusOK, gin.H{"ResponseStatusListObject": rsp})
}
func (s SubscribeController) DeleteSubscribe(c *gin.Context) {
    idList := c.Query("IDList")
    var rsp vo.ResponseStatusList
    for _, id := range strings.Split(idList, ",") {
        if err := s.Repository.DeleteSubscribe(id); err == nil {
            rsp.ResponseStatusObject = append(rsp.ResponseStatusObject, vo.ResponseStatus{
                RequestURL:   c.FullPath(),
                StatusCode:   vo.StatusSuccess,
                StatusString: vo.StatusString[vo.StatusSuccess],
                Id:           id,
                LocalTime:    time.Now().Format("20060102150405"),
            })
        }
    }
    c.Header("Content-Type", "application/VIID+json;charset=UTF-8")
    c.JSON(http.StatusOK, gin.H{"ResponseStatusListObject": rsp})
}
func (s SubscribeController) Notifications(c *gin.Context) {
    data, err := c.GetRawData()
    if err != nil {
    }
    fmt.Println("Notifications:", string(data))
    c.JSON(http.StatusOK, gin.H{"ResponseStatusListObject": ""})
}
controller/systemCtl.go
@@ -1,6 +1,7 @@
package controller
import (
    "gat1400Exchange/pkg/logger"
    "gat1400Exchange/service"
    "net/http"
    "time"
@@ -14,17 +15,20 @@
)
type SystemController struct {
    Auth       *auth.DigestAuth
    Repository repository.SystemRepository
    Auth    *auth.DigestAuth
    ApeRepo repository.ApeRepository
}
// 构造函数
func NewSystemController() SystemController {
    svr := repository.NewSystemRepository()
    controller := SystemController{Repository: svr}
    svr := repository.NewApeRepository()
    controller := SystemController{ApeRepo: svr}
    controller.Auth = auth.NewDigestAuthenticator("Basic Realm", func(user, realm string) string {
        // 需要在这里实现检查用户名和密码是否有效的逻辑, 目前使用固定密码
        if user == config.ServeConf.Username {
            return config.ServeConf.Password
        }
        return config.ServeConf.Password
    })
@@ -42,12 +46,26 @@
        return
    }
    var req vo.RequestRegister
    if err := c.BindJSON(&req); err != nil {
        c.AbortWithStatus(http.StatusBadRequest)
        return
    }
    rspMsg := vo.ResponseStatus{
        RequestURL:   c.FullPath(),
        StatusCode:   vo.StatusSuccess,
        StatusString: vo.StatusString[vo.StatusSuccess],
        Id:           user,
        Id:           req.RegisterObject.DeviceID,
        LocalTime:    time.Now().Format("20060102150405"),
    }
    if user == config.ServeConf.Username {
        if err := s.ApeRepo.Create(req.RegisterObject.DeviceID); err != nil {
            logger.Warn("Create ape failure,%s", err.Error())
            c.AbortWithStatus(http.StatusInternalServerError)
            return
        }
    }
    c.JSON(http.StatusOK, gin.H{"ResponseStatusObject": rspMsg})
@@ -62,6 +80,7 @@
    }
    service.KeepDeviceAlive(req.KeepaliveObject.DeviceID)
    s.ApeRepo.Keepalive(req.KeepaliveObject.DeviceID)
    rspMsg := vo.ResponseStatus{
        RequestURL:   c.FullPath(),
@@ -104,3 +123,30 @@
    c.JSON(http.StatusOK, gin.H{"SystemTimeObject": rspMsg})
}
// 设备列表
func (s SystemController) ApeList(c *gin.Context) {
    apeList, err := s.ApeRepo.List()
    if err != nil {
        logger.Error(err.Error())
    }
    c.JSON(http.StatusOK, gin.H{"ApeList": apeList})
}
// 修改设备
func (s SystemController) ApeUpdate(c *gin.Context) {
    var req vo.Ape
    if err := c.BindJSON(&req); err != nil {
        c.JSON(http.StatusBadRequest, gin.H{"msg": err.Error()})
        return
    }
    err := s.ApeRepo.Update(&req)
    if err != nil {
        c.JSON(http.StatusBadRequest, gin.H{"msg": err.Error()})
        return
    }
    c.JSON(http.StatusOK, gin.H{"msg": "ok"})
}
main.go
@@ -44,6 +44,8 @@
    // 启动网络视频字符叠加器服务
    go service.NVCSServer()
    go service.InitSubscribeTask()
    // 启动定时任务
    cron.Init()
@@ -75,5 +77,7 @@
        logger.Error("Server forced to shutdown:", err)
    }
    service.StopSubscribeTask()
    logger.Info("Server exiting!")
}
models/ape.go
New file
@@ -0,0 +1,41 @@
package models
import (
    "gat1400Exchange/vo"
    "time"
)
type Ape struct {
    Id            string `gorm:"column:id;primary_key;" json:"id"`
    Name          string `gorm:"column:name" json:"name"`
    HeartbeatTime string `gorm:"column:heartbeat_time" json:"heartbeat_time"`
    Ext           vo.Ape `gorm:"column:ext;type:json;not null;default '{}'" json:"ext"`
    CreateTime    int64  `gorm:"column:create_time;autoCreateTime;" json:"create_time"`
    UpdateTime    int64  `gorm:"column:update_time;autoUpdateTime" json:"-"`
    DeleteTime    int64  `gorm:"column:delete_time" json:"-"`
}
func (a *Ape) TableName() string {
    return "apes"
}
func (a *Ape) FindById(id string) error {
    return db.Table(a.TableName()).First(&a, "id = ?", id).Error
}
func (a *Ape) Save() error {
    return db.Table(a.TableName()).Save(a).Error
}
func (a *Ape) Keepalive() error {
    return db.Table(a.TableName()).Where("id = ?", a.Id).Update("heartbeat_time", time.Now().Format("2006-01-02 15:04:05")).Error
}
func (a *Ape) FindAll() ([]Ape, error) {
    var devices []Ape
    if err := db.Table(a.TableName()).Find(&devices).Error; err != nil {
        return nil, err
    }
    return devices, nil
}
models/cascade.go
New file
@@ -0,0 +1,19 @@
package models
type Cascade struct {
    Id         string   `gorm:"column:id;primary_key;" json:"id"`
    Name       string   `gorm:"column:name" json:"name"`
    Username   string   `gorm:"column:username" json:"username"`
    Password   string   `gorm:"column:password" json:"password"`
    IP         string   `gorm:"column:ip" json:"ip"`
    Port       int      `gorm:"column:port" json:"port"`
    Enabled    bool     `gorm:"column:enabled" json:"enabled"`
    DeviceIDs  []string `gorm:"column:device_ids;type:text[];default '{}'" json:"device_ids"`
    CreateTime int64    `gorm:"column:create_time;autoCreateTime;" json:"create_time"`
    UpdateTime int64    `gorm:"column:update_time;autoUpdateTime" json:"-"`
    DeleteTime int64    `gorm:"column:delete_time" json:"-"`
}
func (c *Cascade) TableName() string {
    return "cascades"
}
models/db.go
@@ -20,7 +20,15 @@
        logger.Debug("db open error ", err)
        return err
    }
    _ = db.AutoMigrate(&Device{}, &Positions{}, &Cache{})
    _ = db.AutoMigrate(
        &Ape{},
        &Cache{},
        &Cascade{},
        &Device{},
        &Positions{},
        &SubPlatform{},
        &Subscribe{},
    )
    // 添加默认数据
    InitData()
models/subplatform.go
New file
@@ -0,0 +1,20 @@
package models
type SubPlatform struct {
    Id            string `gorm:"primary_key;" json:"id"`
    HeartbeatTime string `gorm:"column:heartbeat_time" json:"heartbeat_time"`
    Name          string `gorm:"not null;default:''" json:"name"`
    UserName      string `gorm:"not null;default:''" json:"user_name"`
    Realm         string `gorm:"not null;default:''" json:"realm"`
    Password      string `gorm:"not null;default:''" json:"password"`
    Description   string `gorm:"not null;default:''" json:"description"`
    RemoteIP      string `gorm:"not null;default:''" json:"remote_ip"`
    RemotePort    int    `gorm:"not null;default:0" json:"remote_port"`
    CreateTime    int64  `gorm:"column:create_time;autoCreateTime;" json:"create_time"`
    UpdateTime    int64  `gorm:"column:update_time;autoUpdateTime" json:"-"`
    DeleteTime    int64  `gorm:"column:delete_time" json:"-"`
}
func (s *SubPlatform) TableName() string {
    return "sub_platforms"
}
models/subscribe.go
New file
@@ -0,0 +1,40 @@
package models
import (
    "gat1400Exchange/vo"
)
type Subscribe struct {
    Id         string       `gorm:"column:id;primary_key;" json:"id"`
    Status     int          `gorm:"column:status" json:"status"` // 0:订阅中 1:已取消订阅 2:订阅到期 9:未订阅
    FromId     string       `gorm:"column:from_id" json:"from_id"`
    Ext        vo.Subscribe `gorm:"column:ext;type:json;not null;default '{}'" json:"ext"`
    CreateTime int64        `gorm:"column:create_time;autoCreateTime;" json:"create_time"`
    UpdateTime int64        `gorm:"column:update_time;autoUpdateTime" json:"-"`
    DeleteTime int64        `gorm:"column:delete_time" json:"-"`
}
func (s *Subscribe) TableName() string {
    return "subscribes"
}
func (s *Subscribe) FindById(id string) error {
    return db.Table(s.TableName()).First(&s, "id = ?", id).Error
}
func (s *Subscribe) Save() error {
    return db.Table(s.TableName()).Save(s).Error
}
func (s *Subscribe) DeleteById(id string) error {
    return db.Table(s.TableName()).Where("id = ?", id).Delete(s).Error
}
func (s *Subscribe) FindAll() ([]Subscribe, error) {
    var subs []Subscribe
    if err := db.Table(s.TableName()).Find(&subs).Error; err != nil {
        return nil, err
    }
    return subs, nil
}
pkg/snowflake/snowflake.go
New file
@@ -0,0 +1,118 @@
package snowflake
import (
    "errors"
    "strconv"
    "sync"
    "time"
)
const (
    CEpoch         = 1474802888000
    CWorkerIdBits  = 10 // Num of WorkerId Bits
    CSenquenceBits = 12 // Num of Sequence Bits
    CWorkerIdShift  = 12
    CTimeStampShift = 22
    CSequenceMask = 0xfff // equal as getSequenceMask()
    CMaxWorker    = 0x3ff // equal as getMaxWorkerId()
)
type IdWorker struct {
    workerId      int64
    lastTimeStamp int64
    sequence      int64
    maxWorkerId   int64
    lock          *sync.Mutex
}
func NewIdWorker(workerId int64) (iw *IdWorker, err error) {
    iw = new(IdWorker)
    iw.maxWorkerId = getMaxWorkerId()
    if workerId > iw.maxWorkerId || workerId < 0 {
        return nil, errors.New("worker not fit")
    }
    iw.workerId = workerId
    iw.lastTimeStamp = -1
    iw.sequence = 0
    iw.lock = new(sync.Mutex)
    return iw, nil
}
func getMaxWorkerId() int64 {
    return -1 ^ -1<<CWorkerIdBits
}
func getSequenceMask() int64 {
    return -1 ^ -1<<CSenquenceBits
}
// return in ms
func (iw *IdWorker) timeGen() int64 {
    return time.Now().UnixNano() / 1000 / 1000
}
func (iw *IdWorker) timeReGen(last int64) int64 {
    ts := time.Now().UnixNano() / 1000 / 1000
    for {
        if ts <= last {
            ts = iw.timeGen()
        } else {
            break
        }
    }
    return ts
}
func (iw *IdWorker) NextId() (ts int64, err error) {
    iw.lock.Lock()
    defer iw.lock.Unlock()
    ts = iw.timeGen()
    if ts == iw.lastTimeStamp {
        iw.sequence = (iw.sequence + 1) & CSequenceMask
        if iw.sequence == 0 {
            ts = iw.timeReGen(ts)
        }
    } else {
        iw.sequence = 0
    }
    if ts < iw.lastTimeStamp {
        err = errors.New("Clock moved backwards, Refuse gen id")
        return 0, err
    }
    iw.lastTimeStamp = ts
    ts = (ts-CEpoch)<<CTimeStampShift | iw.workerId<<CWorkerIdShift | iw.sequence
    return ts, nil
}
func ParseId(id int64) (t time.Time, ts int64, workerId int64, seq int64) {
    seq = id & CSequenceMask
    workerId = (id >> CWorkerIdShift) & CMaxWorker
    ts = (id >> CTimeStampShift) + CEpoch
    t = time.Unix(ts/1000, (ts%1000)*1000000)
    return
}
var idGenerater, _ = NewIdWorker(0)
func GenerateId() int64 {
start:
    id, err := idGenerater.NextId()
    if err != nil {
        goto start
    }
    return id
}
func GenerateIdStr() string {
start:
    id, err := idGenerater.NextId()
    if err != nil {
        goto start
    }
    return strconv.FormatInt(id, 10)
}
repository/apeRepo.go
New file
@@ -0,0 +1,82 @@
package repository
import (
    "time"
    "gat1400Exchange/models"
    "gat1400Exchange/vo"
)
type ApeRepository struct {
}
func NewApeRepository() ApeRepository {
    return ApeRepository{}
}
func (a *ApeRepository) Keepalive(id string) error {
    var ape models.Ape
    // 设备存在
    if err := ape.FindById(id); err != nil {
        return nil
    }
    return ape.Keepalive()
}
func (a *ApeRepository) Create(id string) error {
    var ape models.Ape
    // 设备存在
    if err := ape.FindById(id); err == nil {
        return nil
    }
    ape.Id = id
    ape.Name = id
    ape.HeartbeatTime = time.Now().Format("2006-01-02 15:04:05")
    ape.Ext = vo.Ape{
        ApeID:            id,
        Name:             "",
        Model:            "",
        IPAddr:           "",
        IPV6Addr:         "",
        Port:             0,
        Longitude:        0,
        Latitude:         0,
        PlaceCode:        "",
        Place:            "",
        OrgCode:          "",
        CapDirection:     0,
        MonitorDirection: "",
        MonitorAreaDesc:  "",
        IsOnline:         "2",
        OwnerApsID:       "",
        UserID:           "",
        Password:         "",
        FunctionType:     "2",
    }
    return ape.Save()
}
func (a *ApeRepository) List() ([]models.Ape, error) {
    var ape models.Ape
    return ape.FindAll()
}
func (a *ApeRepository) Update(req *vo.Ape) error {
    var ape models.Ape
    err := ape.FindById(req.ApeID)
    if err != nil {
        return err
    }
    ape.Name = req.Name
    ape.Ext = *req
    return ape.Save()
}
repository/captureRepo.go
@@ -212,7 +212,7 @@
    cacheItem.Save()
}
func (c CaptureRepository) MsgForward(msg *vo.RequestFaceList) {
func (c CaptureRepository) VIIDMsgForward(msg *vo.RequestFaceList) {
    faceInfo := msg.FaceListObject.FaceObject[0]
    // 匹配楼层
    faceAppearTime, err := time.ParseInLocation("20060102150405", faceInfo.FaceAppearTime, time.Local)
repository/subscribeRepo.go
New file
@@ -0,0 +1,44 @@
package repository
import (
    "gat1400Exchange/models"
    "gat1400Exchange/service"
    "gat1400Exchange/vo"
)
type SubscribeRepository struct {
}
func NewSubscribeRepository() SubscribeRepository {
    return SubscribeRepository{}
}
func (a *SubscribeRepository) CreateSubscribe(fromId string, subscribe *vo.Subscribe) error {
    var sub = models.Subscribe{
        Id:     subscribe.SubscribeID,
        Status: subscribe.SubscribeStatus,
        FromId: fromId,
        Ext:    *subscribe,
    }
    err := sub.Save()
    if err != nil {
        return err
    }
    service.UpdateTaskProcs(sub.Id, vo.Msg_Type_Create_Subscribe, &sub)
    return err
}
func (a *SubscribeRepository) DeleteSubscribe(id string) error {
    var sub = models.Subscribe{}
    err := sub.DeleteById(id)
    if err != nil {
        return err
    }
    service.UpdateTaskProcs(id, vo.Msg_Type_Delete_Subscribe, nil)
    return err
}
routes/routes.go
@@ -35,6 +35,8 @@
    // 注册采集接口路由
    InitCaptureRouters(apiGroup)
    InitSubscribesRouters(apiGroup)
    logger.Info("初始化路由完成!")
    return r
routes/subscribe.go
New file
@@ -0,0 +1,16 @@
package routes
import (
    "gat1400Exchange/controller"
    "github.com/gin-gonic/gin"
)
func InitSubscribesRouters(r *gin.RouterGroup) gin.IRoutes {
    subCtl := controller.NewSubscribeController()
    r.POST("/Subscribes", subCtl.Subscribes)
    r.DELETE("/Subscribes", subCtl.DeleteSubscribe)
    r.POST("/SubscribeNotifications", subCtl.Notifications)
    return r
}
routes/system.go
@@ -17,5 +17,9 @@
        router.POST("/Keepalive", sysCtl.Keepalive)
        router.GET("/Time", sysCtl.Time)
    }
    r.GET("/APEs", sysCtl.ApeList)
    r.POST("/APEs", sysCtl.ApeUpdate)
    return r
}
service/device.go
@@ -38,12 +38,12 @@
}
func DeviceInfoReportTask() error {
    logger.Info("Start device info report task, server:%s.", config.ForwardConf.ReportServer)
    if config.ServeConf.Role == "agent" {
        return nil
    }
    logger.Info("Start device info report task, server:%s.", config.ForwardConf.ReportServer)
    if config.ForwardConf.ReportServer == "" {
        return errors.New("Server addr is empty!")
    }
service/resend.go
@@ -10,15 +10,15 @@
)
func ResendImageData() {
    var cacheMod models.Cache
    cacheItems, _ := cacheMod.FindAll()
    logger.Debug("Start resend task. cache len:%d", len(cacheItems))
    if err := util.HttpGet(config.ForwardConf.SyncServer); err != nil {
        logger.Debug("The server cannot be reached. %s", err.Error())
        return
    }
    var cacheMod models.Cache
    cacheItems, _ := cacheMod.FindAll()
    logger.Debug("Start resend task. cache len:%d", len(cacheItems))
    for _, c := range cacheItems {
        if c.Type == "1400" {
            if client.FaceCapture([]byte(c.Data)) != vo.StatusSuccess {
service/subscribe.go
New file
@@ -0,0 +1,213 @@
package service
import (
    "context"
    "encoding/json"
    "strings"
    "sync"
    "time"
    "gat1400Exchange/client"
    "gat1400Exchange/models"
    "gat1400Exchange/pkg/logger"
    "gat1400Exchange/pkg/snowflake"
    "gat1400Exchange/vo"
)
var TaskProcMap sync.Map
var TaskWaitGroup = &sync.WaitGroup{}
type TaskProcInfo struct {
    cancel context.CancelFunc
    task   *SubscribeTask
}
func (t *TaskProcInfo) stop() {
    t.cancel()
    t.task = nil
}
func UpdateTaskProcs(subId string, msgType int, conf *models.Subscribe) {
    logger.Debug("Receive sub msg: %s ", subId)
    proc, isExist := TaskProcMap.Load(subId)
    switch msgType {
    case vo.Msg_Type_Create_Subscribe, vo.Msg_Type_Update_Subscribe:
        if isExist {
            logger.Debug("更新任务, 退出重新开始")
            proc.(TaskProcInfo).cancel()
        }
        CreateTask(conf)
    case vo.Msg_Type_Delete_Subscribe:
        if !isExist {
            return
        }
        // 关闭任务, 并删除
        proc.(TaskProcInfo).cancel()
    default:
        logger.Warn("未知的消息类型")
    }
}
func InitSubscribeTask() error {
    var s models.Subscribe
    subList, err := s.FindAll()
    if err != nil {
        logger.Error("Find account by channel error:%v", err)
        return err
    }
    for idx, _ := range subList {
        if subList[idx].Status != 0 {
            continue
        }
        CreateTask(&subList[idx])
    }
    return nil
}
func AddFaceCapture(face *vo.FaceObject) {
    TaskProcMap.Range(func(key, value interface{}) bool {
        value.(TaskProcInfo).task.AddFace(face)
        return true
    })
    logger.Debug("添加人脸.")
    //TaskWaitGroup.Wait()
}
func StopSubscribeTask() {
    TaskProcMap.Range(func(key, value interface{}) bool {
        value.(TaskProcInfo).cancel()
        return true
    })
    logger.Debug("等待所有任务退出.")
    TaskWaitGroup.Wait()
}
func CreateTask(conf *models.Subscribe) {
    logger.Debug("添加订阅任务,%s", conf.Id)
    ctx, cancel := context.WithCancel(context.Background())
    newTask := &SubscribeTask{
        ctx:  ctx,
        conf: conf,
    }
    TaskProcMap.Store(conf.Id, TaskProcInfo{
        cancel: cancel,
        task:   newTask,
    })
    go newTask.Start()
}
type SubscribeTask struct {
    ctx      context.Context
    conf     *models.Subscribe
    faceList []*vo.FaceObject
    mutex    sync.Mutex
}
func (task *SubscribeTask) Start() {
    TaskWaitGroup.Add(1)
    Timer := time.NewTicker(time.Duration(task.conf.Ext.ReportInterval) * time.Second)
    for {
        select {
        case <-task.ctx.Done():
            TaskProcMap.Delete(task.conf.Id)
            TaskWaitGroup.Done()
            logger.Warn("Subscribe task stop!!! id:%s", task.conf.Id)
            return
        case <-Timer.C:
            task.Notify()
        }
    }
}
func (task *SubscribeTask) AddFace(face *vo.FaceObject) {
    if !strings.Contains(task.conf.Ext.SubscribeDetail, vo.SubscribeFace) {
        return
    }
    task.mutex.Lock()
    defer task.mutex.Unlock()
    task.faceList = append(task.faceList, face)
}
func (task *SubscribeTask) Notify() {
    subDetails := strings.Split(task.conf.Ext.SubscribeDetail, ",")
    for _, subType := range subDetails {
        triggerTime := time.Now().Format("20060102150405")
        // 上报设备
        if subType == vo.SubscribeApe {
            var notification = vo.DeviceNotification{
                NotificationID:   triggerTime + snowflake.GenerateIdStr(),
                SubscribeID:      task.conf.Id,
                Title:            task.conf.Ext.Title,
                TriggerTime:      triggerTime,
                ExecuteOperation: 1,
            }
            var ids []string
            var apeMod models.Ape
            apeList, err := apeMod.FindAll()
            if err != nil {
                logger.Warn(err.Error())
                continue
            } else {
                for idx, _ := range apeList {
                    ids = append(ids, apeList[idx].Id)
                    notification.DeviceList.APEObject = append(notification.DeviceList.APEObject, apeList[idx].Ext)
                }
            }
            notification.InfoIDs = strings.Join(ids, ";")
            var req vo.RequestSubscribeNotification
            req.SubscribeNotificationListObject.SubscribeNotificationObject = append(req.SubscribeNotificationListObject.SubscribeNotificationObject, notification)
            b, _ := json.Marshal(req)
            client.Notify(task.conf.Ext.ReceiveAddr, b)
            continue
        }
        // 上报人脸
        if subType == vo.SubscribeFace {
            if len(task.faceList) == 0 {
                return
            }
            var notification = vo.FaceNotification{
                NotificationID: triggerTime + snowflake.GenerateIdStr(),
                SubscribeID:    task.conf.Id,
                Title:          task.conf.Ext.Title,
                TriggerTime:    triggerTime,
            }
            var ids []string
            for idx, _ := range task.faceList {
                ids = append(ids, task.faceList[idx].FaceID)
                notification.FaceObjectList.FaceObject = append(notification.FaceObjectList.FaceObject, *task.faceList[idx])
            }
            notification.InfoIDs = strings.Join(ids, ";")
            task.mutex.Lock()
            task.faceList = []*vo.FaceObject{}
            task.mutex.Unlock()
            var req vo.RequestSubscribeNotification
            req.SubscribeNotificationListObject.SubscribeNotificationObject = append(req.SubscribeNotificationListObject.SubscribeNotificationObject, notification)
            b, _ := json.Marshal(req)
            client.Notify(task.conf.Ext.ReceiveAddr, b)
        }
    }
}
vo/ape.go
New file
@@ -0,0 +1,58 @@
package vo
import (
    "database/sql/driver"
    "encoding/json"
    "errors"
)
type Ape struct {
    ApeID            string  `json:"ApeID"`
    Name             string  `json:"Name"`
    Model            string  `json:"Model"`
    IPAddr           string  `json:"IPAddr"`
    IPV6Addr         string  `json:"IPV6Addr"`
    Port             int     `json:"Port"`
    Longitude        float64 `json:"Longitude"`
    Latitude         float64 `json:"Latitude"`
    PlaceCode        string  `json:"PlaceCode"`
    Place            string  `json:"Place"`
    OrgCode          string  `json:"OrgCode"`
    CapDirection     int     `json:"CapDirection"`
    MonitorDirection string  `json:"MonitorDirection"`
    MonitorAreaDesc  string  `json:"MonitorAreaDesc"`
    IsOnline         string  `json:"IsOnline"`
    OwnerApsID       string  `json:"OwnerApsID"`
    UserID           string  `json:"UserId"`
    Password         string  `json:"Password"`
    FunctionType     string  `json:"FunctionType"`
    PositionType     string  `json:"PositionType"`
}
func (a *Ape) Scan(value interface{}) error {
    b, ok := value.([]byte)
    if !ok {
        return errors.New("failed to unmarshal Author value")
    }
    var config Ape
    err := json.Unmarshal(b, &config)
    if err != nil {
        return err
    }
    *a = config
    return nil
}
func (a Ape) Value() (driver.Value, error) {
    return json.Marshal(a)
}
type RequestApeList struct {
    APEListObject struct {
        APEObject []Ape `json:"APEObject"`
    } `json:"APEListObject"`
}
type NotificationApeList struct {
    APEObject []Ape `json:"APEObject"`
}
vo/constant.go
@@ -26,3 +26,28 @@
    StatusInvalidJsonContent: "JSON内容无效",
    StatusReboot:             "系统重启中",
}
// 1 案(事)件目录
// 2 单个案(事)件内容
// 4 采集设备状态
// 5 采集系统目录
// 6 采集系统状态
// 7 视频卡口目录
// 8 单个卡口记录
// 9 车道目录
// 10 单个车道记录
// 13 自动采集的车辆信息
// 14 自动采集的非机动车辆信息
// 15 自动采集的物品信息
// 16 自动采集的文件信息
const (
    SubscribeApe    = "3"  //  采集设备目录
    SubscribePerson = "11" //  自动采集的人员信息
    SubscribeFace   = "12" //  自动采集的人脸信息
)
const (
    Msg_Type_Create_Subscribe int = iota
    Msg_Type_Update_Subscribe
    Msg_Type_Delete_Subscribe
)
vo/face.go
@@ -95,3 +95,7 @@
        FaceObject []FaceObject `json:"FaceObject"`
    } `json:"FaceListObject"`
}
type NotificationFaceList struct {
    FaceObject []FaceObject `json:"FaceObject"`
}
vo/message.go
@@ -30,3 +30,7 @@
    LocalTime    string
    TimeZone     interface{}
}
type ResponseStatusList struct {
    ResponseStatusObject []ResponseStatus
}
vo/subscribe.go
New file
@@ -0,0 +1,76 @@
package vo
import (
    "database/sql/driver"
    "encoding/json"
    "errors"
)
type Subscribe struct {
    SubscribeID           string `json:"SubscribeID"`
    Title                 string `json:"Title"`
    SubscribeDetail       string `json:"SubscribeDetail"`
    ResourceURI           string `json:"ResourceURI"`
    ApplicantName         string `json:"ApplicantName"`
    ApplicantOrg          string `json:"ApplicantOrg"`
    BeginTime             string `json:"BeginTime"` // Kept as string for direct compatibility
    EndTime               string `json:"EndTime"`   // Kept as string for direct compatibility
    ReceiveAddr           string `json:"ReceiveAddr"`
    ReportInterval        int    `json:"ReportInterval"`
    Reason                string `json:"Reason"`
    OperateType           int    `json:"OperateType"`
    SubscribeStatus       int    `json:"SubscribeStatus"`
    SubscribeCancelOrg    string `json:"SubscribeCancelOrg"`
    SubscribeCancelPerson string `json:"SubscribeCancelPerson"`
    CancelTime            string `json:"CancelTime"` // Kept as string for direct compatibility
    CancelReason          string `json:"CancelReason"`
}
func (s *Subscribe) Scan(value interface{}) error {
    b, ok := value.([]byte)
    if !ok {
        return errors.New("failed to unmarshal Author value")
    }
    var config Subscribe
    err := json.Unmarshal(b, &config)
    if err != nil {
        return err
    }
    *s = config
    return nil
}
func (s Subscribe) Value() (driver.Value, error) {
    return json.Marshal(s)
}
type RequestSubscribe struct {
    SubscribeListObject struct {
        SubscribeObject []Subscribe `json:"SubscribeObject"`
    } `json:"SubscribeListObject"`
}
type RequestSubscribeNotification struct {
    SubscribeNotificationListObject struct {
        SubscribeNotificationObject []interface{} `json:"SubscribeNotificationObject"`
    } `json:"SubscribeNotificationListObject"`
}
type DeviceNotification struct {
    NotificationID   string
    SubscribeID      string
    Title            string
    TriggerTime      string
    InfoIDs          string
    ExecuteOperation int
    DeviceList       NotificationApeList
}
type FaceNotification struct {
    NotificationID string
    SubscribeID    string
    Title          string
    TriggerTime    string
    InfoIDs        string
    FaceObjectList NotificationFaceList
}