zhangmeng
2020-01-20 accc3295d6daacd70494ecd3a08033f505c29f66
libcomm/db.go
@@ -1,109 +1,15 @@
package main
import (
   "fmt"
   "strconv"
   "basic.com/dbapi.git"
   "basic.com/libgowrapper/sdkstruct.git"
   "basic.com/pubsub/cache.git/shardmap"
   "basic.com/pubsub/protomsg.git"
   "basic.com/valib/gopherdiscovery.git"
   "github.com/gogo/protobuf/proto"
   "basic.com/valib/pubsub.git"
)
const (
   prefixTASKSDKRULE = "TASKSDKRULE_"
)
var cMap *shardmap.ShardMap
// InitDBAPI init dbapi
func InitDBAPI(ip string, httpPort, heartBeatPort, dataPort int, log func(...interface{})) {
   dbapi.Init(ip, httpPort)
   var initchan = make(chan bool)
   go InitCache(initchan, ip, heartBeatPort, dataPort)
   log("db init done!", <-initchan)
}
// TaskInfos get camera infos from sqlite db
func TaskInfos() []protomsg.TaskSdkInfo {
   tAPI := dbapi.TaskApi{}
   tasks := tAPI.FindAll()
   return tasks
}
// SDKInfo get sdk
func SDKInfo() []sdkstruct.SDKInfo {
   sAPI := dbapi.SdkApi{}
   s := sAPI.FindAllSdkRun()
   var sdks []sdkstruct.SDKInfo
   for _, v := range s {
      sdks = append(sdks, sdkstruct.SDKInfo{
         IpcID:   v.IpcId,
         SdkType: v.SdkType,
      })
// Init init tcp://192.168.5.22:4005
func Init(url, heartBeatURL string, mode int, topics []string, processID string) (chan pubsub.Message, error) {
   p, err := pubsub.NewSubscriber(url, heartBeatURL, mode, topics, processID)
   if err != nil {
      return nil, err
   }
   return sdks
}
// InitCache cache
func InitCache(initChan chan bool, dbIP string, surveyPort int, pubSubPort int) {
   cMap = shardmap.New(uint8(32))
   urlSurvey := "tcp://" + dbIP + ":" + strconv.Itoa(surveyPort)
   urlPubSub := "tcp://" + dbIP + ":" + strconv.Itoa(pubSubPort)
   client, _ := gopherdiscovery.ClientWithSub(urlSurvey, urlPubSub, "analysisProc")
   recvMsg := client.HeartBeatMsg()
   fmt.Println(<-recvMsg)
   initCacheData(initChan)
   peers, _ := client.Peers()
   for b := range peers {
      fmt.Println("peerMsg:", b)
      updateData(b)
   }
}
func initCacheData(initChan chan bool) {
   initTaskSdkRule()
   initChan <- true
}
func initTaskSdkRule() {
   var api dbapi.TaskSdkRuleApi
   b, rules := api.FindAllTaskSdkRules()
   if b {
      if rules != nil {
         for _, tRule := range rules {
            cMap.Set(prefixTASKSDKRULE+tRule.TaskId, tRule.SdkRules)
         }
      }
   }
}
func updateData(b []byte) {
   newUpdateMsg := &protomsg.DbChangeMessage{}
   if err := proto.Unmarshal(b, newUpdateMsg); err != nil {
      fmt.Println("dbChangeMsg unmarshal err:", err)
      return
   }
   switch newUpdateMsg.Table {
   case protomsg.TableChanged_T_TaskSdkRule:
      initTaskSdkRule()
   default:
   }
}
// GetTaskSdkRules rules
func GetTaskSdkRules(taskID string) []*protomsg.SdkRuleSet {
   r, b := cMap.Get(prefixTASKSDKRULE + taskID)
   if b {
      return r.([]*protomsg.SdkRuleSet)
   }
   return nil
   c := p.Recv()
   return c, err
}