package cache import ( "fmt" "strconv" "basic.com/dbapi.git" "basic.com/pubsub/cache.git/shardmap" "basic.com/pubsub/protomsg.git" "basic.com/valib/gopherdiscovery.git" "github.com/gogo/protobuf/proto" ) const ( PREFIX_TASKSDKRULE = "TASKSDKRULE_" ) var cMap *shardmap.ShardMap func Init(initChan chan bool, dbIp string, surveyPort int, pubSubPort int) { cMap = shardmap.New(uint8(32)) urlSurvey := "tcp://" + dbIp + ":" + strconv.Itoa(surveyPort) urlPubSub := "tcp://" + dbIp + ":" + strconv.Itoa(pubSubPort) client, _ := gopherdiscovery.ClientWithSub(urlSurvey, urlPubSub, "analysisProc") recvMsg := client.HeartBeatMsg() fmt.Println(<-recvMsg) initCacheData(initChan) peers, _ := client.Peers() for b := range peers { fmt.Println("peerMsg:", b) updateData(b) } } func initCacheData(initChan chan bool) { initTaskSdkRule() initChan <- true } func initTaskSdkRule() { var api dbapi.TaskSdkRuleApi b, rules := api.FindAllTaskSdkRules() if b { if rules != nil { for _, tRule := range rules { cMap.Set(PREFIX_TASKSDKRULE+tRule.TaskId, tRule.SdkRules) } } } } func updateData(b []byte) { newUpdateMsg := &protomsg.DbChangeMessage{} if err := proto.Unmarshal(b, newUpdateMsg); err != nil { fmt.Println("dbChangeMsg unmarshal err:", err) return } switch newUpdateMsg.Table { case protomsg.TableChanged_T_TaskSdkRule: initTaskSdkRule() default: } } func GetTaskSdkRules(taskId string) []*protomsg.SdkRuleSet { r, b := cMap.Get(PREFIX_TASKSDKRULE + taskId) if b { return r.([]*protomsg.SdkRuleSet) } else { return nil } }