package service
|
|
import (
|
"context"
|
"encoding/json"
|
"gat1400Exchange/config"
|
"strings"
|
"sync"
|
"time"
|
|
"gat1400Exchange/client"
|
"gat1400Exchange/models"
|
"gat1400Exchange/pkg/logger"
|
"gat1400Exchange/pkg/snowflake"
|
"gat1400Exchange/vo"
|
)
|
|
var TaskProcMap sync.Map
|
var TaskWaitGroup = &sync.WaitGroup{}
|
|
type TaskProcInfo struct {
|
cancel context.CancelFunc
|
task *SubscribeTask
|
}
|
|
func (t *TaskProcInfo) stop() {
|
t.cancel()
|
t.task = nil
|
}
|
|
func UpdateTaskProcs(subId string, msgType int, conf *models.Subscribe) {
|
logger.Debug("Receive sub msg: %s ", subId)
|
|
proc, isExist := TaskProcMap.Load(subId)
|
|
switch msgType {
|
case vo.Msg_Type_Create_Subscribe, vo.Msg_Type_Update_Subscribe:
|
if isExist {
|
logger.Debug("更新任务, 退出重新开始")
|
proc.(TaskProcInfo).cancel()
|
}
|
|
CreateTask(conf)
|
case vo.Msg_Type_Delete_Subscribe:
|
if !isExist {
|
return
|
}
|
|
// 关闭任务, 并删除
|
proc.(TaskProcInfo).cancel()
|
default:
|
logger.Warn("未知的消息类型")
|
}
|
}
|
|
func InitSubscribeTask() {
|
var s models.Subscribe
|
subList, err := s.FindByFromId(config.ClientConf.ServerId)
|
if err != nil {
|
logger.Error("Find subscribe error, server id %s, %v", config.ClientConf.ServerId, err)
|
return
|
}
|
|
for idx, _ := range subList {
|
if subList[idx].Status != 0 {
|
continue
|
}
|
|
CreateTask(&subList[idx])
|
}
|
|
return
|
}
|
|
func AddFaceNotification(face *vo.FaceObject) {
|
TaskProcMap.Range(func(key, value interface{}) bool {
|
value.(TaskProcInfo).task.AddFace(face)
|
return true
|
})
|
|
logger.Debug("Add Face Notification.")
|
}
|
|
func StopSubscribeTask() {
|
TaskProcMap.Range(func(key, value interface{}) bool {
|
value.(TaskProcInfo).cancel()
|
return true
|
})
|
|
logger.Debug("等待所有任务退出.")
|
TaskWaitGroup.Wait()
|
}
|
|
func CreateTask(conf *models.Subscribe) {
|
logger.Debug("添加订阅任务,%s", conf.Id)
|
|
ctx, cancel := context.WithCancel(context.Background())
|
newTask := &SubscribeTask{
|
ctx: ctx,
|
conf: conf,
|
}
|
|
TaskProcMap.Store(conf.Id, TaskProcInfo{
|
cancel: cancel,
|
task: newTask,
|
})
|
|
go newTask.Start()
|
}
|
|
type SubscribeTask struct {
|
ctx context.Context
|
conf *models.Subscribe
|
faceList []*vo.FaceObject
|
mutex sync.Mutex
|
}
|
|
func (task *SubscribeTask) Start() {
|
TaskWaitGroup.Add(1)
|
Timer := time.NewTicker(time.Duration(task.conf.Ext.ReportInterval) * time.Second)
|
|
for {
|
select {
|
case <-task.ctx.Done():
|
TaskProcMap.Delete(task.conf.Id)
|
TaskWaitGroup.Done()
|
logger.Warn("Subscribe task stop!!! id:%s", task.conf.Id)
|
return
|
case <-Timer.C:
|
task.Notify()
|
}
|
}
|
}
|
|
func (task *SubscribeTask) AddFace(face *vo.FaceObject) {
|
if !strings.Contains(task.conf.Ext.SubscribeDetail, vo.SubscribeFace) {
|
return
|
}
|
|
task.mutex.Lock()
|
defer task.mutex.Unlock()
|
|
task.faceList = append(task.faceList, face)
|
}
|
|
func (task *SubscribeTask) Notify() {
|
subDetails := strings.Split(task.conf.Ext.SubscribeDetail, ",")
|
for _, subType := range subDetails {
|
triggerTime := time.Now().Format("20060102150405")
|
|
// 上报设备
|
if subType == vo.SubscribeApe {
|
var notification = vo.DeviceNotification{
|
NotificationID: triggerTime + snowflake.GenerateIdStr(),
|
SubscribeID: task.conf.Id,
|
Title: task.conf.Ext.Title,
|
TriggerTime: triggerTime,
|
ExecuteOperation: 1,
|
}
|
|
var ids []string
|
var apeMod models.Ape
|
apeList, err := apeMod.FindAll()
|
if err != nil {
|
logger.Warn(err.Error())
|
continue
|
} else {
|
for idx, _ := range apeList {
|
ids = append(ids, apeList[idx].Id)
|
notification.DeviceList.APEObject = append(notification.DeviceList.APEObject, apeList[idx].Ext)
|
}
|
}
|
|
notification.InfoIDs = strings.Join(ids, ";")
|
|
var req vo.RequestSubscribeNotification
|
req.SubscribeNotificationListObject.SubscribeNotificationObject = append(req.SubscribeNotificationListObject.SubscribeNotificationObject, notification)
|
b, _ := json.Marshal(req)
|
client.Notify(task.conf.Ext.ReceiveAddr, b)
|
continue
|
}
|
|
// 上报人脸
|
if subType == vo.SubscribeFace {
|
if len(task.faceList) == 0 {
|
return
|
}
|
|
var notification = vo.FaceNotification{
|
NotificationID: triggerTime + snowflake.GenerateIdStr(),
|
SubscribeID: task.conf.Id,
|
Title: task.conf.Ext.Title,
|
TriggerTime: triggerTime,
|
ExecuteOperation: 1,
|
}
|
var ids []string
|
for idx, _ := range task.faceList {
|
// 按海康示例填充修改字段
|
task.faceList[idx].ShotTime = task.faceList[idx].FaceAppearTime
|
task.faceList[idx].EntryTime = triggerTime
|
for i, _ := range task.faceList[idx].SubImageList.SubImageInfoObject {
|
task.faceList[idx].SubImageList.SubImageInfoObject[i].EventSort = 2
|
}
|
|
ids = append(ids, task.faceList[idx].FaceID)
|
notification.FaceObjectList.FaceObject = append(notification.FaceObjectList.FaceObject, *task.faceList[idx])
|
}
|
|
notification.InfoIDs = strings.Join(ids, ";")
|
|
task.mutex.Lock()
|
task.faceList = []*vo.FaceObject{}
|
task.mutex.Unlock()
|
|
var req vo.RequestSubscribeNotification
|
req.SubscribeNotificationListObject.SubscribeNotificationObject = append(req.SubscribeNotificationListObject.SubscribeNotificationObject, notification)
|
b, _ := json.Marshal(req)
|
client.Notify(task.conf.Ext.ReceiveAddr, b)
|
}
|
}
|
}
|