From 332fc6ad5edca596ecd23876aa9db7452b45f804 Mon Sep 17 00:00:00 2001
From: zhangzengfei <zhangzengfei@smartai.com>
Date: 星期三, 29 五月 2024 02:58:21 +0800
Subject: [PATCH] 添加人员抓拍处理
---
controller/captureCtl.go | 58 +++--
service/subscribe.go | 215 ++++--------------
service/resend.go | 8
client/faces.go | 27 ++
repository/captureRepo.go | 121 ++++++++++
client/notify.go | 7
main.go | 4
service/notification.go | 160 ++++++++++++++
repository/subscribeRepo.go | 6
9 files changed, 407 insertions(+), 199 deletions(-)
diff --git a/client/faces.go b/client/faces.go
index c4f1f17..42e3827 100644
--- a/client/faces.go
+++ b/client/faces.go
@@ -11,7 +11,8 @@
)
const (
- FacesUrI = "/VIID/Faces"
+ FacesUrI = "/VIID/Faces"
+ PersonsUrI = "/VIID/Persons"
)
func FaceCapture(msg []byte) int {
@@ -36,3 +37,27 @@
logger.Debug("Post faces success.")
return stat.StatusCode
}
+
+func PersonCapture(msg []byte) int {
+ if clientStatus != vo.StatusSuccess {
+ return clientStatus
+ }
+
+ url := fmt.Sprintf("%s://%s:%s%s", config.ClientConf.Proto, config.ClientConf.ServerAddr, config.ClientConf.ServerPort, PersonsUrI)
+ rsp, err := util.HttpPost(url, headers, msg)
+ if err != nil {
+ logger.Warn("Post person failed, %s", err.Error())
+ return vo.StatusOtherError
+ }
+
+ var stat vo.ResponseStatus
+ err = json.Unmarshal(rsp, &stat)
+ if err != nil {
+ logger.Warn("Post person response unmarshal failed, %s", err.Error())
+ return vo.StatusOtherError
+ }
+
+ logger.Debug("Post person success.")
+
+ return stat.StatusCode
+}
diff --git a/client/notify.go b/client/notify.go
index e1a9336..66e9584 100644
--- a/client/notify.go
+++ b/client/notify.go
@@ -49,7 +49,7 @@
return vo.StatusOtherError
}
- logger.Debug("Post notification success.")
+ logger.Debug("Post subscribe success.")
return vo.StatusSuccess
}
@@ -69,7 +69,7 @@
return vo.StatusOtherError
}
- logger.Debug("Post notification success.")
+ logger.Debug("put subscribe success.")
return vo.StatusSuccess
}
@@ -77,8 +77,7 @@
func GetSubscribes(url string) ([]byte, error) {
rsp, err := util.HttpGet(url, headers)
if err != nil {
- logger.Warn("Put subscribe failed, %s", err.Error())
-
+ logger.Warn("Get subscribe failed, %s", err.Error())
}
return rsp, err
diff --git a/controller/captureCtl.go b/controller/captureCtl.go
index a6fc5cb..8f3d784 100644
--- a/controller/captureCtl.go
+++ b/controller/captureCtl.go
@@ -2,14 +2,17 @@
import (
"encoding/base64"
- "gat1400Exchange/config"
- "gat1400Exchange/pkg"
- "gat1400Exchange/service"
+ "fmt"
+ "math/rand"
"net/http"
+ "strconv"
"time"
+ "gat1400Exchange/config"
+ "gat1400Exchange/pkg"
"gat1400Exchange/pkg/logger"
"gat1400Exchange/repository"
+ "gat1400Exchange/service"
"gat1400Exchange/vo"
"github.com/gin-gonic/gin"
@@ -45,7 +48,7 @@
// 濡傛灉寮�鍚簡涓嬬骇, 韬唤搴旇鏄秷鎭唬鐞�, 涓嶅啀杞彂鍒版湇鍔″櫒
if config.ClientConf.Enable && config.ServeConf.Role == "agent" {
- go a.Repository.VIIDMsgForward(&req)
+ go a.Repository.VIIDFaceMsgForward(&req)
} else if config.ServeConf.Role == "cascade" {
go service.AddFaceNotification(&face)
}
@@ -80,15 +83,17 @@
videoLabel := req.VideoLabelListObject.VideoLabelObject[0]
logger.Debug("Receive new message, Id:%s Ip:%s ", videoLabel.VideoLabelID, c.RemoteIP())
- // 杞汉鑴告秷鎭�
- var face vo.FaceObject
- face.FaceID = videoLabel.VideoLabelID
- face.InfoKind = 1
- face.SourceID = videoLabel.VideoImageID
- face.DeviceID = videoLabel.IVADeviceID
- face.LocationMarkTime = videoLabel.CreateTimeAbs
- face.FaceAppearTime = videoLabel.BehaviorAnalysisObject.BehaviorBeginTime
- face.FaceDisAppearTime = videoLabel.BehaviorAnalysisObject.BehaviorEndTime
+ // 杞汉鍛樻秷鎭�
+ var person vo.PersonObject
+ person.PersonID = videoLabel.VideoLabelID[0:41] + "01" + videoLabel.VideoLabelID[43:48]
+ person.InfoKind = "1"
+ person.DeviceID = videoLabel.IVADeviceID
+ person.SourceID = videoLabel.VideoImageID
+ person.LocationMarkTime = videoLabel.BehaviorAnalysisObject.BehaviorBeginTime
+ person.PersonAppearTime = videoLabel.BehaviorAnalysisObject.BehaviorBeginTime
+ person.PersonDisAppearTime = videoLabel.BehaviorAnalysisObject.BehaviorEndTime
+ person.IsDriver = 2
+ person.IsCriminalInvolved = 2
var hasTargetImage bool
var bgImageWith, bgImageHeight int
@@ -104,8 +109,8 @@
bgImage = &videoLabel.SubImageList.SubImageInfoObject[idx]
}
- face.SubImageList.SubImageInfoObject = append(
- face.SubImageList.SubImageInfoObject,
+ person.SubImageList.SubImageInfoObject = append(
+ person.SubImageList.SubImageInfoObject,
videoLabel.SubImageList.SubImageInfoObject[idx],
)
}
@@ -129,8 +134,15 @@
return
}
+ imageId := bgImage.ImageID[37:41]
+ imageNumber, err := strconv.Atoi(imageId)
+ if err != nil {
+ imageNumber = rand.Intn(90000) + 10000
+ } else {
+ imageNumber++
+ }
var subImageInfo = vo.SubImageInfoObject{
- ImageID: bgImage.ImageID + "1",
+ ImageID: fmt.Sprintf("%s%d", bgImage.ImageID[0:37], imageNumber),
EventSort: 10,
DeviceID: bgImage.DeviceID,
StoragePath: "",
@@ -141,23 +153,23 @@
Height: subRect.Top - subRect.Bottom,
Data: base64.StdEncoding.EncodeToString(subImage),
}
- face.SubImageList.SubImageInfoObject = append(
- face.SubImageList.SubImageInfoObject,
+ person.SubImageList.SubImageInfoObject = append(
+ person.SubImageList.SubImageInfoObject,
subImageInfo,
)
}
// 濡傛灉寮�鍚簡涓嬬骇, 韬唤搴旇鏄秷鎭唬鐞�, 涓嶅啀杞彂鍒版湇鍔″櫒
if config.ClientConf.Enable && config.ServeConf.Role == "agent" {
- var faceObjList vo.RequestFaceList
- faceObjList.FaceListObject.FaceObject = append(faceObjList.FaceListObject.FaceObject, face)
- go a.Repository.VIIDMsgForward(&faceObjList)
+ var personList vo.RequestPersonList
+ personList.PersonListObject.PersonObject = append(personList.PersonListObject.PersonObject, person)
+ go a.Repository.VIIDPersonMsgForward(&personList)
} else if config.ServeConf.Role == "cascade" {
- go service.AddFaceNotification(&face)
+ go service.AddPersonNotification(&person)
}
if config.ForwardConf.SyncServer != "" {
- go a.Repository.FaceForward([]vo.FaceObject{face})
+ go a.Repository.PersonForward([]vo.PersonObject{person})
}
rspMsg := vo.ResponseStatus{
diff --git a/main.go b/main.go
index d055edf..8130d67 100644
--- a/main.go
+++ b/main.go
@@ -44,7 +44,7 @@
// 鍚姩缃戠粶瑙嗛瀛楃鍙犲姞鍣ㄦ湇鍔�
go service.StartNVCSServer()
- go service.InitSubscribeTask()
+ go service.InitSubscribeNotificationTasks()
// 鍚姩瀹氭椂浠诲姟
cron.Init()
@@ -77,7 +77,7 @@
logger.Error("Server forced to shutdown:", err)
}
- service.StopSubscribeTask()
+ service.StopNotificationTasks()
logger.Info("Server exiting!")
}
diff --git a/repository/captureRepo.go b/repository/captureRepo.go
index d0702b5..8572735 100644
--- a/repository/captureRepo.go
+++ b/repository/captureRepo.go
@@ -121,6 +121,91 @@
return
}
+func (c CaptureRepository) PersonForward(personList []vo.PersonObject) {
+ var err error
+
+ if personList == nil || len(personList) == 0 {
+ logger.Warn("FaceList is nil")
+ return
+ }
+
+ for _, v := range personList {
+ if v.SubImageList.SubImageInfoObject == nil {
+ logger.Warn("SubImageInfoObject is nil")
+ continue
+ }
+ var deviceId = v.DeviceID
+ var targetId = v.PersonID
+ var bgImageStr string
+ var bgImageBytes, faceImageBytes []byte = nil, nil
+
+ // 鑾峰彇澶у浘, 鐩墠娴峰悍鐨勫皬鍥惧垎杈ㄧ巼澶綆
+ for _, image := range v.SubImageList.SubImageInfoObject {
+ if image.Type != "14" {
+ continue
+ }
+
+ if len(image.Data) > 0 {
+ if len(image.Data) > len(bgImageStr) {
+ bgImageStr = image.Data
+ }
+ } else if image.StoragePath != "" {
+ imgData, err := util.ImageDownload(image.StoragePath, nil)
+ if err != nil {
+ logger.Warn("Image download failure, %s", err.Error())
+ } else {
+ bgImageBytes = imgData
+ bgImageStr = "///"
+ }
+ }
+ }
+
+ if bgImageBytes == nil {
+ bgImageBytes, err = base64.StdEncoding.DecodeString(bgImageStr)
+ if err != nil {
+ logger.Warn("Decode Image Base64 String failure, %s", err.Error())
+ continue
+ }
+ }
+
+ // 杞彂鍥惧儚
+ logger.Debug("Prepare forward person image, deviceId:%s, image len:%d, server:%s", deviceId, len(bgImageBytes), config.ForwardConf.SyncServer)
+ if deviceId != "" && bgImageStr != "" && config.ForwardConf.SyncServer != "" {
+ pd := c.PackPushDataV2(deviceId, targetId, v.PersonAppearTime, bgImageBytes, faceImageBytes)
+ if pd == nil {
+ return
+ }
+
+ // 澶勭悊姊帶濉啓鐨勬ゼ灞備俊鎭� 鏆傛椂浣跨敤oherFeature瀛楁
+ if v.BehaviorDescription != "" {
+ pd.CameraFloor = v.BehaviorDescription
+ }
+
+ // 灏濊瘯浠巉aceId鎻愬彇妤煎眰
+ if pd.CameraFloor == "" && config.ClientConf.AddFloorToFaceId {
+ pd.CameraFloor, _ = pkg.ParseFloorFromId(v.PersonID)
+ }
+ //logger.Debug("device %s, CameraFloor:%s", deviceId, pd.CameraFloor)
+
+ payload, err := json.Marshal(pd)
+ if err != nil {
+ logger.Warn("Marshal error, %s", err.Error())
+ return
+ }
+
+ if !util.SendData(payload, config.ForwardConf.SyncServer) {
+ cacheItem, _ := json.Marshal(pd)
+ c.CacheData(cacheItem, "basic")
+ logger.Warn("The data forwarding failed, adding to local cache.")
+ } else {
+ logger.Debug("The data forwarding successful. deviceId:%s", deviceId)
+ }
+ }
+ }
+
+ return
+}
+
func (c CaptureRepository) PackPushDataV2(deviceId, faceId, appearTime string, bgImgBytes, faceImgBytes []byte) *vo.PushDataInfoV2 {
var pd = new(vo.PushDataInfoV2)
var floor string
@@ -165,7 +250,7 @@
cacheItem.Save()
}
-func (c CaptureRepository) VIIDMsgForward(msg *vo.RequestFaceList) {
+func (c CaptureRepository) VIIDFaceMsgForward(msg *vo.RequestFaceList) {
faceInfo := msg.FaceListObject.FaceObject[0]
// 鍖归厤妤煎眰
faceAppearTime, err := time.ParseInLocation("20060102150405", faceInfo.FaceAppearTime, time.Local)
@@ -190,7 +275,39 @@
b, _ := json.Marshal(msg)
if client.FaceCapture(b) != vo.StatusSuccess {
cacheItem, _ := json.Marshal(msg)
- c.CacheData(cacheItem, "1400")
+ c.CacheData(cacheItem, "1400-face")
+ logger.Warn("The data forwarding failed, adding to local cache.")
+ }
+
+ return
+}
+
+func (c CaptureRepository) VIIDPersonMsgForward(msg *vo.RequestPersonList) {
+ person := msg.PersonListObject.PersonObject[0]
+ // 鍖归厤妤煎眰
+ appearTime, err := time.ParseInLocation("20060102150405", person.PersonAppearTime, time.Local)
+ if err != nil {
+ logger.Warn("Parse face appear time error, %s", err.Error())
+ appearTime = time.Now()
+ }
+
+ var devPos models.Positions
+ _ = devPos.FindPositionByTime(appearTime.Unix() + 5) // 鍔�5绉掔數姊叧闂ㄧ殑鏃堕棿
+ if devPos.Pos == "" {
+ devPos.Pos = "1F"
+ }
+
+ for idx, v := range msg.PersonListObject.PersonObject {
+ msg.PersonListObject.PersonObject[idx].BehaviorDescription = devPos.Pos
+ if config.ClientConf.AddFloorToFaceId {
+ msg.PersonListObject.PersonObject[idx].PersonID = pkg.GenerateFaceIdContainFloor(v.PersonID, devPos.Pos)
+ }
+ }
+
+ b, _ := json.Marshal(msg)
+ if client.PersonCapture(b) != vo.StatusSuccess {
+ cacheItem, _ := json.Marshal(msg)
+ c.CacheData(cacheItem, "1400-person")
logger.Warn("The data forwarding failed, adding to local cache.")
}
diff --git a/repository/subscribeRepo.go b/repository/subscribeRepo.go
index 7652195..5f3d256 100644
--- a/repository/subscribeRepo.go
+++ b/repository/subscribeRepo.go
@@ -148,7 +148,7 @@
return err
}
- service.UpdateTaskProcs(sub.Id, vo.Msg_Type_Create_Subscribe, &sub)
+ service.UpdateNotificationTask(sub.Id, vo.Msg_Type_Create_Subscribe, &sub)
return err
}
@@ -163,7 +163,7 @@
sub.Status = subscribe.SubscribeStatus
sub.Ext = *subscribe
- service.UpdateTaskProcs(subscribe.SubscribeID, vo.Msg_Type_Update_Subscribe, &sub)
+ service.UpdateNotificationTask(subscribe.SubscribeID, vo.Msg_Type_Update_Subscribe, &sub)
return sub.Save()
}
@@ -175,7 +175,7 @@
return err
}
- service.UpdateTaskProcs(id, vo.Msg_Type_Delete_Subscribe, nil)
+ service.UpdateNotificationTask(id, vo.Msg_Type_Delete_Subscribe, nil)
return err
}
diff --git a/service/notification.go b/service/notification.go
new file mode 100644
index 0000000..1605025
--- /dev/null
+++ b/service/notification.go
@@ -0,0 +1,160 @@
+package service
+
+import (
+ "context"
+ "encoding/json"
+ "gat1400Exchange/pkg/logger"
+ "strings"
+ "sync"
+ "time"
+
+ "gat1400Exchange/client"
+ "gat1400Exchange/models"
+ "gat1400Exchange/pkg/snowflake"
+ "gat1400Exchange/vo"
+)
+
+type NotificationTask struct {
+ ctx context.Context
+ conf *models.Subscribe
+ faceList []*vo.FaceObject
+ personList []*vo.PersonObject
+ mutex sync.Mutex
+ lastApeExecTime int64
+}
+
+func (task *NotificationTask) start() {
+ TaskWaitGroup.Add(1)
+ Timer := time.NewTicker(time.Duration(task.conf.Ext.ReportInterval) * time.Second)
+
+ for {
+ select {
+ case <-task.ctx.Done():
+ TaskProcMap.Delete(task.conf.Id)
+ TaskWaitGroup.Done()
+ logger.Warn("Subscribe task stop!!! id:%s", task.conf.Id)
+ return
+ case <-Timer.C:
+ task.notify()
+ }
+ }
+}
+
+func (task *NotificationTask) addFace(face *vo.FaceObject) {
+ if !strings.Contains(task.conf.Ext.SubscribeDetail, vo.SubscribeFace) {
+ return
+ }
+
+ task.mutex.Lock()
+ defer task.mutex.Unlock()
+
+ task.faceList = append(task.faceList, face)
+}
+
+func (task *NotificationTask) addPerson(person *vo.PersonObject) {
+ if !strings.Contains(task.conf.Ext.SubscribeDetail, vo.SubscribePerson) {
+ return
+ }
+
+ task.mutex.Lock()
+ defer task.mutex.Unlock()
+
+ task.personList = append(task.personList, person)
+}
+
+func (task *NotificationTask) notify() {
+ subDetails := strings.Split(task.conf.Ext.SubscribeDetail, ",")
+ for _, subType := range subDetails {
+ var msg *vo.RequestSubscribeNotification
+ switch subType {
+ case vo.SubscribeApe:
+ msg = task.packDeviceNotificationMsg()
+ case vo.SubscribeFace:
+ msg = task.packFaceNotificationMsg()
+ case vo.SubscribePerson:
+ }
+
+ if msg != nil {
+ b, _ := json.Marshal(msg)
+ client.Notify(task.conf.Ext.ReceiveAddr, b)
+ }
+ }
+}
+
+func (task *NotificationTask) packFaceNotificationMsg() *vo.RequestSubscribeNotification {
+ triggerTime := time.Now().Format("20060102150405")
+ if len(task.faceList) == 0 {
+ return nil
+ }
+
+ var notification = vo.FaceNotification{
+ NotificationID: triggerTime + snowflake.GenerateIdStr(),
+ SubscribeID: task.conf.Id,
+ Title: task.conf.Ext.Title,
+ TriggerTime: triggerTime,
+ ExecuteOperation: 1,
+ }
+
+ var ids []string
+ for idx, _ := range task.faceList {
+ // 鎸夋捣搴风ず渚嬪~鍏呬慨鏀瑰瓧娈�
+ task.faceList[idx].ShotTime = task.faceList[idx].FaceAppearTime
+ task.faceList[idx].EntryTime = triggerTime
+ for i, _ := range task.faceList[idx].SubImageList.SubImageInfoObject {
+ task.faceList[idx].SubImageList.SubImageInfoObject[i].EventSort = 2
+ }
+
+ ids = append(ids, task.faceList[idx].FaceID)
+ notification.FaceObjectList.FaceObject = append(notification.FaceObjectList.FaceObject, *task.faceList[idx])
+ }
+
+ notification.InfoIDs = strings.Join(ids, ";")
+
+ task.mutex.Lock()
+ task.faceList = []*vo.FaceObject{}
+ task.mutex.Unlock()
+
+ var req vo.RequestSubscribeNotification
+ req.SubscribeNotificationListObject.SubscribeNotificationObject[0] = notification
+
+ return &req
+}
+
+func (task *NotificationTask) packDeviceNotificationMsg() *vo.RequestSubscribeNotification {
+ // 涓婃姤璁惧 鎺у埗璁惧閫氱煡鎺у埗棰戠巼, 閬垮厤棰戠箒, 涓婄骇涓�鑸細涓嬪彂绉掔骇鐨勪换鍔�, 浣嗚澶囦笉浼氶绻佹洿鏂�,
+ if time.Now().Unix()-task.lastApeExecTime < 60*10 {
+ return nil
+ }
+
+ triggerTime := time.Now().Format("20060102150405")
+
+ task.lastApeExecTime = time.Now().Unix()
+
+ var notification = vo.DeviceNotification{
+ NotificationID: triggerTime + snowflake.GenerateIdStr(),
+ SubscribeID: task.conf.Id,
+ Title: task.conf.Ext.Title,
+ TriggerTime: triggerTime,
+ ExecuteOperation: 1,
+ }
+
+ var ids []string
+ var apeMod models.Ape
+ apeList, err := apeMod.FindAll()
+ if err != nil {
+ logger.Warn(err.Error())
+ return nil
+ } else {
+ for idx, _ := range apeList {
+ ids = append(ids, apeList[idx].Id)
+ notification.DeviceList.APEObject = append(notification.DeviceList.APEObject, apeList[idx].Ext)
+ }
+ }
+
+ notification.InfoIDs = strings.Join(ids, ";")
+
+ var req vo.RequestSubscribeNotification
+ req.SubscribeNotificationListObject.SubscribeNotificationObject[0] = notification
+
+ return &req
+}
diff --git a/service/resend.go b/service/resend.go
index ade60a8..667b237 100644
--- a/service/resend.go
+++ b/service/resend.go
@@ -29,12 +29,18 @@
logger.Error(err.Error())
return
}
- if c.Type == "1400" {
+ if c.Type == "1400-face" {
if client.FaceCapture([]byte(c.Data)) != vo.StatusSuccess {
c.UpdateRetryCount()
logger.Warn("The data resend failed. retry count %d", c.Retry+1)
return
}
+ } else if c.Type == "1400-person" {
+ if client.PersonCapture([]byte(c.Data)) != vo.StatusSuccess {
+ c.UpdateRetryCount()
+ logger.Warn("The data resend failed. retry count %d", c.Retry+1)
+ return
+ }
} else {
if !util.SendData([]byte(c.Data), config.ForwardConf.SyncServer) {
c.UpdateRetryCount()
diff --git a/service/subscribe.go b/service/subscribe.go
index 70853cf..cbdeb97 100644
--- a/service/subscribe.go
+++ b/service/subscribe.go
@@ -2,16 +2,11 @@
import (
"context"
- "encoding/json"
- "gat1400Exchange/config"
- "strings"
"sync"
- "time"
- "gat1400Exchange/client"
+ "gat1400Exchange/config"
"gat1400Exchange/models"
"gat1400Exchange/pkg/logger"
- "gat1400Exchange/pkg/snowflake"
"gat1400Exchange/vo"
)
@@ -20,68 +15,76 @@
type TaskProcInfo struct {
cancel context.CancelFunc
- task *SubscribeTask
+ task *NotificationTask
}
-func (t *TaskProcInfo) stop() {
+func (t TaskProcInfo) stop() {
t.cancel()
t.task = nil
}
-func UpdateTaskProcs(subId string, msgType int, conf *models.Subscribe) {
- logger.Debug("Receive sub msg: %s ", subId)
+func InitSubscribeNotificationTasks() {
+ var s models.Subscribe
+ subList, err := s.FindByFromId(config.ClientConf.ServerId)
+ if err != nil {
+ logger.Error("Get subscribe info error, server id %s, %s", config.ClientConf.ServerId, err.Error())
+ return
+ }
+
+ for idx := range subList {
+ if subList[idx].Status != 0 {
+ continue
+ }
+
+ CreateNotificationTask(&subList[idx])
+ }
+
+ return
+}
+
+func UpdateNotificationTask(subId string, msgType int, conf *models.Subscribe) {
+ logger.Debug("Receive update notify task msg: %s %d", subId, msgType)
proc, isExist := TaskProcMap.Load(subId)
switch msgType {
case vo.Msg_Type_Create_Subscribe, vo.Msg_Type_Update_Subscribe:
if isExist {
- logger.Debug("鏇存柊浠诲姟, 閫�鍑洪噸鏂板紑濮�")
- proc.(TaskProcInfo).cancel()
+ logger.Debug("Update notify task, restart after exit")
+ proc.(TaskProcInfo).stop()
}
- CreateTask(conf)
+ CreateNotificationTask(conf)
case vo.Msg_Type_Delete_Subscribe:
if !isExist {
return
}
// 鍏抽棴浠诲姟, 骞跺垹闄�
- proc.(TaskProcInfo).cancel()
+ proc.(TaskProcInfo).stop()
default:
- logger.Warn("鏈煡鐨勬秷鎭被鍨�")
+ logger.Warn("Invalid msg type %d", msgType)
}
}
-func InitSubscribeTask() {
- var s models.Subscribe
- subList, err := s.FindByFromId(config.ClientConf.ServerId)
- if err != nil {
- logger.Error("Find subscribe error, server id %s, %v", config.ClientConf.ServerId, err)
- return
+func CreateNotificationTask(conf *models.Subscribe) {
+ logger.Debug("Add subscribe notification task, %s, %s", conf.Id, conf.Ext.Title)
+
+ ctx, cancel := context.WithCancel(context.Background())
+ task := &NotificationTask{
+ ctx: ctx,
+ conf: conf,
}
- for idx, _ := range subList {
- if subList[idx].Status != 0 {
- continue
- }
-
- CreateTask(&subList[idx])
- }
-
- return
-}
-
-func AddFaceNotification(face *vo.FaceObject) {
- TaskProcMap.Range(func(key, value interface{}) bool {
- value.(TaskProcInfo).task.AddFace(face)
- return true
+ TaskProcMap.Store(conf.Id, TaskProcInfo{
+ cancel: cancel,
+ task: task,
})
- logger.Debug("Add Face Notification.faceId: %s, faceFeath: %s", face.IDNumber, face.OtherFeature)
+ go task.start()
}
-func StopSubscribeTask() {
+func StopNotificationTasks() {
TaskProcMap.Range(func(key, value interface{}) bool {
value.(TaskProcInfo).cancel()
return true
@@ -91,134 +94,20 @@
TaskWaitGroup.Wait()
}
-func CreateTask(conf *models.Subscribe) {
- logger.Debug("娣诲姞璁㈤槄浠诲姟,%s", conf.Id)
-
- ctx, cancel := context.WithCancel(context.Background())
- newTask := &SubscribeTask{
- ctx: ctx,
- conf: conf,
- }
-
- TaskProcMap.Store(conf.Id, TaskProcInfo{
- cancel: cancel,
- task: newTask,
+func AddFaceNotification(face *vo.FaceObject) {
+ TaskProcMap.Range(func(key, value interface{}) bool {
+ value.(TaskProcInfo).task.addFace(face)
+ return true
})
- go newTask.Start()
+ logger.Debug("Add Face Notification. faceId: %s", face.FaceID)
}
-type SubscribeTask struct {
- ctx context.Context
- conf *models.Subscribe
- faceList []*vo.FaceObject
- mutex sync.Mutex
- lastApeExecTime int64
-}
+func AddPersonNotification(person *vo.PersonObject) {
+ TaskProcMap.Range(func(key, value interface{}) bool {
+ value.(TaskProcInfo).task.addPerson(person)
+ return true
+ })
-func (task *SubscribeTask) Start() {
- TaskWaitGroup.Add(1)
- Timer := time.NewTicker(time.Duration(task.conf.Ext.ReportInterval) * time.Second)
-
- for {
- select {
- case <-task.ctx.Done():
- TaskProcMap.Delete(task.conf.Id)
- TaskWaitGroup.Done()
- logger.Warn("Subscribe task stop!!! id:%s", task.conf.Id)
- return
- case <-Timer.C:
- task.Notify()
- }
- }
-}
-
-func (task *SubscribeTask) AddFace(face *vo.FaceObject) {
- if !strings.Contains(task.conf.Ext.SubscribeDetail, vo.SubscribeFace) {
- return
- }
-
- task.mutex.Lock()
- defer task.mutex.Unlock()
-
- task.faceList = append(task.faceList, face)
-}
-
-func (task *SubscribeTask) Notify() {
- subDetails := strings.Split(task.conf.Ext.SubscribeDetail, ",")
- for _, subType := range subDetails {
- triggerTime := time.Now().Format("20060102150405")
-
- // 涓婃姤璁惧 鎺у埗璁惧閫氱煡鎺у埗棰戠巼, 閬垮厤棰戠箒, 涓婄骇涓�鑸細涓嬪彂绉掔骇鐨勪换鍔�, 浣嗚澶囦笉浼氶绻佹洿鏂�,
- if subType == vo.SubscribeApe && time.Now().Unix()-task.lastApeExecTime > 60*10 {
- task.lastApeExecTime = time.Now().Unix()
-
- var notification = vo.DeviceNotification{
- NotificationID: triggerTime + snowflake.GenerateIdStr(),
- SubscribeID: task.conf.Id,
- Title: task.conf.Ext.Title,
- TriggerTime: triggerTime,
- ExecuteOperation: 1,
- }
-
- var ids []string
- var apeMod models.Ape
- apeList, err := apeMod.FindAll()
- if err != nil {
- logger.Warn(err.Error())
- continue
- } else {
- for idx, _ := range apeList {
- ids = append(ids, apeList[idx].Id)
- notification.DeviceList.APEObject = append(notification.DeviceList.APEObject, apeList[idx].Ext)
- }
- }
-
- notification.InfoIDs = strings.Join(ids, ";")
-
- var req vo.RequestSubscribeNotification
- req.SubscribeNotificationListObject.SubscribeNotificationObject = append(req.SubscribeNotificationListObject.SubscribeNotificationObject, notification)
- b, _ := json.Marshal(req)
- client.Notify(task.conf.Ext.ReceiveAddr, b)
- continue
- }
-
- // 涓婃姤浜鸿劯
- if subType == vo.SubscribeFace {
- if len(task.faceList) == 0 {
- return
- }
-
- var notification = vo.FaceNotification{
- NotificationID: triggerTime + snowflake.GenerateIdStr(),
- SubscribeID: task.conf.Id,
- Title: task.conf.Ext.Title,
- TriggerTime: triggerTime,
- ExecuteOperation: 1,
- }
- var ids []string
- for idx, _ := range task.faceList {
- // 鎸夋捣搴风ず渚嬪~鍏呬慨鏀瑰瓧娈�
- task.faceList[idx].ShotTime = task.faceList[idx].FaceAppearTime
- task.faceList[idx].EntryTime = triggerTime
- for i, _ := range task.faceList[idx].SubImageList.SubImageInfoObject {
- task.faceList[idx].SubImageList.SubImageInfoObject[i].EventSort = 2
- }
-
- ids = append(ids, task.faceList[idx].FaceID)
- notification.FaceObjectList.FaceObject = append(notification.FaceObjectList.FaceObject, *task.faceList[idx])
- }
-
- notification.InfoIDs = strings.Join(ids, ";")
-
- task.mutex.Lock()
- task.faceList = []*vo.FaceObject{}
- task.mutex.Unlock()
-
- var req vo.RequestSubscribeNotification
- req.SubscribeNotificationListObject.SubscribeNotificationObject = append(req.SubscribeNotificationListObject.SubscribeNotificationObject, notification)
- b, _ := json.Marshal(req)
- client.Notify(task.conf.Ext.ReceiveAddr, b)
- }
- }
+ logger.Debug("Add Person Notification. personId: %s", person.PersonID)
}
--
Gitblit v1.8.0