package service
|
|
import (
|
"context"
|
"encoding/json"
|
"gat1400Exchange/pkg/logger"
|
"strings"
|
"sync"
|
"time"
|
|
"gat1400Exchange/client"
|
"gat1400Exchange/models"
|
"gat1400Exchange/pkg/snowflake"
|
"gat1400Exchange/vo"
|
)
|
|
type NotificationTask struct {
|
ctx context.Context
|
conf *models.Subscribe
|
faceList []*vo.FaceObject
|
personList []*vo.PersonObject
|
mutex sync.Mutex
|
lastApeExecTime int64
|
}
|
|
func (task *NotificationTask) start() {
|
TaskWaitGroup.Add(1)
|
Timer := time.NewTicker(time.Duration(task.conf.Ext.ReportInterval) * time.Second)
|
|
for {
|
select {
|
case <-task.ctx.Done():
|
TaskProcMap.Delete(task.conf.Id)
|
TaskWaitGroup.Done()
|
logger.Warn("Subscribe task stop!!! id:%s", task.conf.Id)
|
return
|
case <-Timer.C:
|
task.notify()
|
}
|
}
|
}
|
|
func (task *NotificationTask) addFace(face *vo.FaceObject) {
|
if !strings.Contains(task.conf.Ext.SubscribeDetail, vo.SubscribeFace) {
|
return
|
}
|
|
task.mutex.Lock()
|
defer task.mutex.Unlock()
|
|
task.faceList = append(task.faceList, face)
|
}
|
|
func (task *NotificationTask) addPerson(person *vo.PersonObject) {
|
if !strings.Contains(task.conf.Ext.SubscribeDetail, vo.SubscribePerson) {
|
return
|
}
|
|
task.mutex.Lock()
|
defer task.mutex.Unlock()
|
|
task.personList = append(task.personList, person)
|
}
|
|
func (task *NotificationTask) notify() {
|
subDetails := strings.Split(task.conf.Ext.SubscribeDetail, ",")
|
for _, subType := range subDetails {
|
var msg *vo.RequestSubscribeNotification
|
switch subType {
|
case vo.SubscribeApe:
|
msg = task.packDeviceNotificationMsg()
|
case vo.SubscribeFace:
|
msg = task.packFaceNotificationMsg()
|
case vo.SubscribePerson:
|
}
|
|
if msg != nil {
|
b, _ := json.Marshal(msg)
|
client.Notify(task.conf.Ext.ReceiveAddr, b)
|
}
|
}
|
}
|
|
func (task *NotificationTask) packFaceNotificationMsg() *vo.RequestSubscribeNotification {
|
triggerTime := time.Now().Format("20060102150405")
|
if len(task.faceList) == 0 {
|
return nil
|
}
|
|
var notification = vo.FaceNotification{
|
NotificationID: triggerTime + snowflake.GenerateIdStr(),
|
SubscribeID: task.conf.Id,
|
Title: task.conf.Ext.Title,
|
TriggerTime: triggerTime,
|
ExecuteOperation: 1,
|
}
|
|
var ids []string
|
for idx, _ := range task.faceList {
|
// 按海康示例填充修改字段
|
task.faceList[idx].ShotTime = task.faceList[idx].FaceAppearTime
|
task.faceList[idx].EntryTime = triggerTime
|
for i, _ := range task.faceList[idx].SubImageList.SubImageInfoObject {
|
task.faceList[idx].SubImageList.SubImageInfoObject[i].EventSort = 2
|
}
|
|
ids = append(ids, task.faceList[idx].FaceID)
|
notification.FaceObjectList.FaceObject = append(notification.FaceObjectList.FaceObject, *task.faceList[idx])
|
}
|
|
notification.InfoIDs = strings.Join(ids, ";")
|
|
task.mutex.Lock()
|
task.faceList = []*vo.FaceObject{}
|
task.mutex.Unlock()
|
|
var req vo.RequestSubscribeNotification
|
req.SubscribeNotificationListObject.SubscribeNotificationObject[0] = notification
|
|
return &req
|
}
|
|
func (task *NotificationTask) packDeviceNotificationMsg() *vo.RequestSubscribeNotification {
|
// 上报设备 控制设备通知控制频率, 避免频繁, 上级一般会下发秒级的任务, 但设备不会频繁更新,
|
if time.Now().Unix()-task.lastApeExecTime < 60*10 {
|
return nil
|
}
|
|
triggerTime := time.Now().Format("20060102150405")
|
|
task.lastApeExecTime = time.Now().Unix()
|
|
var notification = vo.DeviceNotification{
|
NotificationID: triggerTime + snowflake.GenerateIdStr(),
|
SubscribeID: task.conf.Id,
|
Title: task.conf.Ext.Title,
|
TriggerTime: triggerTime,
|
ExecuteOperation: 1,
|
}
|
|
var ids []string
|
var apeMod models.Ape
|
apeList, err := apeMod.FindAll()
|
if err != nil {
|
logger.Warn(err.Error())
|
return nil
|
} else {
|
for idx, _ := range apeList {
|
ids = append(ids, apeList[idx].Id)
|
notification.DeviceList.APEObject = append(notification.DeviceList.APEObject, apeList[idx].Ext)
|
}
|
}
|
|
notification.InfoIDs = strings.Join(ids, ";")
|
|
var req vo.RequestSubscribeNotification
|
req.SubscribeNotificationListObject.SubscribeNotificationObject[0] = notification
|
|
return &req
|
}
|