From accc3295d6daacd70494ecd3a08033f505c29f66 Mon Sep 17 00:00:00 2001 From: zhangmeng <775834166@qq.com> Date: 星期一, 20 一月 2020 11:15:27 +0800 Subject: [PATCH] replace sdkhelper with local common --- libgowrapper/reid | 2 util/data.go | 7 - app/master/master.go | 49 ++++++++++-- libgowrapper/humantrack | 2 libcomm/db.go | 110 ++------------------------- app/master/dbfetcher.go | 22 +---- libgowrapper/vehicle | 2 go.mod | 4 + libgowrapper/face | 2 libgowrapper/yolo | 2 main.go | 8 - libcomm/go.mod | 15 --- 12 files changed, 72 insertions(+), 153 deletions(-) diff --git a/app/master/dbfetcher.go b/app/master/dbfetcher.go index 56c5633..186fe9d 100644 --- a/app/master/dbfetcher.go +++ b/app/master/dbfetcher.go @@ -5,13 +5,12 @@ "analysis/logo" "plugin" - "basic.com/libgowrapper/sdkstruct.git" + "basic.com/valib/pubsub.git" ) // Fetcher db type Fetcher struct { - fnInitDBAPI func(string, int, int, int, func(...interface{})) - fnSDKInfo func() []sdkstruct.SDKInfo + fnInit func(string, string, int, []string, string) (chan pubsub.Message, error) } // NewFetcher new @@ -23,23 +22,14 @@ return nil } - fn, err := app.LoadFunc(plug, soFile, "InitDBAPI") + fn, err := app.LoadFunc(plug, soFile, "Init") if err != nil { - logo.Infoln("Lookup Func InitDBAPI From File: ", soFile, " Error") + logo.Infoln("Lookup Func Init From File: ", soFile, " Error") return nil } - fnInit := fn.(func(string, int, int, int, func(...interface{}))) - - fn, err = app.LoadFunc(plug, soFile, "SDKInfo") - if err != nil { - logo.Infoln("Lookup Func SDKInfo From File: ", soFile, " Error") - return nil - } - - fnSDKInfo := fn.(func() []sdkstruct.SDKInfo) + fnInit := fn.(func(string, string, int, []string, string) (chan pubsub.Message, error)) return &Fetcher{ - fnInitDBAPI: fnInit, - fnSDKInfo: fnSDKInfo, + fnInit: fnInit, } } diff --git a/app/master/master.go b/app/master/master.go index 1561d77..1199713 100644 --- a/app/master/master.go +++ b/app/master/master.go @@ -5,9 +5,14 @@ "analysis/logo" "analysis/util" "context" + "encoding/json" + "os" + "strconv" "strings" + "time" "basic.com/libgowrapper/sdkstruct.git" + "basic.com/valib/pubsub.git" ) func reaper(ctxt context.Context) { @@ -28,17 +33,45 @@ logo.Infoln("~~~~~~Created Fetcher, Now Sync From DB") - fetcher.fnInitDBAPI(util.FSI.IP, util.FSI.HTTPort, util.FSI.HBPort, util.FSI.DataPort, logo.Infoln) - // fetcher.fnInitDBAPI("192.168.20.10", util.FSI.HTTPort, util.FSI.HBPort, util.FSI.DataPort, logo.Infoln) - sdks := fetcher.fnSDKInfo() + ip := "tcp://" + util.FSI.IP + url := ip + ":" + strconv.Itoa(util.FSI.DataPort) + hearturl := ip + ":" + strconv.Itoa(util.FSI.HBPort) + + chMsg, err := fetcher.fnInit(url, hearturl, 0, []string{pubsub.Topic_Sdk}, "analysis-master"+strconv.Itoa(os.Getpid())) for { - if len(sdks) == 0 { - logo.Errorln("!!!!!!Fetcher Can't Get SDK Infos From Remote DB") - continue + if err == nil { + break } - break + logo.Infoln("Analysis Fetcher INIT Error! URL:", url) + time.Sleep(time.Second) + chMsg, err = fetcher.fnInit(url, hearturl, 0, []string{pubsub.Topic_Sdk}, "analysis-master"+strconv.Itoa(os.Getpid())) } - return manualStart(ctx, sdks, configPath) + + for { + select { + case <-ctx.Done(): + return true + case msg := <-chMsg: + // sdktype process_name topic null + // yolo/face yolo_0/yolo_1 channel + var sdk map[string](map[string](map[string]interface{})) + + if err := json.Unmarshal(msg.Msg, &sdk); err != nil { + logo.Infoln("Fetcher SDK unmarshal err:", err) + continue + } + + logo.Infoln("~~~~~~Recv New SDKInfos") + chCameras <- CameraInfo{ + Cameras: cameras, + } + logo.Infoln("~~~~~~Recv New SDKInfos Over") + + default: + time.Sleep(10 * time.Millisecond) + } + } + } func manualStart(ctx context.Context, sdks []sdkstruct.SDKInfo, configPath string) bool { diff --git a/go.mod b/go.mod index b1f9691..7483532 100644 --- a/go.mod +++ b/go.mod @@ -5,8 +5,12 @@ require ( basic.com/libgowrapper/sdkstruct.git v0.0.0-20191220011601-e0b3d1f0183c basic.com/valib/gogpu.git v0.0.0-20190711044327-62043b070865 + basic.com/valib/gopherdiscovery.git v0.0.0-20200113080951-9bccb7681924 // indirect + basic.com/valib/pubsub.git v0.0.0-20200116061307-c43a8e3e552e github.com/amoghe/distillog v0.0.0-20180726233512-ae382b35b717 github.com/natefinch/lumberjack v2.0.0+incompatible github.com/olebedev/config v0.0.0-20190528211619-364964f3a8e4 + golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa // indirect gopkg.in/yaml.v2 v2.2.7 // indirect + nanomsg.org/go-mangos v1.4.0 // indirect ) diff --git a/libcomm/db.go b/libcomm/db.go index 3b3e278..d9104e9 100644 --- a/libcomm/db.go +++ b/libcomm/db.go @@ -1,109 +1,15 @@ package main import ( - "fmt" - "strconv" - - "basic.com/dbapi.git" - "basic.com/libgowrapper/sdkstruct.git" - "basic.com/pubsub/cache.git/shardmap" - "basic.com/pubsub/protomsg.git" - "basic.com/valib/gopherdiscovery.git" - "github.com/gogo/protobuf/proto" + "basic.com/valib/pubsub.git" ) -const ( - prefixTASKSDKRULE = "TASKSDKRULE_" -) - -var cMap *shardmap.ShardMap - -// InitDBAPI init dbapi -func InitDBAPI(ip string, httpPort, heartBeatPort, dataPort int, log func(...interface{})) { - dbapi.Init(ip, httpPort) - var initchan = make(chan bool) - go InitCache(initchan, ip, heartBeatPort, dataPort) - log("db init done!", <-initchan) -} - -// TaskInfos get camera infos from sqlite db -func TaskInfos() []protomsg.TaskSdkInfo { - tAPI := dbapi.TaskApi{} - tasks := tAPI.FindAll() - - return tasks -} - -// SDKInfo get sdk -func SDKInfo() []sdkstruct.SDKInfo { - sAPI := dbapi.SdkApi{} - - s := sAPI.FindAllSdkRun() - var sdks []sdkstruct.SDKInfo - for _, v := range s { - sdks = append(sdks, sdkstruct.SDKInfo{ - IpcID: v.IpcId, - SdkType: v.SdkType, - }) +// Init init tcp://192.168.5.22:4005 +func Init(url, heartBeatURL string, mode int, topics []string, processID string) (chan pubsub.Message, error) { + p, err := pubsub.NewSubscriber(url, heartBeatURL, mode, topics, processID) + if err != nil { + return nil, err } - return sdks -} - -// InitCache cache -func InitCache(initChan chan bool, dbIP string, surveyPort int, pubSubPort int) { - cMap = shardmap.New(uint8(32)) - urlSurvey := "tcp://" + dbIP + ":" + strconv.Itoa(surveyPort) - urlPubSub := "tcp://" + dbIP + ":" + strconv.Itoa(pubSubPort) - client, _ := gopherdiscovery.ClientWithSub(urlSurvey, urlPubSub, "analysisProc") - recvMsg := client.HeartBeatMsg() - fmt.Println(<-recvMsg) - - initCacheData(initChan) - - peers, _ := client.Peers() - for b := range peers { - fmt.Println("peerMsg:", b) - updateData(b) - } -} - -func initCacheData(initChan chan bool) { - initTaskSdkRule() - initChan <- true -} - -func initTaskSdkRule() { - var api dbapi.TaskSdkRuleApi - - b, rules := api.FindAllTaskSdkRules() - if b { - if rules != nil { - for _, tRule := range rules { - cMap.Set(prefixTASKSDKRULE+tRule.TaskId, tRule.SdkRules) - } - } - } -} - -func updateData(b []byte) { - newUpdateMsg := &protomsg.DbChangeMessage{} - if err := proto.Unmarshal(b, newUpdateMsg); err != nil { - fmt.Println("dbChangeMsg unmarshal err:", err) - return - } - switch newUpdateMsg.Table { - case protomsg.TableChanged_T_TaskSdkRule: - initTaskSdkRule() - default: - - } -} - -// GetTaskSdkRules rules -func GetTaskSdkRules(taskID string) []*protomsg.SdkRuleSet { - r, b := cMap.Get(prefixTASKSDKRULE + taskID) - if b { - return r.([]*protomsg.SdkRuleSet) - } - return nil + c := p.Recv() + return c, err } diff --git a/libcomm/go.mod b/libcomm/go.mod index 7f7f9a4..324f971 100644 --- a/libcomm/go.mod +++ b/libcomm/go.mod @@ -3,17 +3,8 @@ go 1.12 require ( - basic.com/dbapi.git v0.0.0-20191216030028-03153c1f1f30 - basic.com/libgowrapper/sdkstruct.git v0.0.0-20191220011601-e0b3d1f0183c - basic.com/pubsub/cache.git v0.0.0-20190718093725-6a413e1d7d48 - basic.com/pubsub/protomsg.git v0.0.0-20191219033725-b95da65535d0 - basic.com/valib/gopherdiscovery.git v0.0.0-20190605034340-15d89d8b4e28 - github.com/ajg/form v1.5.1 // indirect - github.com/gogo/protobuf v1.3.1 - golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3 // indirect - golang.org/x/net v0.0.0-20191209160850-c0dbc17a3553 // indirect - golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135 // indirect - google.golang.org/grpc v1.26.0 // indirect - honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc // indirect + basic.com/valib/gopherdiscovery.git v0.0.0-20200113080951-9bccb7681924 // indirect + basic.com/valib/pubsub.git v0.0.0-20200116061307-c43a8e3e552e + golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa // indirect nanomsg.org/go-mangos v1.4.0 // indirect ) diff --git a/libgowrapper/face b/libgowrapper/face index 45b4454..611a497 160000 --- a/libgowrapper/face +++ b/libgowrapper/face @@ -1 +1 @@ -Subproject commit 45b445473158f7e38106a973ec6d57157279eee1 +Subproject commit 611a497108ec9d66646e98daaec5cf94da8ad02d diff --git a/libgowrapper/humantrack b/libgowrapper/humantrack index f417152..d107634 160000 --- a/libgowrapper/humantrack +++ b/libgowrapper/humantrack @@ -1 +1 @@ -Subproject commit f4171524668399dfbd487f244d0efa8efaea7d9f +Subproject commit d107634e8ddcca05b773794df7ed4e9412a11d55 diff --git a/libgowrapper/reid b/libgowrapper/reid index 54521e1..4324306 160000 --- a/libgowrapper/reid +++ b/libgowrapper/reid @@ -1 +1 @@ -Subproject commit 54521e1c57ec88f3dc1e31d2fcc435ed76afa848 +Subproject commit 4324306f529b9bc62d7e818c0b12ff822687bb47 diff --git a/libgowrapper/vehicle b/libgowrapper/vehicle index 589ea34..1a51c8c 160000 --- a/libgowrapper/vehicle +++ b/libgowrapper/vehicle @@ -1 +1 @@ -Subproject commit 589ea342e4aa810cb58c5b1af42f537e2732d4ff +Subproject commit 1a51c8cdd99ad79b113dfb98f0f3fb44235037f7 diff --git a/libgowrapper/yolo b/libgowrapper/yolo index 414a997..51ba06b 160000 --- a/libgowrapper/yolo +++ b/libgowrapper/yolo @@ -1 +1 @@ -Subproject commit 414a9974b34c5c1cdb9219a2da7daff01f335d25 +Subproject commit 51ba06b0aa6278da92703f38c6c26003e4bdae88 diff --git a/main.go b/main.go index 611cc20..b495dbe 100644 --- a/main.go +++ b/main.go @@ -53,7 +53,6 @@ // 鎸囧畾鑾峰彇閰嶇疆淇℃伅浠巗qlite,鏈夋渶楂樹紭鍏堢骇, master浣跨敤 flag.StringVar(&util.FSI.IP, util.FetchSrvIP, util.FSI.IP, "浠嶪P鑾峰彇闇�瑕佽繍琛岀殑SDK,master浣跨敤") - flag.IntVar(&util.FSI.HTTPort, util.FetchSrvPort, util.FSI.HTTPort, "鑾峰彇闇�瑕佽繍琛岀殑SDK鏈嶅姟鍣ㄧ殑HTTP Port,master浣跨敤") flag.IntVar(&util.FSI.HBPort, util.FetchSrvHeartbeatPort, util.FSI.HBPort, "鑾峰彇闇�瑕佽繍琛岀殑SDK鏈嶅姟鍣ㄧ殑蹇冭烦 Port,master浣跨敤") flag.IntVar(&util.FSI.DataPort, util.FetchSrvDataPort, util.FSI.DataPort, "鑾峰彇闇�瑕佽繍琛岀殑SDK鏈嶅姟鍣ㄧ殑鏁版嵁 Port,master浣跨敤") @@ -74,10 +73,9 @@ } func setParamters() { - util.FillParams(util.FetchSrvIP, util.FSI.IP) - util.FillParams(util.FetchSrvPort, strconv.Itoa(util.FSI.HTTPort)) - util.FillParams(util.FetchSrvHeartbeatPort, strconv.Itoa(util.FSI.HBPort)) - util.FillParams(util.FetchSrvDataPort, strconv.Itoa(util.FSI.DataPort)) + // util.FillParams(util.FetchSrvIP, util.FSI.IP) + // util.FillParams(util.FetchSrvHeartbeatPort, strconv.Itoa(util.FSI.HBPort)) + // util.FillParams(util.FetchSrvDataPort, strconv.Itoa(util.FSI.DataPort)) util.FillParams(util.RuleIPC, util.ToRuleIPC) diff --git a/util/data.go b/util/data.go index 8034742..23844a0 100644 --- a/util/data.go +++ b/util/data.go @@ -3,7 +3,6 @@ // FetchServerInfo 浠巗qlite鐨勬湇鍔″櫒鑾峰彇SDK淇℃伅鐨勬湇鍔″櫒淇℃伅 type FetchServerInfo struct { IP string - HTTPort int HBPort int //蹇冭烦绔彛 DataPort int //鏁版嵁绔彛 } @@ -12,9 +11,8 @@ // FSI SDK淇℃伅鏈嶅姟鍣↖P/Port FSI = &FetchServerInfo{ IP: "127.0.0.1", - HTTPort: 8001, - HBPort: 40007, - DataPort: 50007, + HBPort: 5005, + DataPort: 4005, } // ToRuleIPC to ruleprocess @@ -42,7 +40,6 @@ ConfigPath = "config-path" FetchSrvIP = "fetch-server-ip" - FetchSrvPort = "fetch-server-port" FetchSrvHeartbeatPort = "fetch-server-heartbeat-port" FetchSrvDataPort = "fetch-server-data-port" -- Gitblit v1.8.0