fix
zhangqian
2024-03-19 f281cfdc37b58493644175046abf6008016bf8cd
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
package middleware
 
import (
    "aps_crm/pkg/logx"
    "context"
    "fmt"
    "sync"
    "sync/atomic"
    "time"
)
 
// RefreshUserManager 定时拉取活跃用户的信息(包含下属id)
type RefreshUserManager struct {
    RunFlag                   int32
    StartRemoveUserFlag       int32
    StartRefreshUserFlag      int32
    Users                     map[string]*CurrentActiveUser
    mu                        sync.RWMutex
    ctx                       context.Context
    expireSecond              int64
    intervalRemoveUserMinute  int
    intervalRefreshUserMinute int
    cancel                    func()
}
 
type CurrentActiveUser struct {
    UserID         string
    lastActiveTime int64
}
 
var defaultRefreshUserManager *RefreshUserManager
 
func InitRefreshUserManager(intervalRefreshUserMinute, intervalRemoveMinute int, expireSecond int64) {
    if intervalRefreshUserMinute == 0 {
        intervalRefreshUserMinute = 5
    }
    if intervalRemoveMinute > 0 && expireSecond == 0 {
        expireSecond = int64(intervalRemoveMinute * 60 * 12)
    }
    ctx, cancel := context.WithCancel(context.Background())
    defaultRefreshUserManager = &RefreshUserManager{
        expireSecond:              expireSecond,
        intervalRemoveUserMinute:  intervalRemoveMinute,
        intervalRefreshUserMinute: intervalRefreshUserMinute,
        ctx:                       ctx,
        cancel:                    cancel,
        Users:                     map[string]*CurrentActiveUser{},
    }
}
 
func RunRefreshUser() {
    if !atomic.CompareAndSwapInt32(&defaultRefreshUserManager.RunFlag, 0, 1) {
        return
    }
    go defaultRefreshUserManager.refreshUserInfo()
    go defaultRefreshUserManager.removeInActiveUser()
}
func StopRefreshUser() {
    defaultRefreshUserManager.cancel()
}
 
func SetActiveTime(userID string) {
    defaultRefreshUserManager.setActiveTime(userID)
}
 
// RefreshActiveTime 更新用户活跃时间
func (r *RefreshUserManager) setActiveTime(userID string) {
    r.mu.Lock()
    defer r.mu.Unlock()
    if r.Users[userID] == nil {
        r.Users[userID] = &CurrentActiveUser{
            UserID:         userID,
            lastActiveTime: time.Now().Unix(),
        }
        logx.Infof("add active user :%+v", r.Users[userID])
    } else {
        r.Users[userID].lastActiveTime = time.Now().Unix()
        logx.Infof("refresh active time, user:%+v", r.Users[userID])
    }
}
 
func (r *RefreshUserManager) removeInActiveUser() {
    if !atomic.CompareAndSwapInt32(&r.StartRemoveUserFlag, 0, 1) {
        return
    }
    if r.intervalRemoveUserMinute == 0 { //不清理
        return
    }
    ticker := time.NewTicker(time.Minute * time.Duration(r.intervalRemoveUserMinute))
    for {
        select {
        case <-ticker.C:
            nowTs := time.Now().Unix()
            var users []*CurrentActiveUser
            r.mu.RLock()
            for _, user := range r.Users {
                users = append(users, user)
            }
            r.mu.RUnlock()
            for _, user := range users {
                if nowTs-user.lastActiveTime > r.expireSecond {
                    r.mu.Lock()
                    logx.Infof("removed in active user:%+v", user)
                    delete(r.Users, user.UserID)
                    r.mu.Unlock()
                }
            }
        case <-r.ctx.Done():
            fmt.Println("stop RemoveInActiveUser.")
            return
        }
    }
}
 
func (r *RefreshUserManager) refreshUserInfo() {
    if !atomic.CompareAndSwapInt32(&r.StartRefreshUserFlag, 0, 1) {
        return
    }
    ticker := time.NewTicker(time.Minute * time.Duration(r.intervalRefreshUserMinute))
    for {
        select {
        case <-ticker.C:
            r.mu.RLock()
            userIds := make([]string, len(r.Users))
            for userID := range r.Users {
                userIds = append(userIds, userID)
            }
            r.mu.RUnlock()
            if len(userIds) != 0 {
                logx.Infof("RefreshUserInfo, user ids: ", userIds)
                SyncUserInfo(userIds)
            }
        case <-r.ctx.Done():
            fmt.Println("stop RefreshUserInfo.")
            return
        }
    }
}