package service import ( "context" "encoding/json" "gat1400Exchange/config" "strings" "sync" "time" "gat1400Exchange/client" "gat1400Exchange/models" "gat1400Exchange/pkg/logger" "gat1400Exchange/pkg/snowflake" "gat1400Exchange/vo" ) var TaskProcMap sync.Map var TaskWaitGroup = &sync.WaitGroup{} type TaskProcInfo struct { cancel context.CancelFunc task *SubscribeTask } func (t *TaskProcInfo) stop() { t.cancel() t.task = nil } func UpdateTaskProcs(subId string, msgType int, conf *models.Subscribe) { logger.Debug("Receive sub msg: %s ", subId) proc, isExist := TaskProcMap.Load(subId) switch msgType { case vo.Msg_Type_Create_Subscribe, vo.Msg_Type_Update_Subscribe: if isExist { logger.Debug("更新任务, 退出重新开始") proc.(TaskProcInfo).cancel() } CreateTask(conf) case vo.Msg_Type_Delete_Subscribe: if !isExist { return } // 关闭任务, 并删除 proc.(TaskProcInfo).cancel() default: logger.Warn("未知的消息类型") } } func InitSubscribeTask() { var s models.Subscribe subList, err := s.FindByFromId(config.ClientConf.ServerId) if err != nil { logger.Error("Find subscribe error, server id %s, %v", config.ClientConf.ServerId, err) return } for idx, _ := range subList { if subList[idx].Status != 0 { continue } CreateTask(&subList[idx]) } return } func AddFaceNotification(face *vo.FaceObject) { TaskProcMap.Range(func(key, value interface{}) bool { value.(TaskProcInfo).task.AddFace(face) return true }) logger.Debug("Add Face Notification.") } func StopSubscribeTask() { TaskProcMap.Range(func(key, value interface{}) bool { value.(TaskProcInfo).cancel() return true }) logger.Debug("等待所有任务退出.") TaskWaitGroup.Wait() } func CreateTask(conf *models.Subscribe) { logger.Debug("添加订阅任务,%s", conf.Id) ctx, cancel := context.WithCancel(context.Background()) newTask := &SubscribeTask{ ctx: ctx, conf: conf, } TaskProcMap.Store(conf.Id, TaskProcInfo{ cancel: cancel, task: newTask, }) go newTask.Start() } type SubscribeTask struct { ctx context.Context conf *models.Subscribe faceList []*vo.FaceObject mutex sync.Mutex } func (task *SubscribeTask) Start() { TaskWaitGroup.Add(1) Timer := time.NewTicker(time.Duration(task.conf.Ext.ReportInterval) * time.Second) for { select { case <-task.ctx.Done(): TaskProcMap.Delete(task.conf.Id) TaskWaitGroup.Done() logger.Warn("Subscribe task stop!!! id:%s", task.conf.Id) return case <-Timer.C: task.Notify() } } } func (task *SubscribeTask) AddFace(face *vo.FaceObject) { if !strings.Contains(task.conf.Ext.SubscribeDetail, vo.SubscribeFace) { return } task.mutex.Lock() defer task.mutex.Unlock() task.faceList = append(task.faceList, face) } func (task *SubscribeTask) Notify() { subDetails := strings.Split(task.conf.Ext.SubscribeDetail, ",") for _, subType := range subDetails { triggerTime := time.Now().Format("20060102150405") // 上报设备 if subType == vo.SubscribeApe { var notification = vo.DeviceNotification{ NotificationID: triggerTime + snowflake.GenerateIdStr(), SubscribeID: task.conf.Id, Title: task.conf.Ext.Title, TriggerTime: triggerTime, ExecuteOperation: 1, } var ids []string var apeMod models.Ape apeList, err := apeMod.FindAll() if err != nil { logger.Warn(err.Error()) continue } else { for idx, _ := range apeList { ids = append(ids, apeList[idx].Id) notification.DeviceList.APEObject = append(notification.DeviceList.APEObject, apeList[idx].Ext) } } notification.InfoIDs = strings.Join(ids, ";") var req vo.RequestSubscribeNotification req.SubscribeNotificationListObject.SubscribeNotificationObject = append(req.SubscribeNotificationListObject.SubscribeNotificationObject, notification) b, _ := json.Marshal(req) client.Notify(task.conf.Ext.ReceiveAddr, b) continue } // 上报人脸 if subType == vo.SubscribeFace { if len(task.faceList) == 0 { return } var notification = vo.FaceNotification{ NotificationID: triggerTime + snowflake.GenerateIdStr(), SubscribeID: task.conf.Id, Title: task.conf.Ext.Title, TriggerTime: triggerTime, ExecuteOperation: 1, } var ids []string for idx, _ := range task.faceList { // 按海康示例填充修改字段 task.faceList[idx].ShotTime = task.faceList[idx].FaceAppearTime task.faceList[idx].EntryTime = triggerTime for i, _ := range task.faceList[idx].SubImageList.SubImageInfoObject { task.faceList[idx].SubImageList.SubImageInfoObject[i].EventSort = 2 } ids = append(ids, task.faceList[idx].FaceID) notification.FaceObjectList.FaceObject = append(notification.FaceObjectList.FaceObject, *task.faceList[idx]) } notification.InfoIDs = strings.Join(ids, ";") task.mutex.Lock() task.faceList = []*vo.FaceObject{} task.mutex.Unlock() var req vo.RequestSubscribeNotification req.SubscribeNotificationListObject.SubscribeNotificationObject = append(req.SubscribeNotificationListObject.SubscribeNotificationObject, notification) b, _ := json.Marshal(req) client.Notify(task.conf.Ext.ReceiveAddr, b) } } }