From b1eccb805a42d24df12832eb4f62a12897310c50 Mon Sep 17 00:00:00 2001
From: panlei <2799247126@qq.com>
Date: 星期五, 19 七月 2019 18:19:30 +0800
Subject: [PATCH] 加个消息广播

---
 insertdata/EsClient.go       |   15 +++++--
 go.sum                       |   10 ++---
 insertdata/insertDataToEs.go |   24 +++++++++---
 ruleserver/server.go         |   60 ++++++++++++++++++++++++++++++
 go.mod                       |    2 
 main.go                      |    1 
 6 files changed, 94 insertions(+), 18 deletions(-)

diff --git a/go.mod b/go.mod
index dde93ce..b0cebed 100644
--- a/go.mod
+++ b/go.mod
@@ -5,7 +5,7 @@
 require (
 	basic.com/dbapi.git v0.0.0-20190701055817-73bca225181f
 	basic.com/pubsub/cache.git v0.0.0-20190718093725-6a413e1d7d48
-	basic.com/pubsub/protomsg.git v0.0.0-20190717072554-576620e5ba07
+	basic.com/pubsub/protomsg.git v0.0.0-20190719101124-f94ca7d72b4f
 	basic.com/pubsub/sdkcompare.git v0.0.0-20190715013640-f536a4647d00
 	basic.com/valib/deliver.git v0.0.0-20190531095353-25d8c3b20051
 	basic.com/valib/gopherdiscovery.git v0.0.0-20190605034340-15d89d8b4e28
diff --git a/go.sum b/go.sum
index de97b34..57e5177 100644
--- a/go.sum
+++ b/go.sum
@@ -6,12 +6,10 @@
 basic.com/pubsub/cache.git v0.0.0-20190718024458-be52360c4814/go.mod h1:gHLJZz2ee1cGL0X0ae69fs56bAxkDgEQwDhhXZJNUcY=
 basic.com/pubsub/cache.git v0.0.0-20190718093725-6a413e1d7d48 h1:BBA30Rgljn6MRieC4gUncETJDyna3ObyubTo9HEQ2M0=
 basic.com/pubsub/cache.git v0.0.0-20190718093725-6a413e1d7d48/go.mod h1:gHLJZz2ee1cGL0X0ae69fs56bAxkDgEQwDhhXZJNUcY=
-basic.com/pubsub/protomsg.git v0.0.0-20190709070734-b34c868adcc2 h1:ygh9CQPS48KmXv+PNUrOcrMqIiDZOs11apnQdu9oGEY=
-basic.com/pubsub/protomsg.git v0.0.0-20190709070734-b34c868adcc2/go.mod h1:un5NV5VWQoblVLZfx1Rt5vyLgwR0jI92d3VJhfrJhWU=
-basic.com/pubsub/protomsg.git v0.0.0-20190712081201-5a482419c227 h1:1jprxyxmeQ8X4/S7cFnhRf4ByVqD0xLZNvx8/0xTk2k=
-basic.com/pubsub/protomsg.git v0.0.0-20190712081201-5a482419c227/go.mod h1:un5NV5VWQoblVLZfx1Rt5vyLgwR0jI92d3VJhfrJhWU=
-basic.com/pubsub/protomsg.git v0.0.0-20190717072554-576620e5ba07 h1:ZeiaQTNSB6K5ASpcwYtk0bWabMQX0r2JX/5agzTznRs=
-basic.com/pubsub/protomsg.git v0.0.0-20190717072554-576620e5ba07/go.mod h1:un5NV5VWQoblVLZfx1Rt5vyLgwR0jI92d3VJhfrJhWU=
+basic.com/pubsub/protomsg.git v0.0.0-20190719070700-1043c79a70d4 h1:J07AaCWdBtET+3QDxjRC3IIsOcEAnpIzzVA67iN2qdI=
+basic.com/pubsub/protomsg.git v0.0.0-20190719070700-1043c79a70d4/go.mod h1:un5NV5VWQoblVLZfx1Rt5vyLgwR0jI92d3VJhfrJhWU=
+basic.com/pubsub/protomsg.git v0.0.0-20190719101124-f94ca7d72b4f h1:UqgUBY7xucnVlfBIknzarAMikWsyuLd8t51ICyv90T0=
+basic.com/pubsub/protomsg.git v0.0.0-20190719101124-f94ca7d72b4f/go.mod h1:un5NV5VWQoblVLZfx1Rt5vyLgwR0jI92d3VJhfrJhWU=
 basic.com/pubsub/sdkcompare.git v0.0.0-20190715013640-f536a4647d00 h1:sK+Tx7rvM9J2WnNIwrzMDjZSylWiKNfQO0prUBfKsDk=
 basic.com/pubsub/sdkcompare.git v0.0.0-20190715013640-f536a4647d00/go.mod h1:8by33F9E1w17Pw/rDgJGJXAo122w0wDENG14hiMS+RE=
 basic.com/valib/deliver.git v0.0.0-20190531095353-25d8c3b20051 h1:9flC2o3kasaM2Y6I+mY+mxmve/pyAY/UzGQZLT3lFHM=
diff --git a/insertdata/EsClient.go b/insertdata/EsClient.go
index 7a72deb..c83eb87 100644
--- a/insertdata/EsClient.go
+++ b/insertdata/EsClient.go
@@ -7,6 +7,7 @@
 	"io"
 	"io/ioutil"
 	"net/http"
+	"ruleprocess/logger"
 	"strconv"
 	"strings"
 	"time"
@@ -128,7 +129,7 @@
     }
 **/
 
-func EsReq(method string, url string, parama []byte) (err error) {
+func EsReq(method string, url string, parama []byte) (maps map[string]interface{},err error) {
     timeout := time.Duration(10 * time.Second) 
     client := http.Client{
         Timeout: timeout,
@@ -138,13 +139,13 @@
 
     if err != nil {
         fmt.Println("build request fail !")
-        return err 
+        return nil, err
     }
 
     resp, err := client.Do(request)
     if err != nil{
         fmt.Println("request error: ", err)
-        return err 
+        return nil,err
     }
 
     defer resp.Body.Close()
@@ -153,9 +154,13 @@
     if err != nil {
         fmt.Println(err) 
     }
-
+	decoder := make(map[string]interface{})
+	if err := json.Unmarshal([]byte(string(body)), &decoder); err != nil {
+		return nil, err
+	}
     fmt.Println(string(body))
-    return nil 
+    logger.Info(string(body))
+    return decoder ,nil
 }
 
 
diff --git a/insertdata/insertDataToEs.go b/insertdata/insertDataToEs.go
index 9a9bec8..5f23d6d 100644
--- a/insertdata/insertDataToEs.go
+++ b/insertdata/insertDataToEs.go
@@ -209,9 +209,12 @@
 				logger.Info("json parse error ", err)
 				return
 			}
-			err1 := EsReq("POST", videoPersonUrl, requstbody)
+			resp1, err1 := EsReq("POST", videoPersonUrl, requstbody)
 			if err1 != nil {
 				logger.Error("涓婁紶ES鍑洪敊锛�---", err1)
+			} else {
+				logger.Info("鎻掑叆es杩斿洖鐨勪俊鎭細", resp1)
+				ruleserver.AddLxMessage(&protomsg.VideotapeInfo{EsDataId: resp1["_id"].(string), CameraId: msg.Cid, TaskId: msg.Tasklab.Taskid, ImgId: i.Id, SdkIds: []string{}})
 			}
 		}
 	}
@@ -278,9 +281,13 @@
 		logger.Info("json parse error ", err)
 		return
 	}
-	err1 := EsReq("POST", videoPersonUrl, requstbody)
+	resp1, err1 := EsReq("POST", videoPersonUrl, requstbody)
+
 	if err1 != nil {
 		logger.Error("涓婁紶ES鍑洪敊锛�---", err1)
+	} else {
+		logger.Info("鎻掑叆es杩斿洖鐨勪俊鎭細", resp1)
+		ruleserver.AddLxMessage(&protomsg.VideotapeInfo{EsDataId: resp1["_id"].(string), CameraId: msg.Cid, TaskId: msg.Tasklab.Taskid, ImgId: i.Id, SdkIds: []string{}})
 	}
 	//if msg.RuleResult["cacheData"] != nil {
 	//	InsertFace(msg.RuleResult["cacheData"].(ruleserver.ResultMsg))
@@ -362,10 +369,13 @@
 			return
 
 		}
-		err = EsReq("POST", personAction, requstbody)
-		if err != nil {
+		resp1, err2 := EsReq("POST", personAction, requstbody)
+		if err2 != nil {
 			logger.Error("寰�ES鎻掑叆鏁版嵁澶辫触", err)
 		} else {
+			logger.Debug("鎻掑叆es杩斿洖鐨勬暟鎹俊鎭槸锛�", resp1)
+			// 鍙戝嚭褰曞儚淇″彿
+			ruleserver.AddLxMessage(&protomsg.VideotapeInfo{EsDataId: resp1["_id"].(string), CameraId: msg.Cid, TaskId: msg.Tasklab.Taskid, ImgId: i.Id, SdkIds: []string{}})
 			logger.Warn("__________________________________________寰�ES鎻掑叆yolo鏁版嵁鎴愬姛")
 			//os.Exit(1)
 		}
@@ -432,10 +442,12 @@
 		return
 
 	}
-	err = EsReq("POST", personAction, requstbody)
-	if err != nil {
+	resp1, err1 := EsReq("POST", personAction, requstbody)
+	if err1 != nil {
 		logger.Error("寰�ES鎻掑叆鏁版嵁澶辫触", err)
 	} else {
+		logger.Info("鎻掑叆es杩斿洖鐨勪俊鎭細", resp1)
+		ruleserver.AddLxMessage(&protomsg.VideotapeInfo{EsDataId: resp1["_id"].(string), CameraId: msg.Cid, TaskId: msg.Tasklab.Taskid, ImgId: i.Id, SdkIds: []string{}})
 		logger.Warn("__________________________________________寰�ES鎻掑叆yolo鏁版嵁鎴愬姛")
 		//os.Exit(1)
 	}
diff --git a/main.go b/main.go
index d6a9bdf..629a45e 100644
--- a/main.go
+++ b/main.go
@@ -42,6 +42,7 @@
 	//fmt.Println("缂撳瓨鍒濆鍖栧畬鎴�",<- initchan)//dbserver鍒濆鍖栧畬姣�
 	ruleserver.Init()
 	go ruleserver.TimeTicker()
+	go ruleserver.StartServer()
 	nReciever("ipc:///tmp/sdk-2-rules-process.ipc", deliver.PushPull, 1)
 	wg.Wait()
 }
diff --git a/ruleserver/server.go b/ruleserver/server.go
new file mode 100644
index 0000000..398c79d
--- /dev/null
+++ b/ruleserver/server.go
@@ -0,0 +1,60 @@
+package ruleserver
+
+import (
+	"basic.com/pubsub/protomsg.git"
+	"basic.com/valib/gopherdiscovery.git"
+	"github.com/gogo/protobuf/proto"
+	"ruleprocess/logger"
+	"time"
+)
+
+type PubSubServer struct {
+
+}
+
+const (
+	Discovery_Server = "tcp://192.168.1.123:40008"
+    Discovery_UrlPubSub = "tcp://192.168.1.123:50008"
+)
+
+//鍚姩discovery鐨剆erver
+var discoveryServer *gopherdiscovery.DiscoveryServer
+var videotapChan chan *protomsg.VideotapeInfo
+func StartServer() {
+	var clients []string
+	var err error
+	var (
+		defaultOpts = gopherdiscovery.Options{
+			SurveyTime:   3 * time.Second,
+			//RecvDeadline: 3 * time.Second,
+			PollTime:     5 * time.Second,
+		}
+	)
+
+	discoveryServer, err = gopherdiscovery.Server(Discovery_Server, Discovery_UrlPubSub, defaultOpts)
+
+	logger.Debug("server: ", discoveryServer)
+	logger.Debug("err:",err)
+	logger.Debug("clients: ",clients)
+
+	videotapChan = make(chan *protomsg.VideotapeInfo)
+
+	for {
+		select {
+		case dbMsg := <-videotapChan:
+			publishMessage(dbMsg)
+		}
+	}
+}
+
+//骞挎挱鏁版嵁搴撴敼鍙樼殑msg
+func publishMessage(msg *protomsg.VideotapeInfo) {
+	sendBytes,err := proto.Marshal(msg)
+	if err ==nil{
+		discoveryServer.PublishMsg(string(sendBytes))
+	}
+}
+
+func AddLxMessage(msg *protomsg.VideotapeInfo) {
+	videotapChan <- msg
+}
\ No newline at end of file

--
Gitblit v1.8.0