From f4e8f206a6760bdc31734dfcb1c65916b5b76311 Mon Sep 17 00:00:00 2001 From: liuxiaolong <736321739@qq.com> Date: 星期四, 22 八月 2019 12:05:58 +0800 Subject: [PATCH] add cluster --- service/FaceCompareService.go | 254 ++++++++++++-------------------------------------- 1 files changed, 62 insertions(+), 192 deletions(-) diff --git a/service/FaceCompareService.go b/service/FaceCompareService.go index 96fc1ac..04d5040 100644 --- a/service/FaceCompareService.go +++ b/service/FaceCompareService.go @@ -1,22 +1,16 @@ package service import ( - esApi "basic.com/pubsub/esutil.git" "basic.com/pubsub/protomsg.git" - "encoding/json" "github.com/gogo/protobuf/proto" "github.com/satori/go.uuid" "nanomsg.org/go-mangos" "nanomsg.org/go-mangos/protocol/req" "nanomsg.org/go-mangos/transport/ipc" "nanomsg.org/go-mangos/transport/tcp" - "sort" - "strconv" "sync" - "time" "webserver/extend/config" "webserver/extend/logger" - "webserver/extend/util" ) type FaceCompareService struct { @@ -33,28 +27,19 @@ type CompareOnce struct { CompareNum string `json:"compareOnce"` - CompareData []CompareResult `json:"compareData"` + CompareData *CompareList `json:"compareData"` } -type CompareResultWrapper struct { - CompareData []CompareResult - By func(p,q *CompareResult) bool -} +type CompareList []*protomsg.SdkCompareEach -func (crw CompareResultWrapper) Len()int { - return len(crw.CompareData) +func (s CompareList) Len()int { + return len(s) } -func (crw CompareResultWrapper) Swap(i,j int) { - crw.CompareData[i],crw.CompareData[j] = crw.CompareData[j],crw.CompareData[i] +func (s CompareList) Swap(i,j int) { + s[i],s[j] = s[j],s[i] } -func (crw CompareResultWrapper) Less(i,j int) bool{ - return crw.By(&crw.CompareData[i],&crw.CompareData[j]) -} - -func SortByScore(list []CompareResult) { - sort.Sort(CompareResultWrapper{list, func(p, q *CompareResult) bool { - return q.CompareScore < p.CompareScore //閫掑噺鎺掑簭 - }}) +func (s CompareList) Less(i,j int) bool{//闄嶅簭鎺掑簭 + return s[i].CompareScore > s[j].CompareScore } var compResultHisM = make(map[string]*CompareOnce,0) @@ -78,209 +63,94 @@ var CaptureTable = "capturetable" // 鎶撴媿搴� -type CompareResult struct { - Id string `json:"id"` - CompareScore float32 `json:"compareScore"` - CameraId string `json:"cameraId"` - CameraAddr string `json:"cameraAddr"` - PicDate string `json:"picDate"` - Content string `json:"content"` - IsAlarm int `json:"isAlarm"` - PicMaxUrl string `json:"picMaxUrl"` - PicSmUrl []string `json:"picSmUrl"` - Sex string `json:"sex"` - AgeDescription string `json:"ageDescription"` - Race string `json:"race"` - TaskId string `json:"taskId"` - TaskName string `json:"taskName"` - BaseInfo []DbPersonVo `json:"baseInfo"` - VideoUrl string `json:"videoUrl"` - SdkName string `json:"sdkName"` -} -type DbPersonVo struct { - BwType string `json:"bwType"` - CompareScore float32 `json:"compareScore"` - IdCard string `json:"idCard"` - MonitorLevel string `json:"monitorLevel"` - PersonId string `json:"personId"` - PersonName string `json:"personName"` - PersonPicUrl string `json:"personPicUrl"` - PhoneNum string `json:"phoneNum"` - Sex string `json:"sex"` - TableId string `json:"tableId"` - TableName string `json:"tableName"` -} - -func (sv *FaceCompareService) Compare() *CompareOnce{ +func (sv *FaceCompareService) CompareVideoPersons() *CompareList{ + sv.CompareArgs.TableIds = []string { CaptureTable } b, err := proto.Marshal(&sv.CompareArgs) - compServerList := config.CompServerInfo.Url - logger.Debug("compServerList:", compServerList) - //1.鍚戝悇涓猚ompare杩涚▼鍙戣捣璇锋眰鎷垮埌姣斿缁撴灉 - resultList :=make([]CompareResult,0) - for _,str :=range compServerList{ + esCompServerList := config.EsCompServerInfo.Url + logger.Debug("compServerList:", esCompServerList) + //1.鍚戝悇涓狤s compare杩涚▼鍙戣捣璇锋眰鎷垮埌姣斿缁撴灉 + var resultList CompareList + + for _,str :=range esCompServerList{ reqUrl := "tcp://"+str resultB := doCompareRequest(reqUrl,b) if resultB == nil || len(*resultB) ==0 { continue } - rList :=make([]protomsg.Esinfo,0) - err = json.Unmarshal(*resultB, &rList) - if err !=nil{ - logger.Debug("recv result Unmarshal err:", err) + var sdkCompResult protomsg.SdkCompareResult + + err = proto.Unmarshal(*resultB, &sdkCompResult) + if err !=nil { + logger.Debug("comp sdkCompareResult unmarshal err:", err) continue } - logger.Debug("len(rList):", len(rList)) - if len(rList) >0 { - resultList = append(resultList, FillDataToCompareResult(&rList)...) + + logger.Debug("comp len(rList):", len(sdkCompResult.CompareResult)) + if len(sdkCompResult.CompareResult) >0 { + resultList = append(resultList, sdkCompResult.CompareResult...) } } + logger.Debug("comp totalList.len:", len(resultList)) - //2.缂撳瓨姣斿缁撴灉 - co := &CompareOnce{ - CompareNum: sv.CompareNum, - CompareData: resultList, + return &resultList +} + +//姣斿搴曞簱 +func (sv *FaceCompareService) CompareDbPersons() *CompareList{ + b, err := proto.Marshal(&sv.CompareArgs) + dbPersonCompServerUrl := config.DbPersonCompInfo.Url + logger.Debug("comp Server url:", dbPersonCompServerUrl) + + var resultList CompareList + + reqUrl := "tcp://"+dbPersonCompServerUrl + resultB := doCompareRequest(reqUrl,b) + if resultB == nil || len(*resultB) ==0 { + return nil } - SetCompResultByNum(co) + var sdkCompResult protomsg.SdkCompareResult + err = proto.Unmarshal(*resultB, &sdkCompResult) + if err !=nil { + logger.Debug("comp sdkCompareResult unmarshal err:", err) + return nil + } - return co + logger.Debug("comp len(rList):", len(sdkCompResult.CompareResult)) + if len(sdkCompResult.CompareResult) >0 { + resultList = append(resultList, sdkCompResult.CompareResult...) + } + + logger.Debug("comp totalList.len:", len(resultList)) + + return &resultList } func doCompareRequest(url string,args []byte) *[]byte{ - reqUrl := "tcp://"+url - logger.Debug("reqUrl:",reqUrl) + logger.Debug("comp reqUrl:",url) var sock mangos.Socket var err error var msg []byte if sock,err = req.NewSocket();err !=nil { - logger.Debug("can't new req socket:%s",err.Error()) + logger.Debug("comp can't new req socket:%s",err.Error()) return nil } sock.AddTransport(ipc.NewTransport()) sock.AddTransport(tcp.NewTransport()) - sock.SetOption(mangos.OptionRecvDeadline, time.Second*10) if err = sock.Dial(url);err !=nil { - logger.Debug("can't dial on req socket:%s",err.Error()) + logger.Debug("comp can't dial on req socket:%s",err.Error()) return nil } + sock.SetOption(mangos.OptionMaxRecvSize, 1024*1024*100) + //sock.SetOption(mangos.OptionRecvDeadline, time.Second*10) if err = sock.Send(args);err !=nil { - logger.Debug("can't send message on push socket:%s",err.Error()) + logger.Debug("comp can't send message on push socket:%s",err.Error()) return nil } if msg,err = sock.Recv();err !=nil { - logger.Debug("sock.Recv receive err:%s",err.Error()) + logger.Debug("comp sock.Recv receive err:%s",err.Error()) return nil } sock.Close() return &msg -} - -//濉厖鍚戝墠绔繑鍥炵殑鏁版嵁 -func FillDataToCompareResult(rList *[]protomsg.Esinfo) []CompareResult { - resultList :=make([]CompareResult,0) - - dbPersonM := make(map[string]*protomsg.Esinfo,0) - captureM := make(map[string]*protomsg.Esinfo,0) - personIds :=make([]string,0) - captureIds := make([]string,0) - for _,v :=range *rList{ - if v.Tableid == CaptureTable { - captureM[v.Id] = &v - captureIds = append(captureIds,v.Id) - } else { - dbPersonM[v.Id] = &v - personIds = append(personIds,v.Id) - } - } - logger.Debug("personIds:", personIds) - logger.Debug("captureIds:",captureIds) - esServerIp := config.EsInfo.Masterip - esServerPort := config.EsInfo.Httpport - index := config.EsInfo.EsIndex.Dbtablepersons.IndexName - var dbpersons []protomsg.Dbperson - if len(personIds) >0 { - dbpersons, _ = esApi.Dbpersoninfosbyid(personIds, index, esServerIp, esServerPort) - } - - logger.Debug("dbpersons:", dbpersons) - if dbpersons !=nil { - for _,p :=range dbpersons { - var dbP = DbPersonVo { - PersonId: p.Id, - IdCard: p.IdCard, - CompareScore: util.ParseScore(dbPersonM[p.Id].CompareScore), - MonitorLevel: p.MonitorLevel, - PersonName: p.PersonName, - PersonPicUrl: p.PersonPicUrl, - PhoneNum: p.PhoneNum, - Sex: p.Sex, - TableId: p.TableId, - } - dbTableInfos, _ := esApi.Dbtablefosbyid([]string{p.TableId}, config.EsInfo.EsIndex.DbTables.IndexName, esServerIp, esServerPort) - if dbTableInfos !=nil{ - dbP.BwType = dbTableInfos[0].BwType - dbP.TableName = dbTableInfos[0].TableName - } - var cr = CompareResult{ - BaseInfo:[]DbPersonVo{ dbP }, - } - resultList = append(resultList,cr) - } - } - var capturePersons []protomsg.Videopersons - if len(captureIds) >0 { - logger.Debug("capturePersons:", capturePersons) - videopersons, _ := esApi.Videopersonsinfosbyid(captureIds, config.EsInfo.EsIndex.VideoPersons.IndexName, config.EsInfo.Masterip, config.EsInfo.Httpport) - logger.Debug("videoPersons.len:",len(videopersons)) - for _,vp :=range videopersons { - isAlarmInt, _ := strconv.Atoi(vp.IsAlarm) - bi := make([]DbPersonVo,0) - for _,p :=range vp.BaseInfo { - bi = append(bi, DbPersonVo{ - PersonId: p.PersonId, - IdCard: p.IdCard, - CompareScore: util.ParseScore(p.CompareScore), - MonitorLevel: parseMonitorLevel(p.MonitorLevel), - PersonName: p.PersonName, - PersonPicUrl: p.PersonPicUrl, - PhoneNum: p.PhoneNum, - Sex: p.Sex, - TableId: p.TableId, - }) - } - vpE := CompareResult{ - Id: vp.Id, - CompareScore: util.ParseScore(captureM[vp.Id].CompareScore), - CameraId: vp.CameraId, - CameraAddr: vp.CameraAddr, - PicDate: vp.PicDate, - PicMaxUrl: vp.PicMaxUrl, - PicSmUrl: []string{ vp.PicSmUrl }, - IsAlarm: isAlarmInt, - Sex: vp.Sex, - AgeDescription: vp.AgeDescription, - Race: vp.Race, - TaskName: vp.TaskName, - TaskId: vp.TaskId, - VideoUrl: vp.VideoUrl, - BaseInfo: bi, - SdkName: "浜鸿劯", - } - resultList = append(resultList,vpE) - } - } - return resultList -} - -func parseMonitorLevel(level string) string { - if level == "1" { - return "涓�绾�" - } - if level == "2" { - return "浜岀骇" - } - if level == "3" { - return "涓夌骇" - } - return level } \ No newline at end of file -- Gitblit v1.8.0