From accc3295d6daacd70494ecd3a08033f505c29f66 Mon Sep 17 00:00:00 2001
From: zhangmeng <775834166@qq.com>
Date: 星期一, 20 一月 2020 11:15:27 +0800
Subject: [PATCH] replace sdkhelper with local common

---
 libgowrapper/reid       |    2 
 util/data.go            |    7 -
 app/master/master.go    |   49 ++++++++++--
 libgowrapper/humantrack |    2 
 libcomm/db.go           |  110 ++-------------------------
 app/master/dbfetcher.go |   22 +----
 libgowrapper/vehicle    |    2 
 go.mod                  |    4 +
 libgowrapper/face       |    2 
 libgowrapper/yolo       |    2 
 main.go                 |    8 -
 libcomm/go.mod          |   15 ---
 12 files changed, 72 insertions(+), 153 deletions(-)

diff --git a/app/master/dbfetcher.go b/app/master/dbfetcher.go
index 56c5633..186fe9d 100644
--- a/app/master/dbfetcher.go
+++ b/app/master/dbfetcher.go
@@ -5,13 +5,12 @@
 	"analysis/logo"
 	"plugin"
 
-	"basic.com/libgowrapper/sdkstruct.git"
+	"basic.com/valib/pubsub.git"
 )
 
 // Fetcher db
 type Fetcher struct {
-	fnInitDBAPI func(string, int, int, int, func(...interface{}))
-	fnSDKInfo   func() []sdkstruct.SDKInfo
+	fnInit func(string, string, int, []string, string) (chan pubsub.Message, error)
 }
 
 // NewFetcher new
@@ -23,23 +22,14 @@
 		return nil
 	}
 
-	fn, err := app.LoadFunc(plug, soFile, "InitDBAPI")
+	fn, err := app.LoadFunc(plug, soFile, "Init")
 	if err != nil {
-		logo.Infoln("Lookup Func InitDBAPI From File: ", soFile, " Error")
+		logo.Infoln("Lookup Func Init From File: ", soFile, " Error")
 		return nil
 	}
-	fnInit := fn.(func(string, int, int, int, func(...interface{})))
-
-	fn, err = app.LoadFunc(plug, soFile, "SDKInfo")
-	if err != nil {
-		logo.Infoln("Lookup Func SDKInfo From File: ", soFile, " Error")
-		return nil
-	}
-
-	fnSDKInfo := fn.(func() []sdkstruct.SDKInfo)
+	fnInit := fn.(func(string, string, int, []string, string) (chan pubsub.Message, error))
 
 	return &Fetcher{
-		fnInitDBAPI: fnInit,
-		fnSDKInfo:   fnSDKInfo,
+		fnInit: fnInit,
 	}
 }
diff --git a/app/master/master.go b/app/master/master.go
index 1561d77..1199713 100644
--- a/app/master/master.go
+++ b/app/master/master.go
@@ -5,9 +5,14 @@
 	"analysis/logo"
 	"analysis/util"
 	"context"
+	"encoding/json"
+	"os"
+	"strconv"
 	"strings"
+	"time"
 
 	"basic.com/libgowrapper/sdkstruct.git"
+	"basic.com/valib/pubsub.git"
 )
 
 func reaper(ctxt context.Context) {
@@ -28,17 +33,45 @@
 
 	logo.Infoln("~~~~~~Created Fetcher, Now Sync From DB")
 
-	fetcher.fnInitDBAPI(util.FSI.IP, util.FSI.HTTPort, util.FSI.HBPort, util.FSI.DataPort, logo.Infoln)
-	// fetcher.fnInitDBAPI("192.168.20.10", util.FSI.HTTPort, util.FSI.HBPort, util.FSI.DataPort, logo.Infoln)
-	sdks := fetcher.fnSDKInfo()
+	ip := "tcp://" + util.FSI.IP
+	url := ip + ":" + strconv.Itoa(util.FSI.DataPort)
+	hearturl := ip + ":" + strconv.Itoa(util.FSI.HBPort)
+
+	chMsg, err := fetcher.fnInit(url, hearturl, 0, []string{pubsub.Topic_Sdk}, "analysis-master"+strconv.Itoa(os.Getpid()))
 	for {
-		if len(sdks) == 0 {
-			logo.Errorln("!!!!!!Fetcher Can't Get SDK Infos From Remote DB")
-			continue
+		if err == nil {
+			break
 		}
-		break
+		logo.Infoln("Analysis Fetcher INIT Error! URL:", url)
+		time.Sleep(time.Second)
+		chMsg, err = fetcher.fnInit(url, hearturl, 0, []string{pubsub.Topic_Sdk}, "analysis-master"+strconv.Itoa(os.Getpid()))
 	}
-	return manualStart(ctx, sdks, configPath)
+
+	for {
+		select {
+		case <-ctx.Done():
+			return true
+		case msg := <-chMsg:
+			//      		sdktype	   process_name   topic		null
+			//				yolo/face  yolo_0/yolo_1  channel
+			var sdk map[string](map[string](map[string]interface{}))
+
+			if err := json.Unmarshal(msg.Msg, &sdk); err != nil {
+				logo.Infoln("Fetcher SDK unmarshal err:", err)
+				continue
+			}
+
+			logo.Infoln("~~~~~~Recv New SDKInfos")
+			chCameras <- CameraInfo{
+				Cameras: cameras,
+			}
+			logo.Infoln("~~~~~~Recv New SDKInfos Over")
+
+		default:
+			time.Sleep(10 * time.Millisecond)
+		}
+	}
+
 }
 
 func manualStart(ctx context.Context, sdks []sdkstruct.SDKInfo, configPath string) bool {
diff --git a/go.mod b/go.mod
index b1f9691..7483532 100644
--- a/go.mod
+++ b/go.mod
@@ -5,8 +5,12 @@
 require (
 	basic.com/libgowrapper/sdkstruct.git v0.0.0-20191220011601-e0b3d1f0183c
 	basic.com/valib/gogpu.git v0.0.0-20190711044327-62043b070865
+	basic.com/valib/gopherdiscovery.git v0.0.0-20200113080951-9bccb7681924 // indirect
+	basic.com/valib/pubsub.git v0.0.0-20200116061307-c43a8e3e552e
 	github.com/amoghe/distillog v0.0.0-20180726233512-ae382b35b717
 	github.com/natefinch/lumberjack v2.0.0+incompatible
 	github.com/olebedev/config v0.0.0-20190528211619-364964f3a8e4
+	golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa // indirect
 	gopkg.in/yaml.v2 v2.2.7 // indirect
+	nanomsg.org/go-mangos v1.4.0 // indirect
 )
diff --git a/libcomm/db.go b/libcomm/db.go
index 3b3e278..d9104e9 100644
--- a/libcomm/db.go
+++ b/libcomm/db.go
@@ -1,109 +1,15 @@
 package main
 
 import (
-	"fmt"
-	"strconv"
-
-	"basic.com/dbapi.git"
-	"basic.com/libgowrapper/sdkstruct.git"
-	"basic.com/pubsub/cache.git/shardmap"
-	"basic.com/pubsub/protomsg.git"
-	"basic.com/valib/gopherdiscovery.git"
-	"github.com/gogo/protobuf/proto"
+	"basic.com/valib/pubsub.git"
 )
 
-const (
-	prefixTASKSDKRULE = "TASKSDKRULE_"
-)
-
-var cMap *shardmap.ShardMap
-
-// InitDBAPI init dbapi
-func InitDBAPI(ip string, httpPort, heartBeatPort, dataPort int, log func(...interface{})) {
-	dbapi.Init(ip, httpPort)
-	var initchan = make(chan bool)
-	go InitCache(initchan, ip, heartBeatPort, dataPort)
-	log("db init done!", <-initchan)
-}
-
-// TaskInfos get camera infos from sqlite db
-func TaskInfos() []protomsg.TaskSdkInfo {
-	tAPI := dbapi.TaskApi{}
-	tasks := tAPI.FindAll()
-
-	return tasks
-}
-
-// SDKInfo get sdk
-func SDKInfo() []sdkstruct.SDKInfo {
-	sAPI := dbapi.SdkApi{}
-
-	s := sAPI.FindAllSdkRun()
-	var sdks []sdkstruct.SDKInfo
-	for _, v := range s {
-		sdks = append(sdks, sdkstruct.SDKInfo{
-			IpcID:   v.IpcId,
-			SdkType: v.SdkType,
-		})
+// Init init tcp://192.168.5.22:4005
+func Init(url, heartBeatURL string, mode int, topics []string, processID string) (chan pubsub.Message, error) {
+	p, err := pubsub.NewSubscriber(url, heartBeatURL, mode, topics, processID)
+	if err != nil {
+		return nil, err
 	}
-	return sdks
-}
-
-// InitCache cache
-func InitCache(initChan chan bool, dbIP string, surveyPort int, pubSubPort int) {
-	cMap = shardmap.New(uint8(32))
-	urlSurvey := "tcp://" + dbIP + ":" + strconv.Itoa(surveyPort)
-	urlPubSub := "tcp://" + dbIP + ":" + strconv.Itoa(pubSubPort)
-	client, _ := gopherdiscovery.ClientWithSub(urlSurvey, urlPubSub, "analysisProc")
-	recvMsg := client.HeartBeatMsg()
-	fmt.Println(<-recvMsg)
-
-	initCacheData(initChan)
-
-	peers, _ := client.Peers()
-	for b := range peers {
-		fmt.Println("peerMsg:", b)
-		updateData(b)
-	}
-}
-
-func initCacheData(initChan chan bool) {
-	initTaskSdkRule()
-	initChan <- true
-}
-
-func initTaskSdkRule() {
-	var api dbapi.TaskSdkRuleApi
-
-	b, rules := api.FindAllTaskSdkRules()
-	if b {
-		if rules != nil {
-			for _, tRule := range rules {
-				cMap.Set(prefixTASKSDKRULE+tRule.TaskId, tRule.SdkRules)
-			}
-		}
-	}
-}
-
-func updateData(b []byte) {
-	newUpdateMsg := &protomsg.DbChangeMessage{}
-	if err := proto.Unmarshal(b, newUpdateMsg); err != nil {
-		fmt.Println("dbChangeMsg unmarshal err:", err)
-		return
-	}
-	switch newUpdateMsg.Table {
-	case protomsg.TableChanged_T_TaskSdkRule:
-		initTaskSdkRule()
-	default:
-
-	}
-}
-
-// GetTaskSdkRules rules
-func GetTaskSdkRules(taskID string) []*protomsg.SdkRuleSet {
-	r, b := cMap.Get(prefixTASKSDKRULE + taskID)
-	if b {
-		return r.([]*protomsg.SdkRuleSet)
-	}
-	return nil
+	c := p.Recv()
+	return c, err
 }
diff --git a/libcomm/go.mod b/libcomm/go.mod
index 7f7f9a4..324f971 100644
--- a/libcomm/go.mod
+++ b/libcomm/go.mod
@@ -3,17 +3,8 @@
 go 1.12
 
 require (
-	basic.com/dbapi.git v0.0.0-20191216030028-03153c1f1f30
-	basic.com/libgowrapper/sdkstruct.git v0.0.0-20191220011601-e0b3d1f0183c
-	basic.com/pubsub/cache.git v0.0.0-20190718093725-6a413e1d7d48
-	basic.com/pubsub/protomsg.git v0.0.0-20191219033725-b95da65535d0
-	basic.com/valib/gopherdiscovery.git v0.0.0-20190605034340-15d89d8b4e28
-	github.com/ajg/form v1.5.1 // indirect
-	github.com/gogo/protobuf v1.3.1
-	golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3 // indirect
-	golang.org/x/net v0.0.0-20191209160850-c0dbc17a3553 // indirect
-	golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135 // indirect
-	google.golang.org/grpc v1.26.0 // indirect
-	honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc // indirect
+	basic.com/valib/gopherdiscovery.git v0.0.0-20200113080951-9bccb7681924 // indirect
+	basic.com/valib/pubsub.git v0.0.0-20200116061307-c43a8e3e552e
+	golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa // indirect
 	nanomsg.org/go-mangos v1.4.0 // indirect
 )
diff --git a/libgowrapper/face b/libgowrapper/face
index 45b4454..611a497 160000
--- a/libgowrapper/face
+++ b/libgowrapper/face
@@ -1 +1 @@
-Subproject commit 45b445473158f7e38106a973ec6d57157279eee1
+Subproject commit 611a497108ec9d66646e98daaec5cf94da8ad02d
diff --git a/libgowrapper/humantrack b/libgowrapper/humantrack
index f417152..d107634 160000
--- a/libgowrapper/humantrack
+++ b/libgowrapper/humantrack
@@ -1 +1 @@
-Subproject commit f4171524668399dfbd487f244d0efa8efaea7d9f
+Subproject commit d107634e8ddcca05b773794df7ed4e9412a11d55
diff --git a/libgowrapper/reid b/libgowrapper/reid
index 54521e1..4324306 160000
--- a/libgowrapper/reid
+++ b/libgowrapper/reid
@@ -1 +1 @@
-Subproject commit 54521e1c57ec88f3dc1e31d2fcc435ed76afa848
+Subproject commit 4324306f529b9bc62d7e818c0b12ff822687bb47
diff --git a/libgowrapper/vehicle b/libgowrapper/vehicle
index 589ea34..1a51c8c 160000
--- a/libgowrapper/vehicle
+++ b/libgowrapper/vehicle
@@ -1 +1 @@
-Subproject commit 589ea342e4aa810cb58c5b1af42f537e2732d4ff
+Subproject commit 1a51c8cdd99ad79b113dfb98f0f3fb44235037f7
diff --git a/libgowrapper/yolo b/libgowrapper/yolo
index 414a997..51ba06b 160000
--- a/libgowrapper/yolo
+++ b/libgowrapper/yolo
@@ -1 +1 @@
-Subproject commit 414a9974b34c5c1cdb9219a2da7daff01f335d25
+Subproject commit 51ba06b0aa6278da92703f38c6c26003e4bdae88
diff --git a/main.go b/main.go
index 611cc20..b495dbe 100644
--- a/main.go
+++ b/main.go
@@ -53,7 +53,6 @@
 
 	// 鎸囧畾鑾峰彇閰嶇疆淇℃伅浠巗qlite,鏈夋渶楂樹紭鍏堢骇, master浣跨敤
 	flag.StringVar(&util.FSI.IP, util.FetchSrvIP, util.FSI.IP, "浠嶪P鑾峰彇闇�瑕佽繍琛岀殑SDK,master浣跨敤")
-	flag.IntVar(&util.FSI.HTTPort, util.FetchSrvPort, util.FSI.HTTPort, "鑾峰彇闇�瑕佽繍琛岀殑SDK鏈嶅姟鍣ㄧ殑HTTP Port,master浣跨敤")
 	flag.IntVar(&util.FSI.HBPort, util.FetchSrvHeartbeatPort, util.FSI.HBPort, "鑾峰彇闇�瑕佽繍琛岀殑SDK鏈嶅姟鍣ㄧ殑蹇冭烦 Port,master浣跨敤")
 	flag.IntVar(&util.FSI.DataPort, util.FetchSrvDataPort, util.FSI.DataPort, "鑾峰彇闇�瑕佽繍琛岀殑SDK鏈嶅姟鍣ㄧ殑鏁版嵁 Port,master浣跨敤")
 
@@ -74,10 +73,9 @@
 }
 
 func setParamters() {
-	util.FillParams(util.FetchSrvIP, util.FSI.IP)
-	util.FillParams(util.FetchSrvPort, strconv.Itoa(util.FSI.HTTPort))
-	util.FillParams(util.FetchSrvHeartbeatPort, strconv.Itoa(util.FSI.HBPort))
-	util.FillParams(util.FetchSrvDataPort, strconv.Itoa(util.FSI.DataPort))
+	// util.FillParams(util.FetchSrvIP, util.FSI.IP)
+	// util.FillParams(util.FetchSrvHeartbeatPort, strconv.Itoa(util.FSI.HBPort))
+	// util.FillParams(util.FetchSrvDataPort, strconv.Itoa(util.FSI.DataPort))
 
 	util.FillParams(util.RuleIPC, util.ToRuleIPC)
 
diff --git a/util/data.go b/util/data.go
index 8034742..23844a0 100644
--- a/util/data.go
+++ b/util/data.go
@@ -3,7 +3,6 @@
 // FetchServerInfo 浠巗qlite鐨勬湇鍔″櫒鑾峰彇SDK淇℃伅鐨勬湇鍔″櫒淇℃伅
 type FetchServerInfo struct {
 	IP       string
-	HTTPort  int
 	HBPort   int //蹇冭烦绔彛
 	DataPort int //鏁版嵁绔彛
 }
@@ -12,9 +11,8 @@
 	// FSI SDK淇℃伅鏈嶅姟鍣↖P/Port
 	FSI = &FetchServerInfo{
 		IP:       "127.0.0.1",
-		HTTPort:  8001,
-		HBPort:   40007,
-		DataPort: 50007,
+		HBPort:   5005,
+		DataPort: 4005,
 	}
 
 	// ToRuleIPC to ruleprocess
@@ -42,7 +40,6 @@
 	ConfigPath = "config-path"
 
 	FetchSrvIP            = "fetch-server-ip"
-	FetchSrvPort          = "fetch-server-port"
 	FetchSrvHeartbeatPort = "fetch-server-heartbeat-port"
 	FetchSrvDataPort      = "fetch-server-data-port"
 

--
Gitblit v1.8.0