From b1eccb805a42d24df12832eb4f62a12897310c50 Mon Sep 17 00:00:00 2001
From: panlei <2799247126@qq.com>
Date: 星期五, 19 七月 2019 18:19:30 +0800
Subject: [PATCH] 加个消息广播
---
insertdata/EsClient.go | 15 +++++--
go.sum | 10 ++---
insertdata/insertDataToEs.go | 24 +++++++++---
ruleserver/server.go | 60 ++++++++++++++++++++++++++++++
go.mod | 2
main.go | 1
6 files changed, 94 insertions(+), 18 deletions(-)
diff --git a/go.mod b/go.mod
index dde93ce..b0cebed 100644
--- a/go.mod
+++ b/go.mod
@@ -5,7 +5,7 @@
require (
basic.com/dbapi.git v0.0.0-20190701055817-73bca225181f
basic.com/pubsub/cache.git v0.0.0-20190718093725-6a413e1d7d48
- basic.com/pubsub/protomsg.git v0.0.0-20190717072554-576620e5ba07
+ basic.com/pubsub/protomsg.git v0.0.0-20190719101124-f94ca7d72b4f
basic.com/pubsub/sdkcompare.git v0.0.0-20190715013640-f536a4647d00
basic.com/valib/deliver.git v0.0.0-20190531095353-25d8c3b20051
basic.com/valib/gopherdiscovery.git v0.0.0-20190605034340-15d89d8b4e28
diff --git a/go.sum b/go.sum
index de97b34..57e5177 100644
--- a/go.sum
+++ b/go.sum
@@ -6,12 +6,10 @@
basic.com/pubsub/cache.git v0.0.0-20190718024458-be52360c4814/go.mod h1:gHLJZz2ee1cGL0X0ae69fs56bAxkDgEQwDhhXZJNUcY=
basic.com/pubsub/cache.git v0.0.0-20190718093725-6a413e1d7d48 h1:BBA30Rgljn6MRieC4gUncETJDyna3ObyubTo9HEQ2M0=
basic.com/pubsub/cache.git v0.0.0-20190718093725-6a413e1d7d48/go.mod h1:gHLJZz2ee1cGL0X0ae69fs56bAxkDgEQwDhhXZJNUcY=
-basic.com/pubsub/protomsg.git v0.0.0-20190709070734-b34c868adcc2 h1:ygh9CQPS48KmXv+PNUrOcrMqIiDZOs11apnQdu9oGEY=
-basic.com/pubsub/protomsg.git v0.0.0-20190709070734-b34c868adcc2/go.mod h1:un5NV5VWQoblVLZfx1Rt5vyLgwR0jI92d3VJhfrJhWU=
-basic.com/pubsub/protomsg.git v0.0.0-20190712081201-5a482419c227 h1:1jprxyxmeQ8X4/S7cFnhRf4ByVqD0xLZNvx8/0xTk2k=
-basic.com/pubsub/protomsg.git v0.0.0-20190712081201-5a482419c227/go.mod h1:un5NV5VWQoblVLZfx1Rt5vyLgwR0jI92d3VJhfrJhWU=
-basic.com/pubsub/protomsg.git v0.0.0-20190717072554-576620e5ba07 h1:ZeiaQTNSB6K5ASpcwYtk0bWabMQX0r2JX/5agzTznRs=
-basic.com/pubsub/protomsg.git v0.0.0-20190717072554-576620e5ba07/go.mod h1:un5NV5VWQoblVLZfx1Rt5vyLgwR0jI92d3VJhfrJhWU=
+basic.com/pubsub/protomsg.git v0.0.0-20190719070700-1043c79a70d4 h1:J07AaCWdBtET+3QDxjRC3IIsOcEAnpIzzVA67iN2qdI=
+basic.com/pubsub/protomsg.git v0.0.0-20190719070700-1043c79a70d4/go.mod h1:un5NV5VWQoblVLZfx1Rt5vyLgwR0jI92d3VJhfrJhWU=
+basic.com/pubsub/protomsg.git v0.0.0-20190719101124-f94ca7d72b4f h1:UqgUBY7xucnVlfBIknzarAMikWsyuLd8t51ICyv90T0=
+basic.com/pubsub/protomsg.git v0.0.0-20190719101124-f94ca7d72b4f/go.mod h1:un5NV5VWQoblVLZfx1Rt5vyLgwR0jI92d3VJhfrJhWU=
basic.com/pubsub/sdkcompare.git v0.0.0-20190715013640-f536a4647d00 h1:sK+Tx7rvM9J2WnNIwrzMDjZSylWiKNfQO0prUBfKsDk=
basic.com/pubsub/sdkcompare.git v0.0.0-20190715013640-f536a4647d00/go.mod h1:8by33F9E1w17Pw/rDgJGJXAo122w0wDENG14hiMS+RE=
basic.com/valib/deliver.git v0.0.0-20190531095353-25d8c3b20051 h1:9flC2o3kasaM2Y6I+mY+mxmve/pyAY/UzGQZLT3lFHM=
diff --git a/insertdata/EsClient.go b/insertdata/EsClient.go
index 7a72deb..c83eb87 100644
--- a/insertdata/EsClient.go
+++ b/insertdata/EsClient.go
@@ -7,6 +7,7 @@
"io"
"io/ioutil"
"net/http"
+ "ruleprocess/logger"
"strconv"
"strings"
"time"
@@ -128,7 +129,7 @@
}
**/
-func EsReq(method string, url string, parama []byte) (err error) {
+func EsReq(method string, url string, parama []byte) (maps map[string]interface{},err error) {
timeout := time.Duration(10 * time.Second)
client := http.Client{
Timeout: timeout,
@@ -138,13 +139,13 @@
if err != nil {
fmt.Println("build request fail !")
- return err
+ return nil, err
}
resp, err := client.Do(request)
if err != nil{
fmt.Println("request error: ", err)
- return err
+ return nil,err
}
defer resp.Body.Close()
@@ -153,9 +154,13 @@
if err != nil {
fmt.Println(err)
}
-
+ decoder := make(map[string]interface{})
+ if err := json.Unmarshal([]byte(string(body)), &decoder); err != nil {
+ return nil, err
+ }
fmt.Println(string(body))
- return nil
+ logger.Info(string(body))
+ return decoder ,nil
}
diff --git a/insertdata/insertDataToEs.go b/insertdata/insertDataToEs.go
index 9a9bec8..5f23d6d 100644
--- a/insertdata/insertDataToEs.go
+++ b/insertdata/insertDataToEs.go
@@ -209,9 +209,12 @@
logger.Info("json parse error ", err)
return
}
- err1 := EsReq("POST", videoPersonUrl, requstbody)
+ resp1, err1 := EsReq("POST", videoPersonUrl, requstbody)
if err1 != nil {
logger.Error("涓婁紶ES鍑洪敊锛�---", err1)
+ } else {
+ logger.Info("鎻掑叆es杩斿洖鐨勪俊鎭細", resp1)
+ ruleserver.AddLxMessage(&protomsg.VideotapeInfo{EsDataId: resp1["_id"].(string), CameraId: msg.Cid, TaskId: msg.Tasklab.Taskid, ImgId: i.Id, SdkIds: []string{}})
}
}
}
@@ -278,9 +281,13 @@
logger.Info("json parse error ", err)
return
}
- err1 := EsReq("POST", videoPersonUrl, requstbody)
+ resp1, err1 := EsReq("POST", videoPersonUrl, requstbody)
+
if err1 != nil {
logger.Error("涓婁紶ES鍑洪敊锛�---", err1)
+ } else {
+ logger.Info("鎻掑叆es杩斿洖鐨勪俊鎭細", resp1)
+ ruleserver.AddLxMessage(&protomsg.VideotapeInfo{EsDataId: resp1["_id"].(string), CameraId: msg.Cid, TaskId: msg.Tasklab.Taskid, ImgId: i.Id, SdkIds: []string{}})
}
//if msg.RuleResult["cacheData"] != nil {
// InsertFace(msg.RuleResult["cacheData"].(ruleserver.ResultMsg))
@@ -362,10 +369,13 @@
return
}
- err = EsReq("POST", personAction, requstbody)
- if err != nil {
+ resp1, err2 := EsReq("POST", personAction, requstbody)
+ if err2 != nil {
logger.Error("寰�ES鎻掑叆鏁版嵁澶辫触", err)
} else {
+ logger.Debug("鎻掑叆es杩斿洖鐨勬暟鎹俊鎭槸锛�", resp1)
+ // 鍙戝嚭褰曞儚淇″彿
+ ruleserver.AddLxMessage(&protomsg.VideotapeInfo{EsDataId: resp1["_id"].(string), CameraId: msg.Cid, TaskId: msg.Tasklab.Taskid, ImgId: i.Id, SdkIds: []string{}})
logger.Warn("__________________________________________寰�ES鎻掑叆yolo鏁版嵁鎴愬姛")
//os.Exit(1)
}
@@ -432,10 +442,12 @@
return
}
- err = EsReq("POST", personAction, requstbody)
- if err != nil {
+ resp1, err1 := EsReq("POST", personAction, requstbody)
+ if err1 != nil {
logger.Error("寰�ES鎻掑叆鏁版嵁澶辫触", err)
} else {
+ logger.Info("鎻掑叆es杩斿洖鐨勪俊鎭細", resp1)
+ ruleserver.AddLxMessage(&protomsg.VideotapeInfo{EsDataId: resp1["_id"].(string), CameraId: msg.Cid, TaskId: msg.Tasklab.Taskid, ImgId: i.Id, SdkIds: []string{}})
logger.Warn("__________________________________________寰�ES鎻掑叆yolo鏁版嵁鎴愬姛")
//os.Exit(1)
}
diff --git a/main.go b/main.go
index d6a9bdf..629a45e 100644
--- a/main.go
+++ b/main.go
@@ -42,6 +42,7 @@
//fmt.Println("缂撳瓨鍒濆鍖栧畬鎴�",<- initchan)//dbserver鍒濆鍖栧畬姣�
ruleserver.Init()
go ruleserver.TimeTicker()
+ go ruleserver.StartServer()
nReciever("ipc:///tmp/sdk-2-rules-process.ipc", deliver.PushPull, 1)
wg.Wait()
}
diff --git a/ruleserver/server.go b/ruleserver/server.go
new file mode 100644
index 0000000..398c79d
--- /dev/null
+++ b/ruleserver/server.go
@@ -0,0 +1,60 @@
+package ruleserver
+
+import (
+ "basic.com/pubsub/protomsg.git"
+ "basic.com/valib/gopherdiscovery.git"
+ "github.com/gogo/protobuf/proto"
+ "ruleprocess/logger"
+ "time"
+)
+
+type PubSubServer struct {
+
+}
+
+const (
+ Discovery_Server = "tcp://192.168.1.123:40008"
+ Discovery_UrlPubSub = "tcp://192.168.1.123:50008"
+)
+
+//鍚姩discovery鐨剆erver
+var discoveryServer *gopherdiscovery.DiscoveryServer
+var videotapChan chan *protomsg.VideotapeInfo
+func StartServer() {
+ var clients []string
+ var err error
+ var (
+ defaultOpts = gopherdiscovery.Options{
+ SurveyTime: 3 * time.Second,
+ //RecvDeadline: 3 * time.Second,
+ PollTime: 5 * time.Second,
+ }
+ )
+
+ discoveryServer, err = gopherdiscovery.Server(Discovery_Server, Discovery_UrlPubSub, defaultOpts)
+
+ logger.Debug("server: ", discoveryServer)
+ logger.Debug("err:",err)
+ logger.Debug("clients: ",clients)
+
+ videotapChan = make(chan *protomsg.VideotapeInfo)
+
+ for {
+ select {
+ case dbMsg := <-videotapChan:
+ publishMessage(dbMsg)
+ }
+ }
+}
+
+//骞挎挱鏁版嵁搴撴敼鍙樼殑msg
+func publishMessage(msg *protomsg.VideotapeInfo) {
+ sendBytes,err := proto.Marshal(msg)
+ if err ==nil{
+ discoveryServer.PublishMsg(string(sendBytes))
+ }
+}
+
+func AddLxMessage(msg *protomsg.VideotapeInfo) {
+ videotapChan <- msg
+}
\ No newline at end of file
--
Gitblit v1.8.0