package service import ( "context" "sync" "gat1400Exchange/config" "gat1400Exchange/models" "gat1400Exchange/pkg/logger" "gat1400Exchange/vo" ) var TaskProcMap sync.Map var TaskWaitGroup = &sync.WaitGroup{} type TaskProcInfo struct { cancel context.CancelFunc task *NotificationTask } func (t TaskProcInfo) stop() { t.cancel() t.task = nil } func InitSubscribeNotificationTasks() { var s models.Subscribe subList, err := s.FindByFromId(config.ClientConf.ServerId) if err != nil { logger.Error("Get subscribe info error, server id %s, %s", config.ClientConf.ServerId, err.Error()) return } for idx := range subList { if subList[idx].Status != 0 { continue } CreateNotificationTask(&subList[idx]) } return } func UpdateNotificationTask(subId string, msgType int, conf *models.Subscribe) { logger.Debug("Receive update notify task msg: %s %d", subId, msgType) proc, isExist := TaskProcMap.Load(subId) switch msgType { case vo.Msg_Type_Create_Subscribe, vo.Msg_Type_Update_Subscribe: if isExist { logger.Debug("Update notify task, restart after exit") proc.(TaskProcInfo).stop() } CreateNotificationTask(conf) case vo.Msg_Type_Delete_Subscribe: if !isExist { return } // 关闭任务, 并删除 proc.(TaskProcInfo).stop() default: logger.Warn("Invalid msg type %d", msgType) } } func CreateNotificationTask(conf *models.Subscribe) { logger.Debug("Add subscribe notification task, %s, %s", conf.Id, conf.Ext.Title) ctx, cancel := context.WithCancel(context.Background()) task := &NotificationTask{ ctx: ctx, conf: conf, } TaskProcMap.Store(conf.Id, TaskProcInfo{ cancel: cancel, task: task, }) go task.start() } func StopNotificationTasks() { TaskProcMap.Range(func(key, value interface{}) bool { value.(TaskProcInfo).cancel() return true }) logger.Debug("等待所有任务退出.") TaskWaitGroup.Wait() } func AddFaceNotification(face *vo.FaceObject) { TaskProcMap.Range(func(key, value interface{}) bool { value.(TaskProcInfo).task.addFace(face) return true }) } func AddPersonNotification(person *vo.PersonObject) { TaskProcMap.Range(func(key, value interface{}) bool { value.(TaskProcInfo).task.addPerson(person) return true }) }