package service
|
|
import (
|
"basic.com/valib/go-aiot.git/aiotProto/aiot"
|
"basic.com/valib/logger.git"
|
"encoding/json"
|
"errors"
|
"io"
|
"io/ioutil"
|
"os"
|
"sync"
|
"time"
|
"vamicro/config"
|
"vamicro/saas-service/service/nodeService"
|
)
|
|
var Cache cache
|
|
const cacheFile = "../config/cache.dump"
|
|
type cache struct {
|
lock *sync.RWMutex
|
records map[string]record
|
ch chan bool
|
}
|
|
type record struct {
|
Data string `json:"data"`
|
Expire int64 `json:"expire"`
|
}
|
|
type LockRes struct {
|
Code int `json:"code"`
|
Data string `json:"data"`
|
Msg string `json:"msg"`
|
Success bool `json:"success"`
|
}
|
|
func InitCache() {
|
if Cache.records != nil {
|
return
|
}
|
Cache = cache{
|
lock: &sync.RWMutex{},
|
records: make(map[string]record),
|
ch: make(chan bool, 1),
|
}
|
// 恢复原有数据
|
Cache.file2Cache()
|
// 定期清理过期缓存
|
ticker := time.NewTicker(200 * time.Millisecond)
|
// 定期固定缓存
|
stableTicker := time.NewTicker(500 * time.Millisecond)
|
go func() {
|
for {
|
select {
|
case <-ticker.C:
|
for k := range Cache.records {
|
Cache.lock.RLock()
|
exp := Cache.records[k].Expire
|
Cache.lock.RUnlock()
|
if exp > 0 {
|
timeNow := time.Now().Unix()
|
if exp < timeNow {
|
Cache.lock.Lock()
|
delete(Cache.records, k)
|
Cache.lock.Unlock()
|
}
|
}
|
}
|
case <-stableTicker.C:
|
Cache.cache2File()
|
}
|
}
|
}()
|
}
|
func (c *cache) file2Cache() {
|
Cache.lock.Lock()
|
defer Cache.lock.Unlock()
|
byt, err := ioutil.ReadFile(cacheFile)
|
if err != nil {
|
logger.Error("can not read file:", cacheFile, err)
|
return
|
}
|
err = json.Unmarshal(byt, &c.records)
|
if err != nil {
|
logger.Error("can not read file:", cacheFile, string(byt))
|
}
|
}
|
|
func (c *cache) cache2File() {
|
Cache.lock.RLock()
|
records := Cache.records
|
Cache.lock.RUnlock()
|
byt, _ := json.Marshal(records)
|
content := string(byt)
|
f, err := os.OpenFile(cacheFile, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
|
if err != nil {
|
logger.Error("can not open file:", cacheFile, content)
|
return
|
}
|
defer f.Close()
|
n, err := f.Write(byt)
|
if err == nil && n < len(byt) {
|
logger.Error("content write error:", io.ErrShortWrite, content)
|
return
|
}
|
}
|
|
func (c *cache) SetLock(nodeId string, key string, timeOut ...time.Duration) bool {
|
cacheKey := c.GetKeyName(nodeId, key)
|
if !CliSrv.IsMaster {
|
masterId := ""
|
for _, n := range nodeService.Node.NodeLists {
|
if n.DriftState == "master" {
|
masterId = n.NodeId
|
break
|
}
|
}
|
if masterId == "" {
|
logger.Error("主节点获取失败,无法完成上锁动作")
|
return true
|
}
|
params := make(map[string]interface{})
|
params["key"] = key
|
params["nodeId"] = nodeId
|
data, err := DoBusReq("/data/api-v/saas/setLock", masterId, aiot.RequestMethod_Post, aiot.RequestContentType_ApplicationJson, params)
|
if err != nil {
|
logger.Error("fail to DoBusReq of /data/api-v/saas/setLock", masterId, params, err)
|
return false
|
}
|
busRes := &LockRes{}
|
err = json.Unmarshal(data, busRes)
|
if err != nil {
|
logger.Error("fail to Unmarshal DoBusReq of /data/api-v/saas/setLock", masterId, params, err)
|
return false
|
}
|
return busRes.Success
|
}
|
Cache.lock.RLock()
|
_, ok := Cache.records[cacheKey]
|
Cache.lock.RUnlock()
|
if ok {
|
ch := make(chan bool, 1)
|
go func(c chan bool) {
|
ticker := time.NewTicker(100 * time.Millisecond)
|
i := 0
|
for {
|
select {
|
case <-ticker.C:
|
i++
|
Cache.lock.RLock()
|
_, ok := Cache.records[cacheKey]
|
Cache.lock.RUnlock()
|
if !ok {
|
ch <- true
|
return
|
}
|
if i > 3*1000/100 {
|
ch <- false
|
return
|
}
|
}
|
}
|
}(ch)
|
noExists := <-ch
|
if !noExists {
|
return false
|
}
|
}
|
var expire int64
|
if len(timeOut) > 0 {
|
expire = time.Now().Add(timeOut[0]).Unix()
|
} else {
|
expire = 0
|
}
|
|
Cache.lock.Lock()
|
defer Cache.lock.Unlock()
|
Cache.records[cacheKey] = record{
|
Data: nodeId,
|
Expire: expire,
|
}
|
return true
|
}
|
|
func (c *cache) GetLock(nodeId string, key string) (bool, string) {
|
cacheKey := c.GetKeyName(nodeId, key)
|
if !CliSrv.IsMaster {
|
masterId := ""
|
for _, n := range nodeService.Node.NodeLists {
|
if n.DriftState == "master" {
|
masterId = n.NodeId
|
break
|
}
|
}
|
if masterId == "" {
|
logger.Error("主节点获取失败,无法完成上锁动作")
|
return false, ""
|
}
|
params := make(map[string]interface{})
|
params["key"] = key
|
params["nodeId"] = nodeId
|
data, err := DoBusReq("/data/api-v/saas/getLock", masterId, aiot.RequestMethod_Post, aiot.RequestContentType_ApplicationJson, params)
|
if err != nil {
|
logger.Error("fail to DoBusReq of /data/api-v/saas/getLock", masterId, params, err)
|
return false, ""
|
}
|
busRes := &LockRes{}
|
err = json.Unmarshal(data, busRes)
|
if err != nil {
|
logger.Error("fail to Unmarshal DoBusReq of /data/api-v/saas/getLock", masterId, params, err)
|
return false, ""
|
}
|
return busRes.Success, busRes.Data
|
}
|
Cache.lock.RLock()
|
rec, ok := Cache.records[cacheKey]
|
Cache.lock.RUnlock()
|
if ok {
|
return true, rec.Data
|
}
|
return false, ""
|
}
|
|
func (c *cache) GetAllLock() map[string]record {
|
return c.records
|
}
|
|
func (c *cache) DelLock(nodeId string, key string) bool {
|
cacheKey := c.GetKeyName(nodeId, key)
|
if !CliSrv.IsMaster {
|
masterId := ""
|
for _, n := range nodeService.Node.NodeLists {
|
if n.DriftState == "master" {
|
masterId = n.NodeId
|
break
|
}
|
}
|
if masterId == "" {
|
logger.Error("主节点获取失败,无法完成上锁动作")
|
return true
|
}
|
params := make(map[string]interface{})
|
params["key"] = key
|
params["nodeId"] = nodeId
|
data, err := DoBusReq("/data/api-v/saas/delLock", masterId, aiot.RequestMethod_Post, aiot.RequestContentType_ApplicationJson, params)
|
if err != nil {
|
logger.Error("fail to DoBusReq of /data/api-v/saas/delLock", masterId, params, err)
|
return false
|
}
|
busRes := &LockRes{}
|
err = json.Unmarshal(data, busRes)
|
if err != nil {
|
logger.Error("fail to Unmarshal DoBusReq of /data/api-v/saas/delLock", masterId, params, err)
|
return false
|
}
|
return busRes.Success
|
}
|
Cache.lock.Lock()
|
defer Cache.lock.Unlock()
|
if _, ok := Cache.records[cacheKey]; ok {
|
delete(Cache.records, cacheKey)
|
return true
|
}
|
return false
|
}
|
|
func (c *cache) GetKeyName(nodeId string, key string) string {
|
return nodeId + ":" + key
|
}
|
|
func SetGlobalLock(key string) (bool, error) {
|
params := make(map[string]interface{})
|
params["key"] = key
|
params["nodeId"] = config.Server.AnalyServerId
|
data, err := DoBusReq("/data/api-v/saas/setLock", config.Server.AnalyServerId, aiot.RequestMethod_Post, aiot.RequestContentType_ApplicationJson, params)
|
if err != nil {
|
logger.Error("fail to DoBusReq of /data/api-v/saas/setLock", config.Server.AnalyServerId, params, err)
|
return false, err
|
}
|
busRes := &LockRes{}
|
err = json.Unmarshal(data, busRes)
|
if err != nil {
|
logger.Error("fail to Unmarshal DoBusReq of /data/api-v/saas/setLock", config.Server.AnalyServerId, params, err)
|
return false, err
|
}
|
if !busRes.Success {
|
return false, errors.New("锁设置失败")
|
}
|
return true, nil
|
}
|
|
func DelGlobalLock(key string) (bool, error) {
|
params := make(map[string]interface{})
|
params["key"] = key
|
params["nodeId"] = config.Server.AnalyServerId
|
data, err := DoBusReq("/data/api-v/saas/delLock", config.Server.AnalyServerId, aiot.RequestMethod_Post, aiot.RequestContentType_ApplicationJson, params)
|
if err != nil {
|
logger.Error("fail to DoBusReq of /data/api-v/saas/delLock", config.Server.AnalyServerId, params, err)
|
return false, err
|
}
|
busRes := &LockRes{}
|
err = json.Unmarshal(data, busRes)
|
if err != nil {
|
logger.Error("fail to Unmarshal DoBusReq of /data/api-v/saas/delLock", config.Server.AnalyServerId, params, err)
|
return false, err
|
}
|
if !busRes.Success {
|
return false, errors.New("锁删除失败")
|
}
|
return true, nil
|
}
|