replace sdkhelper with local common
| | |
| | | "analysis/logo" |
| | | "plugin" |
| | | |
| | | "basic.com/libgowrapper/sdkstruct.git" |
| | | "basic.com/valib/pubsub.git" |
| | | ) |
| | | |
| | | // Fetcher db |
| | | type Fetcher struct { |
| | | fnInitDBAPI func(string, int, int, int, func(...interface{})) |
| | | fnSDKInfo func() []sdkstruct.SDKInfo |
| | | fnInit func(string, string, int, []string, string) (chan pubsub.Message, error) |
| | | } |
| | | |
| | | // NewFetcher new |
| | |
| | | return nil |
| | | } |
| | | |
| | | fn, err := app.LoadFunc(plug, soFile, "InitDBAPI") |
| | | fn, err := app.LoadFunc(plug, soFile, "Init") |
| | | if err != nil { |
| | | logo.Infoln("Lookup Func InitDBAPI From File: ", soFile, " Error") |
| | | logo.Infoln("Lookup Func Init From File: ", soFile, " Error") |
| | | return nil |
| | | } |
| | | fnInit := fn.(func(string, int, int, int, func(...interface{}))) |
| | | |
| | | fn, err = app.LoadFunc(plug, soFile, "SDKInfo") |
| | | if err != nil { |
| | | logo.Infoln("Lookup Func SDKInfo From File: ", soFile, " Error") |
| | | return nil |
| | | } |
| | | |
| | | fnSDKInfo := fn.(func() []sdkstruct.SDKInfo) |
| | | fnInit := fn.(func(string, string, int, []string, string) (chan pubsub.Message, error)) |
| | | |
| | | return &Fetcher{ |
| | | fnInitDBAPI: fnInit, |
| | | fnSDKInfo: fnSDKInfo, |
| | | fnInit: fnInit, |
| | | } |
| | | } |
| | |
| | | "analysis/logo" |
| | | "analysis/util" |
| | | "context" |
| | | "encoding/json" |
| | | "os" |
| | | "strconv" |
| | | "strings" |
| | | "time" |
| | | |
| | | "basic.com/libgowrapper/sdkstruct.git" |
| | | "basic.com/valib/pubsub.git" |
| | | ) |
| | | |
| | | func reaper(ctxt context.Context) { |
| | |
| | | |
| | | logo.Infoln("~~~~~~Created Fetcher, Now Sync From DB") |
| | | |
| | | fetcher.fnInitDBAPI(util.FSI.IP, util.FSI.HTTPort, util.FSI.HBPort, util.FSI.DataPort, logo.Infoln) |
| | | // fetcher.fnInitDBAPI("192.168.20.10", util.FSI.HTTPort, util.FSI.HBPort, util.FSI.DataPort, logo.Infoln) |
| | | sdks := fetcher.fnSDKInfo() |
| | | ip := "tcp://" + util.FSI.IP |
| | | url := ip + ":" + strconv.Itoa(util.FSI.DataPort) |
| | | hearturl := ip + ":" + strconv.Itoa(util.FSI.HBPort) |
| | | |
| | | chMsg, err := fetcher.fnInit(url, hearturl, 0, []string{pubsub.Topic_Sdk}, "analysis-master"+strconv.Itoa(os.Getpid())) |
| | | for { |
| | | if len(sdks) == 0 { |
| | | logo.Errorln("!!!!!!Fetcher Can't Get SDK Infos From Remote DB") |
| | | continue |
| | | if err == nil { |
| | | break |
| | | } |
| | | break |
| | | logo.Infoln("Analysis Fetcher INIT Error! URL:", url) |
| | | time.Sleep(time.Second) |
| | | chMsg, err = fetcher.fnInit(url, hearturl, 0, []string{pubsub.Topic_Sdk}, "analysis-master"+strconv.Itoa(os.Getpid())) |
| | | } |
| | | return manualStart(ctx, sdks, configPath) |
| | | |
| | | for { |
| | | select { |
| | | case <-ctx.Done(): |
| | | return true |
| | | case msg := <-chMsg: |
| | | // sdktype process_name topic null |
| | | // yolo/face yolo_0/yolo_1 channel |
| | | var sdk map[string](map[string](map[string]interface{})) |
| | | |
| | | if err := json.Unmarshal(msg.Msg, &sdk); err != nil { |
| | | logo.Infoln("Fetcher SDK unmarshal err:", err) |
| | | continue |
| | | } |
| | | |
| | | logo.Infoln("~~~~~~Recv New SDKInfos") |
| | | chCameras <- CameraInfo{ |
| | | Cameras: cameras, |
| | | } |
| | | logo.Infoln("~~~~~~Recv New SDKInfos Over") |
| | | |
| | | default: |
| | | time.Sleep(10 * time.Millisecond) |
| | | } |
| | | } |
| | | |
| | | } |
| | | |
| | | func manualStart(ctx context.Context, sdks []sdkstruct.SDKInfo, configPath string) bool { |
| | |
| | | require ( |
| | | basic.com/libgowrapper/sdkstruct.git v0.0.0-20191220011601-e0b3d1f0183c |
| | | basic.com/valib/gogpu.git v0.0.0-20190711044327-62043b070865 |
| | | basic.com/valib/gopherdiscovery.git v0.0.0-20200113080951-9bccb7681924 // indirect |
| | | basic.com/valib/pubsub.git v0.0.0-20200116061307-c43a8e3e552e |
| | | github.com/amoghe/distillog v0.0.0-20180726233512-ae382b35b717 |
| | | github.com/natefinch/lumberjack v2.0.0+incompatible |
| | | github.com/olebedev/config v0.0.0-20190528211619-364964f3a8e4 |
| | | golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa // indirect |
| | | gopkg.in/yaml.v2 v2.2.7 // indirect |
| | | nanomsg.org/go-mangos v1.4.0 // indirect |
| | | ) |
| | |
| | | package main |
| | | |
| | | import ( |
| | | "fmt" |
| | | "strconv" |
| | | |
| | | "basic.com/dbapi.git" |
| | | "basic.com/libgowrapper/sdkstruct.git" |
| | | "basic.com/pubsub/cache.git/shardmap" |
| | | "basic.com/pubsub/protomsg.git" |
| | | "basic.com/valib/gopherdiscovery.git" |
| | | "github.com/gogo/protobuf/proto" |
| | | "basic.com/valib/pubsub.git" |
| | | ) |
| | | |
| | | const ( |
| | | prefixTASKSDKRULE = "TASKSDKRULE_" |
| | | ) |
| | | |
| | | var cMap *shardmap.ShardMap |
| | | |
| | | // InitDBAPI init dbapi |
| | | func InitDBAPI(ip string, httpPort, heartBeatPort, dataPort int, log func(...interface{})) { |
| | | dbapi.Init(ip, httpPort) |
| | | var initchan = make(chan bool) |
| | | go InitCache(initchan, ip, heartBeatPort, dataPort) |
| | | log("db init done!", <-initchan) |
| | | } |
| | | |
| | | // TaskInfos get camera infos from sqlite db |
| | | func TaskInfos() []protomsg.TaskSdkInfo { |
| | | tAPI := dbapi.TaskApi{} |
| | | tasks := tAPI.FindAll() |
| | | |
| | | return tasks |
| | | } |
| | | |
| | | // SDKInfo get sdk |
| | | func SDKInfo() []sdkstruct.SDKInfo { |
| | | sAPI := dbapi.SdkApi{} |
| | | |
| | | s := sAPI.FindAllSdkRun() |
| | | var sdks []sdkstruct.SDKInfo |
| | | for _, v := range s { |
| | | sdks = append(sdks, sdkstruct.SDKInfo{ |
| | | IpcID: v.IpcId, |
| | | SdkType: v.SdkType, |
| | | }) |
| | | // Init init tcp://192.168.5.22:4005 |
| | | func Init(url, heartBeatURL string, mode int, topics []string, processID string) (chan pubsub.Message, error) { |
| | | p, err := pubsub.NewSubscriber(url, heartBeatURL, mode, topics, processID) |
| | | if err != nil { |
| | | return nil, err |
| | | } |
| | | return sdks |
| | | } |
| | | |
| | | // InitCache cache |
| | | func InitCache(initChan chan bool, dbIP string, surveyPort int, pubSubPort int) { |
| | | cMap = shardmap.New(uint8(32)) |
| | | urlSurvey := "tcp://" + dbIP + ":" + strconv.Itoa(surveyPort) |
| | | urlPubSub := "tcp://" + dbIP + ":" + strconv.Itoa(pubSubPort) |
| | | client, _ := gopherdiscovery.ClientWithSub(urlSurvey, urlPubSub, "analysisProc") |
| | | recvMsg := client.HeartBeatMsg() |
| | | fmt.Println(<-recvMsg) |
| | | |
| | | initCacheData(initChan) |
| | | |
| | | peers, _ := client.Peers() |
| | | for b := range peers { |
| | | fmt.Println("peerMsg:", b) |
| | | updateData(b) |
| | | } |
| | | } |
| | | |
| | | func initCacheData(initChan chan bool) { |
| | | initTaskSdkRule() |
| | | initChan <- true |
| | | } |
| | | |
| | | func initTaskSdkRule() { |
| | | var api dbapi.TaskSdkRuleApi |
| | | |
| | | b, rules := api.FindAllTaskSdkRules() |
| | | if b { |
| | | if rules != nil { |
| | | for _, tRule := range rules { |
| | | cMap.Set(prefixTASKSDKRULE+tRule.TaskId, tRule.SdkRules) |
| | | } |
| | | } |
| | | } |
| | | } |
| | | |
| | | func updateData(b []byte) { |
| | | newUpdateMsg := &protomsg.DbChangeMessage{} |
| | | if err := proto.Unmarshal(b, newUpdateMsg); err != nil { |
| | | fmt.Println("dbChangeMsg unmarshal err:", err) |
| | | return |
| | | } |
| | | switch newUpdateMsg.Table { |
| | | case protomsg.TableChanged_T_TaskSdkRule: |
| | | initTaskSdkRule() |
| | | default: |
| | | |
| | | } |
| | | } |
| | | |
| | | // GetTaskSdkRules rules |
| | | func GetTaskSdkRules(taskID string) []*protomsg.SdkRuleSet { |
| | | r, b := cMap.Get(prefixTASKSDKRULE + taskID) |
| | | if b { |
| | | return r.([]*protomsg.SdkRuleSet) |
| | | } |
| | | return nil |
| | | c := p.Recv() |
| | | return c, err |
| | | } |
| | |
| | | go 1.12 |
| | | |
| | | require ( |
| | | basic.com/dbapi.git v0.0.0-20191216030028-03153c1f1f30 |
| | | basic.com/libgowrapper/sdkstruct.git v0.0.0-20191220011601-e0b3d1f0183c |
| | | basic.com/pubsub/cache.git v0.0.0-20190718093725-6a413e1d7d48 |
| | | basic.com/pubsub/protomsg.git v0.0.0-20191219033725-b95da65535d0 |
| | | basic.com/valib/gopherdiscovery.git v0.0.0-20190605034340-15d89d8b4e28 |
| | | github.com/ajg/form v1.5.1 // indirect |
| | | github.com/gogo/protobuf v1.3.1 |
| | | golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3 // indirect |
| | | golang.org/x/net v0.0.0-20191209160850-c0dbc17a3553 // indirect |
| | | golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135 // indirect |
| | | google.golang.org/grpc v1.26.0 // indirect |
| | | honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc // indirect |
| | | basic.com/valib/gopherdiscovery.git v0.0.0-20200113080951-9bccb7681924 // indirect |
| | | basic.com/valib/pubsub.git v0.0.0-20200116061307-c43a8e3e552e |
| | | golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa // indirect |
| | | nanomsg.org/go-mangos v1.4.0 // indirect |
| | | ) |
| | |
| | | Subproject commit 45b445473158f7e38106a973ec6d57157279eee1 |
| | | Subproject commit 611a497108ec9d66646e98daaec5cf94da8ad02d |
| | |
| | | Subproject commit f4171524668399dfbd487f244d0efa8efaea7d9f |
| | | Subproject commit d107634e8ddcca05b773794df7ed4e9412a11d55 |
| | |
| | | Subproject commit 54521e1c57ec88f3dc1e31d2fcc435ed76afa848 |
| | | Subproject commit 4324306f529b9bc62d7e818c0b12ff822687bb47 |
| | |
| | | Subproject commit 589ea342e4aa810cb58c5b1af42f537e2732d4ff |
| | | Subproject commit 1a51c8cdd99ad79b113dfb98f0f3fb44235037f7 |
| | |
| | | Subproject commit 414a9974b34c5c1cdb9219a2da7daff01f335d25 |
| | | Subproject commit 51ba06b0aa6278da92703f38c6c26003e4bdae88 |
| | |
| | | |
| | | // 指定获取配置信息从sqlite,有最高优先级, master使用 |
| | | flag.StringVar(&util.FSI.IP, util.FetchSrvIP, util.FSI.IP, "从IP获取需要运行的SDK,master使用") |
| | | flag.IntVar(&util.FSI.HTTPort, util.FetchSrvPort, util.FSI.HTTPort, "获取需要运行的SDK服务器的HTTP Port,master使用") |
| | | flag.IntVar(&util.FSI.HBPort, util.FetchSrvHeartbeatPort, util.FSI.HBPort, "获取需要运行的SDK服务器的心跳 Port,master使用") |
| | | flag.IntVar(&util.FSI.DataPort, util.FetchSrvDataPort, util.FSI.DataPort, "获取需要运行的SDK服务器的数据 Port,master使用") |
| | | |
| | |
| | | } |
| | | |
| | | func setParamters() { |
| | | util.FillParams(util.FetchSrvIP, util.FSI.IP) |
| | | util.FillParams(util.FetchSrvPort, strconv.Itoa(util.FSI.HTTPort)) |
| | | util.FillParams(util.FetchSrvHeartbeatPort, strconv.Itoa(util.FSI.HBPort)) |
| | | util.FillParams(util.FetchSrvDataPort, strconv.Itoa(util.FSI.DataPort)) |
| | | // util.FillParams(util.FetchSrvIP, util.FSI.IP) |
| | | // util.FillParams(util.FetchSrvHeartbeatPort, strconv.Itoa(util.FSI.HBPort)) |
| | | // util.FillParams(util.FetchSrvDataPort, strconv.Itoa(util.FSI.DataPort)) |
| | | |
| | | util.FillParams(util.RuleIPC, util.ToRuleIPC) |
| | | |
| | |
| | | // FetchServerInfo 从sqlite的服务器获取SDK信息的服务器信息 |
| | | type FetchServerInfo struct { |
| | | IP string |
| | | HTTPort int |
| | | HBPort int //心跳端口 |
| | | DataPort int //数据端口 |
| | | } |
| | |
| | | // FSI SDK信息服务器IP/Port |
| | | FSI = &FetchServerInfo{ |
| | | IP: "127.0.0.1", |
| | | HTTPort: 8001, |
| | | HBPort: 40007, |
| | | DataPort: 50007, |
| | | HBPort: 5005, |
| | | DataPort: 4005, |
| | | } |
| | | |
| | | // ToRuleIPC to ruleprocess |
| | |
| | | ConfigPath = "config-path" |
| | | |
| | | FetchSrvIP = "fetch-server-ip" |
| | | FetchSrvPort = "fetch-server-port" |
| | | FetchSrvHeartbeatPort = "fetch-server-heartbeat-port" |
| | | FetchSrvDataPort = "fetch-server-data-port" |
| | | |