New file |
| | |
| | | package middleware |
| | | |
| | | import ( |
| | | "aps_crm/pkg/logx" |
| | | "context" |
| | | "fmt" |
| | | "sync" |
| | | "sync/atomic" |
| | | "time" |
| | | ) |
| | | |
| | | // RefreshUserManager 定时拉取活跃用户的信息(包含下属id) |
| | | type RefreshUserManager struct { |
| | | RunFlag int32 |
| | | StartRemoveUserFlag int32 |
| | | StartRefreshUserFlag int32 |
| | | Users map[string]*CurrentActiveUser |
| | | mu sync.RWMutex |
| | | ctx context.Context |
| | | expireSecond int64 |
| | | intervalRemoveUserMinute int |
| | | intervalRefreshUserMinute int |
| | | cancel func() |
| | | } |
| | | |
| | | type CurrentActiveUser struct { |
| | | UserID string |
| | | lastActiveTime int64 |
| | | } |
| | | |
| | | var defaultRefreshUserManager *RefreshUserManager |
| | | |
| | | func InitRefreshUserManager(intervalRefreshUserMinute, intervalRemoveMinute int, expireSecond int64) { |
| | | if intervalRefreshUserMinute == 0 { |
| | | intervalRefreshUserMinute = 5 |
| | | } |
| | | if intervalRemoveMinute > 0 && expireSecond == 0 { |
| | | expireSecond = int64(intervalRemoveMinute * 60 * 12) |
| | | } |
| | | ctx, cancel := context.WithCancel(context.Background()) |
| | | defaultRefreshUserManager = &RefreshUserManager{ |
| | | expireSecond: expireSecond, |
| | | intervalRemoveUserMinute: intervalRemoveMinute, |
| | | intervalRefreshUserMinute: intervalRefreshUserMinute, |
| | | ctx: ctx, |
| | | cancel: cancel, |
| | | Users: map[string]*CurrentActiveUser{}, |
| | | } |
| | | } |
| | | |
| | | func RunRefreshUser() { |
| | | if !atomic.CompareAndSwapInt32(&defaultRefreshUserManager.RunFlag, 0, 1) { |
| | | return |
| | | } |
| | | go defaultRefreshUserManager.refreshUserInfo() |
| | | go defaultRefreshUserManager.removeInActiveUser() |
| | | } |
| | | func StopRefreshUser() { |
| | | defaultRefreshUserManager.cancel() |
| | | } |
| | | |
| | | func SetActiveTime(userID string) { |
| | | defaultRefreshUserManager.setActiveTime(userID) |
| | | } |
| | | |
| | | // RefreshActiveTime 更新用户活跃时间 |
| | | func (r *RefreshUserManager) setActiveTime(userID string) { |
| | | r.mu.Lock() |
| | | defer r.mu.Unlock() |
| | | if r.Users[userID] == nil { |
| | | r.Users[userID] = &CurrentActiveUser{ |
| | | UserID: userID, |
| | | lastActiveTime: time.Now().Unix(), |
| | | } |
| | | logx.Infof("add active user :%+v", r.Users[userID]) |
| | | } else { |
| | | r.Users[userID].lastActiveTime = time.Now().Unix() |
| | | logx.Infof("refresh active time, user:%+v", r.Users[userID]) |
| | | } |
| | | } |
| | | |
| | | func (r *RefreshUserManager) removeInActiveUser() { |
| | | if !atomic.CompareAndSwapInt32(&r.StartRemoveUserFlag, 0, 1) { |
| | | return |
| | | } |
| | | if r.intervalRemoveUserMinute == 0 { //不清理 |
| | | return |
| | | } |
| | | ticker := time.NewTicker(time.Minute * time.Duration(r.intervalRemoveUserMinute)) |
| | | for { |
| | | select { |
| | | case <-ticker.C: |
| | | nowTs := time.Now().Unix() |
| | | var users []*CurrentActiveUser |
| | | r.mu.RLock() |
| | | for _, user := range r.Users { |
| | | users = append(users, user) |
| | | } |
| | | r.mu.RUnlock() |
| | | for _, user := range users { |
| | | if nowTs-user.lastActiveTime > r.expireSecond { |
| | | r.mu.Lock() |
| | | logx.Infof("removed in active user:%+v", user) |
| | | delete(r.Users, user.UserID) |
| | | r.mu.Unlock() |
| | | } |
| | | } |
| | | case <-r.ctx.Done(): |
| | | fmt.Println("stop RemoveInActiveUser.") |
| | | return |
| | | } |
| | | } |
| | | } |
| | | |
| | | func (r *RefreshUserManager) refreshUserInfo() { |
| | | if !atomic.CompareAndSwapInt32(&r.StartRefreshUserFlag, 0, 1) { |
| | | return |
| | | } |
| | | ticker := time.NewTicker(time.Minute * time.Duration(r.intervalRefreshUserMinute)) |
| | | for { |
| | | select { |
| | | case <-ticker.C: |
| | | r.mu.RLock() |
| | | userIds := make([]string, len(r.Users)) |
| | | for userID := range r.Users { |
| | | userIds = append(userIds, userID) |
| | | } |
| | | r.mu.RUnlock() |
| | | if len(userIds) != 0 { |
| | | logx.Infof("RefreshUserInfo, user ids: ", userIds) |
| | | SyncUserInfo(userIds) |
| | | } |
| | | case <-r.ctx.Done(): |
| | | fmt.Println("stop RefreshUserInfo.") |
| | | return |
| | | } |
| | | } |
| | | } |