package middleware
|
|
import (
|
"aps_crm/pkg/logx"
|
"context"
|
"fmt"
|
"sync"
|
"sync/atomic"
|
"time"
|
)
|
|
// RefreshUserManager 定时拉取活跃用户的信息(包含下属id)
|
type RefreshUserManager struct {
|
RunFlag int32
|
StartRemoveUserFlag int32
|
StartRefreshUserFlag int32
|
Users map[string]*CurrentActiveUser
|
mu sync.RWMutex
|
ctx context.Context
|
expireSecond int64
|
intervalRemoveUserMinute int
|
intervalRefreshUserMinute int
|
cancel func()
|
}
|
|
type CurrentActiveUser struct {
|
UserID string
|
lastActiveTime int64
|
}
|
|
var defaultRefreshUserManager *RefreshUserManager
|
|
func InitRefreshUserManager(intervalRefreshUserMinute, intervalRemoveMinute int, expireSecond int64) {
|
if intervalRefreshUserMinute == 0 {
|
intervalRefreshUserMinute = 5
|
}
|
if intervalRemoveMinute > 0 && expireSecond == 0 {
|
expireSecond = int64(intervalRemoveMinute * 60 * 12)
|
}
|
ctx, cancel := context.WithCancel(context.Background())
|
defaultRefreshUserManager = &RefreshUserManager{
|
expireSecond: expireSecond,
|
intervalRemoveUserMinute: intervalRemoveMinute,
|
intervalRefreshUserMinute: intervalRefreshUserMinute,
|
ctx: ctx,
|
cancel: cancel,
|
Users: map[string]*CurrentActiveUser{},
|
}
|
}
|
|
func RunRefreshUser() {
|
if !atomic.CompareAndSwapInt32(&defaultRefreshUserManager.RunFlag, 0, 1) {
|
return
|
}
|
go defaultRefreshUserManager.refreshUserInfo()
|
go defaultRefreshUserManager.removeInActiveUser()
|
}
|
func StopRefreshUser() {
|
defaultRefreshUserManager.cancel()
|
}
|
|
func SetActiveTime(userID string) {
|
defaultRefreshUserManager.setActiveTime(userID)
|
}
|
|
// RefreshActiveTime 更新用户活跃时间
|
func (r *RefreshUserManager) setActiveTime(userID string) {
|
r.mu.Lock()
|
defer r.mu.Unlock()
|
if r.Users[userID] == nil {
|
r.Users[userID] = &CurrentActiveUser{
|
UserID: userID,
|
lastActiveTime: time.Now().Unix(),
|
}
|
logx.Infof("add active user :%+v", r.Users[userID])
|
} else {
|
r.Users[userID].lastActiveTime = time.Now().Unix()
|
logx.Infof("refresh active time, user:%+v", r.Users[userID])
|
}
|
}
|
|
func (r *RefreshUserManager) removeInActiveUser() {
|
if !atomic.CompareAndSwapInt32(&r.StartRemoveUserFlag, 0, 1) {
|
return
|
}
|
if r.intervalRemoveUserMinute == 0 { //不清理
|
return
|
}
|
ticker := time.NewTicker(time.Minute * time.Duration(r.intervalRemoveUserMinute))
|
for {
|
select {
|
case <-ticker.C:
|
nowTs := time.Now().Unix()
|
var users []*CurrentActiveUser
|
r.mu.RLock()
|
for _, user := range r.Users {
|
users = append(users, user)
|
}
|
r.mu.RUnlock()
|
for _, user := range users {
|
if nowTs-user.lastActiveTime > r.expireSecond {
|
r.mu.Lock()
|
logx.Infof("removed in active user:%+v", user)
|
delete(r.Users, user.UserID)
|
r.mu.Unlock()
|
}
|
}
|
case <-r.ctx.Done():
|
fmt.Println("stop RemoveInActiveUser.")
|
return
|
}
|
}
|
}
|
|
func (r *RefreshUserManager) refreshUserInfo() {
|
if !atomic.CompareAndSwapInt32(&r.StartRefreshUserFlag, 0, 1) {
|
return
|
}
|
ticker := time.NewTicker(time.Minute * time.Duration(r.intervalRefreshUserMinute))
|
for {
|
select {
|
case <-ticker.C:
|
r.mu.RLock()
|
userIds := make([]string, len(r.Users))
|
for userID := range r.Users {
|
userIds = append(userIds, userID)
|
}
|
r.mu.RUnlock()
|
if len(userIds) != 0 {
|
logx.Infof("RefreshUserInfo, user ids: ", userIds)
|
SyncUserInfo(userIds)
|
}
|
case <-r.ctx.Done():
|
fmt.Println("stop RefreshUserInfo.")
|
return
|
}
|
}
|
}
|