zhangzengfei
2024-05-29 332fc6ad5edca596ecd23876aa9db7452b45f804
service/subscribe.go
@@ -2,16 +2,11 @@
import (
   "context"
   "encoding/json"
   "gat1400Exchange/config"
   "strings"
   "sync"
   "time"
   "gat1400Exchange/client"
   "gat1400Exchange/config"
   "gat1400Exchange/models"
   "gat1400Exchange/pkg/logger"
   "gat1400Exchange/pkg/snowflake"
   "gat1400Exchange/vo"
)
@@ -20,68 +15,76 @@
type TaskProcInfo struct {
   cancel context.CancelFunc
   task   *SubscribeTask
   task   *NotificationTask
}
func (t *TaskProcInfo) stop() {
func (t TaskProcInfo) stop() {
   t.cancel()
   t.task = nil
}
func UpdateTaskProcs(subId string, msgType int, conf *models.Subscribe) {
   logger.Debug("Receive sub msg: %s ", subId)
func InitSubscribeNotificationTasks() {
   var s models.Subscribe
   subList, err := s.FindByFromId(config.ClientConf.ServerId)
   if err != nil {
      logger.Error("Get subscribe info error, server id %s, %s", config.ClientConf.ServerId, err.Error())
      return
   }
   for idx := range subList {
      if subList[idx].Status != 0 {
         continue
      }
      CreateNotificationTask(&subList[idx])
   }
   return
}
func UpdateNotificationTask(subId string, msgType int, conf *models.Subscribe) {
   logger.Debug("Receive update notify task msg: %s %d", subId, msgType)
   proc, isExist := TaskProcMap.Load(subId)
   switch msgType {
   case vo.Msg_Type_Create_Subscribe, vo.Msg_Type_Update_Subscribe:
      if isExist {
         logger.Debug("更新任务, 退出重新开始")
         proc.(TaskProcInfo).cancel()
         logger.Debug("Update notify task, restart after exit")
         proc.(TaskProcInfo).stop()
      }
      CreateTask(conf)
      CreateNotificationTask(conf)
   case vo.Msg_Type_Delete_Subscribe:
      if !isExist {
         return
      }
      // 关闭任务, 并删除
      proc.(TaskProcInfo).cancel()
      proc.(TaskProcInfo).stop()
   default:
      logger.Warn("未知的消息类型")
      logger.Warn("Invalid msg type %d", msgType)
   }
}
func InitSubscribeTask() {
   var s models.Subscribe
   subList, err := s.FindByFromId(config.ClientConf.ServerId)
   if err != nil {
      logger.Error("Find subscribe error, server id %s, %v", config.ClientConf.ServerId, err)
      return
func CreateNotificationTask(conf *models.Subscribe) {
   logger.Debug("Add subscribe notification task, %s, %s", conf.Id, conf.Ext.Title)
   ctx, cancel := context.WithCancel(context.Background())
   task := &NotificationTask{
      ctx:  ctx,
      conf: conf,
   }
   for idx, _ := range subList {
      if subList[idx].Status != 0 {
         continue
      }
      CreateTask(&subList[idx])
   }
   return
}
func AddFaceNotification(face *vo.FaceObject) {
   TaskProcMap.Range(func(key, value interface{}) bool {
      value.(TaskProcInfo).task.AddFace(face)
      return true
   TaskProcMap.Store(conf.Id, TaskProcInfo{
      cancel: cancel,
      task:   task,
   })
   logger.Debug("Add Face Notification.faceId: %s, faceFeath: %s", face.IDNumber, face.OtherFeature)
   go task.start()
}
func StopSubscribeTask() {
func StopNotificationTasks() {
   TaskProcMap.Range(func(key, value interface{}) bool {
      value.(TaskProcInfo).cancel()
      return true
@@ -91,134 +94,20 @@
   TaskWaitGroup.Wait()
}
func CreateTask(conf *models.Subscribe) {
   logger.Debug("添加订阅任务,%s", conf.Id)
   ctx, cancel := context.WithCancel(context.Background())
   newTask := &SubscribeTask{
      ctx:  ctx,
      conf: conf,
   }
   TaskProcMap.Store(conf.Id, TaskProcInfo{
      cancel: cancel,
      task:   newTask,
func AddFaceNotification(face *vo.FaceObject) {
   TaskProcMap.Range(func(key, value interface{}) bool {
      value.(TaskProcInfo).task.addFace(face)
      return true
   })
   go newTask.Start()
   logger.Debug("Add Face Notification. faceId: %s", face.FaceID)
}
type SubscribeTask struct {
   ctx             context.Context
   conf            *models.Subscribe
   faceList        []*vo.FaceObject
   mutex           sync.Mutex
   lastApeExecTime int64
}
func AddPersonNotification(person *vo.PersonObject) {
   TaskProcMap.Range(func(key, value interface{}) bool {
      value.(TaskProcInfo).task.addPerson(person)
      return true
   })
func (task *SubscribeTask) Start() {
   TaskWaitGroup.Add(1)
   Timer := time.NewTicker(time.Duration(task.conf.Ext.ReportInterval) * time.Second)
   for {
      select {
      case <-task.ctx.Done():
         TaskProcMap.Delete(task.conf.Id)
         TaskWaitGroup.Done()
         logger.Warn("Subscribe task stop!!! id:%s", task.conf.Id)
         return
      case <-Timer.C:
         task.Notify()
      }
   }
}
func (task *SubscribeTask) AddFace(face *vo.FaceObject) {
   if !strings.Contains(task.conf.Ext.SubscribeDetail, vo.SubscribeFace) {
      return
   }
   task.mutex.Lock()
   defer task.mutex.Unlock()
   task.faceList = append(task.faceList, face)
}
func (task *SubscribeTask) Notify() {
   subDetails := strings.Split(task.conf.Ext.SubscribeDetail, ",")
   for _, subType := range subDetails {
      triggerTime := time.Now().Format("20060102150405")
      // 上报设备 控制设备通知控制频率, 避免频繁, 上级一般会下发秒级的任务, 但设备不会频繁更新,
      if subType == vo.SubscribeApe && time.Now().Unix()-task.lastApeExecTime > 60*10 {
         task.lastApeExecTime = time.Now().Unix()
         var notification = vo.DeviceNotification{
            NotificationID:   triggerTime + snowflake.GenerateIdStr(),
            SubscribeID:      task.conf.Id,
            Title:            task.conf.Ext.Title,
            TriggerTime:      triggerTime,
            ExecuteOperation: 1,
         }
         var ids []string
         var apeMod models.Ape
         apeList, err := apeMod.FindAll()
         if err != nil {
            logger.Warn(err.Error())
            continue
         } else {
            for idx, _ := range apeList {
               ids = append(ids, apeList[idx].Id)
               notification.DeviceList.APEObject = append(notification.DeviceList.APEObject, apeList[idx].Ext)
            }
         }
         notification.InfoIDs = strings.Join(ids, ";")
         var req vo.RequestSubscribeNotification
         req.SubscribeNotificationListObject.SubscribeNotificationObject = append(req.SubscribeNotificationListObject.SubscribeNotificationObject, notification)
         b, _ := json.Marshal(req)
         client.Notify(task.conf.Ext.ReceiveAddr, b)
         continue
      }
      // 上报人脸
      if subType == vo.SubscribeFace {
         if len(task.faceList) == 0 {
            return
         }
         var notification = vo.FaceNotification{
            NotificationID:   triggerTime + snowflake.GenerateIdStr(),
            SubscribeID:      task.conf.Id,
            Title:            task.conf.Ext.Title,
            TriggerTime:      triggerTime,
            ExecuteOperation: 1,
         }
         var ids []string
         for idx, _ := range task.faceList {
            // 按海康示例填充修改字段
            task.faceList[idx].ShotTime = task.faceList[idx].FaceAppearTime
            task.faceList[idx].EntryTime = triggerTime
            for i, _ := range task.faceList[idx].SubImageList.SubImageInfoObject {
               task.faceList[idx].SubImageList.SubImageInfoObject[i].EventSort = 2
            }
            ids = append(ids, task.faceList[idx].FaceID)
            notification.FaceObjectList.FaceObject = append(notification.FaceObjectList.FaceObject, *task.faceList[idx])
         }
         notification.InfoIDs = strings.Join(ids, ";")
         task.mutex.Lock()
         task.faceList = []*vo.FaceObject{}
         task.mutex.Unlock()
         var req vo.RequestSubscribeNotification
         req.SubscribeNotificationListObject.SubscribeNotificationObject = append(req.SubscribeNotificationListObject.SubscribeNotificationObject, notification)
         b, _ := json.Marshal(req)
         client.Notify(task.conf.Ext.ReceiveAddr, b)
      }
   }
   logger.Debug("Add Person Notification. personId: %s", person.PersonID)
}