From 332fc6ad5edca596ecd23876aa9db7452b45f804 Mon Sep 17 00:00:00 2001 From: zhangzengfei <zhangzengfei@smartai.com> Date: 星期三, 29 五月 2024 02:58:21 +0800 Subject: [PATCH] 添加人员抓拍处理 --- service/subscribe.go | 215 +++++++++++++---------------------------------------- 1 files changed, 52 insertions(+), 163 deletions(-) diff --git a/service/subscribe.go b/service/subscribe.go index 70853cf..cbdeb97 100644 --- a/service/subscribe.go +++ b/service/subscribe.go @@ -2,16 +2,11 @@ import ( "context" - "encoding/json" - "gat1400Exchange/config" - "strings" "sync" - "time" - "gat1400Exchange/client" + "gat1400Exchange/config" "gat1400Exchange/models" "gat1400Exchange/pkg/logger" - "gat1400Exchange/pkg/snowflake" "gat1400Exchange/vo" ) @@ -20,68 +15,76 @@ type TaskProcInfo struct { cancel context.CancelFunc - task *SubscribeTask + task *NotificationTask } -func (t *TaskProcInfo) stop() { +func (t TaskProcInfo) stop() { t.cancel() t.task = nil } -func UpdateTaskProcs(subId string, msgType int, conf *models.Subscribe) { - logger.Debug("Receive sub msg: %s ", subId) +func InitSubscribeNotificationTasks() { + var s models.Subscribe + subList, err := s.FindByFromId(config.ClientConf.ServerId) + if err != nil { + logger.Error("Get subscribe info error, server id %s, %s", config.ClientConf.ServerId, err.Error()) + return + } + + for idx := range subList { + if subList[idx].Status != 0 { + continue + } + + CreateNotificationTask(&subList[idx]) + } + + return +} + +func UpdateNotificationTask(subId string, msgType int, conf *models.Subscribe) { + logger.Debug("Receive update notify task msg: %s %d", subId, msgType) proc, isExist := TaskProcMap.Load(subId) switch msgType { case vo.Msg_Type_Create_Subscribe, vo.Msg_Type_Update_Subscribe: if isExist { - logger.Debug("鏇存柊浠诲姟, 閫�鍑洪噸鏂板紑濮�") - proc.(TaskProcInfo).cancel() + logger.Debug("Update notify task, restart after exit") + proc.(TaskProcInfo).stop() } - CreateTask(conf) + CreateNotificationTask(conf) case vo.Msg_Type_Delete_Subscribe: if !isExist { return } // 鍏抽棴浠诲姟, 骞跺垹闄� - proc.(TaskProcInfo).cancel() + proc.(TaskProcInfo).stop() default: - logger.Warn("鏈煡鐨勬秷鎭被鍨�") + logger.Warn("Invalid msg type %d", msgType) } } -func InitSubscribeTask() { - var s models.Subscribe - subList, err := s.FindByFromId(config.ClientConf.ServerId) - if err != nil { - logger.Error("Find subscribe error, server id %s, %v", config.ClientConf.ServerId, err) - return +func CreateNotificationTask(conf *models.Subscribe) { + logger.Debug("Add subscribe notification task, %s, %s", conf.Id, conf.Ext.Title) + + ctx, cancel := context.WithCancel(context.Background()) + task := &NotificationTask{ + ctx: ctx, + conf: conf, } - for idx, _ := range subList { - if subList[idx].Status != 0 { - continue - } - - CreateTask(&subList[idx]) - } - - return -} - -func AddFaceNotification(face *vo.FaceObject) { - TaskProcMap.Range(func(key, value interface{}) bool { - value.(TaskProcInfo).task.AddFace(face) - return true + TaskProcMap.Store(conf.Id, TaskProcInfo{ + cancel: cancel, + task: task, }) - logger.Debug("Add Face Notification.faceId: %s, faceFeath: %s", face.IDNumber, face.OtherFeature) + go task.start() } -func StopSubscribeTask() { +func StopNotificationTasks() { TaskProcMap.Range(func(key, value interface{}) bool { value.(TaskProcInfo).cancel() return true @@ -91,134 +94,20 @@ TaskWaitGroup.Wait() } -func CreateTask(conf *models.Subscribe) { - logger.Debug("娣诲姞璁㈤槄浠诲姟,%s", conf.Id) - - ctx, cancel := context.WithCancel(context.Background()) - newTask := &SubscribeTask{ - ctx: ctx, - conf: conf, - } - - TaskProcMap.Store(conf.Id, TaskProcInfo{ - cancel: cancel, - task: newTask, +func AddFaceNotification(face *vo.FaceObject) { + TaskProcMap.Range(func(key, value interface{}) bool { + value.(TaskProcInfo).task.addFace(face) + return true }) - go newTask.Start() + logger.Debug("Add Face Notification. faceId: %s", face.FaceID) } -type SubscribeTask struct { - ctx context.Context - conf *models.Subscribe - faceList []*vo.FaceObject - mutex sync.Mutex - lastApeExecTime int64 -} +func AddPersonNotification(person *vo.PersonObject) { + TaskProcMap.Range(func(key, value interface{}) bool { + value.(TaskProcInfo).task.addPerson(person) + return true + }) -func (task *SubscribeTask) Start() { - TaskWaitGroup.Add(1) - Timer := time.NewTicker(time.Duration(task.conf.Ext.ReportInterval) * time.Second) - - for { - select { - case <-task.ctx.Done(): - TaskProcMap.Delete(task.conf.Id) - TaskWaitGroup.Done() - logger.Warn("Subscribe task stop!!! id:%s", task.conf.Id) - return - case <-Timer.C: - task.Notify() - } - } -} - -func (task *SubscribeTask) AddFace(face *vo.FaceObject) { - if !strings.Contains(task.conf.Ext.SubscribeDetail, vo.SubscribeFace) { - return - } - - task.mutex.Lock() - defer task.mutex.Unlock() - - task.faceList = append(task.faceList, face) -} - -func (task *SubscribeTask) Notify() { - subDetails := strings.Split(task.conf.Ext.SubscribeDetail, ",") - for _, subType := range subDetails { - triggerTime := time.Now().Format("20060102150405") - - // 涓婃姤璁惧 鎺у埗璁惧閫氱煡鎺у埗棰戠巼, 閬垮厤棰戠箒, 涓婄骇涓�鑸細涓嬪彂绉掔骇鐨勪换鍔�, 浣嗚澶囦笉浼氶绻佹洿鏂�, - if subType == vo.SubscribeApe && time.Now().Unix()-task.lastApeExecTime > 60*10 { - task.lastApeExecTime = time.Now().Unix() - - var notification = vo.DeviceNotification{ - NotificationID: triggerTime + snowflake.GenerateIdStr(), - SubscribeID: task.conf.Id, - Title: task.conf.Ext.Title, - TriggerTime: triggerTime, - ExecuteOperation: 1, - } - - var ids []string - var apeMod models.Ape - apeList, err := apeMod.FindAll() - if err != nil { - logger.Warn(err.Error()) - continue - } else { - for idx, _ := range apeList { - ids = append(ids, apeList[idx].Id) - notification.DeviceList.APEObject = append(notification.DeviceList.APEObject, apeList[idx].Ext) - } - } - - notification.InfoIDs = strings.Join(ids, ";") - - var req vo.RequestSubscribeNotification - req.SubscribeNotificationListObject.SubscribeNotificationObject = append(req.SubscribeNotificationListObject.SubscribeNotificationObject, notification) - b, _ := json.Marshal(req) - client.Notify(task.conf.Ext.ReceiveAddr, b) - continue - } - - // 涓婃姤浜鸿劯 - if subType == vo.SubscribeFace { - if len(task.faceList) == 0 { - return - } - - var notification = vo.FaceNotification{ - NotificationID: triggerTime + snowflake.GenerateIdStr(), - SubscribeID: task.conf.Id, - Title: task.conf.Ext.Title, - TriggerTime: triggerTime, - ExecuteOperation: 1, - } - var ids []string - for idx, _ := range task.faceList { - // 鎸夋捣搴风ず渚嬪~鍏呬慨鏀瑰瓧娈� - task.faceList[idx].ShotTime = task.faceList[idx].FaceAppearTime - task.faceList[idx].EntryTime = triggerTime - for i, _ := range task.faceList[idx].SubImageList.SubImageInfoObject { - task.faceList[idx].SubImageList.SubImageInfoObject[i].EventSort = 2 - } - - ids = append(ids, task.faceList[idx].FaceID) - notification.FaceObjectList.FaceObject = append(notification.FaceObjectList.FaceObject, *task.faceList[idx]) - } - - notification.InfoIDs = strings.Join(ids, ";") - - task.mutex.Lock() - task.faceList = []*vo.FaceObject{} - task.mutex.Unlock() - - var req vo.RequestSubscribeNotification - req.SubscribeNotificationListObject.SubscribeNotificationObject = append(req.SubscribeNotificationListObject.SubscribeNotificationObject, notification) - b, _ := json.Marshal(req) - client.Notify(task.conf.Ext.ReceiveAddr, b) - } - } + logger.Debug("Add Person Notification. personId: %s", person.PersonID) } -- Gitblit v1.8.0