From 332fc6ad5edca596ecd23876aa9db7452b45f804 Mon Sep 17 00:00:00 2001
From: zhangzengfei <zhangzengfei@smartai.com>
Date: 星期三, 29 五月 2024 02:58:21 +0800
Subject: [PATCH] 添加人员抓拍处理

---
 service/subscribe.go |  215 +++++++++++++----------------------------------------
 1 files changed, 52 insertions(+), 163 deletions(-)

diff --git a/service/subscribe.go b/service/subscribe.go
index 70853cf..cbdeb97 100644
--- a/service/subscribe.go
+++ b/service/subscribe.go
@@ -2,16 +2,11 @@
 
 import (
 	"context"
-	"encoding/json"
-	"gat1400Exchange/config"
-	"strings"
 	"sync"
-	"time"
 
-	"gat1400Exchange/client"
+	"gat1400Exchange/config"
 	"gat1400Exchange/models"
 	"gat1400Exchange/pkg/logger"
-	"gat1400Exchange/pkg/snowflake"
 	"gat1400Exchange/vo"
 )
 
@@ -20,68 +15,76 @@
 
 type TaskProcInfo struct {
 	cancel context.CancelFunc
-	task   *SubscribeTask
+	task   *NotificationTask
 }
 
-func (t *TaskProcInfo) stop() {
+func (t TaskProcInfo) stop() {
 	t.cancel()
 	t.task = nil
 }
 
-func UpdateTaskProcs(subId string, msgType int, conf *models.Subscribe) {
-	logger.Debug("Receive sub msg: %s ", subId)
+func InitSubscribeNotificationTasks() {
+	var s models.Subscribe
+	subList, err := s.FindByFromId(config.ClientConf.ServerId)
+	if err != nil {
+		logger.Error("Get subscribe info error, server id %s, %s", config.ClientConf.ServerId, err.Error())
+		return
+	}
+
+	for idx := range subList {
+		if subList[idx].Status != 0 {
+			continue
+		}
+
+		CreateNotificationTask(&subList[idx])
+	}
+
+	return
+}
+
+func UpdateNotificationTask(subId string, msgType int, conf *models.Subscribe) {
+	logger.Debug("Receive update notify task msg: %s %d", subId, msgType)
 
 	proc, isExist := TaskProcMap.Load(subId)
 
 	switch msgType {
 	case vo.Msg_Type_Create_Subscribe, vo.Msg_Type_Update_Subscribe:
 		if isExist {
-			logger.Debug("鏇存柊浠诲姟, 閫�鍑洪噸鏂板紑濮�")
-			proc.(TaskProcInfo).cancel()
+			logger.Debug("Update notify task, restart after exit")
+			proc.(TaskProcInfo).stop()
 		}
 
-		CreateTask(conf)
+		CreateNotificationTask(conf)
 	case vo.Msg_Type_Delete_Subscribe:
 		if !isExist {
 			return
 		}
 
 		// 鍏抽棴浠诲姟, 骞跺垹闄�
-		proc.(TaskProcInfo).cancel()
+		proc.(TaskProcInfo).stop()
 	default:
-		logger.Warn("鏈煡鐨勬秷鎭被鍨�")
+		logger.Warn("Invalid msg type %d", msgType)
 	}
 }
 
-func InitSubscribeTask() {
-	var s models.Subscribe
-	subList, err := s.FindByFromId(config.ClientConf.ServerId)
-	if err != nil {
-		logger.Error("Find subscribe error, server id %s, %v", config.ClientConf.ServerId, err)
-		return
+func CreateNotificationTask(conf *models.Subscribe) {
+	logger.Debug("Add subscribe notification task, %s, %s", conf.Id, conf.Ext.Title)
+
+	ctx, cancel := context.WithCancel(context.Background())
+	task := &NotificationTask{
+		ctx:  ctx,
+		conf: conf,
 	}
 
-	for idx, _ := range subList {
-		if subList[idx].Status != 0 {
-			continue
-		}
-
-		CreateTask(&subList[idx])
-	}
-
-	return
-}
-
-func AddFaceNotification(face *vo.FaceObject) {
-	TaskProcMap.Range(func(key, value interface{}) bool {
-		value.(TaskProcInfo).task.AddFace(face)
-		return true
+	TaskProcMap.Store(conf.Id, TaskProcInfo{
+		cancel: cancel,
+		task:   task,
 	})
 
-	logger.Debug("Add Face Notification.faceId: %s, faceFeath: %s", face.IDNumber, face.OtherFeature)
+	go task.start()
 }
 
-func StopSubscribeTask() {
+func StopNotificationTasks() {
 	TaskProcMap.Range(func(key, value interface{}) bool {
 		value.(TaskProcInfo).cancel()
 		return true
@@ -91,134 +94,20 @@
 	TaskWaitGroup.Wait()
 }
 
-func CreateTask(conf *models.Subscribe) {
-	logger.Debug("娣诲姞璁㈤槄浠诲姟,%s", conf.Id)
-
-	ctx, cancel := context.WithCancel(context.Background())
-	newTask := &SubscribeTask{
-		ctx:  ctx,
-		conf: conf,
-	}
-
-	TaskProcMap.Store(conf.Id, TaskProcInfo{
-		cancel: cancel,
-		task:   newTask,
+func AddFaceNotification(face *vo.FaceObject) {
+	TaskProcMap.Range(func(key, value interface{}) bool {
+		value.(TaskProcInfo).task.addFace(face)
+		return true
 	})
 
-	go newTask.Start()
+	logger.Debug("Add Face Notification. faceId: %s", face.FaceID)
 }
 
-type SubscribeTask struct {
-	ctx             context.Context
-	conf            *models.Subscribe
-	faceList        []*vo.FaceObject
-	mutex           sync.Mutex
-	lastApeExecTime int64
-}
+func AddPersonNotification(person *vo.PersonObject) {
+	TaskProcMap.Range(func(key, value interface{}) bool {
+		value.(TaskProcInfo).task.addPerson(person)
+		return true
+	})
 
-func (task *SubscribeTask) Start() {
-	TaskWaitGroup.Add(1)
-	Timer := time.NewTicker(time.Duration(task.conf.Ext.ReportInterval) * time.Second)
-
-	for {
-		select {
-		case <-task.ctx.Done():
-			TaskProcMap.Delete(task.conf.Id)
-			TaskWaitGroup.Done()
-			logger.Warn("Subscribe task stop!!! id:%s", task.conf.Id)
-			return
-		case <-Timer.C:
-			task.Notify()
-		}
-	}
-}
-
-func (task *SubscribeTask) AddFace(face *vo.FaceObject) {
-	if !strings.Contains(task.conf.Ext.SubscribeDetail, vo.SubscribeFace) {
-		return
-	}
-
-	task.mutex.Lock()
-	defer task.mutex.Unlock()
-
-	task.faceList = append(task.faceList, face)
-}
-
-func (task *SubscribeTask) Notify() {
-	subDetails := strings.Split(task.conf.Ext.SubscribeDetail, ",")
-	for _, subType := range subDetails {
-		triggerTime := time.Now().Format("20060102150405")
-
-		// 涓婃姤璁惧 鎺у埗璁惧閫氱煡鎺у埗棰戠巼, 閬垮厤棰戠箒, 涓婄骇涓�鑸細涓嬪彂绉掔骇鐨勪换鍔�, 浣嗚澶囦笉浼氶绻佹洿鏂�,
-		if subType == vo.SubscribeApe && time.Now().Unix()-task.lastApeExecTime > 60*10 {
-			task.lastApeExecTime = time.Now().Unix()
-
-			var notification = vo.DeviceNotification{
-				NotificationID:   triggerTime + snowflake.GenerateIdStr(),
-				SubscribeID:      task.conf.Id,
-				Title:            task.conf.Ext.Title,
-				TriggerTime:      triggerTime,
-				ExecuteOperation: 1,
-			}
-
-			var ids []string
-			var apeMod models.Ape
-			apeList, err := apeMod.FindAll()
-			if err != nil {
-				logger.Warn(err.Error())
-				continue
-			} else {
-				for idx, _ := range apeList {
-					ids = append(ids, apeList[idx].Id)
-					notification.DeviceList.APEObject = append(notification.DeviceList.APEObject, apeList[idx].Ext)
-				}
-			}
-
-			notification.InfoIDs = strings.Join(ids, ";")
-
-			var req vo.RequestSubscribeNotification
-			req.SubscribeNotificationListObject.SubscribeNotificationObject = append(req.SubscribeNotificationListObject.SubscribeNotificationObject, notification)
-			b, _ := json.Marshal(req)
-			client.Notify(task.conf.Ext.ReceiveAddr, b)
-			continue
-		}
-
-		// 涓婃姤浜鸿劯
-		if subType == vo.SubscribeFace {
-			if len(task.faceList) == 0 {
-				return
-			}
-
-			var notification = vo.FaceNotification{
-				NotificationID:   triggerTime + snowflake.GenerateIdStr(),
-				SubscribeID:      task.conf.Id,
-				Title:            task.conf.Ext.Title,
-				TriggerTime:      triggerTime,
-				ExecuteOperation: 1,
-			}
-			var ids []string
-			for idx, _ := range task.faceList {
-				// 鎸夋捣搴风ず渚嬪~鍏呬慨鏀瑰瓧娈�
-				task.faceList[idx].ShotTime = task.faceList[idx].FaceAppearTime
-				task.faceList[idx].EntryTime = triggerTime
-				for i, _ := range task.faceList[idx].SubImageList.SubImageInfoObject {
-					task.faceList[idx].SubImageList.SubImageInfoObject[i].EventSort = 2
-				}
-
-				ids = append(ids, task.faceList[idx].FaceID)
-				notification.FaceObjectList.FaceObject = append(notification.FaceObjectList.FaceObject, *task.faceList[idx])
-			}
-
-			notification.InfoIDs = strings.Join(ids, ";")
-
-			task.mutex.Lock()
-			task.faceList = []*vo.FaceObject{}
-			task.mutex.Unlock()
-
-			var req vo.RequestSubscribeNotification
-			req.SubscribeNotificationListObject.SubscribeNotificationObject = append(req.SubscribeNotificationListObject.SubscribeNotificationObject, notification)
-			b, _ := json.Marshal(req)
-			client.Notify(task.conf.Ext.ReceiveAddr, b)
-		}
-	}
+	logger.Debug("Add Person Notification. personId: %s", person.PersonID)
 }

--
Gitblit v1.8.0