From 332fc6ad5edca596ecd23876aa9db7452b45f804 Mon Sep 17 00:00:00 2001 From: zhangzengfei <zhangzengfei@smartai.com> Date: 星期三, 29 五月 2024 02:58:21 +0800 Subject: [PATCH] 添加人员抓拍处理 --- controller/captureCtl.go | 58 +++-- service/subscribe.go | 215 ++++-------------- service/resend.go | 8 client/faces.go | 27 ++ repository/captureRepo.go | 121 ++++++++++ client/notify.go | 7 main.go | 4 service/notification.go | 160 ++++++++++++++ repository/subscribeRepo.go | 6 9 files changed, 407 insertions(+), 199 deletions(-) diff --git a/client/faces.go b/client/faces.go index c4f1f17..42e3827 100644 --- a/client/faces.go +++ b/client/faces.go @@ -11,7 +11,8 @@ ) const ( - FacesUrI = "/VIID/Faces" + FacesUrI = "/VIID/Faces" + PersonsUrI = "/VIID/Persons" ) func FaceCapture(msg []byte) int { @@ -36,3 +37,27 @@ logger.Debug("Post faces success.") return stat.StatusCode } + +func PersonCapture(msg []byte) int { + if clientStatus != vo.StatusSuccess { + return clientStatus + } + + url := fmt.Sprintf("%s://%s:%s%s", config.ClientConf.Proto, config.ClientConf.ServerAddr, config.ClientConf.ServerPort, PersonsUrI) + rsp, err := util.HttpPost(url, headers, msg) + if err != nil { + logger.Warn("Post person failed, %s", err.Error()) + return vo.StatusOtherError + } + + var stat vo.ResponseStatus + err = json.Unmarshal(rsp, &stat) + if err != nil { + logger.Warn("Post person response unmarshal failed, %s", err.Error()) + return vo.StatusOtherError + } + + logger.Debug("Post person success.") + + return stat.StatusCode +} diff --git a/client/notify.go b/client/notify.go index e1a9336..66e9584 100644 --- a/client/notify.go +++ b/client/notify.go @@ -49,7 +49,7 @@ return vo.StatusOtherError } - logger.Debug("Post notification success.") + logger.Debug("Post subscribe success.") return vo.StatusSuccess } @@ -69,7 +69,7 @@ return vo.StatusOtherError } - logger.Debug("Post notification success.") + logger.Debug("put subscribe success.") return vo.StatusSuccess } @@ -77,8 +77,7 @@ func GetSubscribes(url string) ([]byte, error) { rsp, err := util.HttpGet(url, headers) if err != nil { - logger.Warn("Put subscribe failed, %s", err.Error()) - + logger.Warn("Get subscribe failed, %s", err.Error()) } return rsp, err diff --git a/controller/captureCtl.go b/controller/captureCtl.go index a6fc5cb..8f3d784 100644 --- a/controller/captureCtl.go +++ b/controller/captureCtl.go @@ -2,14 +2,17 @@ import ( "encoding/base64" - "gat1400Exchange/config" - "gat1400Exchange/pkg" - "gat1400Exchange/service" + "fmt" + "math/rand" "net/http" + "strconv" "time" + "gat1400Exchange/config" + "gat1400Exchange/pkg" "gat1400Exchange/pkg/logger" "gat1400Exchange/repository" + "gat1400Exchange/service" "gat1400Exchange/vo" "github.com/gin-gonic/gin" @@ -45,7 +48,7 @@ // 濡傛灉寮�鍚簡涓嬬骇, 韬唤搴旇鏄秷鎭唬鐞�, 涓嶅啀杞彂鍒版湇鍔″櫒 if config.ClientConf.Enable && config.ServeConf.Role == "agent" { - go a.Repository.VIIDMsgForward(&req) + go a.Repository.VIIDFaceMsgForward(&req) } else if config.ServeConf.Role == "cascade" { go service.AddFaceNotification(&face) } @@ -80,15 +83,17 @@ videoLabel := req.VideoLabelListObject.VideoLabelObject[0] logger.Debug("Receive new message, Id:%s Ip:%s ", videoLabel.VideoLabelID, c.RemoteIP()) - // 杞汉鑴告秷鎭� - var face vo.FaceObject - face.FaceID = videoLabel.VideoLabelID - face.InfoKind = 1 - face.SourceID = videoLabel.VideoImageID - face.DeviceID = videoLabel.IVADeviceID - face.LocationMarkTime = videoLabel.CreateTimeAbs - face.FaceAppearTime = videoLabel.BehaviorAnalysisObject.BehaviorBeginTime - face.FaceDisAppearTime = videoLabel.BehaviorAnalysisObject.BehaviorEndTime + // 杞汉鍛樻秷鎭� + var person vo.PersonObject + person.PersonID = videoLabel.VideoLabelID[0:41] + "01" + videoLabel.VideoLabelID[43:48] + person.InfoKind = "1" + person.DeviceID = videoLabel.IVADeviceID + person.SourceID = videoLabel.VideoImageID + person.LocationMarkTime = videoLabel.BehaviorAnalysisObject.BehaviorBeginTime + person.PersonAppearTime = videoLabel.BehaviorAnalysisObject.BehaviorBeginTime + person.PersonDisAppearTime = videoLabel.BehaviorAnalysisObject.BehaviorEndTime + person.IsDriver = 2 + person.IsCriminalInvolved = 2 var hasTargetImage bool var bgImageWith, bgImageHeight int @@ -104,8 +109,8 @@ bgImage = &videoLabel.SubImageList.SubImageInfoObject[idx] } - face.SubImageList.SubImageInfoObject = append( - face.SubImageList.SubImageInfoObject, + person.SubImageList.SubImageInfoObject = append( + person.SubImageList.SubImageInfoObject, videoLabel.SubImageList.SubImageInfoObject[idx], ) } @@ -129,8 +134,15 @@ return } + imageId := bgImage.ImageID[37:41] + imageNumber, err := strconv.Atoi(imageId) + if err != nil { + imageNumber = rand.Intn(90000) + 10000 + } else { + imageNumber++ + } var subImageInfo = vo.SubImageInfoObject{ - ImageID: bgImage.ImageID + "1", + ImageID: fmt.Sprintf("%s%d", bgImage.ImageID[0:37], imageNumber), EventSort: 10, DeviceID: bgImage.DeviceID, StoragePath: "", @@ -141,23 +153,23 @@ Height: subRect.Top - subRect.Bottom, Data: base64.StdEncoding.EncodeToString(subImage), } - face.SubImageList.SubImageInfoObject = append( - face.SubImageList.SubImageInfoObject, + person.SubImageList.SubImageInfoObject = append( + person.SubImageList.SubImageInfoObject, subImageInfo, ) } // 濡傛灉寮�鍚簡涓嬬骇, 韬唤搴旇鏄秷鎭唬鐞�, 涓嶅啀杞彂鍒版湇鍔″櫒 if config.ClientConf.Enable && config.ServeConf.Role == "agent" { - var faceObjList vo.RequestFaceList - faceObjList.FaceListObject.FaceObject = append(faceObjList.FaceListObject.FaceObject, face) - go a.Repository.VIIDMsgForward(&faceObjList) + var personList vo.RequestPersonList + personList.PersonListObject.PersonObject = append(personList.PersonListObject.PersonObject, person) + go a.Repository.VIIDPersonMsgForward(&personList) } else if config.ServeConf.Role == "cascade" { - go service.AddFaceNotification(&face) + go service.AddPersonNotification(&person) } if config.ForwardConf.SyncServer != "" { - go a.Repository.FaceForward([]vo.FaceObject{face}) + go a.Repository.PersonForward([]vo.PersonObject{person}) } rspMsg := vo.ResponseStatus{ diff --git a/main.go b/main.go index d055edf..8130d67 100644 --- a/main.go +++ b/main.go @@ -44,7 +44,7 @@ // 鍚姩缃戠粶瑙嗛瀛楃鍙犲姞鍣ㄦ湇鍔� go service.StartNVCSServer() - go service.InitSubscribeTask() + go service.InitSubscribeNotificationTasks() // 鍚姩瀹氭椂浠诲姟 cron.Init() @@ -77,7 +77,7 @@ logger.Error("Server forced to shutdown:", err) } - service.StopSubscribeTask() + service.StopNotificationTasks() logger.Info("Server exiting!") } diff --git a/repository/captureRepo.go b/repository/captureRepo.go index d0702b5..8572735 100644 --- a/repository/captureRepo.go +++ b/repository/captureRepo.go @@ -121,6 +121,91 @@ return } +func (c CaptureRepository) PersonForward(personList []vo.PersonObject) { + var err error + + if personList == nil || len(personList) == 0 { + logger.Warn("FaceList is nil") + return + } + + for _, v := range personList { + if v.SubImageList.SubImageInfoObject == nil { + logger.Warn("SubImageInfoObject is nil") + continue + } + var deviceId = v.DeviceID + var targetId = v.PersonID + var bgImageStr string + var bgImageBytes, faceImageBytes []byte = nil, nil + + // 鑾峰彇澶у浘, 鐩墠娴峰悍鐨勫皬鍥惧垎杈ㄧ巼澶綆 + for _, image := range v.SubImageList.SubImageInfoObject { + if image.Type != "14" { + continue + } + + if len(image.Data) > 0 { + if len(image.Data) > len(bgImageStr) { + bgImageStr = image.Data + } + } else if image.StoragePath != "" { + imgData, err := util.ImageDownload(image.StoragePath, nil) + if err != nil { + logger.Warn("Image download failure, %s", err.Error()) + } else { + bgImageBytes = imgData + bgImageStr = "///" + } + } + } + + if bgImageBytes == nil { + bgImageBytes, err = base64.StdEncoding.DecodeString(bgImageStr) + if err != nil { + logger.Warn("Decode Image Base64 String failure, %s", err.Error()) + continue + } + } + + // 杞彂鍥惧儚 + logger.Debug("Prepare forward person image, deviceId:%s, image len:%d, server:%s", deviceId, len(bgImageBytes), config.ForwardConf.SyncServer) + if deviceId != "" && bgImageStr != "" && config.ForwardConf.SyncServer != "" { + pd := c.PackPushDataV2(deviceId, targetId, v.PersonAppearTime, bgImageBytes, faceImageBytes) + if pd == nil { + return + } + + // 澶勭悊姊帶濉啓鐨勬ゼ灞備俊鎭� 鏆傛椂浣跨敤oherFeature瀛楁 + if v.BehaviorDescription != "" { + pd.CameraFloor = v.BehaviorDescription + } + + // 灏濊瘯浠巉aceId鎻愬彇妤煎眰 + if pd.CameraFloor == "" && config.ClientConf.AddFloorToFaceId { + pd.CameraFloor, _ = pkg.ParseFloorFromId(v.PersonID) + } + //logger.Debug("device %s, CameraFloor:%s", deviceId, pd.CameraFloor) + + payload, err := json.Marshal(pd) + if err != nil { + logger.Warn("Marshal error, %s", err.Error()) + return + } + + if !util.SendData(payload, config.ForwardConf.SyncServer) { + cacheItem, _ := json.Marshal(pd) + c.CacheData(cacheItem, "basic") + logger.Warn("The data forwarding failed, adding to local cache.") + } else { + logger.Debug("The data forwarding successful. deviceId:%s", deviceId) + } + } + } + + return +} + func (c CaptureRepository) PackPushDataV2(deviceId, faceId, appearTime string, bgImgBytes, faceImgBytes []byte) *vo.PushDataInfoV2 { var pd = new(vo.PushDataInfoV2) var floor string @@ -165,7 +250,7 @@ cacheItem.Save() } -func (c CaptureRepository) VIIDMsgForward(msg *vo.RequestFaceList) { +func (c CaptureRepository) VIIDFaceMsgForward(msg *vo.RequestFaceList) { faceInfo := msg.FaceListObject.FaceObject[0] // 鍖归厤妤煎眰 faceAppearTime, err := time.ParseInLocation("20060102150405", faceInfo.FaceAppearTime, time.Local) @@ -190,7 +275,39 @@ b, _ := json.Marshal(msg) if client.FaceCapture(b) != vo.StatusSuccess { cacheItem, _ := json.Marshal(msg) - c.CacheData(cacheItem, "1400") + c.CacheData(cacheItem, "1400-face") + logger.Warn("The data forwarding failed, adding to local cache.") + } + + return +} + +func (c CaptureRepository) VIIDPersonMsgForward(msg *vo.RequestPersonList) { + person := msg.PersonListObject.PersonObject[0] + // 鍖归厤妤煎眰 + appearTime, err := time.ParseInLocation("20060102150405", person.PersonAppearTime, time.Local) + if err != nil { + logger.Warn("Parse face appear time error, %s", err.Error()) + appearTime = time.Now() + } + + var devPos models.Positions + _ = devPos.FindPositionByTime(appearTime.Unix() + 5) // 鍔�5绉掔數姊叧闂ㄧ殑鏃堕棿 + if devPos.Pos == "" { + devPos.Pos = "1F" + } + + for idx, v := range msg.PersonListObject.PersonObject { + msg.PersonListObject.PersonObject[idx].BehaviorDescription = devPos.Pos + if config.ClientConf.AddFloorToFaceId { + msg.PersonListObject.PersonObject[idx].PersonID = pkg.GenerateFaceIdContainFloor(v.PersonID, devPos.Pos) + } + } + + b, _ := json.Marshal(msg) + if client.PersonCapture(b) != vo.StatusSuccess { + cacheItem, _ := json.Marshal(msg) + c.CacheData(cacheItem, "1400-person") logger.Warn("The data forwarding failed, adding to local cache.") } diff --git a/repository/subscribeRepo.go b/repository/subscribeRepo.go index 7652195..5f3d256 100644 --- a/repository/subscribeRepo.go +++ b/repository/subscribeRepo.go @@ -148,7 +148,7 @@ return err } - service.UpdateTaskProcs(sub.Id, vo.Msg_Type_Create_Subscribe, &sub) + service.UpdateNotificationTask(sub.Id, vo.Msg_Type_Create_Subscribe, &sub) return err } @@ -163,7 +163,7 @@ sub.Status = subscribe.SubscribeStatus sub.Ext = *subscribe - service.UpdateTaskProcs(subscribe.SubscribeID, vo.Msg_Type_Update_Subscribe, &sub) + service.UpdateNotificationTask(subscribe.SubscribeID, vo.Msg_Type_Update_Subscribe, &sub) return sub.Save() } @@ -175,7 +175,7 @@ return err } - service.UpdateTaskProcs(id, vo.Msg_Type_Delete_Subscribe, nil) + service.UpdateNotificationTask(id, vo.Msg_Type_Delete_Subscribe, nil) return err } diff --git a/service/notification.go b/service/notification.go new file mode 100644 index 0000000..1605025 --- /dev/null +++ b/service/notification.go @@ -0,0 +1,160 @@ +package service + +import ( + "context" + "encoding/json" + "gat1400Exchange/pkg/logger" + "strings" + "sync" + "time" + + "gat1400Exchange/client" + "gat1400Exchange/models" + "gat1400Exchange/pkg/snowflake" + "gat1400Exchange/vo" +) + +type NotificationTask struct { + ctx context.Context + conf *models.Subscribe + faceList []*vo.FaceObject + personList []*vo.PersonObject + mutex sync.Mutex + lastApeExecTime int64 +} + +func (task *NotificationTask) start() { + TaskWaitGroup.Add(1) + Timer := time.NewTicker(time.Duration(task.conf.Ext.ReportInterval) * time.Second) + + for { + select { + case <-task.ctx.Done(): + TaskProcMap.Delete(task.conf.Id) + TaskWaitGroup.Done() + logger.Warn("Subscribe task stop!!! id:%s", task.conf.Id) + return + case <-Timer.C: + task.notify() + } + } +} + +func (task *NotificationTask) addFace(face *vo.FaceObject) { + if !strings.Contains(task.conf.Ext.SubscribeDetail, vo.SubscribeFace) { + return + } + + task.mutex.Lock() + defer task.mutex.Unlock() + + task.faceList = append(task.faceList, face) +} + +func (task *NotificationTask) addPerson(person *vo.PersonObject) { + if !strings.Contains(task.conf.Ext.SubscribeDetail, vo.SubscribePerson) { + return + } + + task.mutex.Lock() + defer task.mutex.Unlock() + + task.personList = append(task.personList, person) +} + +func (task *NotificationTask) notify() { + subDetails := strings.Split(task.conf.Ext.SubscribeDetail, ",") + for _, subType := range subDetails { + var msg *vo.RequestSubscribeNotification + switch subType { + case vo.SubscribeApe: + msg = task.packDeviceNotificationMsg() + case vo.SubscribeFace: + msg = task.packFaceNotificationMsg() + case vo.SubscribePerson: + } + + if msg != nil { + b, _ := json.Marshal(msg) + client.Notify(task.conf.Ext.ReceiveAddr, b) + } + } +} + +func (task *NotificationTask) packFaceNotificationMsg() *vo.RequestSubscribeNotification { + triggerTime := time.Now().Format("20060102150405") + if len(task.faceList) == 0 { + return nil + } + + var notification = vo.FaceNotification{ + NotificationID: triggerTime + snowflake.GenerateIdStr(), + SubscribeID: task.conf.Id, + Title: task.conf.Ext.Title, + TriggerTime: triggerTime, + ExecuteOperation: 1, + } + + var ids []string + for idx, _ := range task.faceList { + // 鎸夋捣搴风ず渚嬪~鍏呬慨鏀瑰瓧娈� + task.faceList[idx].ShotTime = task.faceList[idx].FaceAppearTime + task.faceList[idx].EntryTime = triggerTime + for i, _ := range task.faceList[idx].SubImageList.SubImageInfoObject { + task.faceList[idx].SubImageList.SubImageInfoObject[i].EventSort = 2 + } + + ids = append(ids, task.faceList[idx].FaceID) + notification.FaceObjectList.FaceObject = append(notification.FaceObjectList.FaceObject, *task.faceList[idx]) + } + + notification.InfoIDs = strings.Join(ids, ";") + + task.mutex.Lock() + task.faceList = []*vo.FaceObject{} + task.mutex.Unlock() + + var req vo.RequestSubscribeNotification + req.SubscribeNotificationListObject.SubscribeNotificationObject[0] = notification + + return &req +} + +func (task *NotificationTask) packDeviceNotificationMsg() *vo.RequestSubscribeNotification { + // 涓婃姤璁惧 鎺у埗璁惧閫氱煡鎺у埗棰戠巼, 閬垮厤棰戠箒, 涓婄骇涓�鑸細涓嬪彂绉掔骇鐨勪换鍔�, 浣嗚澶囦笉浼氶绻佹洿鏂�, + if time.Now().Unix()-task.lastApeExecTime < 60*10 { + return nil + } + + triggerTime := time.Now().Format("20060102150405") + + task.lastApeExecTime = time.Now().Unix() + + var notification = vo.DeviceNotification{ + NotificationID: triggerTime + snowflake.GenerateIdStr(), + SubscribeID: task.conf.Id, + Title: task.conf.Ext.Title, + TriggerTime: triggerTime, + ExecuteOperation: 1, + } + + var ids []string + var apeMod models.Ape + apeList, err := apeMod.FindAll() + if err != nil { + logger.Warn(err.Error()) + return nil + } else { + for idx, _ := range apeList { + ids = append(ids, apeList[idx].Id) + notification.DeviceList.APEObject = append(notification.DeviceList.APEObject, apeList[idx].Ext) + } + } + + notification.InfoIDs = strings.Join(ids, ";") + + var req vo.RequestSubscribeNotification + req.SubscribeNotificationListObject.SubscribeNotificationObject[0] = notification + + return &req +} diff --git a/service/resend.go b/service/resend.go index ade60a8..667b237 100644 --- a/service/resend.go +++ b/service/resend.go @@ -29,12 +29,18 @@ logger.Error(err.Error()) return } - if c.Type == "1400" { + if c.Type == "1400-face" { if client.FaceCapture([]byte(c.Data)) != vo.StatusSuccess { c.UpdateRetryCount() logger.Warn("The data resend failed. retry count %d", c.Retry+1) return } + } else if c.Type == "1400-person" { + if client.PersonCapture([]byte(c.Data)) != vo.StatusSuccess { + c.UpdateRetryCount() + logger.Warn("The data resend failed. retry count %d", c.Retry+1) + return + } } else { if !util.SendData([]byte(c.Data), config.ForwardConf.SyncServer) { c.UpdateRetryCount() diff --git a/service/subscribe.go b/service/subscribe.go index 70853cf..cbdeb97 100644 --- a/service/subscribe.go +++ b/service/subscribe.go @@ -2,16 +2,11 @@ import ( "context" - "encoding/json" - "gat1400Exchange/config" - "strings" "sync" - "time" - "gat1400Exchange/client" + "gat1400Exchange/config" "gat1400Exchange/models" "gat1400Exchange/pkg/logger" - "gat1400Exchange/pkg/snowflake" "gat1400Exchange/vo" ) @@ -20,68 +15,76 @@ type TaskProcInfo struct { cancel context.CancelFunc - task *SubscribeTask + task *NotificationTask } -func (t *TaskProcInfo) stop() { +func (t TaskProcInfo) stop() { t.cancel() t.task = nil } -func UpdateTaskProcs(subId string, msgType int, conf *models.Subscribe) { - logger.Debug("Receive sub msg: %s ", subId) +func InitSubscribeNotificationTasks() { + var s models.Subscribe + subList, err := s.FindByFromId(config.ClientConf.ServerId) + if err != nil { + logger.Error("Get subscribe info error, server id %s, %s", config.ClientConf.ServerId, err.Error()) + return + } + + for idx := range subList { + if subList[idx].Status != 0 { + continue + } + + CreateNotificationTask(&subList[idx]) + } + + return +} + +func UpdateNotificationTask(subId string, msgType int, conf *models.Subscribe) { + logger.Debug("Receive update notify task msg: %s %d", subId, msgType) proc, isExist := TaskProcMap.Load(subId) switch msgType { case vo.Msg_Type_Create_Subscribe, vo.Msg_Type_Update_Subscribe: if isExist { - logger.Debug("鏇存柊浠诲姟, 閫�鍑洪噸鏂板紑濮�") - proc.(TaskProcInfo).cancel() + logger.Debug("Update notify task, restart after exit") + proc.(TaskProcInfo).stop() } - CreateTask(conf) + CreateNotificationTask(conf) case vo.Msg_Type_Delete_Subscribe: if !isExist { return } // 鍏抽棴浠诲姟, 骞跺垹闄� - proc.(TaskProcInfo).cancel() + proc.(TaskProcInfo).stop() default: - logger.Warn("鏈煡鐨勬秷鎭被鍨�") + logger.Warn("Invalid msg type %d", msgType) } } -func InitSubscribeTask() { - var s models.Subscribe - subList, err := s.FindByFromId(config.ClientConf.ServerId) - if err != nil { - logger.Error("Find subscribe error, server id %s, %v", config.ClientConf.ServerId, err) - return +func CreateNotificationTask(conf *models.Subscribe) { + logger.Debug("Add subscribe notification task, %s, %s", conf.Id, conf.Ext.Title) + + ctx, cancel := context.WithCancel(context.Background()) + task := &NotificationTask{ + ctx: ctx, + conf: conf, } - for idx, _ := range subList { - if subList[idx].Status != 0 { - continue - } - - CreateTask(&subList[idx]) - } - - return -} - -func AddFaceNotification(face *vo.FaceObject) { - TaskProcMap.Range(func(key, value interface{}) bool { - value.(TaskProcInfo).task.AddFace(face) - return true + TaskProcMap.Store(conf.Id, TaskProcInfo{ + cancel: cancel, + task: task, }) - logger.Debug("Add Face Notification.faceId: %s, faceFeath: %s", face.IDNumber, face.OtherFeature) + go task.start() } -func StopSubscribeTask() { +func StopNotificationTasks() { TaskProcMap.Range(func(key, value interface{}) bool { value.(TaskProcInfo).cancel() return true @@ -91,134 +94,20 @@ TaskWaitGroup.Wait() } -func CreateTask(conf *models.Subscribe) { - logger.Debug("娣诲姞璁㈤槄浠诲姟,%s", conf.Id) - - ctx, cancel := context.WithCancel(context.Background()) - newTask := &SubscribeTask{ - ctx: ctx, - conf: conf, - } - - TaskProcMap.Store(conf.Id, TaskProcInfo{ - cancel: cancel, - task: newTask, +func AddFaceNotification(face *vo.FaceObject) { + TaskProcMap.Range(func(key, value interface{}) bool { + value.(TaskProcInfo).task.addFace(face) + return true }) - go newTask.Start() + logger.Debug("Add Face Notification. faceId: %s", face.FaceID) } -type SubscribeTask struct { - ctx context.Context - conf *models.Subscribe - faceList []*vo.FaceObject - mutex sync.Mutex - lastApeExecTime int64 -} +func AddPersonNotification(person *vo.PersonObject) { + TaskProcMap.Range(func(key, value interface{}) bool { + value.(TaskProcInfo).task.addPerson(person) + return true + }) -func (task *SubscribeTask) Start() { - TaskWaitGroup.Add(1) - Timer := time.NewTicker(time.Duration(task.conf.Ext.ReportInterval) * time.Second) - - for { - select { - case <-task.ctx.Done(): - TaskProcMap.Delete(task.conf.Id) - TaskWaitGroup.Done() - logger.Warn("Subscribe task stop!!! id:%s", task.conf.Id) - return - case <-Timer.C: - task.Notify() - } - } -} - -func (task *SubscribeTask) AddFace(face *vo.FaceObject) { - if !strings.Contains(task.conf.Ext.SubscribeDetail, vo.SubscribeFace) { - return - } - - task.mutex.Lock() - defer task.mutex.Unlock() - - task.faceList = append(task.faceList, face) -} - -func (task *SubscribeTask) Notify() { - subDetails := strings.Split(task.conf.Ext.SubscribeDetail, ",") - for _, subType := range subDetails { - triggerTime := time.Now().Format("20060102150405") - - // 涓婃姤璁惧 鎺у埗璁惧閫氱煡鎺у埗棰戠巼, 閬垮厤棰戠箒, 涓婄骇涓�鑸細涓嬪彂绉掔骇鐨勪换鍔�, 浣嗚澶囦笉浼氶绻佹洿鏂�, - if subType == vo.SubscribeApe && time.Now().Unix()-task.lastApeExecTime > 60*10 { - task.lastApeExecTime = time.Now().Unix() - - var notification = vo.DeviceNotification{ - NotificationID: triggerTime + snowflake.GenerateIdStr(), - SubscribeID: task.conf.Id, - Title: task.conf.Ext.Title, - TriggerTime: triggerTime, - ExecuteOperation: 1, - } - - var ids []string - var apeMod models.Ape - apeList, err := apeMod.FindAll() - if err != nil { - logger.Warn(err.Error()) - continue - } else { - for idx, _ := range apeList { - ids = append(ids, apeList[idx].Id) - notification.DeviceList.APEObject = append(notification.DeviceList.APEObject, apeList[idx].Ext) - } - } - - notification.InfoIDs = strings.Join(ids, ";") - - var req vo.RequestSubscribeNotification - req.SubscribeNotificationListObject.SubscribeNotificationObject = append(req.SubscribeNotificationListObject.SubscribeNotificationObject, notification) - b, _ := json.Marshal(req) - client.Notify(task.conf.Ext.ReceiveAddr, b) - continue - } - - // 涓婃姤浜鸿劯 - if subType == vo.SubscribeFace { - if len(task.faceList) == 0 { - return - } - - var notification = vo.FaceNotification{ - NotificationID: triggerTime + snowflake.GenerateIdStr(), - SubscribeID: task.conf.Id, - Title: task.conf.Ext.Title, - TriggerTime: triggerTime, - ExecuteOperation: 1, - } - var ids []string - for idx, _ := range task.faceList { - // 鎸夋捣搴风ず渚嬪~鍏呬慨鏀瑰瓧娈� - task.faceList[idx].ShotTime = task.faceList[idx].FaceAppearTime - task.faceList[idx].EntryTime = triggerTime - for i, _ := range task.faceList[idx].SubImageList.SubImageInfoObject { - task.faceList[idx].SubImageList.SubImageInfoObject[i].EventSort = 2 - } - - ids = append(ids, task.faceList[idx].FaceID) - notification.FaceObjectList.FaceObject = append(notification.FaceObjectList.FaceObject, *task.faceList[idx]) - } - - notification.InfoIDs = strings.Join(ids, ";") - - task.mutex.Lock() - task.faceList = []*vo.FaceObject{} - task.mutex.Unlock() - - var req vo.RequestSubscribeNotification - req.SubscribeNotificationListObject.SubscribeNotificationObject = append(req.SubscribeNotificationListObject.SubscribeNotificationObject, notification) - b, _ := json.Marshal(req) - client.Notify(task.conf.Ext.ReceiveAddr, b) - } - } + logger.Debug("Add Person Notification. personId: %s", person.PersonID) } -- Gitblit v1.8.0