zhangzengfei
2024-05-29 332fc6ad5edca596ecd23876aa9db7452b45f804
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
package service
 
import (
    "context"
    "sync"
 
    "gat1400Exchange/config"
    "gat1400Exchange/models"
    "gat1400Exchange/pkg/logger"
    "gat1400Exchange/vo"
)
 
var TaskProcMap sync.Map
var TaskWaitGroup = &sync.WaitGroup{}
 
type TaskProcInfo struct {
    cancel context.CancelFunc
    task   *NotificationTask
}
 
func (t TaskProcInfo) stop() {
    t.cancel()
    t.task = nil
}
 
func InitSubscribeNotificationTasks() {
    var s models.Subscribe
    subList, err := s.FindByFromId(config.ClientConf.ServerId)
    if err != nil {
        logger.Error("Get subscribe info error, server id %s, %s", config.ClientConf.ServerId, err.Error())
        return
    }
 
    for idx := range subList {
        if subList[idx].Status != 0 {
            continue
        }
 
        CreateNotificationTask(&subList[idx])
    }
 
    return
}
 
func UpdateNotificationTask(subId string, msgType int, conf *models.Subscribe) {
    logger.Debug("Receive update notify task msg: %s %d", subId, msgType)
 
    proc, isExist := TaskProcMap.Load(subId)
 
    switch msgType {
    case vo.Msg_Type_Create_Subscribe, vo.Msg_Type_Update_Subscribe:
        if isExist {
            logger.Debug("Update notify task, restart after exit")
            proc.(TaskProcInfo).stop()
        }
 
        CreateNotificationTask(conf)
    case vo.Msg_Type_Delete_Subscribe:
        if !isExist {
            return
        }
 
        // 关闭任务, 并删除
        proc.(TaskProcInfo).stop()
    default:
        logger.Warn("Invalid msg type %d", msgType)
    }
}
 
func CreateNotificationTask(conf *models.Subscribe) {
    logger.Debug("Add subscribe notification task, %s, %s", conf.Id, conf.Ext.Title)
 
    ctx, cancel := context.WithCancel(context.Background())
    task := &NotificationTask{
        ctx:  ctx,
        conf: conf,
    }
 
    TaskProcMap.Store(conf.Id, TaskProcInfo{
        cancel: cancel,
        task:   task,
    })
 
    go task.start()
}
 
func StopNotificationTasks() {
    TaskProcMap.Range(func(key, value interface{}) bool {
        value.(TaskProcInfo).cancel()
        return true
    })
 
    logger.Debug("等待所有任务退出.")
    TaskWaitGroup.Wait()
}
 
func AddFaceNotification(face *vo.FaceObject) {
    TaskProcMap.Range(func(key, value interface{}) bool {
        value.(TaskProcInfo).task.addFace(face)
        return true
    })
 
    logger.Debug("Add Face Notification. faceId: %s", face.FaceID)
}
 
func AddPersonNotification(person *vo.PersonObject) {
    TaskProcMap.Range(func(key, value interface{}) bool {
        value.(TaskProcInfo).task.addPerson(person)
        return true
    })
 
    logger.Debug("Add Person Notification. personId: %s", person.PersonID)
}