package cache
|
|
import (
|
"fmt"
|
"strconv"
|
|
"basic.com/dbapi.git"
|
"basic.com/pubsub/cache.git/shardmap"
|
"basic.com/pubsub/protomsg.git"
|
"basic.com/valib/gopherdiscovery.git"
|
"github.com/gogo/protobuf/proto"
|
)
|
|
const (
|
PREFIX_TASKSDKRULE = "TASKSDKRULE_"
|
)
|
|
var cMap *shardmap.ShardMap
|
|
func Init(initChan chan bool, dbIp string, surveyPort int, pubSubPort int) {
|
cMap = shardmap.New(uint8(32))
|
urlSurvey := "tcp://" + dbIp + ":" + strconv.Itoa(surveyPort)
|
urlPubSub := "tcp://" + dbIp + ":" + strconv.Itoa(pubSubPort)
|
client, _ := gopherdiscovery.ClientWithSub(urlSurvey, urlPubSub, "analysisProc")
|
recvMsg := client.HeartBeatMsg()
|
fmt.Println(<-recvMsg)
|
|
initCacheData(initChan)
|
|
peers, _ := client.Peers()
|
for b := range peers {
|
fmt.Println("peerMsg:", b)
|
updateData(b)
|
}
|
}
|
|
func initCacheData(initChan chan bool) {
|
initTaskSdkRule()
|
initChan <- true
|
}
|
|
func initTaskSdkRule() {
|
var api dbapi.TaskSdkRuleApi
|
|
b, rules := api.FindAllTaskSdkRules()
|
if b {
|
if rules != nil {
|
for _, tRule := range rules {
|
cMap.Set(PREFIX_TASKSDKRULE+tRule.TaskId, tRule.SdkRules)
|
}
|
}
|
}
|
}
|
|
func updateData(b []byte) {
|
newUpdateMsg := &protomsg.DbChangeMessage{}
|
if err := proto.Unmarshal(b, newUpdateMsg); err != nil {
|
fmt.Println("dbChangeMsg unmarshal err:", err)
|
return
|
}
|
switch newUpdateMsg.Table {
|
case protomsg.TableChanged_T_TaskSdkRule:
|
initTaskSdkRule()
|
default:
|
|
}
|
}
|
|
func GetTaskSdkRules(taskId string) []*protomsg.SdkRuleSet {
|
r, b := cMap.Get(PREFIX_TASKSDKRULE + taskId)
|
if b {
|
return r.([]*protomsg.SdkRuleSet)
|
} else {
|
return nil
|
}
|
}
|