From 332fc6ad5edca596ecd23876aa9db7452b45f804 Mon Sep 17 00:00:00 2001
From: zhangzengfei <zhangzengfei@smartai.com>
Date: 星期三, 29 五月 2024 02:58:21 +0800
Subject: [PATCH] 添加人员抓拍处理

---
 controller/captureCtl.go    |   58 +++--
 service/subscribe.go        |  215 ++++--------------
 service/resend.go           |    8 
 client/faces.go             |   27 ++
 repository/captureRepo.go   |  121 ++++++++++
 client/notify.go            |    7 
 main.go                     |    4 
 service/notification.go     |  160 ++++++++++++++
 repository/subscribeRepo.go |    6 
 9 files changed, 407 insertions(+), 199 deletions(-)

diff --git a/client/faces.go b/client/faces.go
index c4f1f17..42e3827 100644
--- a/client/faces.go
+++ b/client/faces.go
@@ -11,7 +11,8 @@
 )
 
 const (
-	FacesUrI = "/VIID/Faces"
+	FacesUrI   = "/VIID/Faces"
+	PersonsUrI = "/VIID/Persons"
 )
 
 func FaceCapture(msg []byte) int {
@@ -36,3 +37,27 @@
 	logger.Debug("Post faces success.")
 	return stat.StatusCode
 }
+
+func PersonCapture(msg []byte) int {
+	if clientStatus != vo.StatusSuccess {
+		return clientStatus
+	}
+
+	url := fmt.Sprintf("%s://%s:%s%s", config.ClientConf.Proto, config.ClientConf.ServerAddr, config.ClientConf.ServerPort, PersonsUrI)
+	rsp, err := util.HttpPost(url, headers, msg)
+	if err != nil {
+		logger.Warn("Post person failed, %s", err.Error())
+		return vo.StatusOtherError
+	}
+
+	var stat vo.ResponseStatus
+	err = json.Unmarshal(rsp, &stat)
+	if err != nil {
+		logger.Warn("Post person response unmarshal failed, %s", err.Error())
+		return vo.StatusOtherError
+	}
+
+	logger.Debug("Post person success.")
+
+	return stat.StatusCode
+}
diff --git a/client/notify.go b/client/notify.go
index e1a9336..66e9584 100644
--- a/client/notify.go
+++ b/client/notify.go
@@ -49,7 +49,7 @@
 		return vo.StatusOtherError
 	}
 
-	logger.Debug("Post notification success.")
+	logger.Debug("Post subscribe success.")
 
 	return vo.StatusSuccess
 }
@@ -69,7 +69,7 @@
 		return vo.StatusOtherError
 	}
 
-	logger.Debug("Post notification success.")
+	logger.Debug("put subscribe success.")
 
 	return vo.StatusSuccess
 }
@@ -77,8 +77,7 @@
 func GetSubscribes(url string) ([]byte, error) {
 	rsp, err := util.HttpGet(url, headers)
 	if err != nil {
-		logger.Warn("Put subscribe failed, %s", err.Error())
-
+		logger.Warn("Get subscribe failed, %s", err.Error())
 	}
 
 	return rsp, err
diff --git a/controller/captureCtl.go b/controller/captureCtl.go
index a6fc5cb..8f3d784 100644
--- a/controller/captureCtl.go
+++ b/controller/captureCtl.go
@@ -2,14 +2,17 @@
 
 import (
 	"encoding/base64"
-	"gat1400Exchange/config"
-	"gat1400Exchange/pkg"
-	"gat1400Exchange/service"
+	"fmt"
+	"math/rand"
 	"net/http"
+	"strconv"
 	"time"
 
+	"gat1400Exchange/config"
+	"gat1400Exchange/pkg"
 	"gat1400Exchange/pkg/logger"
 	"gat1400Exchange/repository"
+	"gat1400Exchange/service"
 	"gat1400Exchange/vo"
 
 	"github.com/gin-gonic/gin"
@@ -45,7 +48,7 @@
 
 	// 濡傛灉寮�鍚簡涓嬬骇, 韬唤搴旇鏄秷鎭唬鐞�, 涓嶅啀杞彂鍒版湇鍔″櫒
 	if config.ClientConf.Enable && config.ServeConf.Role == "agent" {
-		go a.Repository.VIIDMsgForward(&req)
+		go a.Repository.VIIDFaceMsgForward(&req)
 	} else if config.ServeConf.Role == "cascade" {
 		go service.AddFaceNotification(&face)
 	}
@@ -80,15 +83,17 @@
 	videoLabel := req.VideoLabelListObject.VideoLabelObject[0]
 	logger.Debug("Receive new message, Id:%s Ip:%s ", videoLabel.VideoLabelID, c.RemoteIP())
 
-	// 杞汉鑴告秷鎭�
-	var face vo.FaceObject
-	face.FaceID = videoLabel.VideoLabelID
-	face.InfoKind = 1
-	face.SourceID = videoLabel.VideoImageID
-	face.DeviceID = videoLabel.IVADeviceID
-	face.LocationMarkTime = videoLabel.CreateTimeAbs
-	face.FaceAppearTime = videoLabel.BehaviorAnalysisObject.BehaviorBeginTime
-	face.FaceDisAppearTime = videoLabel.BehaviorAnalysisObject.BehaviorEndTime
+	// 杞汉鍛樻秷鎭�
+	var person vo.PersonObject
+	person.PersonID = videoLabel.VideoLabelID[0:41] + "01" + videoLabel.VideoLabelID[43:48]
+	person.InfoKind = "1"
+	person.DeviceID = videoLabel.IVADeviceID
+	person.SourceID = videoLabel.VideoImageID
+	person.LocationMarkTime = videoLabel.BehaviorAnalysisObject.BehaviorBeginTime
+	person.PersonAppearTime = videoLabel.BehaviorAnalysisObject.BehaviorBeginTime
+	person.PersonDisAppearTime = videoLabel.BehaviorAnalysisObject.BehaviorEndTime
+	person.IsDriver = 2
+	person.IsCriminalInvolved = 2
 
 	var hasTargetImage bool
 	var bgImageWith, bgImageHeight int
@@ -104,8 +109,8 @@
 			bgImage = &videoLabel.SubImageList.SubImageInfoObject[idx]
 		}
 
-		face.SubImageList.SubImageInfoObject = append(
-			face.SubImageList.SubImageInfoObject,
+		person.SubImageList.SubImageInfoObject = append(
+			person.SubImageList.SubImageInfoObject,
 			videoLabel.SubImageList.SubImageInfoObject[idx],
 		)
 	}
@@ -129,8 +134,15 @@
 			return
 		}
 
+		imageId := bgImage.ImageID[37:41]
+		imageNumber, err := strconv.Atoi(imageId)
+		if err != nil {
+			imageNumber = rand.Intn(90000) + 10000
+		} else {
+			imageNumber++
+		}
 		var subImageInfo = vo.SubImageInfoObject{
-			ImageID:     bgImage.ImageID + "1",
+			ImageID:     fmt.Sprintf("%s%d", bgImage.ImageID[0:37], imageNumber),
 			EventSort:   10,
 			DeviceID:    bgImage.DeviceID,
 			StoragePath: "",
@@ -141,23 +153,23 @@
 			Height:      subRect.Top - subRect.Bottom,
 			Data:        base64.StdEncoding.EncodeToString(subImage),
 		}
-		face.SubImageList.SubImageInfoObject = append(
-			face.SubImageList.SubImageInfoObject,
+		person.SubImageList.SubImageInfoObject = append(
+			person.SubImageList.SubImageInfoObject,
 			subImageInfo,
 		)
 	}
 
 	// 濡傛灉寮�鍚簡涓嬬骇, 韬唤搴旇鏄秷鎭唬鐞�, 涓嶅啀杞彂鍒版湇鍔″櫒
 	if config.ClientConf.Enable && config.ServeConf.Role == "agent" {
-		var faceObjList vo.RequestFaceList
-		faceObjList.FaceListObject.FaceObject = append(faceObjList.FaceListObject.FaceObject, face)
-		go a.Repository.VIIDMsgForward(&faceObjList)
+		var personList vo.RequestPersonList
+		personList.PersonListObject.PersonObject = append(personList.PersonListObject.PersonObject, person)
+		go a.Repository.VIIDPersonMsgForward(&personList)
 	} else if config.ServeConf.Role == "cascade" {
-		go service.AddFaceNotification(&face)
+		go service.AddPersonNotification(&person)
 	}
 
 	if config.ForwardConf.SyncServer != "" {
-		go a.Repository.FaceForward([]vo.FaceObject{face})
+		go a.Repository.PersonForward([]vo.PersonObject{person})
 	}
 
 	rspMsg := vo.ResponseStatus{
diff --git a/main.go b/main.go
index d055edf..8130d67 100644
--- a/main.go
+++ b/main.go
@@ -44,7 +44,7 @@
 	// 鍚姩缃戠粶瑙嗛瀛楃鍙犲姞鍣ㄦ湇鍔�
 	go service.StartNVCSServer()
 
-	go service.InitSubscribeTask()
+	go service.InitSubscribeNotificationTasks()
 
 	// 鍚姩瀹氭椂浠诲姟
 	cron.Init()
@@ -77,7 +77,7 @@
 		logger.Error("Server forced to shutdown:", err)
 	}
 
-	service.StopSubscribeTask()
+	service.StopNotificationTasks()
 
 	logger.Info("Server exiting!")
 }
diff --git a/repository/captureRepo.go b/repository/captureRepo.go
index d0702b5..8572735 100644
--- a/repository/captureRepo.go
+++ b/repository/captureRepo.go
@@ -121,6 +121,91 @@
 	return
 }
 
+func (c CaptureRepository) PersonForward(personList []vo.PersonObject) {
+	var err error
+
+	if personList == nil || len(personList) == 0 {
+		logger.Warn("FaceList is nil")
+		return
+	}
+
+	for _, v := range personList {
+		if v.SubImageList.SubImageInfoObject == nil {
+			logger.Warn("SubImageInfoObject is nil")
+			continue
+		}
+		var deviceId = v.DeviceID
+		var targetId = v.PersonID
+		var bgImageStr string
+		var bgImageBytes, faceImageBytes []byte = nil, nil
+
+		// 鑾峰彇澶у浘, 鐩墠娴峰悍鐨勫皬鍥惧垎杈ㄧ巼澶綆
+		for _, image := range v.SubImageList.SubImageInfoObject {
+			if image.Type != "14" {
+				continue
+			}
+
+			if len(image.Data) > 0 {
+				if len(image.Data) > len(bgImageStr) {
+					bgImageStr = image.Data
+				}
+			} else if image.StoragePath != "" {
+				imgData, err := util.ImageDownload(image.StoragePath, nil)
+				if err != nil {
+					logger.Warn("Image download failure, %s", err.Error())
+				} else {
+					bgImageBytes = imgData
+					bgImageStr = "///"
+				}
+			}
+		}
+
+		if bgImageBytes == nil {
+			bgImageBytes, err = base64.StdEncoding.DecodeString(bgImageStr)
+			if err != nil {
+				logger.Warn("Decode Image Base64 String failure, %s", err.Error())
+				continue
+			}
+		}
+
+		// 杞彂鍥惧儚
+		logger.Debug("Prepare forward person image, deviceId:%s, image len:%d, server:%s", deviceId, len(bgImageBytes), config.ForwardConf.SyncServer)
+		if deviceId != "" && bgImageStr != "" && config.ForwardConf.SyncServer != "" {
+			pd := c.PackPushDataV2(deviceId, targetId, v.PersonAppearTime, bgImageBytes, faceImageBytes)
+			if pd == nil {
+				return
+			}
+
+			// 澶勭悊姊帶濉啓鐨勬ゼ灞備俊鎭� 鏆傛椂浣跨敤oherFeature瀛楁
+			if v.BehaviorDescription != "" {
+				pd.CameraFloor = v.BehaviorDescription
+			}
+
+			// 灏濊瘯浠巉aceId鎻愬彇妤煎眰
+			if pd.CameraFloor == "" && config.ClientConf.AddFloorToFaceId {
+				pd.CameraFloor, _ = pkg.ParseFloorFromId(v.PersonID)
+			}
+			//logger.Debug("device %s, CameraFloor:%s", deviceId, pd.CameraFloor)
+
+			payload, err := json.Marshal(pd)
+			if err != nil {
+				logger.Warn("Marshal error, %s", err.Error())
+				return
+			}
+
+			if !util.SendData(payload, config.ForwardConf.SyncServer) {
+				cacheItem, _ := json.Marshal(pd)
+				c.CacheData(cacheItem, "basic")
+				logger.Warn("The data forwarding failed, adding to local cache.")
+			} else {
+				logger.Debug("The data forwarding successful. deviceId:%s", deviceId)
+			}
+		}
+	}
+
+	return
+}
+
 func (c CaptureRepository) PackPushDataV2(deviceId, faceId, appearTime string, bgImgBytes, faceImgBytes []byte) *vo.PushDataInfoV2 {
 	var pd = new(vo.PushDataInfoV2)
 	var floor string
@@ -165,7 +250,7 @@
 	cacheItem.Save()
 }
 
-func (c CaptureRepository) VIIDMsgForward(msg *vo.RequestFaceList) {
+func (c CaptureRepository) VIIDFaceMsgForward(msg *vo.RequestFaceList) {
 	faceInfo := msg.FaceListObject.FaceObject[0]
 	// 鍖归厤妤煎眰
 	faceAppearTime, err := time.ParseInLocation("20060102150405", faceInfo.FaceAppearTime, time.Local)
@@ -190,7 +275,39 @@
 	b, _ := json.Marshal(msg)
 	if client.FaceCapture(b) != vo.StatusSuccess {
 		cacheItem, _ := json.Marshal(msg)
-		c.CacheData(cacheItem, "1400")
+		c.CacheData(cacheItem, "1400-face")
+		logger.Warn("The data forwarding failed, adding to local cache.")
+	}
+
+	return
+}
+
+func (c CaptureRepository) VIIDPersonMsgForward(msg *vo.RequestPersonList) {
+	person := msg.PersonListObject.PersonObject[0]
+	// 鍖归厤妤煎眰
+	appearTime, err := time.ParseInLocation("20060102150405", person.PersonAppearTime, time.Local)
+	if err != nil {
+		logger.Warn("Parse face appear time error, %s", err.Error())
+		appearTime = time.Now()
+	}
+
+	var devPos models.Positions
+	_ = devPos.FindPositionByTime(appearTime.Unix() + 5) // 鍔�5绉掔數姊叧闂ㄧ殑鏃堕棿
+	if devPos.Pos == "" {
+		devPos.Pos = "1F"
+	}
+
+	for idx, v := range msg.PersonListObject.PersonObject {
+		msg.PersonListObject.PersonObject[idx].BehaviorDescription = devPos.Pos
+		if config.ClientConf.AddFloorToFaceId {
+			msg.PersonListObject.PersonObject[idx].PersonID = pkg.GenerateFaceIdContainFloor(v.PersonID, devPos.Pos)
+		}
+	}
+
+	b, _ := json.Marshal(msg)
+	if client.PersonCapture(b) != vo.StatusSuccess {
+		cacheItem, _ := json.Marshal(msg)
+		c.CacheData(cacheItem, "1400-person")
 		logger.Warn("The data forwarding failed, adding to local cache.")
 	}
 
diff --git a/repository/subscribeRepo.go b/repository/subscribeRepo.go
index 7652195..5f3d256 100644
--- a/repository/subscribeRepo.go
+++ b/repository/subscribeRepo.go
@@ -148,7 +148,7 @@
 		return err
 	}
 
-	service.UpdateTaskProcs(sub.Id, vo.Msg_Type_Create_Subscribe, &sub)
+	service.UpdateNotificationTask(sub.Id, vo.Msg_Type_Create_Subscribe, &sub)
 
 	return err
 }
@@ -163,7 +163,7 @@
 	sub.Status = subscribe.SubscribeStatus
 	sub.Ext = *subscribe
 
-	service.UpdateTaskProcs(subscribe.SubscribeID, vo.Msg_Type_Update_Subscribe, &sub)
+	service.UpdateNotificationTask(subscribe.SubscribeID, vo.Msg_Type_Update_Subscribe, &sub)
 
 	return sub.Save()
 }
@@ -175,7 +175,7 @@
 		return err
 	}
 
-	service.UpdateTaskProcs(id, vo.Msg_Type_Delete_Subscribe, nil)
+	service.UpdateNotificationTask(id, vo.Msg_Type_Delete_Subscribe, nil)
 
 	return err
 }
diff --git a/service/notification.go b/service/notification.go
new file mode 100644
index 0000000..1605025
--- /dev/null
+++ b/service/notification.go
@@ -0,0 +1,160 @@
+package service
+
+import (
+	"context"
+	"encoding/json"
+	"gat1400Exchange/pkg/logger"
+	"strings"
+	"sync"
+	"time"
+
+	"gat1400Exchange/client"
+	"gat1400Exchange/models"
+	"gat1400Exchange/pkg/snowflake"
+	"gat1400Exchange/vo"
+)
+
+type NotificationTask struct {
+	ctx             context.Context
+	conf            *models.Subscribe
+	faceList        []*vo.FaceObject
+	personList      []*vo.PersonObject
+	mutex           sync.Mutex
+	lastApeExecTime int64
+}
+
+func (task *NotificationTask) start() {
+	TaskWaitGroup.Add(1)
+	Timer := time.NewTicker(time.Duration(task.conf.Ext.ReportInterval) * time.Second)
+
+	for {
+		select {
+		case <-task.ctx.Done():
+			TaskProcMap.Delete(task.conf.Id)
+			TaskWaitGroup.Done()
+			logger.Warn("Subscribe task stop!!! id:%s", task.conf.Id)
+			return
+		case <-Timer.C:
+			task.notify()
+		}
+	}
+}
+
+func (task *NotificationTask) addFace(face *vo.FaceObject) {
+	if !strings.Contains(task.conf.Ext.SubscribeDetail, vo.SubscribeFace) {
+		return
+	}
+
+	task.mutex.Lock()
+	defer task.mutex.Unlock()
+
+	task.faceList = append(task.faceList, face)
+}
+
+func (task *NotificationTask) addPerson(person *vo.PersonObject) {
+	if !strings.Contains(task.conf.Ext.SubscribeDetail, vo.SubscribePerson) {
+		return
+	}
+
+	task.mutex.Lock()
+	defer task.mutex.Unlock()
+
+	task.personList = append(task.personList, person)
+}
+
+func (task *NotificationTask) notify() {
+	subDetails := strings.Split(task.conf.Ext.SubscribeDetail, ",")
+	for _, subType := range subDetails {
+		var msg *vo.RequestSubscribeNotification
+		switch subType {
+		case vo.SubscribeApe:
+			msg = task.packDeviceNotificationMsg()
+		case vo.SubscribeFace:
+			msg = task.packFaceNotificationMsg()
+		case vo.SubscribePerson:
+		}
+
+		if msg != nil {
+			b, _ := json.Marshal(msg)
+			client.Notify(task.conf.Ext.ReceiveAddr, b)
+		}
+	}
+}
+
+func (task *NotificationTask) packFaceNotificationMsg() *vo.RequestSubscribeNotification {
+	triggerTime := time.Now().Format("20060102150405")
+	if len(task.faceList) == 0 {
+		return nil
+	}
+
+	var notification = vo.FaceNotification{
+		NotificationID:   triggerTime + snowflake.GenerateIdStr(),
+		SubscribeID:      task.conf.Id,
+		Title:            task.conf.Ext.Title,
+		TriggerTime:      triggerTime,
+		ExecuteOperation: 1,
+	}
+
+	var ids []string
+	for idx, _ := range task.faceList {
+		// 鎸夋捣搴风ず渚嬪~鍏呬慨鏀瑰瓧娈�
+		task.faceList[idx].ShotTime = task.faceList[idx].FaceAppearTime
+		task.faceList[idx].EntryTime = triggerTime
+		for i, _ := range task.faceList[idx].SubImageList.SubImageInfoObject {
+			task.faceList[idx].SubImageList.SubImageInfoObject[i].EventSort = 2
+		}
+
+		ids = append(ids, task.faceList[idx].FaceID)
+		notification.FaceObjectList.FaceObject = append(notification.FaceObjectList.FaceObject, *task.faceList[idx])
+	}
+
+	notification.InfoIDs = strings.Join(ids, ";")
+
+	task.mutex.Lock()
+	task.faceList = []*vo.FaceObject{}
+	task.mutex.Unlock()
+
+	var req vo.RequestSubscribeNotification
+	req.SubscribeNotificationListObject.SubscribeNotificationObject[0] = notification
+
+	return &req
+}
+
+func (task *NotificationTask) packDeviceNotificationMsg() *vo.RequestSubscribeNotification {
+	// 涓婃姤璁惧 鎺у埗璁惧閫氱煡鎺у埗棰戠巼, 閬垮厤棰戠箒, 涓婄骇涓�鑸細涓嬪彂绉掔骇鐨勪换鍔�, 浣嗚澶囦笉浼氶绻佹洿鏂�,
+	if time.Now().Unix()-task.lastApeExecTime < 60*10 {
+		return nil
+	}
+
+	triggerTime := time.Now().Format("20060102150405")
+
+	task.lastApeExecTime = time.Now().Unix()
+
+	var notification = vo.DeviceNotification{
+		NotificationID:   triggerTime + snowflake.GenerateIdStr(),
+		SubscribeID:      task.conf.Id,
+		Title:            task.conf.Ext.Title,
+		TriggerTime:      triggerTime,
+		ExecuteOperation: 1,
+	}
+
+	var ids []string
+	var apeMod models.Ape
+	apeList, err := apeMod.FindAll()
+	if err != nil {
+		logger.Warn(err.Error())
+		return nil
+	} else {
+		for idx, _ := range apeList {
+			ids = append(ids, apeList[idx].Id)
+			notification.DeviceList.APEObject = append(notification.DeviceList.APEObject, apeList[idx].Ext)
+		}
+	}
+
+	notification.InfoIDs = strings.Join(ids, ";")
+
+	var req vo.RequestSubscribeNotification
+	req.SubscribeNotificationListObject.SubscribeNotificationObject[0] = notification
+
+	return &req
+}
diff --git a/service/resend.go b/service/resend.go
index ade60a8..667b237 100644
--- a/service/resend.go
+++ b/service/resend.go
@@ -29,12 +29,18 @@
 			logger.Error(err.Error())
 			return
 		}
-		if c.Type == "1400" {
+		if c.Type == "1400-face" {
 			if client.FaceCapture([]byte(c.Data)) != vo.StatusSuccess {
 				c.UpdateRetryCount()
 				logger.Warn("The data resend failed. retry count %d", c.Retry+1)
 				return
 			}
+		} else if c.Type == "1400-person" {
+			if client.PersonCapture([]byte(c.Data)) != vo.StatusSuccess {
+				c.UpdateRetryCount()
+				logger.Warn("The data resend failed. retry count %d", c.Retry+1)
+				return
+			}
 		} else {
 			if !util.SendData([]byte(c.Data), config.ForwardConf.SyncServer) {
 				c.UpdateRetryCount()
diff --git a/service/subscribe.go b/service/subscribe.go
index 70853cf..cbdeb97 100644
--- a/service/subscribe.go
+++ b/service/subscribe.go
@@ -2,16 +2,11 @@
 
 import (
 	"context"
-	"encoding/json"
-	"gat1400Exchange/config"
-	"strings"
 	"sync"
-	"time"
 
-	"gat1400Exchange/client"
+	"gat1400Exchange/config"
 	"gat1400Exchange/models"
 	"gat1400Exchange/pkg/logger"
-	"gat1400Exchange/pkg/snowflake"
 	"gat1400Exchange/vo"
 )
 
@@ -20,68 +15,76 @@
 
 type TaskProcInfo struct {
 	cancel context.CancelFunc
-	task   *SubscribeTask
+	task   *NotificationTask
 }
 
-func (t *TaskProcInfo) stop() {
+func (t TaskProcInfo) stop() {
 	t.cancel()
 	t.task = nil
 }
 
-func UpdateTaskProcs(subId string, msgType int, conf *models.Subscribe) {
-	logger.Debug("Receive sub msg: %s ", subId)
+func InitSubscribeNotificationTasks() {
+	var s models.Subscribe
+	subList, err := s.FindByFromId(config.ClientConf.ServerId)
+	if err != nil {
+		logger.Error("Get subscribe info error, server id %s, %s", config.ClientConf.ServerId, err.Error())
+		return
+	}
+
+	for idx := range subList {
+		if subList[idx].Status != 0 {
+			continue
+		}
+
+		CreateNotificationTask(&subList[idx])
+	}
+
+	return
+}
+
+func UpdateNotificationTask(subId string, msgType int, conf *models.Subscribe) {
+	logger.Debug("Receive update notify task msg: %s %d", subId, msgType)
 
 	proc, isExist := TaskProcMap.Load(subId)
 
 	switch msgType {
 	case vo.Msg_Type_Create_Subscribe, vo.Msg_Type_Update_Subscribe:
 		if isExist {
-			logger.Debug("鏇存柊浠诲姟, 閫�鍑洪噸鏂板紑濮�")
-			proc.(TaskProcInfo).cancel()
+			logger.Debug("Update notify task, restart after exit")
+			proc.(TaskProcInfo).stop()
 		}
 
-		CreateTask(conf)
+		CreateNotificationTask(conf)
 	case vo.Msg_Type_Delete_Subscribe:
 		if !isExist {
 			return
 		}
 
 		// 鍏抽棴浠诲姟, 骞跺垹闄�
-		proc.(TaskProcInfo).cancel()
+		proc.(TaskProcInfo).stop()
 	default:
-		logger.Warn("鏈煡鐨勬秷鎭被鍨�")
+		logger.Warn("Invalid msg type %d", msgType)
 	}
 }
 
-func InitSubscribeTask() {
-	var s models.Subscribe
-	subList, err := s.FindByFromId(config.ClientConf.ServerId)
-	if err != nil {
-		logger.Error("Find subscribe error, server id %s, %v", config.ClientConf.ServerId, err)
-		return
+func CreateNotificationTask(conf *models.Subscribe) {
+	logger.Debug("Add subscribe notification task, %s, %s", conf.Id, conf.Ext.Title)
+
+	ctx, cancel := context.WithCancel(context.Background())
+	task := &NotificationTask{
+		ctx:  ctx,
+		conf: conf,
 	}
 
-	for idx, _ := range subList {
-		if subList[idx].Status != 0 {
-			continue
-		}
-
-		CreateTask(&subList[idx])
-	}
-
-	return
-}
-
-func AddFaceNotification(face *vo.FaceObject) {
-	TaskProcMap.Range(func(key, value interface{}) bool {
-		value.(TaskProcInfo).task.AddFace(face)
-		return true
+	TaskProcMap.Store(conf.Id, TaskProcInfo{
+		cancel: cancel,
+		task:   task,
 	})
 
-	logger.Debug("Add Face Notification.faceId: %s, faceFeath: %s", face.IDNumber, face.OtherFeature)
+	go task.start()
 }
 
-func StopSubscribeTask() {
+func StopNotificationTasks() {
 	TaskProcMap.Range(func(key, value interface{}) bool {
 		value.(TaskProcInfo).cancel()
 		return true
@@ -91,134 +94,20 @@
 	TaskWaitGroup.Wait()
 }
 
-func CreateTask(conf *models.Subscribe) {
-	logger.Debug("娣诲姞璁㈤槄浠诲姟,%s", conf.Id)
-
-	ctx, cancel := context.WithCancel(context.Background())
-	newTask := &SubscribeTask{
-		ctx:  ctx,
-		conf: conf,
-	}
-
-	TaskProcMap.Store(conf.Id, TaskProcInfo{
-		cancel: cancel,
-		task:   newTask,
+func AddFaceNotification(face *vo.FaceObject) {
+	TaskProcMap.Range(func(key, value interface{}) bool {
+		value.(TaskProcInfo).task.addFace(face)
+		return true
 	})
 
-	go newTask.Start()
+	logger.Debug("Add Face Notification. faceId: %s", face.FaceID)
 }
 
-type SubscribeTask struct {
-	ctx             context.Context
-	conf            *models.Subscribe
-	faceList        []*vo.FaceObject
-	mutex           sync.Mutex
-	lastApeExecTime int64
-}
+func AddPersonNotification(person *vo.PersonObject) {
+	TaskProcMap.Range(func(key, value interface{}) bool {
+		value.(TaskProcInfo).task.addPerson(person)
+		return true
+	})
 
-func (task *SubscribeTask) Start() {
-	TaskWaitGroup.Add(1)
-	Timer := time.NewTicker(time.Duration(task.conf.Ext.ReportInterval) * time.Second)
-
-	for {
-		select {
-		case <-task.ctx.Done():
-			TaskProcMap.Delete(task.conf.Id)
-			TaskWaitGroup.Done()
-			logger.Warn("Subscribe task stop!!! id:%s", task.conf.Id)
-			return
-		case <-Timer.C:
-			task.Notify()
-		}
-	}
-}
-
-func (task *SubscribeTask) AddFace(face *vo.FaceObject) {
-	if !strings.Contains(task.conf.Ext.SubscribeDetail, vo.SubscribeFace) {
-		return
-	}
-
-	task.mutex.Lock()
-	defer task.mutex.Unlock()
-
-	task.faceList = append(task.faceList, face)
-}
-
-func (task *SubscribeTask) Notify() {
-	subDetails := strings.Split(task.conf.Ext.SubscribeDetail, ",")
-	for _, subType := range subDetails {
-		triggerTime := time.Now().Format("20060102150405")
-
-		// 涓婃姤璁惧 鎺у埗璁惧閫氱煡鎺у埗棰戠巼, 閬垮厤棰戠箒, 涓婄骇涓�鑸細涓嬪彂绉掔骇鐨勪换鍔�, 浣嗚澶囦笉浼氶绻佹洿鏂�,
-		if subType == vo.SubscribeApe && time.Now().Unix()-task.lastApeExecTime > 60*10 {
-			task.lastApeExecTime = time.Now().Unix()
-
-			var notification = vo.DeviceNotification{
-				NotificationID:   triggerTime + snowflake.GenerateIdStr(),
-				SubscribeID:      task.conf.Id,
-				Title:            task.conf.Ext.Title,
-				TriggerTime:      triggerTime,
-				ExecuteOperation: 1,
-			}
-
-			var ids []string
-			var apeMod models.Ape
-			apeList, err := apeMod.FindAll()
-			if err != nil {
-				logger.Warn(err.Error())
-				continue
-			} else {
-				for idx, _ := range apeList {
-					ids = append(ids, apeList[idx].Id)
-					notification.DeviceList.APEObject = append(notification.DeviceList.APEObject, apeList[idx].Ext)
-				}
-			}
-
-			notification.InfoIDs = strings.Join(ids, ";")
-
-			var req vo.RequestSubscribeNotification
-			req.SubscribeNotificationListObject.SubscribeNotificationObject = append(req.SubscribeNotificationListObject.SubscribeNotificationObject, notification)
-			b, _ := json.Marshal(req)
-			client.Notify(task.conf.Ext.ReceiveAddr, b)
-			continue
-		}
-
-		// 涓婃姤浜鸿劯
-		if subType == vo.SubscribeFace {
-			if len(task.faceList) == 0 {
-				return
-			}
-
-			var notification = vo.FaceNotification{
-				NotificationID:   triggerTime + snowflake.GenerateIdStr(),
-				SubscribeID:      task.conf.Id,
-				Title:            task.conf.Ext.Title,
-				TriggerTime:      triggerTime,
-				ExecuteOperation: 1,
-			}
-			var ids []string
-			for idx, _ := range task.faceList {
-				// 鎸夋捣搴风ず渚嬪~鍏呬慨鏀瑰瓧娈�
-				task.faceList[idx].ShotTime = task.faceList[idx].FaceAppearTime
-				task.faceList[idx].EntryTime = triggerTime
-				for i, _ := range task.faceList[idx].SubImageList.SubImageInfoObject {
-					task.faceList[idx].SubImageList.SubImageInfoObject[i].EventSort = 2
-				}
-
-				ids = append(ids, task.faceList[idx].FaceID)
-				notification.FaceObjectList.FaceObject = append(notification.FaceObjectList.FaceObject, *task.faceList[idx])
-			}
-
-			notification.InfoIDs = strings.Join(ids, ";")
-
-			task.mutex.Lock()
-			task.faceList = []*vo.FaceObject{}
-			task.mutex.Unlock()
-
-			var req vo.RequestSubscribeNotification
-			req.SubscribeNotificationListObject.SubscribeNotificationObject = append(req.SubscribeNotificationListObject.SubscribeNotificationObject, notification)
-			b, _ := json.Marshal(req)
-			client.Notify(task.conf.Ext.ReceiveAddr, b)
-		}
-	}
+	logger.Debug("Add Person Notification. personId: %s", person.PersonID)
 }

--
Gitblit v1.8.0