package service
|
|
import (
|
"context"
|
"sync"
|
|
"gat1400Exchange/config"
|
"gat1400Exchange/models"
|
"gat1400Exchange/pkg/logger"
|
"gat1400Exchange/vo"
|
)
|
|
var TaskProcMap sync.Map
|
var TaskWaitGroup = &sync.WaitGroup{}
|
|
type TaskProcInfo struct {
|
cancel context.CancelFunc
|
task *NotificationTask
|
}
|
|
func (t TaskProcInfo) stop() {
|
t.cancel()
|
t.task = nil
|
}
|
|
func InitSubscribeNotificationTasks() {
|
var s models.Subscribe
|
subList, err := s.FindByFromId(config.ClientConf.ServerId)
|
if err != nil {
|
logger.Error("Get subscribe info error, server id %s, %s", config.ClientConf.ServerId, err.Error())
|
return
|
}
|
|
for idx := range subList {
|
if subList[idx].Status != 0 {
|
continue
|
}
|
|
CreateNotificationTask(&subList[idx])
|
}
|
|
return
|
}
|
|
func UpdateNotificationTask(subId string, msgType int, conf *models.Subscribe) {
|
logger.Debug("Receive update notify task msg: %s %d", subId, msgType)
|
|
proc, isExist := TaskProcMap.Load(subId)
|
|
switch msgType {
|
case vo.Msg_Type_Create_Subscribe, vo.Msg_Type_Update_Subscribe:
|
if isExist {
|
logger.Debug("Update notify task, restart after exit")
|
proc.(TaskProcInfo).stop()
|
}
|
|
CreateNotificationTask(conf)
|
case vo.Msg_Type_Delete_Subscribe:
|
if !isExist {
|
return
|
}
|
|
// 关闭任务, 并删除
|
proc.(TaskProcInfo).stop()
|
default:
|
logger.Warn("Invalid msg type %d", msgType)
|
}
|
}
|
|
func CreateNotificationTask(conf *models.Subscribe) {
|
logger.Debug("Add subscribe notification task, %s, %s", conf.Id, conf.Ext.Title)
|
|
ctx, cancel := context.WithCancel(context.Background())
|
task := &NotificationTask{
|
ctx: ctx,
|
conf: conf,
|
}
|
|
TaskProcMap.Store(conf.Id, TaskProcInfo{
|
cancel: cancel,
|
task: task,
|
})
|
|
go task.start()
|
}
|
|
func StopNotificationTasks() {
|
TaskProcMap.Range(func(key, value interface{}) bool {
|
value.(TaskProcInfo).cancel()
|
return true
|
})
|
|
logger.Debug("等待所有任务退出.")
|
TaskWaitGroup.Wait()
|
}
|
|
func AddFaceNotification(face *vo.FaceObject) {
|
TaskProcMap.Range(func(key, value interface{}) bool {
|
value.(TaskProcInfo).task.addFace(face)
|
return true
|
})
|
|
logger.Debug("Add Face Notification. faceId: %s", face.FaceID)
|
}
|
|
func AddPersonNotification(person *vo.PersonObject) {
|
TaskProcMap.Range(func(key, value interface{}) bool {
|
value.(TaskProcInfo).task.addPerson(person)
|
return true
|
})
|
|
logger.Debug("Add Person Notification. personId: %s", person.PersonID)
|
}
|