liujiandao
2023-11-13 b3a47cb555076c25c64d83dd455a472509291245
middleware/refresh_user.go
New file
@@ -0,0 +1,138 @@
package middleware
import (
   "aps_crm/pkg/logx"
   "context"
   "fmt"
   "sync"
   "sync/atomic"
   "time"
)
// RefreshUserManager 定时拉取活跃用户的信息(包含下属id)
type RefreshUserManager struct {
   RunFlag                   int32
   StartRemoveUserFlag       int32
   StartRefreshUserFlag      int32
   Users                     map[string]*CurrentActiveUser
   mu                        sync.RWMutex
   ctx                       context.Context
   expireSecond              int64
   intervalRemoveUserMinute  int
   intervalRefreshUserMinute int
   cancel                    func()
}
type CurrentActiveUser struct {
   UserID         string
   lastActiveTime int64
}
var defaultRefreshUserManager *RefreshUserManager
func InitRefreshUserManager(intervalRefreshUserMinute, intervalRemoveMinute int, expireSecond int64) {
   if intervalRefreshUserMinute == 0 {
      intervalRefreshUserMinute = 5
   }
   if intervalRemoveMinute > 0 && expireSecond == 0 {
      expireSecond = int64(intervalRemoveMinute * 60 * 12)
   }
   ctx, cancel := context.WithCancel(context.Background())
   defaultRefreshUserManager = &RefreshUserManager{
      expireSecond:              expireSecond,
      intervalRemoveUserMinute:  intervalRemoveMinute,
      intervalRefreshUserMinute: intervalRefreshUserMinute,
      ctx:                       ctx,
      cancel:                    cancel,
      Users:                     map[string]*CurrentActiveUser{},
   }
}
func RunRefreshUser() {
   if !atomic.CompareAndSwapInt32(&defaultRefreshUserManager.RunFlag, 0, 1) {
      return
   }
   go defaultRefreshUserManager.refreshUserInfo()
   go defaultRefreshUserManager.removeInActiveUser()
}
func StopRefreshUser() {
   defaultRefreshUserManager.cancel()
}
func SetActiveTime(userID string) {
   defaultRefreshUserManager.setActiveTime(userID)
}
// RefreshActiveTime 更新用户活跃时间
func (r *RefreshUserManager) setActiveTime(userID string) {
   r.mu.Lock()
   defer r.mu.Unlock()
   if r.Users[userID] == nil {
      r.Users[userID] = &CurrentActiveUser{
         UserID:         userID,
         lastActiveTime: time.Now().Unix(),
      }
      logx.Infof("add active user :%+v", r.Users[userID])
   } else {
      r.Users[userID].lastActiveTime = time.Now().Unix()
      logx.Infof("refresh active time, user:%+v", r.Users[userID])
   }
}
func (r *RefreshUserManager) removeInActiveUser() {
   if !atomic.CompareAndSwapInt32(&r.StartRemoveUserFlag, 0, 1) {
      return
   }
   if r.intervalRemoveUserMinute == 0 { //不清理
      return
   }
   ticker := time.NewTicker(time.Minute * time.Duration(r.intervalRemoveUserMinute))
   for {
      select {
      case <-ticker.C:
         nowTs := time.Now().Unix()
         var users []*CurrentActiveUser
         r.mu.RLock()
         for _, user := range r.Users {
            users = append(users, user)
         }
         r.mu.RUnlock()
         for _, user := range users {
            if nowTs-user.lastActiveTime > r.expireSecond {
               r.mu.Lock()
               logx.Infof("removed in active user:%+v", user)
               delete(r.Users, user.UserID)
               r.mu.Unlock()
            }
         }
      case <-r.ctx.Done():
         fmt.Println("stop RemoveInActiveUser.")
         return
      }
   }
}
func (r *RefreshUserManager) refreshUserInfo() {
   if !atomic.CompareAndSwapInt32(&r.StartRefreshUserFlag, 0, 1) {
      return
   }
   ticker := time.NewTicker(time.Minute * time.Duration(r.intervalRefreshUserMinute))
   for {
      select {
      case <-ticker.C:
         r.mu.RLock()
         userIds := make([]string, len(r.Users))
         for userID := range r.Users {
            userIds = append(userIds, userID)
         }
         r.mu.RUnlock()
         if len(userIds) != 0 {
            logx.Infof("RefreshUserInfo, user ids: ", userIds)
            SyncUserInfo(userIds)
         }
      case <-r.ctx.Done():
         fmt.Println("stop RefreshUserInfo.")
         return
      }
   }
}