| | |
| | | |
| | | import ( |
| | | "context" |
| | | "encoding/json" |
| | | "gat1400Exchange/config" |
| | | "strings" |
| | | "sync" |
| | | "time" |
| | | |
| | | "gat1400Exchange/client" |
| | | "gat1400Exchange/config" |
| | | "gat1400Exchange/models" |
| | | "gat1400Exchange/pkg/logger" |
| | | "gat1400Exchange/pkg/snowflake" |
| | | "gat1400Exchange/vo" |
| | | ) |
| | | |
| | |
| | | |
| | | type TaskProcInfo struct { |
| | | cancel context.CancelFunc |
| | | task *SubscribeTask |
| | | task *NotificationTask |
| | | } |
| | | |
| | | func (t *TaskProcInfo) stop() { |
| | | func (t TaskProcInfo) stop() { |
| | | t.cancel() |
| | | t.task = nil |
| | | } |
| | | |
| | | func UpdateTaskProcs(subId string, msgType int, conf *models.Subscribe) { |
| | | logger.Debug("Receive sub msg: %s ", subId) |
| | | func InitSubscribeNotificationTasks() { |
| | | var s models.Subscribe |
| | | subList, err := s.FindByFromId(config.ClientConf.ServerId) |
| | | if err != nil { |
| | | logger.Error("Get subscribe info error, server id %s, %s", config.ClientConf.ServerId, err.Error()) |
| | | return |
| | | } |
| | | |
| | | for idx := range subList { |
| | | if subList[idx].Status != 0 { |
| | | continue |
| | | } |
| | | |
| | | CreateNotificationTask(&subList[idx]) |
| | | } |
| | | |
| | | return |
| | | } |
| | | |
| | | func UpdateNotificationTask(subId string, msgType int, conf *models.Subscribe) { |
| | | logger.Debug("Receive update notify task msg: %s %d", subId, msgType) |
| | | |
| | | proc, isExist := TaskProcMap.Load(subId) |
| | | |
| | | switch msgType { |
| | | case vo.Msg_Type_Create_Subscribe, vo.Msg_Type_Update_Subscribe: |
| | | if isExist { |
| | | logger.Debug("更新任务, 退出重新开始") |
| | | proc.(TaskProcInfo).cancel() |
| | | logger.Debug("Update notify task, restart after exit") |
| | | proc.(TaskProcInfo).stop() |
| | | } |
| | | |
| | | CreateTask(conf) |
| | | CreateNotificationTask(conf) |
| | | case vo.Msg_Type_Delete_Subscribe: |
| | | if !isExist { |
| | | return |
| | | } |
| | | |
| | | // 关闭任务, 并删除 |
| | | proc.(TaskProcInfo).cancel() |
| | | proc.(TaskProcInfo).stop() |
| | | default: |
| | | logger.Warn("未知的消息类型") |
| | | logger.Warn("Invalid msg type %d", msgType) |
| | | } |
| | | } |
| | | |
| | | func InitSubscribeTask() { |
| | | var s models.Subscribe |
| | | subList, err := s.FindByFromId(config.ClientConf.ServerId) |
| | | if err != nil { |
| | | logger.Error("Find subscribe error, server id %s, %v", config.ClientConf.ServerId, err) |
| | | return |
| | | func CreateNotificationTask(conf *models.Subscribe) { |
| | | logger.Debug("Add subscribe notification task, %s, %s", conf.Id, conf.Ext.Title) |
| | | |
| | | ctx, cancel := context.WithCancel(context.Background()) |
| | | task := &NotificationTask{ |
| | | ctx: ctx, |
| | | conf: conf, |
| | | } |
| | | |
| | | for idx, _ := range subList { |
| | | if subList[idx].Status != 0 { |
| | | continue |
| | | } |
| | | |
| | | CreateTask(&subList[idx]) |
| | | } |
| | | |
| | | return |
| | | } |
| | | |
| | | func AddFaceNotification(face *vo.FaceObject) { |
| | | TaskProcMap.Range(func(key, value interface{}) bool { |
| | | value.(TaskProcInfo).task.AddFace(face) |
| | | return true |
| | | TaskProcMap.Store(conf.Id, TaskProcInfo{ |
| | | cancel: cancel, |
| | | task: task, |
| | | }) |
| | | |
| | | logger.Debug("Add Face Notification.faceId: %s, faceFeath: %s", face.IDNumber, face.OtherFeature) |
| | | go task.start() |
| | | } |
| | | |
| | | func StopSubscribeTask() { |
| | | func StopNotificationTasks() { |
| | | TaskProcMap.Range(func(key, value interface{}) bool { |
| | | value.(TaskProcInfo).cancel() |
| | | return true |
| | |
| | | TaskWaitGroup.Wait() |
| | | } |
| | | |
| | | func CreateTask(conf *models.Subscribe) { |
| | | logger.Debug("添加订阅任务,%s", conf.Id) |
| | | |
| | | ctx, cancel := context.WithCancel(context.Background()) |
| | | newTask := &SubscribeTask{ |
| | | ctx: ctx, |
| | | conf: conf, |
| | | } |
| | | |
| | | TaskProcMap.Store(conf.Id, TaskProcInfo{ |
| | | cancel: cancel, |
| | | task: newTask, |
| | | func AddFaceNotification(face *vo.FaceObject) { |
| | | TaskProcMap.Range(func(key, value interface{}) bool { |
| | | value.(TaskProcInfo).task.addFace(face) |
| | | return true |
| | | }) |
| | | |
| | | go newTask.Start() |
| | | logger.Debug("Add Face Notification. faceId: %s", face.FaceID) |
| | | } |
| | | |
| | | type SubscribeTask struct { |
| | | ctx context.Context |
| | | conf *models.Subscribe |
| | | faceList []*vo.FaceObject |
| | | mutex sync.Mutex |
| | | lastApeExecTime int64 |
| | | } |
| | | func AddPersonNotification(person *vo.PersonObject) { |
| | | TaskProcMap.Range(func(key, value interface{}) bool { |
| | | value.(TaskProcInfo).task.addPerson(person) |
| | | return true |
| | | }) |
| | | |
| | | func (task *SubscribeTask) Start() { |
| | | TaskWaitGroup.Add(1) |
| | | Timer := time.NewTicker(time.Duration(task.conf.Ext.ReportInterval) * time.Second) |
| | | |
| | | for { |
| | | select { |
| | | case <-task.ctx.Done(): |
| | | TaskProcMap.Delete(task.conf.Id) |
| | | TaskWaitGroup.Done() |
| | | logger.Warn("Subscribe task stop!!! id:%s", task.conf.Id) |
| | | return |
| | | case <-Timer.C: |
| | | task.Notify() |
| | | } |
| | | } |
| | | } |
| | | |
| | | func (task *SubscribeTask) AddFace(face *vo.FaceObject) { |
| | | if !strings.Contains(task.conf.Ext.SubscribeDetail, vo.SubscribeFace) { |
| | | return |
| | | } |
| | | |
| | | task.mutex.Lock() |
| | | defer task.mutex.Unlock() |
| | | |
| | | task.faceList = append(task.faceList, face) |
| | | } |
| | | |
| | | func (task *SubscribeTask) Notify() { |
| | | subDetails := strings.Split(task.conf.Ext.SubscribeDetail, ",") |
| | | for _, subType := range subDetails { |
| | | triggerTime := time.Now().Format("20060102150405") |
| | | |
| | | // 上报设备 控制设备通知控制频率, 避免频繁, 上级一般会下发秒级的任务, 但设备不会频繁更新, |
| | | if subType == vo.SubscribeApe && time.Now().Unix()-task.lastApeExecTime > 60*10 { |
| | | task.lastApeExecTime = time.Now().Unix() |
| | | |
| | | var notification = vo.DeviceNotification{ |
| | | NotificationID: triggerTime + snowflake.GenerateIdStr(), |
| | | SubscribeID: task.conf.Id, |
| | | Title: task.conf.Ext.Title, |
| | | TriggerTime: triggerTime, |
| | | ExecuteOperation: 1, |
| | | } |
| | | |
| | | var ids []string |
| | | var apeMod models.Ape |
| | | apeList, err := apeMod.FindAll() |
| | | if err != nil { |
| | | logger.Warn(err.Error()) |
| | | continue |
| | | } else { |
| | | for idx, _ := range apeList { |
| | | ids = append(ids, apeList[idx].Id) |
| | | notification.DeviceList.APEObject = append(notification.DeviceList.APEObject, apeList[idx].Ext) |
| | | } |
| | | } |
| | | |
| | | notification.InfoIDs = strings.Join(ids, ";") |
| | | |
| | | var req vo.RequestSubscribeNotification |
| | | req.SubscribeNotificationListObject.SubscribeNotificationObject = append(req.SubscribeNotificationListObject.SubscribeNotificationObject, notification) |
| | | b, _ := json.Marshal(req) |
| | | client.Notify(task.conf.Ext.ReceiveAddr, b) |
| | | continue |
| | | } |
| | | |
| | | // 上报人脸 |
| | | if subType == vo.SubscribeFace { |
| | | if len(task.faceList) == 0 { |
| | | return |
| | | } |
| | | |
| | | var notification = vo.FaceNotification{ |
| | | NotificationID: triggerTime + snowflake.GenerateIdStr(), |
| | | SubscribeID: task.conf.Id, |
| | | Title: task.conf.Ext.Title, |
| | | TriggerTime: triggerTime, |
| | | ExecuteOperation: 1, |
| | | } |
| | | var ids []string |
| | | for idx, _ := range task.faceList { |
| | | // 按海康示例填充修改字段 |
| | | task.faceList[idx].ShotTime = task.faceList[idx].FaceAppearTime |
| | | task.faceList[idx].EntryTime = triggerTime |
| | | for i, _ := range task.faceList[idx].SubImageList.SubImageInfoObject { |
| | | task.faceList[idx].SubImageList.SubImageInfoObject[i].EventSort = 2 |
| | | } |
| | | |
| | | ids = append(ids, task.faceList[idx].FaceID) |
| | | notification.FaceObjectList.FaceObject = append(notification.FaceObjectList.FaceObject, *task.faceList[idx]) |
| | | } |
| | | |
| | | notification.InfoIDs = strings.Join(ids, ";") |
| | | |
| | | task.mutex.Lock() |
| | | task.faceList = []*vo.FaceObject{} |
| | | task.mutex.Unlock() |
| | | |
| | | var req vo.RequestSubscribeNotification |
| | | req.SubscribeNotificationListObject.SubscribeNotificationObject = append(req.SubscribeNotificationListObject.SubscribeNotificationObject, notification) |
| | | b, _ := json.Marshal(req) |
| | | client.Notify(task.conf.Ext.ReceiveAddr, b) |
| | | } |
| | | } |
| | | logger.Debug("Add Person Notification. personId: %s", person.PersonID) |
| | | } |