From cc962c1b6ffabd9d41e6db0571efbcc1a4568ce7 Mon Sep 17 00:00:00 2001
From: liuxiaolong <736321739@qq.com>
Date: 星期一, 29 七月 2019 11:39:56 +0800
Subject: [PATCH] use mangos req instead of deliver,add recv timeout
---
service/FaceCompareService.go | 51 +++++++++++++++++++++++++++++++++++++++------------
1 files changed, 39 insertions(+), 12 deletions(-)
diff --git a/service/FaceCompareService.go b/service/FaceCompareService.go
index c4714c5..96fc1ac 100644
--- a/service/FaceCompareService.go
+++ b/service/FaceCompareService.go
@@ -3,13 +3,17 @@
import (
esApi "basic.com/pubsub/esutil.git"
"basic.com/pubsub/protomsg.git"
- "basic.com/valib/deliver.git"
"encoding/json"
"github.com/gogo/protobuf/proto"
"github.com/satori/go.uuid"
+ "nanomsg.org/go-mangos"
+ "nanomsg.org/go-mangos/protocol/req"
+ "nanomsg.org/go-mangos/transport/ipc"
+ "nanomsg.org/go-mangos/transport/tcp"
"sort"
"strconv"
"sync"
+ "time"
"webserver/extend/config"
"webserver/extend/logger"
"webserver/extend/util"
@@ -115,19 +119,12 @@
resultList :=make([]CompareResult,0)
for _,str :=range compServerList{
reqUrl := "tcp://"+str
- reqClient := deliver.NewClient(deliver.ReqRep, reqUrl)
- err = reqClient.Send(b)
- if err !=nil{
- logger.Debug("reqClient.Send err:",err)
- continue
- }
- resultB, err := reqClient.Recv()
- if err !=nil{
- logger.Debug("reqClient.Recv err:",err)
+ resultB := doCompareRequest(reqUrl,b)
+ if resultB == nil || len(*resultB) ==0 {
continue
}
rList :=make([]protomsg.Esinfo,0)
- err = json.Unmarshal(resultB, &rList)
+ err = json.Unmarshal(*resultB, &rList)
if err !=nil{
logger.Debug("recv result Unmarshal err:", err)
continue
@@ -146,6 +143,36 @@
SetCompResultByNum(co)
return co
+}
+
+func doCompareRequest(url string,args []byte) *[]byte{
+ reqUrl := "tcp://"+url
+ logger.Debug("reqUrl:",reqUrl)
+ var sock mangos.Socket
+ var err error
+ var msg []byte
+
+ if sock,err = req.NewSocket();err !=nil {
+ logger.Debug("can't new req socket:%s",err.Error())
+ return nil
+ }
+ sock.AddTransport(ipc.NewTransport())
+ sock.AddTransport(tcp.NewTransport())
+ sock.SetOption(mangos.OptionRecvDeadline, time.Second*10)
+ if err = sock.Dial(url);err !=nil {
+ logger.Debug("can't dial on req socket:%s",err.Error())
+ return nil
+ }
+ if err = sock.Send(args);err !=nil {
+ logger.Debug("can't send message on push socket:%s",err.Error())
+ return nil
+ }
+ if msg,err = sock.Recv();err !=nil {
+ logger.Debug("sock.Recv receive err:%s",err.Error())
+ return nil
+ }
+ sock.Close()
+ return &msg
}
//濉厖鍚戝墠绔繑鍥炵殑鏁版嵁
@@ -207,7 +234,7 @@
logger.Debug("videoPersons.len:",len(videopersons))
for _,vp :=range videopersons {
isAlarmInt, _ := strconv.Atoi(vp.IsAlarm)
- var bi []DbPersonVo
+ bi := make([]DbPersonVo,0)
for _,p :=range vp.BaseInfo {
bi = append(bi, DbPersonVo{
PersonId: p.PersonId,
--
Gitblit v1.8.0