zhangzengfei
2024-05-29 332fc6ad5edca596ecd23876aa9db7452b45f804
添加人员抓拍处理
1个文件已添加
8个文件已修改
606 ■■■■■ 已修改文件
client/faces.go 27 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
client/notify.go 7 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
controller/captureCtl.go 58 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
main.go 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
repository/captureRepo.go 121 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
repository/subscribeRepo.go 6 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
service/notification.go 160 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
service/resend.go 8 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
service/subscribe.go 215 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
client/faces.go
@@ -11,7 +11,8 @@
)
const (
    FacesUrI = "/VIID/Faces"
    FacesUrI   = "/VIID/Faces"
    PersonsUrI = "/VIID/Persons"
)
func FaceCapture(msg []byte) int {
@@ -36,3 +37,27 @@
    logger.Debug("Post faces success.")
    return stat.StatusCode
}
func PersonCapture(msg []byte) int {
    if clientStatus != vo.StatusSuccess {
        return clientStatus
    }
    url := fmt.Sprintf("%s://%s:%s%s", config.ClientConf.Proto, config.ClientConf.ServerAddr, config.ClientConf.ServerPort, PersonsUrI)
    rsp, err := util.HttpPost(url, headers, msg)
    if err != nil {
        logger.Warn("Post person failed, %s", err.Error())
        return vo.StatusOtherError
    }
    var stat vo.ResponseStatus
    err = json.Unmarshal(rsp, &stat)
    if err != nil {
        logger.Warn("Post person response unmarshal failed, %s", err.Error())
        return vo.StatusOtherError
    }
    logger.Debug("Post person success.")
    return stat.StatusCode
}
client/notify.go
@@ -49,7 +49,7 @@
        return vo.StatusOtherError
    }
    logger.Debug("Post notification success.")
    logger.Debug("Post subscribe success.")
    return vo.StatusSuccess
}
@@ -69,7 +69,7 @@
        return vo.StatusOtherError
    }
    logger.Debug("Post notification success.")
    logger.Debug("put subscribe success.")
    return vo.StatusSuccess
}
@@ -77,8 +77,7 @@
func GetSubscribes(url string) ([]byte, error) {
    rsp, err := util.HttpGet(url, headers)
    if err != nil {
        logger.Warn("Put subscribe failed, %s", err.Error())
        logger.Warn("Get subscribe failed, %s", err.Error())
    }
    return rsp, err
controller/captureCtl.go
@@ -2,14 +2,17 @@
import (
    "encoding/base64"
    "gat1400Exchange/config"
    "gat1400Exchange/pkg"
    "gat1400Exchange/service"
    "fmt"
    "math/rand"
    "net/http"
    "strconv"
    "time"
    "gat1400Exchange/config"
    "gat1400Exchange/pkg"
    "gat1400Exchange/pkg/logger"
    "gat1400Exchange/repository"
    "gat1400Exchange/service"
    "gat1400Exchange/vo"
    "github.com/gin-gonic/gin"
@@ -45,7 +48,7 @@
    // 如果开启了下级, 身份应该是消息代理, 不再转发到服务器
    if config.ClientConf.Enable && config.ServeConf.Role == "agent" {
        go a.Repository.VIIDMsgForward(&req)
        go a.Repository.VIIDFaceMsgForward(&req)
    } else if config.ServeConf.Role == "cascade" {
        go service.AddFaceNotification(&face)
    }
@@ -80,15 +83,17 @@
    videoLabel := req.VideoLabelListObject.VideoLabelObject[0]
    logger.Debug("Receive new message, Id:%s Ip:%s ", videoLabel.VideoLabelID, c.RemoteIP())
    // 转人脸消息
    var face vo.FaceObject
    face.FaceID = videoLabel.VideoLabelID
    face.InfoKind = 1
    face.SourceID = videoLabel.VideoImageID
    face.DeviceID = videoLabel.IVADeviceID
    face.LocationMarkTime = videoLabel.CreateTimeAbs
    face.FaceAppearTime = videoLabel.BehaviorAnalysisObject.BehaviorBeginTime
    face.FaceDisAppearTime = videoLabel.BehaviorAnalysisObject.BehaviorEndTime
    // 转人员消息
    var person vo.PersonObject
    person.PersonID = videoLabel.VideoLabelID[0:41] + "01" + videoLabel.VideoLabelID[43:48]
    person.InfoKind = "1"
    person.DeviceID = videoLabel.IVADeviceID
    person.SourceID = videoLabel.VideoImageID
    person.LocationMarkTime = videoLabel.BehaviorAnalysisObject.BehaviorBeginTime
    person.PersonAppearTime = videoLabel.BehaviorAnalysisObject.BehaviorBeginTime
    person.PersonDisAppearTime = videoLabel.BehaviorAnalysisObject.BehaviorEndTime
    person.IsDriver = 2
    person.IsCriminalInvolved = 2
    var hasTargetImage bool
    var bgImageWith, bgImageHeight int
@@ -104,8 +109,8 @@
            bgImage = &videoLabel.SubImageList.SubImageInfoObject[idx]
        }
        face.SubImageList.SubImageInfoObject = append(
            face.SubImageList.SubImageInfoObject,
        person.SubImageList.SubImageInfoObject = append(
            person.SubImageList.SubImageInfoObject,
            videoLabel.SubImageList.SubImageInfoObject[idx],
        )
    }
@@ -129,8 +134,15 @@
            return
        }
        imageId := bgImage.ImageID[37:41]
        imageNumber, err := strconv.Atoi(imageId)
        if err != nil {
            imageNumber = rand.Intn(90000) + 10000
        } else {
            imageNumber++
        }
        var subImageInfo = vo.SubImageInfoObject{
            ImageID:     bgImage.ImageID + "1",
            ImageID:     fmt.Sprintf("%s%d", bgImage.ImageID[0:37], imageNumber),
            EventSort:   10,
            DeviceID:    bgImage.DeviceID,
            StoragePath: "",
@@ -141,23 +153,23 @@
            Height:      subRect.Top - subRect.Bottom,
            Data:        base64.StdEncoding.EncodeToString(subImage),
        }
        face.SubImageList.SubImageInfoObject = append(
            face.SubImageList.SubImageInfoObject,
        person.SubImageList.SubImageInfoObject = append(
            person.SubImageList.SubImageInfoObject,
            subImageInfo,
        )
    }
    // 如果开启了下级, 身份应该是消息代理, 不再转发到服务器
    if config.ClientConf.Enable && config.ServeConf.Role == "agent" {
        var faceObjList vo.RequestFaceList
        faceObjList.FaceListObject.FaceObject = append(faceObjList.FaceListObject.FaceObject, face)
        go a.Repository.VIIDMsgForward(&faceObjList)
        var personList vo.RequestPersonList
        personList.PersonListObject.PersonObject = append(personList.PersonListObject.PersonObject, person)
        go a.Repository.VIIDPersonMsgForward(&personList)
    } else if config.ServeConf.Role == "cascade" {
        go service.AddFaceNotification(&face)
        go service.AddPersonNotification(&person)
    }
    if config.ForwardConf.SyncServer != "" {
        go a.Repository.FaceForward([]vo.FaceObject{face})
        go a.Repository.PersonForward([]vo.PersonObject{person})
    }
    rspMsg := vo.ResponseStatus{
main.go
@@ -44,7 +44,7 @@
    // 启动网络视频字符叠加器服务
    go service.StartNVCSServer()
    go service.InitSubscribeTask()
    go service.InitSubscribeNotificationTasks()
    // 启动定时任务
    cron.Init()
@@ -77,7 +77,7 @@
        logger.Error("Server forced to shutdown:", err)
    }
    service.StopSubscribeTask()
    service.StopNotificationTasks()
    logger.Info("Server exiting!")
}
repository/captureRepo.go
@@ -121,6 +121,91 @@
    return
}
func (c CaptureRepository) PersonForward(personList []vo.PersonObject) {
    var err error
    if personList == nil || len(personList) == 0 {
        logger.Warn("FaceList is nil")
        return
    }
    for _, v := range personList {
        if v.SubImageList.SubImageInfoObject == nil {
            logger.Warn("SubImageInfoObject is nil")
            continue
        }
        var deviceId = v.DeviceID
        var targetId = v.PersonID
        var bgImageStr string
        var bgImageBytes, faceImageBytes []byte = nil, nil
        // 获取大图, 目前海康的小图分辨率太低
        for _, image := range v.SubImageList.SubImageInfoObject {
            if image.Type != "14" {
                continue
            }
            if len(image.Data) > 0 {
                if len(image.Data) > len(bgImageStr) {
                    bgImageStr = image.Data
                }
            } else if image.StoragePath != "" {
                imgData, err := util.ImageDownload(image.StoragePath, nil)
                if err != nil {
                    logger.Warn("Image download failure, %s", err.Error())
                } else {
                    bgImageBytes = imgData
                    bgImageStr = "///"
                }
            }
        }
        if bgImageBytes == nil {
            bgImageBytes, err = base64.StdEncoding.DecodeString(bgImageStr)
            if err != nil {
                logger.Warn("Decode Image Base64 String failure, %s", err.Error())
                continue
            }
        }
        // 转发图像
        logger.Debug("Prepare forward person image, deviceId:%s, image len:%d, server:%s", deviceId, len(bgImageBytes), config.ForwardConf.SyncServer)
        if deviceId != "" && bgImageStr != "" && config.ForwardConf.SyncServer != "" {
            pd := c.PackPushDataV2(deviceId, targetId, v.PersonAppearTime, bgImageBytes, faceImageBytes)
            if pd == nil {
                return
            }
            // 处理梯控填写的楼层信息 暂时使用oherFeature字段
            if v.BehaviorDescription != "" {
                pd.CameraFloor = v.BehaviorDescription
            }
            // 尝试从faceId提取楼层
            if pd.CameraFloor == "" && config.ClientConf.AddFloorToFaceId {
                pd.CameraFloor, _ = pkg.ParseFloorFromId(v.PersonID)
            }
            //logger.Debug("device %s, CameraFloor:%s", deviceId, pd.CameraFloor)
            payload, err := json.Marshal(pd)
            if err != nil {
                logger.Warn("Marshal error, %s", err.Error())
                return
            }
            if !util.SendData(payload, config.ForwardConf.SyncServer) {
                cacheItem, _ := json.Marshal(pd)
                c.CacheData(cacheItem, "basic")
                logger.Warn("The data forwarding failed, adding to local cache.")
            } else {
                logger.Debug("The data forwarding successful. deviceId:%s", deviceId)
            }
        }
    }
    return
}
func (c CaptureRepository) PackPushDataV2(deviceId, faceId, appearTime string, bgImgBytes, faceImgBytes []byte) *vo.PushDataInfoV2 {
    var pd = new(vo.PushDataInfoV2)
    var floor string
@@ -165,7 +250,7 @@
    cacheItem.Save()
}
func (c CaptureRepository) VIIDMsgForward(msg *vo.RequestFaceList) {
func (c CaptureRepository) VIIDFaceMsgForward(msg *vo.RequestFaceList) {
    faceInfo := msg.FaceListObject.FaceObject[0]
    // 匹配楼层
    faceAppearTime, err := time.ParseInLocation("20060102150405", faceInfo.FaceAppearTime, time.Local)
@@ -190,7 +275,39 @@
    b, _ := json.Marshal(msg)
    if client.FaceCapture(b) != vo.StatusSuccess {
        cacheItem, _ := json.Marshal(msg)
        c.CacheData(cacheItem, "1400")
        c.CacheData(cacheItem, "1400-face")
        logger.Warn("The data forwarding failed, adding to local cache.")
    }
    return
}
func (c CaptureRepository) VIIDPersonMsgForward(msg *vo.RequestPersonList) {
    person := msg.PersonListObject.PersonObject[0]
    // 匹配楼层
    appearTime, err := time.ParseInLocation("20060102150405", person.PersonAppearTime, time.Local)
    if err != nil {
        logger.Warn("Parse face appear time error, %s", err.Error())
        appearTime = time.Now()
    }
    var devPos models.Positions
    _ = devPos.FindPositionByTime(appearTime.Unix() + 5) // 加5秒电梯关门的时间
    if devPos.Pos == "" {
        devPos.Pos = "1F"
    }
    for idx, v := range msg.PersonListObject.PersonObject {
        msg.PersonListObject.PersonObject[idx].BehaviorDescription = devPos.Pos
        if config.ClientConf.AddFloorToFaceId {
            msg.PersonListObject.PersonObject[idx].PersonID = pkg.GenerateFaceIdContainFloor(v.PersonID, devPos.Pos)
        }
    }
    b, _ := json.Marshal(msg)
    if client.PersonCapture(b) != vo.StatusSuccess {
        cacheItem, _ := json.Marshal(msg)
        c.CacheData(cacheItem, "1400-person")
        logger.Warn("The data forwarding failed, adding to local cache.")
    }
repository/subscribeRepo.go
@@ -148,7 +148,7 @@
        return err
    }
    service.UpdateTaskProcs(sub.Id, vo.Msg_Type_Create_Subscribe, &sub)
    service.UpdateNotificationTask(sub.Id, vo.Msg_Type_Create_Subscribe, &sub)
    return err
}
@@ -163,7 +163,7 @@
    sub.Status = subscribe.SubscribeStatus
    sub.Ext = *subscribe
    service.UpdateTaskProcs(subscribe.SubscribeID, vo.Msg_Type_Update_Subscribe, &sub)
    service.UpdateNotificationTask(subscribe.SubscribeID, vo.Msg_Type_Update_Subscribe, &sub)
    return sub.Save()
}
@@ -175,7 +175,7 @@
        return err
    }
    service.UpdateTaskProcs(id, vo.Msg_Type_Delete_Subscribe, nil)
    service.UpdateNotificationTask(id, vo.Msg_Type_Delete_Subscribe, nil)
    return err
}
service/notification.go
New file
@@ -0,0 +1,160 @@
package service
import (
    "context"
    "encoding/json"
    "gat1400Exchange/pkg/logger"
    "strings"
    "sync"
    "time"
    "gat1400Exchange/client"
    "gat1400Exchange/models"
    "gat1400Exchange/pkg/snowflake"
    "gat1400Exchange/vo"
)
type NotificationTask struct {
    ctx             context.Context
    conf            *models.Subscribe
    faceList        []*vo.FaceObject
    personList      []*vo.PersonObject
    mutex           sync.Mutex
    lastApeExecTime int64
}
func (task *NotificationTask) start() {
    TaskWaitGroup.Add(1)
    Timer := time.NewTicker(time.Duration(task.conf.Ext.ReportInterval) * time.Second)
    for {
        select {
        case <-task.ctx.Done():
            TaskProcMap.Delete(task.conf.Id)
            TaskWaitGroup.Done()
            logger.Warn("Subscribe task stop!!! id:%s", task.conf.Id)
            return
        case <-Timer.C:
            task.notify()
        }
    }
}
func (task *NotificationTask) addFace(face *vo.FaceObject) {
    if !strings.Contains(task.conf.Ext.SubscribeDetail, vo.SubscribeFace) {
        return
    }
    task.mutex.Lock()
    defer task.mutex.Unlock()
    task.faceList = append(task.faceList, face)
}
func (task *NotificationTask) addPerson(person *vo.PersonObject) {
    if !strings.Contains(task.conf.Ext.SubscribeDetail, vo.SubscribePerson) {
        return
    }
    task.mutex.Lock()
    defer task.mutex.Unlock()
    task.personList = append(task.personList, person)
}
func (task *NotificationTask) notify() {
    subDetails := strings.Split(task.conf.Ext.SubscribeDetail, ",")
    for _, subType := range subDetails {
        var msg *vo.RequestSubscribeNotification
        switch subType {
        case vo.SubscribeApe:
            msg = task.packDeviceNotificationMsg()
        case vo.SubscribeFace:
            msg = task.packFaceNotificationMsg()
        case vo.SubscribePerson:
        }
        if msg != nil {
            b, _ := json.Marshal(msg)
            client.Notify(task.conf.Ext.ReceiveAddr, b)
        }
    }
}
func (task *NotificationTask) packFaceNotificationMsg() *vo.RequestSubscribeNotification {
    triggerTime := time.Now().Format("20060102150405")
    if len(task.faceList) == 0 {
        return nil
    }
    var notification = vo.FaceNotification{
        NotificationID:   triggerTime + snowflake.GenerateIdStr(),
        SubscribeID:      task.conf.Id,
        Title:            task.conf.Ext.Title,
        TriggerTime:      triggerTime,
        ExecuteOperation: 1,
    }
    var ids []string
    for idx, _ := range task.faceList {
        // 按海康示例填充修改字段
        task.faceList[idx].ShotTime = task.faceList[idx].FaceAppearTime
        task.faceList[idx].EntryTime = triggerTime
        for i, _ := range task.faceList[idx].SubImageList.SubImageInfoObject {
            task.faceList[idx].SubImageList.SubImageInfoObject[i].EventSort = 2
        }
        ids = append(ids, task.faceList[idx].FaceID)
        notification.FaceObjectList.FaceObject = append(notification.FaceObjectList.FaceObject, *task.faceList[idx])
    }
    notification.InfoIDs = strings.Join(ids, ";")
    task.mutex.Lock()
    task.faceList = []*vo.FaceObject{}
    task.mutex.Unlock()
    var req vo.RequestSubscribeNotification
    req.SubscribeNotificationListObject.SubscribeNotificationObject[0] = notification
    return &req
}
func (task *NotificationTask) packDeviceNotificationMsg() *vo.RequestSubscribeNotification {
    // 上报设备 控制设备通知控制频率, 避免频繁, 上级一般会下发秒级的任务, 但设备不会频繁更新,
    if time.Now().Unix()-task.lastApeExecTime < 60*10 {
        return nil
    }
    triggerTime := time.Now().Format("20060102150405")
    task.lastApeExecTime = time.Now().Unix()
    var notification = vo.DeviceNotification{
        NotificationID:   triggerTime + snowflake.GenerateIdStr(),
        SubscribeID:      task.conf.Id,
        Title:            task.conf.Ext.Title,
        TriggerTime:      triggerTime,
        ExecuteOperation: 1,
    }
    var ids []string
    var apeMod models.Ape
    apeList, err := apeMod.FindAll()
    if err != nil {
        logger.Warn(err.Error())
        return nil
    } else {
        for idx, _ := range apeList {
            ids = append(ids, apeList[idx].Id)
            notification.DeviceList.APEObject = append(notification.DeviceList.APEObject, apeList[idx].Ext)
        }
    }
    notification.InfoIDs = strings.Join(ids, ";")
    var req vo.RequestSubscribeNotification
    req.SubscribeNotificationListObject.SubscribeNotificationObject[0] = notification
    return &req
}
service/resend.go
@@ -29,12 +29,18 @@
            logger.Error(err.Error())
            return
        }
        if c.Type == "1400" {
        if c.Type == "1400-face" {
            if client.FaceCapture([]byte(c.Data)) != vo.StatusSuccess {
                c.UpdateRetryCount()
                logger.Warn("The data resend failed. retry count %d", c.Retry+1)
                return
            }
        } else if c.Type == "1400-person" {
            if client.PersonCapture([]byte(c.Data)) != vo.StatusSuccess {
                c.UpdateRetryCount()
                logger.Warn("The data resend failed. retry count %d", c.Retry+1)
                return
            }
        } else {
            if !util.SendData([]byte(c.Data), config.ForwardConf.SyncServer) {
                c.UpdateRetryCount()
service/subscribe.go
@@ -2,16 +2,11 @@
import (
    "context"
    "encoding/json"
    "gat1400Exchange/config"
    "strings"
    "sync"
    "time"
    "gat1400Exchange/client"
    "gat1400Exchange/config"
    "gat1400Exchange/models"
    "gat1400Exchange/pkg/logger"
    "gat1400Exchange/pkg/snowflake"
    "gat1400Exchange/vo"
)
@@ -20,68 +15,76 @@
type TaskProcInfo struct {
    cancel context.CancelFunc
    task   *SubscribeTask
    task   *NotificationTask
}
func (t *TaskProcInfo) stop() {
func (t TaskProcInfo) stop() {
    t.cancel()
    t.task = nil
}
func UpdateTaskProcs(subId string, msgType int, conf *models.Subscribe) {
    logger.Debug("Receive sub msg: %s ", subId)
func InitSubscribeNotificationTasks() {
    var s models.Subscribe
    subList, err := s.FindByFromId(config.ClientConf.ServerId)
    if err != nil {
        logger.Error("Get subscribe info error, server id %s, %s", config.ClientConf.ServerId, err.Error())
        return
    }
    for idx := range subList {
        if subList[idx].Status != 0 {
            continue
        }
        CreateNotificationTask(&subList[idx])
    }
    return
}
func UpdateNotificationTask(subId string, msgType int, conf *models.Subscribe) {
    logger.Debug("Receive update notify task msg: %s %d", subId, msgType)
    proc, isExist := TaskProcMap.Load(subId)
    switch msgType {
    case vo.Msg_Type_Create_Subscribe, vo.Msg_Type_Update_Subscribe:
        if isExist {
            logger.Debug("更新任务, 退出重新开始")
            proc.(TaskProcInfo).cancel()
            logger.Debug("Update notify task, restart after exit")
            proc.(TaskProcInfo).stop()
        }
        CreateTask(conf)
        CreateNotificationTask(conf)
    case vo.Msg_Type_Delete_Subscribe:
        if !isExist {
            return
        }
        // 关闭任务, 并删除
        proc.(TaskProcInfo).cancel()
        proc.(TaskProcInfo).stop()
    default:
        logger.Warn("未知的消息类型")
        logger.Warn("Invalid msg type %d", msgType)
    }
}
func InitSubscribeTask() {
    var s models.Subscribe
    subList, err := s.FindByFromId(config.ClientConf.ServerId)
    if err != nil {
        logger.Error("Find subscribe error, server id %s, %v", config.ClientConf.ServerId, err)
        return
func CreateNotificationTask(conf *models.Subscribe) {
    logger.Debug("Add subscribe notification task, %s, %s", conf.Id, conf.Ext.Title)
    ctx, cancel := context.WithCancel(context.Background())
    task := &NotificationTask{
        ctx:  ctx,
        conf: conf,
    }
    for idx, _ := range subList {
        if subList[idx].Status != 0 {
            continue
        }
        CreateTask(&subList[idx])
    }
    return
}
func AddFaceNotification(face *vo.FaceObject) {
    TaskProcMap.Range(func(key, value interface{}) bool {
        value.(TaskProcInfo).task.AddFace(face)
        return true
    TaskProcMap.Store(conf.Id, TaskProcInfo{
        cancel: cancel,
        task:   task,
    })
    logger.Debug("Add Face Notification.faceId: %s, faceFeath: %s", face.IDNumber, face.OtherFeature)
    go task.start()
}
func StopSubscribeTask() {
func StopNotificationTasks() {
    TaskProcMap.Range(func(key, value interface{}) bool {
        value.(TaskProcInfo).cancel()
        return true
@@ -91,134 +94,20 @@
    TaskWaitGroup.Wait()
}
func CreateTask(conf *models.Subscribe) {
    logger.Debug("添加订阅任务,%s", conf.Id)
    ctx, cancel := context.WithCancel(context.Background())
    newTask := &SubscribeTask{
        ctx:  ctx,
        conf: conf,
    }
    TaskProcMap.Store(conf.Id, TaskProcInfo{
        cancel: cancel,
        task:   newTask,
func AddFaceNotification(face *vo.FaceObject) {
    TaskProcMap.Range(func(key, value interface{}) bool {
        value.(TaskProcInfo).task.addFace(face)
        return true
    })
    go newTask.Start()
    logger.Debug("Add Face Notification. faceId: %s", face.FaceID)
}
type SubscribeTask struct {
    ctx             context.Context
    conf            *models.Subscribe
    faceList        []*vo.FaceObject
    mutex           sync.Mutex
    lastApeExecTime int64
}
func AddPersonNotification(person *vo.PersonObject) {
    TaskProcMap.Range(func(key, value interface{}) bool {
        value.(TaskProcInfo).task.addPerson(person)
        return true
    })
func (task *SubscribeTask) Start() {
    TaskWaitGroup.Add(1)
    Timer := time.NewTicker(time.Duration(task.conf.Ext.ReportInterval) * time.Second)
    for {
        select {
        case <-task.ctx.Done():
            TaskProcMap.Delete(task.conf.Id)
            TaskWaitGroup.Done()
            logger.Warn("Subscribe task stop!!! id:%s", task.conf.Id)
            return
        case <-Timer.C:
            task.Notify()
        }
    }
}
func (task *SubscribeTask) AddFace(face *vo.FaceObject) {
    if !strings.Contains(task.conf.Ext.SubscribeDetail, vo.SubscribeFace) {
        return
    }
    task.mutex.Lock()
    defer task.mutex.Unlock()
    task.faceList = append(task.faceList, face)
}
func (task *SubscribeTask) Notify() {
    subDetails := strings.Split(task.conf.Ext.SubscribeDetail, ",")
    for _, subType := range subDetails {
        triggerTime := time.Now().Format("20060102150405")
        // 上报设备 控制设备通知控制频率, 避免频繁, 上级一般会下发秒级的任务, 但设备不会频繁更新,
        if subType == vo.SubscribeApe && time.Now().Unix()-task.lastApeExecTime > 60*10 {
            task.lastApeExecTime = time.Now().Unix()
            var notification = vo.DeviceNotification{
                NotificationID:   triggerTime + snowflake.GenerateIdStr(),
                SubscribeID:      task.conf.Id,
                Title:            task.conf.Ext.Title,
                TriggerTime:      triggerTime,
                ExecuteOperation: 1,
            }
            var ids []string
            var apeMod models.Ape
            apeList, err := apeMod.FindAll()
            if err != nil {
                logger.Warn(err.Error())
                continue
            } else {
                for idx, _ := range apeList {
                    ids = append(ids, apeList[idx].Id)
                    notification.DeviceList.APEObject = append(notification.DeviceList.APEObject, apeList[idx].Ext)
                }
            }
            notification.InfoIDs = strings.Join(ids, ";")
            var req vo.RequestSubscribeNotification
            req.SubscribeNotificationListObject.SubscribeNotificationObject = append(req.SubscribeNotificationListObject.SubscribeNotificationObject, notification)
            b, _ := json.Marshal(req)
            client.Notify(task.conf.Ext.ReceiveAddr, b)
            continue
        }
        // 上报人脸
        if subType == vo.SubscribeFace {
            if len(task.faceList) == 0 {
                return
            }
            var notification = vo.FaceNotification{
                NotificationID:   triggerTime + snowflake.GenerateIdStr(),
                SubscribeID:      task.conf.Id,
                Title:            task.conf.Ext.Title,
                TriggerTime:      triggerTime,
                ExecuteOperation: 1,
            }
            var ids []string
            for idx, _ := range task.faceList {
                // 按海康示例填充修改字段
                task.faceList[idx].ShotTime = task.faceList[idx].FaceAppearTime
                task.faceList[idx].EntryTime = triggerTime
                for i, _ := range task.faceList[idx].SubImageList.SubImageInfoObject {
                    task.faceList[idx].SubImageList.SubImageInfoObject[i].EventSort = 2
                }
                ids = append(ids, task.faceList[idx].FaceID)
                notification.FaceObjectList.FaceObject = append(notification.FaceObjectList.FaceObject, *task.faceList[idx])
            }
            notification.InfoIDs = strings.Join(ids, ";")
            task.mutex.Lock()
            task.faceList = []*vo.FaceObject{}
            task.mutex.Unlock()
            var req vo.RequestSubscribeNotification
            req.SubscribeNotificationListObject.SubscribeNotificationObject = append(req.SubscribeNotificationListObject.SubscribeNotificationObject, notification)
            b, _ := json.Marshal(req)
            client.Notify(task.conf.Ext.ReceiveAddr, b)
        }
    }
    logger.Debug("Add Person Notification. personId: %s", person.PersonID)
}