From 6f26cb4297ebaab4394e05e1a498e347ce290bb5 Mon Sep 17 00:00:00 2001
From: zhangzengfei <zhangzengfei@smartai.com>
Date: 星期四, 22 八月 2024 19:36:28 +0800
Subject: [PATCH] 修复运行方向的bug
---
service/subscribe.go | 211 ++++++++++++----------------------------------------
1 files changed, 50 insertions(+), 161 deletions(-)
diff --git a/service/subscribe.go b/service/subscribe.go
index 556043e..7867ab7 100644
--- a/service/subscribe.go
+++ b/service/subscribe.go
@@ -2,15 +2,11 @@
import (
"context"
- "encoding/json"
- "strings"
"sync"
- "time"
- "gat1400Exchange/client"
+ "gat1400Exchange/config"
"gat1400Exchange/models"
"gat1400Exchange/pkg/logger"
- "gat1400Exchange/pkg/snowflake"
"gat1400Exchange/vo"
)
@@ -19,69 +15,76 @@
type TaskProcInfo struct {
cancel context.CancelFunc
- task *SubscribeTask
+ task *NotificationTask
}
-func (t *TaskProcInfo) stop() {
+func (t TaskProcInfo) stop() {
t.cancel()
t.task = nil
}
-func UpdateTaskProcs(subId string, msgType int, conf *models.Subscribe) {
- logger.Debug("Receive sub msg: %s ", subId)
+func InitSubscribeNotificationTasks() {
+ var s models.Subscribe
+ subList, err := s.FindByFromId(config.ClientConf.ServerId)
+ if err != nil {
+ logger.Error("Get subscribe info error, server id %s, %s", config.ClientConf.ServerId, err.Error())
+ return
+ }
+
+ for idx := range subList {
+ if subList[idx].Status != 0 {
+ continue
+ }
+
+ CreateNotificationTask(&subList[idx])
+ }
+
+ return
+}
+
+func UpdateNotificationTask(subId string, msgType int, conf *models.Subscribe) {
+ logger.Debug("Receive update notify task msg: %s %d", subId, msgType)
proc, isExist := TaskProcMap.Load(subId)
switch msgType {
case vo.Msg_Type_Create_Subscribe, vo.Msg_Type_Update_Subscribe:
if isExist {
- logger.Debug("鏇存柊浠诲姟, 閫�鍑洪噸鏂板紑濮�")
- proc.(TaskProcInfo).cancel()
+ logger.Debug("Update notify task, restart after exit")
+ proc.(TaskProcInfo).stop()
}
- CreateTask(conf)
+ CreateNotificationTask(conf)
case vo.Msg_Type_Delete_Subscribe:
if !isExist {
return
}
// 鍏抽棴浠诲姟, 骞跺垹闄�
- proc.(TaskProcInfo).cancel()
+ proc.(TaskProcInfo).stop()
default:
- logger.Warn("鏈煡鐨勬秷鎭被鍨�")
+ logger.Warn("Invalid msg type %d", msgType)
}
}
-func InitSubscribeTask() error {
- var s models.Subscribe
- subList, err := s.FindAll()
- if err != nil {
- logger.Error("Find account by channel error:%v", err)
- return err
+func CreateNotificationTask(conf *models.Subscribe) {
+ logger.Debug("Add subscribe notification task, %s, %s", conf.Id, conf.Ext.Title)
+
+ ctx, cancel := context.WithCancel(context.Background())
+ task := &NotificationTask{
+ ctx: ctx,
+ conf: conf,
}
- for idx, _ := range subList {
- if subList[idx].Status != 0 {
- continue
- }
-
- CreateTask(&subList[idx])
- }
-
- return nil
-}
-
-func AddFaceCapture(face *vo.FaceObject) {
- TaskProcMap.Range(func(key, value interface{}) bool {
- value.(TaskProcInfo).task.AddFace(face)
- return true
+ TaskProcMap.Store(conf.Id, TaskProcInfo{
+ cancel: cancel,
+ task: task,
})
- logger.Debug("娣诲姞浜鸿劯.")
- //TaskWaitGroup.Wait()
+ go task.start()
}
-func StopSubscribeTask() {
+func StopNotificationTasks() {
TaskProcMap.Range(func(key, value interface{}) bool {
value.(TaskProcInfo).cancel()
return true
@@ -91,131 +94,17 @@
TaskWaitGroup.Wait()
}
-func CreateTask(conf *models.Subscribe) {
- logger.Debug("娣诲姞璁㈤槄浠诲姟,%s", conf.Id)
-
- ctx, cancel := context.WithCancel(context.Background())
- newTask := &SubscribeTask{
- ctx: ctx,
- conf: conf,
- }
-
- TaskProcMap.Store(conf.Id, TaskProcInfo{
- cancel: cancel,
- task: newTask,
+func AddFaceNotification(face *vo.FaceObject) {
+ TaskProcMap.Range(func(key, value interface{}) bool {
+ value.(TaskProcInfo).task.addFace(face)
+ return true
})
- go newTask.Start()
}
-type SubscribeTask struct {
- ctx context.Context
- conf *models.Subscribe
- faceList []*vo.FaceObject
- mutex sync.Mutex
-}
-
-func (task *SubscribeTask) Start() {
- TaskWaitGroup.Add(1)
- Timer := time.NewTicker(time.Duration(task.conf.Ext.ReportInterval) * time.Second)
-
- for {
- select {
- case <-task.ctx.Done():
- TaskProcMap.Delete(task.conf.Id)
- TaskWaitGroup.Done()
- logger.Warn("Subscribe task stop!!! id:%s", task.conf.Id)
- return
- case <-Timer.C:
- task.Notify()
- }
- }
-}
-
-func (task *SubscribeTask) AddFace(face *vo.FaceObject) {
- if !strings.Contains(task.conf.Ext.SubscribeDetail, vo.SubscribeFace) {
- return
- }
-
- task.mutex.Lock()
- defer task.mutex.Unlock()
-
- task.faceList = append(task.faceList, face)
-}
-
-func (task *SubscribeTask) Notify() {
- subDetails := strings.Split(task.conf.Ext.SubscribeDetail, ",")
- for _, subType := range subDetails {
- triggerTime := time.Now().Format("20060102150405")
-
- // 涓婃姤璁惧
- if subType == vo.SubscribeApe {
- var notification = vo.DeviceNotification{
- NotificationID: triggerTime + snowflake.GenerateIdStr(),
- SubscribeID: task.conf.Id,
- Title: task.conf.Ext.Title,
- TriggerTime: triggerTime,
- ExecuteOperation: 1,
- }
-
- var ids []string
- var apeMod models.Ape
- apeList, err := apeMod.FindAll()
- if err != nil {
- logger.Warn(err.Error())
- continue
- } else {
- for idx, _ := range apeList {
- ids = append(ids, apeList[idx].Id)
- notification.DeviceList.APEObject = append(notification.DeviceList.APEObject, apeList[idx].Ext)
- }
- }
-
- notification.InfoIDs = strings.Join(ids, ";")
-
- var req vo.RequestSubscribeNotification
- req.SubscribeNotificationListObject.SubscribeNotificationObject = append(req.SubscribeNotificationListObject.SubscribeNotificationObject, notification)
- b, _ := json.Marshal(req)
- client.Notify(task.conf.Ext.ReceiveAddr, b)
- continue
- }
-
- // 涓婃姤浜鸿劯
- if subType == vo.SubscribeFace {
- if len(task.faceList) == 0 {
- return
- }
-
- var notification = vo.FaceNotification{
- NotificationID: triggerTime + snowflake.GenerateIdStr(),
- SubscribeID: task.conf.Id,
- Title: task.conf.Ext.Title,
- TriggerTime: triggerTime,
- ExecuteOperation: 1,
- }
- var ids []string
- for idx, _ := range task.faceList {
- // 鎸夋捣搴风ず渚嬪~鍏呬慨鏀瑰瓧娈�
- task.faceList[idx].ShotTime = task.faceList[idx].FaceAppearTime
- task.faceList[idx].EntryTime = triggerTime
- for i, _ := range task.faceList[idx].SubImageList.SubImageInfoObject {
- task.faceList[idx].SubImageList.SubImageInfoObject[i].EventSort = 2
- }
-
- ids = append(ids, task.faceList[idx].FaceID)
- notification.FaceObjectList.FaceObject = append(notification.FaceObjectList.FaceObject, *task.faceList[idx])
- }
-
- notification.InfoIDs = strings.Join(ids, ";")
-
- task.mutex.Lock()
- task.faceList = []*vo.FaceObject{}
- task.mutex.Unlock()
-
- var req vo.RequestSubscribeNotification
- req.SubscribeNotificationListObject.SubscribeNotificationObject = append(req.SubscribeNotificationListObject.SubscribeNotificationObject, notification)
- b, _ := json.Marshal(req)
- client.Notify(task.conf.Ext.ReceiveAddr, b)
- }
- }
+func AddPersonNotification(person *vo.PersonObject) {
+ TaskProcMap.Range(func(key, value interface{}) bool {
+ value.(TaskProcInfo).task.addPerson(person)
+ return true
+ })
}
--
Gitblit v1.8.0