package service import ( "context" "encoding/json" "gat1400Exchange/pkg/logger" "strings" "sync" "time" "gat1400Exchange/client" "gat1400Exchange/models" "gat1400Exchange/pkg/snowflake" "gat1400Exchange/vo" ) type NotificationTask struct { ctx context.Context conf *models.Subscribe faceList []*vo.FaceObject personList []*vo.PersonObject mutex sync.Mutex lastApeExecTime int64 } func (task *NotificationTask) start() { TaskWaitGroup.Add(1) Timer := time.NewTicker(time.Duration(task.conf.Ext.ReportInterval) * time.Second) for { select { case <-task.ctx.Done(): TaskProcMap.Delete(task.conf.Id) TaskWaitGroup.Done() logger.Warn("Subscribe task stop!!! id:%s", task.conf.Id) return case <-Timer.C: task.notify() } } } func (task *NotificationTask) addFace(face *vo.FaceObject) { if !strings.Contains(task.conf.Ext.SubscribeDetail, vo.SubscribeFace) { return } task.mutex.Lock() defer task.mutex.Unlock() logger.Debug("Add Face Notification %s. faceId: %s", task.conf.Ext.Title, face.FaceID) task.faceList = append(task.faceList, face) } func (task *NotificationTask) addPerson(person *vo.PersonObject) { if !strings.Contains(task.conf.Ext.SubscribeDetail, vo.SubscribePerson) { return } task.mutex.Lock() defer task.mutex.Unlock() logger.Debug("Add Person Notification %s. personId: %s", task.conf.Ext.Title, person.PersonID) task.personList = append(task.personList, person) } func (task *NotificationTask) notify() { subDetails := strings.Split(task.conf.Ext.SubscribeDetail, ",") for _, subType := range subDetails { var msg *vo.RequestSubscribeNotification switch subType { case vo.SubscribeApe: msg = task.packDeviceNotificationMsg() case vo.SubscribeFace: msg = task.packFaceNotificationMsg() case vo.SubscribePerson: msg = task.packPersonNotificationMsg() } if msg != nil { b, _ := json.Marshal(msg) client.Notify(task.conf.Ext.ReceiveAddr, b) } } } func (task *NotificationTask) packFaceNotificationMsg() *vo.RequestSubscribeNotification { triggerTime := time.Now().Format("20060102150405") if len(task.faceList) == 0 { return nil } var notification = vo.FaceNotification{ NotificationID: triggerTime + snowflake.GenerateIdStr(), SubscribeID: task.conf.Id, Title: task.conf.Ext.Title, TriggerTime: triggerTime, ExecuteOperation: 1, } var ids []string for idx, _ := range task.faceList { // 按海康示例填充修改字段 task.faceList[idx].ShotTime = task.faceList[idx].FaceAppearTime task.faceList[idx].EntryTime = triggerTime for i, _ := range task.faceList[idx].SubImageList.SubImageInfoObject { task.faceList[idx].SubImageList.SubImageInfoObject[i].EventSort = 2 } ids = append(ids, task.faceList[idx].FaceID) notification.FaceObjectList.FaceObject = append(notification.FaceObjectList.FaceObject, *task.faceList[idx]) } notification.InfoIDs = strings.Join(ids, ";") task.mutex.Lock() task.faceList = []*vo.FaceObject{} task.mutex.Unlock() var req vo.RequestSubscribeNotification req.SubscribeNotificationListObject.SubscribeNotificationObject = append(req.SubscribeNotificationListObject.SubscribeNotificationObject, notification) return &req } func (task *NotificationTask) packPersonNotificationMsg() *vo.RequestSubscribeNotification { triggerTime := time.Now().Format("20060102150405") if len(task.personList) == 0 { return nil } var notification = vo.PersonNotification{ NotificationID: triggerTime + snowflake.GenerateIdStr(), SubscribeID: task.conf.Id, Title: task.conf.Ext.Title, TriggerTime: triggerTime, ExecuteOperation: 1, } var ids []string for idx, _ := range task.personList { // 按海康示例填充修改字段 for i, _ := range task.personList[idx].SubImageList.SubImageInfoObject { task.personList[idx].SubImageList.SubImageInfoObject[i].EventSort = 2 } ids = append(ids, task.personList[idx].PersonID) notification.PersonObjectList.PersonObject = append(notification.PersonObjectList.PersonObject, *task.personList[idx]) } notification.InfoIDs = strings.Join(ids, ";") task.mutex.Lock() task.personList = []*vo.PersonObject{} task.mutex.Unlock() var req vo.RequestSubscribeNotification req.SubscribeNotificationListObject.SubscribeNotificationObject = append(req.SubscribeNotificationListObject.SubscribeNotificationObject, notification) return &req } func (task *NotificationTask) packDeviceNotificationMsg() *vo.RequestSubscribeNotification { // 上报设备 控制设备通知控制频率, 避免频繁, 上级一般会下发秒级的任务, 但设备不会频繁更新, if time.Now().Unix()-task.lastApeExecTime < 60*10 { return nil } triggerTime := time.Now().Format("20060102150405") task.lastApeExecTime = time.Now().Unix() var notification = vo.DeviceNotification{ NotificationID: triggerTime + snowflake.GenerateIdStr(), SubscribeID: task.conf.Id, Title: task.conf.Ext.Title, TriggerTime: triggerTime, ExecuteOperation: 1, } var ids []string var apeMod models.Ape apeList, err := apeMod.FindAll() if err != nil { logger.Warn(err.Error()) return nil } else { for idx, _ := range apeList { ids = append(ids, apeList[idx].Id) notification.DeviceList.APEObject = append(notification.DeviceList.APEObject, apeList[idx].Ext) } } notification.InfoIDs = strings.Join(ids, ";") var req vo.RequestSubscribeNotification req.SubscribeNotificationListObject.SubscribeNotificationObject = append(req.SubscribeNotificationListObject.SubscribeNotificationObject, notification) return &req }