package service import ( "basic.com/valib/go-aiot.git/aiotProto/aiot" "basic.com/valib/logger.git" "encoding/json" "errors" "io" "io/ioutil" "os" "sync" "time" "vamicro/config" "vamicro/saas-service/service/nodeService" ) var Cache cache const cacheFile = "../config/cache.dump" type cache struct { lock *sync.RWMutex records map[string]record ch chan bool } type record struct { Data string `json:"data"` Expire int64 `json:"expire"` } type LockRes struct { Code int `json:"code"` Data string `json:"data"` Msg string `json:"msg"` Success bool `json:"success"` } func InitCache() { if Cache.records != nil { return } Cache = cache{ lock: &sync.RWMutex{}, records: make(map[string]record), ch: make(chan bool, 1), } // 恢复原有数据 Cache.file2Cache() // 定期清理过期缓存 ticker := time.NewTicker(200 * time.Millisecond) // 定期固定缓存 stableTicker := time.NewTicker(500 * time.Millisecond) go func() { for { select { case <-ticker.C: for k := range Cache.records { Cache.lock.RLock() exp := Cache.records[k].Expire Cache.lock.RUnlock() if exp > 0 { timeNow := time.Now().Unix() if exp < timeNow { Cache.lock.Lock() delete(Cache.records, k) Cache.lock.Unlock() } } } case <-stableTicker.C: Cache.cache2File() } } }() } func (c *cache) file2Cache() { Cache.lock.Lock() defer Cache.lock.Unlock() byt, err := ioutil.ReadFile(cacheFile) if err != nil { logger.Error("can not read file:", cacheFile, err) return } err = json.Unmarshal(byt, &c.records) if err != nil { logger.Error("can not read file:", cacheFile, string(byt)) } } func (c *cache) cache2File() { Cache.lock.RLock() records := Cache.records Cache.lock.RUnlock() byt, _ := json.Marshal(records) content := string(byt) f, err := os.OpenFile(cacheFile, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644) if err != nil { logger.Error("can not open file:", cacheFile, content) return } defer f.Close() n, err := f.Write(byt) if err == nil && n < len(byt) { logger.Error("content write error:", io.ErrShortWrite, content) return } } func (c *cache) SetLock(nodeId string, key string, timeOut ...time.Duration) bool { cacheKey := c.GetKeyName(nodeId, key) if !CliSrv.IsMaster { masterId := "" for _, n := range nodeService.Node.NodeLists { if n.DriftState == "master" { masterId = n.NodeId break } } if masterId == "" { logger.Error("主节点获取失败,无法完成上锁动作") return true } params := make(map[string]interface{}) params["key"] = key params["nodeId"] = nodeId data, err := DoBusReq("/data/api-v/saas/setLock", masterId, aiot.RequestMethod_Post, aiot.RequestContentType_ApplicationJson, params) if err != nil { logger.Error("fail to DoBusReq of /data/api-v/saas/setLock", masterId, params, err) return false } busRes := &LockRes{} err = json.Unmarshal(data, busRes) if err != nil { logger.Error("fail to Unmarshal DoBusReq of /data/api-v/saas/setLock", masterId, params, err) return false } return busRes.Success } Cache.lock.RLock() _, ok := Cache.records[cacheKey] Cache.lock.RUnlock() if ok { ch := make(chan bool, 1) go func(c chan bool) { ticker := time.NewTicker(100 * time.Millisecond) i := 0 for { select { case <-ticker.C: i++ Cache.lock.RLock() _, ok := Cache.records[cacheKey] Cache.lock.RUnlock() if !ok { ch <- true return } if i > 3*1000/100 { ch <- false return } } } }(ch) noExists := <-ch if !noExists { return false } } var expire int64 if len(timeOut) > 0 { expire = time.Now().Add(timeOut[0]).Unix() } else { expire = 0 } Cache.lock.Lock() defer Cache.lock.Unlock() Cache.records[cacheKey] = record{ Data: nodeId, Expire: expire, } return true } func (c *cache) GetLock(nodeId string, key string) (bool, string) { cacheKey := c.GetKeyName(nodeId, key) if !CliSrv.IsMaster { masterId := "" for _, n := range nodeService.Node.NodeLists { if n.DriftState == "master" { masterId = n.NodeId break } } if masterId == "" { logger.Error("主节点获取失败,无法完成上锁动作") return false, "" } params := make(map[string]interface{}) params["key"] = key params["nodeId"] = nodeId data, err := DoBusReq("/data/api-v/saas/getLock", masterId, aiot.RequestMethod_Post, aiot.RequestContentType_ApplicationJson, params) if err != nil { logger.Error("fail to DoBusReq of /data/api-v/saas/getLock", masterId, params, err) return false, "" } busRes := &LockRes{} err = json.Unmarshal(data, busRes) if err != nil { logger.Error("fail to Unmarshal DoBusReq of /data/api-v/saas/getLock", masterId, params, err) return false, "" } return busRes.Success, busRes.Data } Cache.lock.RLock() rec, ok := Cache.records[cacheKey] Cache.lock.RUnlock() if ok { return true, rec.Data } return false, "" } func (c *cache) GetAllLock() map[string]record { return c.records } func (c *cache) DelLock(nodeId string, key string) bool { cacheKey := c.GetKeyName(nodeId, key) if !CliSrv.IsMaster { masterId := "" for _, n := range nodeService.Node.NodeLists { if n.DriftState == "master" { masterId = n.NodeId break } } if masterId == "" { logger.Error("主节点获取失败,无法完成上锁动作") return true } params := make(map[string]interface{}) params["key"] = key params["nodeId"] = nodeId data, err := DoBusReq("/data/api-v/saas/delLock", masterId, aiot.RequestMethod_Post, aiot.RequestContentType_ApplicationJson, params) if err != nil { logger.Error("fail to DoBusReq of /data/api-v/saas/delLock", masterId, params, err) return false } busRes := &LockRes{} err = json.Unmarshal(data, busRes) if err != nil { logger.Error("fail to Unmarshal DoBusReq of /data/api-v/saas/delLock", masterId, params, err) return false } return busRes.Success } Cache.lock.Lock() defer Cache.lock.Unlock() if _, ok := Cache.records[cacheKey]; ok { delete(Cache.records, cacheKey) return true } return false } func (c *cache) GetKeyName(nodeId string, key string) string { return nodeId + ":" + key } func SetGlobalLock(key string) (bool, error) { params := make(map[string]interface{}) params["key"] = key params["nodeId"] = config.Server.AnalyServerId data, err := DoBusReq("/data/api-v/saas/setLock", config.Server.AnalyServerId, aiot.RequestMethod_Post, aiot.RequestContentType_ApplicationJson, params) if err != nil { logger.Error("fail to DoBusReq of /data/api-v/saas/setLock", config.Server.AnalyServerId, params, err) return false, err } busRes := &LockRes{} err = json.Unmarshal(data, busRes) if err != nil { logger.Error("fail to Unmarshal DoBusReq of /data/api-v/saas/setLock", config.Server.AnalyServerId, params, err) return false, err } if !busRes.Success { return false, errors.New("锁设置失败") } return true, nil } func DelGlobalLock(key string) (bool, error) { params := make(map[string]interface{}) params["key"] = key params["nodeId"] = config.Server.AnalyServerId data, err := DoBusReq("/data/api-v/saas/delLock", config.Server.AnalyServerId, aiot.RequestMethod_Post, aiot.RequestContentType_ApplicationJson, params) if err != nil { logger.Error("fail to DoBusReq of /data/api-v/saas/delLock", config.Server.AnalyServerId, params, err) return false, err } busRes := &LockRes{} err = json.Unmarshal(data, busRes) if err != nil { logger.Error("fail to Unmarshal DoBusReq of /data/api-v/saas/delLock", config.Server.AnalyServerId, params, err) return false, err } if !busRes.Success { return false, errors.New("锁删除失败") } return true, nil }