package main
|
|
import (
|
"fmt"
|
"strconv"
|
|
"basic.com/dbapi.git"
|
"basic.com/libgowrapper/sdkstruct.git"
|
"basic.com/pubsub/cache.git/shardmap"
|
"basic.com/pubsub/protomsg.git"
|
"basic.com/valib/gopherdiscovery.git"
|
"github.com/gogo/protobuf/proto"
|
)
|
|
const (
|
prefixTASKSDKRULE = "TASKSDKRULE_"
|
)
|
|
var cMap *shardmap.ShardMap
|
|
// InitDBAPI init dbapi
|
func InitDBAPI(ip string, httpPort, heartBeatPort, dataPort int, log func(...interface{})) {
|
dbapi.Init(ip, httpPort)
|
var initchan = make(chan bool)
|
go InitCache(initchan, ip, heartBeatPort, dataPort)
|
log("db init done!", <-initchan)
|
}
|
|
// TaskInfos get camera infos from sqlite db
|
func TaskInfos() []protomsg.TaskSdkInfo {
|
tAPI := dbapi.TaskApi{}
|
tasks := tAPI.FindAll()
|
|
return tasks
|
}
|
|
// SDKInfo get sdk
|
func SDKInfo() []sdkstruct.SDKInfo {
|
sAPI := dbapi.SdkApi{}
|
|
s := sAPI.FindAllSdkRun()
|
var sdks []sdkstruct.SDKInfo
|
for _, v := range s {
|
sdks = append(sdks, sdkstruct.SDKInfo{
|
IpcID: v.IpcId,
|
SdkType: v.SdkType,
|
})
|
}
|
return sdks
|
}
|
|
// InitCache cache
|
func InitCache(initChan chan bool, dbIP string, surveyPort int, pubSubPort int) {
|
cMap = shardmap.New(uint8(32))
|
urlSurvey := "tcp://" + dbIP + ":" + strconv.Itoa(surveyPort)
|
urlPubSub := "tcp://" + dbIP + ":" + strconv.Itoa(pubSubPort)
|
client, _ := gopherdiscovery.ClientWithSub(urlSurvey, urlPubSub, "analysisProc")
|
recvMsg := client.HeartBeatMsg()
|
fmt.Println(<-recvMsg)
|
|
initCacheData(initChan)
|
|
peers, _ := client.Peers()
|
for b := range peers {
|
fmt.Println("peerMsg:", b)
|
updateData(b)
|
}
|
}
|
|
func initCacheData(initChan chan bool) {
|
initTaskSdkRule()
|
initChan <- true
|
}
|
|
func initTaskSdkRule() {
|
var api dbapi.TaskSdkRuleApi
|
|
b, rules := api.FindAllTaskSdkRules()
|
if b {
|
if rules != nil {
|
for _, tRule := range rules {
|
cMap.Set(prefixTASKSDKRULE+tRule.TaskId, tRule.SdkRules)
|
}
|
}
|
}
|
}
|
|
func updateData(b []byte) {
|
newUpdateMsg := &protomsg.DbChangeMessage{}
|
if err := proto.Unmarshal(b, newUpdateMsg); err != nil {
|
fmt.Println("dbChangeMsg unmarshal err:", err)
|
return
|
}
|
switch newUpdateMsg.Table {
|
case protomsg.TableChanged_T_TaskSdkRule:
|
initTaskSdkRule()
|
default:
|
|
}
|
}
|
|
// GetTaskSdkRules rules
|
func GetTaskSdkRules(taskID string) []*protomsg.SdkRuleSet {
|
r, b := cMap.Get(prefixTASKSDKRULE + taskID)
|
if b {
|
return r.([]*protomsg.SdkRuleSet)
|
}
|
return nil
|
}
|