package middleware import ( "aps_crm/pkg/logx" "context" "fmt" "sync" "sync/atomic" "time" ) // RefreshUserManager 定时拉取活跃用户的信息(包含下属id) type RefreshUserManager struct { RunFlag int32 StartRemoveUserFlag int32 StartRefreshUserFlag int32 Users map[string]*CurrentActiveUser mu sync.RWMutex ctx context.Context expireSecond int64 intervalRemoveUserMinute int intervalRefreshUserMinute int cancel func() } type CurrentActiveUser struct { UserID string lastActiveTime int64 } var defaultRefreshUserManager *RefreshUserManager func InitRefreshUserManager(intervalRefreshUserMinute, intervalRemoveMinute int, expireSecond int64) { if intervalRefreshUserMinute == 0 { intervalRefreshUserMinute = 5 } if intervalRemoveMinute > 0 && expireSecond == 0 { expireSecond = int64(intervalRemoveMinute * 60 * 12) } ctx, cancel := context.WithCancel(context.Background()) defaultRefreshUserManager = &RefreshUserManager{ expireSecond: expireSecond, intervalRemoveUserMinute: intervalRemoveMinute, intervalRefreshUserMinute: intervalRefreshUserMinute, ctx: ctx, cancel: cancel, Users: map[string]*CurrentActiveUser{}, } } func RunRefreshUser() { if !atomic.CompareAndSwapInt32(&defaultRefreshUserManager.RunFlag, 0, 1) { return } go defaultRefreshUserManager.refreshUserInfo() go defaultRefreshUserManager.removeInActiveUser() } func StopRefreshUser() { defaultRefreshUserManager.cancel() } func SetActiveTime(userID string) { defaultRefreshUserManager.setActiveTime(userID) } // RefreshActiveTime 更新用户活跃时间 func (r *RefreshUserManager) setActiveTime(userID string) { r.mu.Lock() defer r.mu.Unlock() if r.Users[userID] == nil { r.Users[userID] = &CurrentActiveUser{ UserID: userID, lastActiveTime: time.Now().Unix(), } logx.Infof("add active user :%+v", r.Users[userID]) } else { r.Users[userID].lastActiveTime = time.Now().Unix() logx.Infof("refresh active time, user:%+v", r.Users[userID]) } } func (r *RefreshUserManager) removeInActiveUser() { if !atomic.CompareAndSwapInt32(&r.StartRemoveUserFlag, 0, 1) { return } if r.intervalRemoveUserMinute == 0 { //不清理 return } ticker := time.NewTicker(time.Minute * time.Duration(r.intervalRemoveUserMinute)) for { select { case <-ticker.C: nowTs := time.Now().Unix() var users []*CurrentActiveUser r.mu.RLock() for _, user := range r.Users { users = append(users, user) } r.mu.RUnlock() for _, user := range users { if nowTs-user.lastActiveTime > r.expireSecond { r.mu.Lock() logx.Infof("removed in active user:%+v", user) delete(r.Users, user.UserID) r.mu.Unlock() } } case <-r.ctx.Done(): fmt.Println("stop RemoveInActiveUser.") return } } } func (r *RefreshUserManager) refreshUserInfo() { if !atomic.CompareAndSwapInt32(&r.StartRefreshUserFlag, 0, 1) { return } ticker := time.NewTicker(time.Minute * time.Duration(r.intervalRefreshUserMinute)) for { select { case <-ticker.C: r.mu.RLock() userIds := make([]string, len(r.Users)) for userID := range r.Users { userIds = append(userIds, userID) } r.mu.RUnlock() if len(userIds) != 0 { logx.Infof("RefreshUserInfo, user ids: ", userIds) SyncUserInfo(userIds) } case <-r.ctx.Done(): fmt.Println("stop RefreshUserInfo.") return } } }