| | |
| | | package main |
| | | |
| | | import ( |
| | | "fmt" |
| | | "strconv" |
| | | |
| | | "basic.com/dbapi.git" |
| | | "basic.com/libgowrapper/sdkstruct.git" |
| | | "basic.com/pubsub/cache.git/shardmap" |
| | | "basic.com/pubsub/protomsg.git" |
| | | "basic.com/valib/gopherdiscovery.git" |
| | | "github.com/gogo/protobuf/proto" |
| | | "basic.com/valib/pubsub.git" |
| | | ) |
| | | |
| | | const ( |
| | | prefixTASKSDKRULE = "TASKSDKRULE_" |
| | | ) |
| | | |
| | | var cMap *shardmap.ShardMap |
| | | |
| | | // InitDBAPI init dbapi |
| | | func InitDBAPI(ip string, httpPort, heartBeatPort, dataPort int, log func(...interface{})) { |
| | | dbapi.Init(ip, httpPort) |
| | | var initchan = make(chan bool) |
| | | go InitCache(initchan, ip, heartBeatPort, dataPort) |
| | | log("db init done!", <-initchan) |
| | | } |
| | | |
| | | // TaskInfos get camera infos from sqlite db |
| | | func TaskInfos() []protomsg.TaskSdkInfo { |
| | | tAPI := dbapi.TaskApi{} |
| | | tasks := tAPI.FindAll() |
| | | |
| | | return tasks |
| | | } |
| | | |
| | | // SDKInfo get sdk |
| | | func SDKInfo() []sdkstruct.SDKInfo { |
| | | sAPI := dbapi.SdkApi{} |
| | | |
| | | s := sAPI.FindAllSdkRun() |
| | | var sdks []sdkstruct.SDKInfo |
| | | for _, v := range s { |
| | | sdks = append(sdks, sdkstruct.SDKInfo{ |
| | | IpcID: v.IpcId, |
| | | SdkType: v.SdkType, |
| | | }) |
| | | // Init init tcp://192.168.5.22:4005 |
| | | func Init(url, heartBeatURL string, mode int, topics []string, processID string) (chan pubsub.Message, error) { |
| | | p, err := pubsub.NewSubscriber(url, heartBeatURL, mode, topics, processID) |
| | | if err != nil { |
| | | return nil, err |
| | | } |
| | | return sdks |
| | | } |
| | | |
| | | // InitCache cache |
| | | func InitCache(initChan chan bool, dbIP string, surveyPort int, pubSubPort int) { |
| | | cMap = shardmap.New(uint8(32)) |
| | | urlSurvey := "tcp://" + dbIP + ":" + strconv.Itoa(surveyPort) |
| | | urlPubSub := "tcp://" + dbIP + ":" + strconv.Itoa(pubSubPort) |
| | | client, _ := gopherdiscovery.ClientWithSub(urlSurvey, urlPubSub, "analysisProc") |
| | | recvMsg := client.HeartBeatMsg() |
| | | fmt.Println(<-recvMsg) |
| | | |
| | | initCacheData(initChan) |
| | | |
| | | peers, _ := client.Peers() |
| | | for b := range peers { |
| | | fmt.Println("peerMsg:", b) |
| | | updateData(b) |
| | | } |
| | | } |
| | | |
| | | func initCacheData(initChan chan bool) { |
| | | initTaskSdkRule() |
| | | initChan <- true |
| | | } |
| | | |
| | | func initTaskSdkRule() { |
| | | var api dbapi.TaskSdkRuleApi |
| | | |
| | | b, rules := api.FindAllTaskSdkRules() |
| | | if b { |
| | | if rules != nil { |
| | | for _, tRule := range rules { |
| | | cMap.Set(prefixTASKSDKRULE+tRule.TaskId, tRule.SdkRules) |
| | | } |
| | | } |
| | | } |
| | | } |
| | | |
| | | func updateData(b []byte) { |
| | | newUpdateMsg := &protomsg.DbChangeMessage{} |
| | | if err := proto.Unmarshal(b, newUpdateMsg); err != nil { |
| | | fmt.Println("dbChangeMsg unmarshal err:", err) |
| | | return |
| | | } |
| | | switch newUpdateMsg.Table { |
| | | case protomsg.TableChanged_T_TaskSdkRule: |
| | | initTaskSdkRule() |
| | | default: |
| | | |
| | | } |
| | | } |
| | | |
| | | // GetTaskSdkRules rules |
| | | func GetTaskSdkRules(taskID string) []*protomsg.SdkRuleSet { |
| | | r, b := cMap.Get(prefixTASKSDKRULE + taskID) |
| | | if b { |
| | | return r.([]*protomsg.SdkRuleSet) |
| | | } |
| | | return nil |
| | | c := p.Recv() |
| | | return c, err |
| | | } |