From b1eccb805a42d24df12832eb4f62a12897310c50 Mon Sep 17 00:00:00 2001 From: panlei <2799247126@qq.com> Date: 星期五, 19 七月 2019 18:19:30 +0800 Subject: [PATCH] 加个消息广播 --- insertdata/EsClient.go | 15 +++++-- go.sum | 10 ++--- insertdata/insertDataToEs.go | 24 +++++++++--- ruleserver/server.go | 60 ++++++++++++++++++++++++++++++ go.mod | 2 main.go | 1 6 files changed, 94 insertions(+), 18 deletions(-) diff --git a/go.mod b/go.mod index dde93ce..b0cebed 100644 --- a/go.mod +++ b/go.mod @@ -5,7 +5,7 @@ require ( basic.com/dbapi.git v0.0.0-20190701055817-73bca225181f basic.com/pubsub/cache.git v0.0.0-20190718093725-6a413e1d7d48 - basic.com/pubsub/protomsg.git v0.0.0-20190717072554-576620e5ba07 + basic.com/pubsub/protomsg.git v0.0.0-20190719101124-f94ca7d72b4f basic.com/pubsub/sdkcompare.git v0.0.0-20190715013640-f536a4647d00 basic.com/valib/deliver.git v0.0.0-20190531095353-25d8c3b20051 basic.com/valib/gopherdiscovery.git v0.0.0-20190605034340-15d89d8b4e28 diff --git a/go.sum b/go.sum index de97b34..57e5177 100644 --- a/go.sum +++ b/go.sum @@ -6,12 +6,10 @@ basic.com/pubsub/cache.git v0.0.0-20190718024458-be52360c4814/go.mod h1:gHLJZz2ee1cGL0X0ae69fs56bAxkDgEQwDhhXZJNUcY= basic.com/pubsub/cache.git v0.0.0-20190718093725-6a413e1d7d48 h1:BBA30Rgljn6MRieC4gUncETJDyna3ObyubTo9HEQ2M0= basic.com/pubsub/cache.git v0.0.0-20190718093725-6a413e1d7d48/go.mod h1:gHLJZz2ee1cGL0X0ae69fs56bAxkDgEQwDhhXZJNUcY= -basic.com/pubsub/protomsg.git v0.0.0-20190709070734-b34c868adcc2 h1:ygh9CQPS48KmXv+PNUrOcrMqIiDZOs11apnQdu9oGEY= -basic.com/pubsub/protomsg.git v0.0.0-20190709070734-b34c868adcc2/go.mod h1:un5NV5VWQoblVLZfx1Rt5vyLgwR0jI92d3VJhfrJhWU= -basic.com/pubsub/protomsg.git v0.0.0-20190712081201-5a482419c227 h1:1jprxyxmeQ8X4/S7cFnhRf4ByVqD0xLZNvx8/0xTk2k= -basic.com/pubsub/protomsg.git v0.0.0-20190712081201-5a482419c227/go.mod h1:un5NV5VWQoblVLZfx1Rt5vyLgwR0jI92d3VJhfrJhWU= -basic.com/pubsub/protomsg.git v0.0.0-20190717072554-576620e5ba07 h1:ZeiaQTNSB6K5ASpcwYtk0bWabMQX0r2JX/5agzTznRs= -basic.com/pubsub/protomsg.git v0.0.0-20190717072554-576620e5ba07/go.mod h1:un5NV5VWQoblVLZfx1Rt5vyLgwR0jI92d3VJhfrJhWU= +basic.com/pubsub/protomsg.git v0.0.0-20190719070700-1043c79a70d4 h1:J07AaCWdBtET+3QDxjRC3IIsOcEAnpIzzVA67iN2qdI= +basic.com/pubsub/protomsg.git v0.0.0-20190719070700-1043c79a70d4/go.mod h1:un5NV5VWQoblVLZfx1Rt5vyLgwR0jI92d3VJhfrJhWU= +basic.com/pubsub/protomsg.git v0.0.0-20190719101124-f94ca7d72b4f h1:UqgUBY7xucnVlfBIknzarAMikWsyuLd8t51ICyv90T0= +basic.com/pubsub/protomsg.git v0.0.0-20190719101124-f94ca7d72b4f/go.mod h1:un5NV5VWQoblVLZfx1Rt5vyLgwR0jI92d3VJhfrJhWU= basic.com/pubsub/sdkcompare.git v0.0.0-20190715013640-f536a4647d00 h1:sK+Tx7rvM9J2WnNIwrzMDjZSylWiKNfQO0prUBfKsDk= basic.com/pubsub/sdkcompare.git v0.0.0-20190715013640-f536a4647d00/go.mod h1:8by33F9E1w17Pw/rDgJGJXAo122w0wDENG14hiMS+RE= basic.com/valib/deliver.git v0.0.0-20190531095353-25d8c3b20051 h1:9flC2o3kasaM2Y6I+mY+mxmve/pyAY/UzGQZLT3lFHM= diff --git a/insertdata/EsClient.go b/insertdata/EsClient.go index 7a72deb..c83eb87 100644 --- a/insertdata/EsClient.go +++ b/insertdata/EsClient.go @@ -7,6 +7,7 @@ "io" "io/ioutil" "net/http" + "ruleprocess/logger" "strconv" "strings" "time" @@ -128,7 +129,7 @@ } **/ -func EsReq(method string, url string, parama []byte) (err error) { +func EsReq(method string, url string, parama []byte) (maps map[string]interface{},err error) { timeout := time.Duration(10 * time.Second) client := http.Client{ Timeout: timeout, @@ -138,13 +139,13 @@ if err != nil { fmt.Println("build request fail !") - return err + return nil, err } resp, err := client.Do(request) if err != nil{ fmt.Println("request error: ", err) - return err + return nil,err } defer resp.Body.Close() @@ -153,9 +154,13 @@ if err != nil { fmt.Println(err) } - + decoder := make(map[string]interface{}) + if err := json.Unmarshal([]byte(string(body)), &decoder); err != nil { + return nil, err + } fmt.Println(string(body)) - return nil + logger.Info(string(body)) + return decoder ,nil } diff --git a/insertdata/insertDataToEs.go b/insertdata/insertDataToEs.go index 9a9bec8..5f23d6d 100644 --- a/insertdata/insertDataToEs.go +++ b/insertdata/insertDataToEs.go @@ -209,9 +209,12 @@ logger.Info("json parse error ", err) return } - err1 := EsReq("POST", videoPersonUrl, requstbody) + resp1, err1 := EsReq("POST", videoPersonUrl, requstbody) if err1 != nil { logger.Error("涓婁紶ES鍑洪敊锛�---", err1) + } else { + logger.Info("鎻掑叆es杩斿洖鐨勪俊鎭細", resp1) + ruleserver.AddLxMessage(&protomsg.VideotapeInfo{EsDataId: resp1["_id"].(string), CameraId: msg.Cid, TaskId: msg.Tasklab.Taskid, ImgId: i.Id, SdkIds: []string{}}) } } } @@ -278,9 +281,13 @@ logger.Info("json parse error ", err) return } - err1 := EsReq("POST", videoPersonUrl, requstbody) + resp1, err1 := EsReq("POST", videoPersonUrl, requstbody) + if err1 != nil { logger.Error("涓婁紶ES鍑洪敊锛�---", err1) + } else { + logger.Info("鎻掑叆es杩斿洖鐨勪俊鎭細", resp1) + ruleserver.AddLxMessage(&protomsg.VideotapeInfo{EsDataId: resp1["_id"].(string), CameraId: msg.Cid, TaskId: msg.Tasklab.Taskid, ImgId: i.Id, SdkIds: []string{}}) } //if msg.RuleResult["cacheData"] != nil { // InsertFace(msg.RuleResult["cacheData"].(ruleserver.ResultMsg)) @@ -362,10 +369,13 @@ return } - err = EsReq("POST", personAction, requstbody) - if err != nil { + resp1, err2 := EsReq("POST", personAction, requstbody) + if err2 != nil { logger.Error("寰�ES鎻掑叆鏁版嵁澶辫触", err) } else { + logger.Debug("鎻掑叆es杩斿洖鐨勬暟鎹俊鎭槸锛�", resp1) + // 鍙戝嚭褰曞儚淇″彿 + ruleserver.AddLxMessage(&protomsg.VideotapeInfo{EsDataId: resp1["_id"].(string), CameraId: msg.Cid, TaskId: msg.Tasklab.Taskid, ImgId: i.Id, SdkIds: []string{}}) logger.Warn("__________________________________________寰�ES鎻掑叆yolo鏁版嵁鎴愬姛") //os.Exit(1) } @@ -432,10 +442,12 @@ return } - err = EsReq("POST", personAction, requstbody) - if err != nil { + resp1, err1 := EsReq("POST", personAction, requstbody) + if err1 != nil { logger.Error("寰�ES鎻掑叆鏁版嵁澶辫触", err) } else { + logger.Info("鎻掑叆es杩斿洖鐨勪俊鎭細", resp1) + ruleserver.AddLxMessage(&protomsg.VideotapeInfo{EsDataId: resp1["_id"].(string), CameraId: msg.Cid, TaskId: msg.Tasklab.Taskid, ImgId: i.Id, SdkIds: []string{}}) logger.Warn("__________________________________________寰�ES鎻掑叆yolo鏁版嵁鎴愬姛") //os.Exit(1) } diff --git a/main.go b/main.go index d6a9bdf..629a45e 100644 --- a/main.go +++ b/main.go @@ -42,6 +42,7 @@ //fmt.Println("缂撳瓨鍒濆鍖栧畬鎴�",<- initchan)//dbserver鍒濆鍖栧畬姣� ruleserver.Init() go ruleserver.TimeTicker() + go ruleserver.StartServer() nReciever("ipc:///tmp/sdk-2-rules-process.ipc", deliver.PushPull, 1) wg.Wait() } diff --git a/ruleserver/server.go b/ruleserver/server.go new file mode 100644 index 0000000..398c79d --- /dev/null +++ b/ruleserver/server.go @@ -0,0 +1,60 @@ +package ruleserver + +import ( + "basic.com/pubsub/protomsg.git" + "basic.com/valib/gopherdiscovery.git" + "github.com/gogo/protobuf/proto" + "ruleprocess/logger" + "time" +) + +type PubSubServer struct { + +} + +const ( + Discovery_Server = "tcp://192.168.1.123:40008" + Discovery_UrlPubSub = "tcp://192.168.1.123:50008" +) + +//鍚姩discovery鐨剆erver +var discoveryServer *gopherdiscovery.DiscoveryServer +var videotapChan chan *protomsg.VideotapeInfo +func StartServer() { + var clients []string + var err error + var ( + defaultOpts = gopherdiscovery.Options{ + SurveyTime: 3 * time.Second, + //RecvDeadline: 3 * time.Second, + PollTime: 5 * time.Second, + } + ) + + discoveryServer, err = gopherdiscovery.Server(Discovery_Server, Discovery_UrlPubSub, defaultOpts) + + logger.Debug("server: ", discoveryServer) + logger.Debug("err:",err) + logger.Debug("clients: ",clients) + + videotapChan = make(chan *protomsg.VideotapeInfo) + + for { + select { + case dbMsg := <-videotapChan: + publishMessage(dbMsg) + } + } +} + +//骞挎挱鏁版嵁搴撴敼鍙樼殑msg +func publishMessage(msg *protomsg.VideotapeInfo) { + sendBytes,err := proto.Marshal(msg) + if err ==nil{ + discoveryServer.PublishMsg(string(sendBytes)) + } +} + +func AddLxMessage(msg *protomsg.VideotapeInfo) { + videotapChan <- msg +} \ No newline at end of file -- Gitblit v1.8.0