package middleware import ( "context" "fmt" "sync" "sync/atomic" "time" ) // RefreshUserManager 定时拉取活跃用户的信息(包含下属id) type RefreshUserManager struct { RunFlag int32 StartRemoveUserFlag int32 StartRefreshUserFlag int32 Users map[string]*CurrentActiveUser mu sync.RWMutex ctx context.Context expireSecond int64 intervalRemoveUserMinute int intervalRefreshUserMinute int cancel func() } type CurrentActiveUser struct { UserID string lastActiveTime int64 } var defaultRefreshUserManager *RefreshUserManager func InitRefreshUserManager(intervalRefreshUserMinute, intervalRemoveMinute int, expireSecond int64) { if intervalRefreshUserMinute == 0 { intervalRefreshUserMinute = 5 } if intervalRemoveMinute > 0 && expireSecond == 0 { expireSecond = int64(intervalRemoveMinute * 60 * 12) } ctx, cancel := context.WithCancel(context.Background()) defaultRefreshUserManager = &RefreshUserManager{ expireSecond: expireSecond, intervalRemoveUserMinute: intervalRemoveMinute, intervalRefreshUserMinute: intervalRefreshUserMinute, ctx: ctx, cancel: cancel, } } func RunRefreshUser() { if !atomic.CompareAndSwapInt32(&defaultRefreshUserManager.RunFlag, 0, 1) { return } go defaultRefreshUserManager.refreshUserInfo() go defaultRefreshUserManager.removeInActiveUser() } func StopRefreshUser() { defaultRefreshUserManager.cancel() } func SetActiveTime(userID string) { defaultRefreshUserManager.setActiveTime(userID) } // RefreshActiveTime 更新用户活跃时间 func (r *RefreshUserManager) setActiveTime(userID string) { r.mu.Lock() defer r.mu.Unlock() if r.Users[userID] == nil { r.Users[userID] = &CurrentActiveUser{ UserID: userID, lastActiveTime: time.Now().Unix(), } } else { r.Users[userID].lastActiveTime = time.Now().Unix() } } func (r *RefreshUserManager) removeInActiveUser() { if !atomic.CompareAndSwapInt32(&r.StartRemoveUserFlag, 0, 1) { return } if r.intervalRemoveUserMinute == 0 { //不清理 return } ticker := time.NewTicker(time.Minute * time.Duration(r.intervalRemoveUserMinute)) for { select { case <-ticker.C: nowTs := time.Now().Unix() fmt.Println("it is time to RemoveInActiveUser:", nowTs) r.mu.RLock() for userID, user := range r.Users { if nowTs-user.lastActiveTime > r.expireSecond { r.mu.Lock() delete(r.Users, userID) r.mu.Unlock() } } r.mu.RUnlock() case <-r.ctx.Done(): fmt.Println("stop RemoveInActiveUser.") return } } } func (r *RefreshUserManager) refreshUserInfo() { if !atomic.CompareAndSwapInt32(&r.StartRefreshUserFlag, 0, 1) { return } ticker := time.NewTicker(time.Minute * time.Duration(r.intervalRefreshUserMinute)) for { select { case <-ticker.C: nowTs := time.Now().Unix() fmt.Println("it is time to RefreshUserInfo:", nowTs) r.mu.RLock() userIds := make([]string, len(r.Users)) for userID := range r.Users { userIds = append(userIds, userID) } r.mu.RUnlock() if len(userIds) != 0 { SyncUserInfo(userIds) } case <-r.ctx.Done(): fmt.Println("stop RefreshUserInfo.") return } } }