panlei
2019-07-19 b1eccb805a42d24df12832eb4f62a12897310c50
加个消息广播
1个文件已添加
5个文件已修改
112 ■■■■ 已修改文件
go.mod 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
go.sum 10 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
insertdata/EsClient.go 15 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
insertdata/insertDataToEs.go 24 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
main.go 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
ruleserver/server.go 60 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
go.mod
@@ -5,7 +5,7 @@
require (
    basic.com/dbapi.git v0.0.0-20190701055817-73bca225181f
    basic.com/pubsub/cache.git v0.0.0-20190718093725-6a413e1d7d48
    basic.com/pubsub/protomsg.git v0.0.0-20190717072554-576620e5ba07
    basic.com/pubsub/protomsg.git v0.0.0-20190719101124-f94ca7d72b4f
    basic.com/pubsub/sdkcompare.git v0.0.0-20190715013640-f536a4647d00
    basic.com/valib/deliver.git v0.0.0-20190531095353-25d8c3b20051
    basic.com/valib/gopherdiscovery.git v0.0.0-20190605034340-15d89d8b4e28
go.sum
@@ -6,12 +6,10 @@
basic.com/pubsub/cache.git v0.0.0-20190718024458-be52360c4814/go.mod h1:gHLJZz2ee1cGL0X0ae69fs56bAxkDgEQwDhhXZJNUcY=
basic.com/pubsub/cache.git v0.0.0-20190718093725-6a413e1d7d48 h1:BBA30Rgljn6MRieC4gUncETJDyna3ObyubTo9HEQ2M0=
basic.com/pubsub/cache.git v0.0.0-20190718093725-6a413e1d7d48/go.mod h1:gHLJZz2ee1cGL0X0ae69fs56bAxkDgEQwDhhXZJNUcY=
basic.com/pubsub/protomsg.git v0.0.0-20190709070734-b34c868adcc2 h1:ygh9CQPS48KmXv+PNUrOcrMqIiDZOs11apnQdu9oGEY=
basic.com/pubsub/protomsg.git v0.0.0-20190709070734-b34c868adcc2/go.mod h1:un5NV5VWQoblVLZfx1Rt5vyLgwR0jI92d3VJhfrJhWU=
basic.com/pubsub/protomsg.git v0.0.0-20190712081201-5a482419c227 h1:1jprxyxmeQ8X4/S7cFnhRf4ByVqD0xLZNvx8/0xTk2k=
basic.com/pubsub/protomsg.git v0.0.0-20190712081201-5a482419c227/go.mod h1:un5NV5VWQoblVLZfx1Rt5vyLgwR0jI92d3VJhfrJhWU=
basic.com/pubsub/protomsg.git v0.0.0-20190717072554-576620e5ba07 h1:ZeiaQTNSB6K5ASpcwYtk0bWabMQX0r2JX/5agzTznRs=
basic.com/pubsub/protomsg.git v0.0.0-20190717072554-576620e5ba07/go.mod h1:un5NV5VWQoblVLZfx1Rt5vyLgwR0jI92d3VJhfrJhWU=
basic.com/pubsub/protomsg.git v0.0.0-20190719070700-1043c79a70d4 h1:J07AaCWdBtET+3QDxjRC3IIsOcEAnpIzzVA67iN2qdI=
basic.com/pubsub/protomsg.git v0.0.0-20190719070700-1043c79a70d4/go.mod h1:un5NV5VWQoblVLZfx1Rt5vyLgwR0jI92d3VJhfrJhWU=
basic.com/pubsub/protomsg.git v0.0.0-20190719101124-f94ca7d72b4f h1:UqgUBY7xucnVlfBIknzarAMikWsyuLd8t51ICyv90T0=
basic.com/pubsub/protomsg.git v0.0.0-20190719101124-f94ca7d72b4f/go.mod h1:un5NV5VWQoblVLZfx1Rt5vyLgwR0jI92d3VJhfrJhWU=
basic.com/pubsub/sdkcompare.git v0.0.0-20190715013640-f536a4647d00 h1:sK+Tx7rvM9J2WnNIwrzMDjZSylWiKNfQO0prUBfKsDk=
basic.com/pubsub/sdkcompare.git v0.0.0-20190715013640-f536a4647d00/go.mod h1:8by33F9E1w17Pw/rDgJGJXAo122w0wDENG14hiMS+RE=
basic.com/valib/deliver.git v0.0.0-20190531095353-25d8c3b20051 h1:9flC2o3kasaM2Y6I+mY+mxmve/pyAY/UzGQZLT3lFHM=
insertdata/EsClient.go
@@ -7,6 +7,7 @@
    "io"
    "io/ioutil"
    "net/http"
    "ruleprocess/logger"
    "strconv"
    "strings"
    "time"
@@ -128,7 +129,7 @@
    }
**/
func EsReq(method string, url string, parama []byte) (err error) {
func EsReq(method string, url string, parama []byte) (maps map[string]interface{},err error) {
    timeout := time.Duration(10 * time.Second) 
    client := http.Client{
        Timeout: timeout,
@@ -138,13 +139,13 @@
    if err != nil {
        fmt.Println("build request fail !")
        return err
        return nil, err
    }
    resp, err := client.Do(request)
    if err != nil{
        fmt.Println("request error: ", err)
        return err
        return nil,err
    }
    defer resp.Body.Close()
@@ -153,9 +154,13 @@
    if err != nil {
        fmt.Println(err) 
    }
    decoder := make(map[string]interface{})
    if err := json.Unmarshal([]byte(string(body)), &decoder); err != nil {
        return nil, err
    }
    fmt.Println(string(body))
    return nil
    logger.Info(string(body))
    return decoder ,nil
}
insertdata/insertDataToEs.go
@@ -209,9 +209,12 @@
                logger.Info("json parse error ", err)
                return
            }
            err1 := EsReq("POST", videoPersonUrl, requstbody)
            resp1, err1 := EsReq("POST", videoPersonUrl, requstbody)
            if err1 != nil {
                logger.Error("上传ES出错!---", err1)
            } else {
                logger.Info("插入es返回的信息:", resp1)
                ruleserver.AddLxMessage(&protomsg.VideotapeInfo{EsDataId: resp1["_id"].(string), CameraId: msg.Cid, TaskId: msg.Tasklab.Taskid, ImgId: i.Id, SdkIds: []string{}})
            }
        }
    }
@@ -278,9 +281,13 @@
        logger.Info("json parse error ", err)
        return
    }
    err1 := EsReq("POST", videoPersonUrl, requstbody)
    resp1, err1 := EsReq("POST", videoPersonUrl, requstbody)
    if err1 != nil {
        logger.Error("上传ES出错!---", err1)
    } else {
        logger.Info("插入es返回的信息:", resp1)
        ruleserver.AddLxMessage(&protomsg.VideotapeInfo{EsDataId: resp1["_id"].(string), CameraId: msg.Cid, TaskId: msg.Tasklab.Taskid, ImgId: i.Id, SdkIds: []string{}})
    }
    //if msg.RuleResult["cacheData"] != nil {
    //    InsertFace(msg.RuleResult["cacheData"].(ruleserver.ResultMsg))
@@ -362,10 +369,13 @@
            return
        }
        err = EsReq("POST", personAction, requstbody)
        if err != nil {
        resp1, err2 := EsReq("POST", personAction, requstbody)
        if err2 != nil {
            logger.Error("往ES插入数据失败", err)
        } else {
            logger.Debug("插入es返回的数据信息是:", resp1)
            // 发出录像信号
            ruleserver.AddLxMessage(&protomsg.VideotapeInfo{EsDataId: resp1["_id"].(string), CameraId: msg.Cid, TaskId: msg.Tasklab.Taskid, ImgId: i.Id, SdkIds: []string{}})
            logger.Warn("__________________________________________往ES插入yolo数据成功")
            //os.Exit(1)
        }
@@ -432,10 +442,12 @@
        return
    }
    err = EsReq("POST", personAction, requstbody)
    if err != nil {
    resp1, err1 := EsReq("POST", personAction, requstbody)
    if err1 != nil {
        logger.Error("往ES插入数据失败", err)
    } else {
        logger.Info("插入es返回的信息:", resp1)
        ruleserver.AddLxMessage(&protomsg.VideotapeInfo{EsDataId: resp1["_id"].(string), CameraId: msg.Cid, TaskId: msg.Tasklab.Taskid, ImgId: i.Id, SdkIds: []string{}})
        logger.Warn("__________________________________________往ES插入yolo数据成功")
        //os.Exit(1)
    }
main.go
@@ -42,6 +42,7 @@
    //fmt.Println("缓存初始化完成",<- initchan)//dbserver初始化完毕
    ruleserver.Init()
    go ruleserver.TimeTicker()
    go ruleserver.StartServer()
    nReciever("ipc:///tmp/sdk-2-rules-process.ipc", deliver.PushPull, 1)
    wg.Wait()
}
ruleserver/server.go
New file
@@ -0,0 +1,60 @@
package ruleserver
import (
    "basic.com/pubsub/protomsg.git"
    "basic.com/valib/gopherdiscovery.git"
    "github.com/gogo/protobuf/proto"
    "ruleprocess/logger"
    "time"
)
type PubSubServer struct {
}
const (
    Discovery_Server = "tcp://192.168.1.123:40008"
    Discovery_UrlPubSub = "tcp://192.168.1.123:50008"
)
//启动discovery的server
var discoveryServer *gopherdiscovery.DiscoveryServer
var videotapChan chan *protomsg.VideotapeInfo
func StartServer() {
    var clients []string
    var err error
    var (
        defaultOpts = gopherdiscovery.Options{
            SurveyTime:   3 * time.Second,
            //RecvDeadline: 3 * time.Second,
            PollTime:     5 * time.Second,
        }
    )
    discoveryServer, err = gopherdiscovery.Server(Discovery_Server, Discovery_UrlPubSub, defaultOpts)
    logger.Debug("server: ", discoveryServer)
    logger.Debug("err:",err)
    logger.Debug("clients: ",clients)
    videotapChan = make(chan *protomsg.VideotapeInfo)
    for {
        select {
        case dbMsg := <-videotapChan:
            publishMessage(dbMsg)
        }
    }
}
//广播数据库改变的msg
func publishMessage(msg *protomsg.VideotapeInfo) {
    sendBytes,err := proto.Marshal(msg)
    if err ==nil{
        discoveryServer.PublishMsg(string(sendBytes))
    }
}
func AddLxMessage(msg *protomsg.VideotapeInfo) {
    videotapChan <- msg
}