package service import ( "basic.com/pubsub/protomsg.git" "github.com/gogo/protobuf/proto" "github.com/satori/go.uuid" "nanomsg.org/go-mangos" "nanomsg.org/go-mangos/protocol/req" "nanomsg.org/go-mangos/transport/ipc" "nanomsg.org/go-mangos/transport/tcp" "strconv" "sync" "webserver/extend/config" "basic.com/valib/logger.git" ) type FaceCompareService struct { CompareNum string CompareArgs protomsg.CompareArgs } func NewFaceCompareService(args protomsg.CompareArgs) *FaceCompareService { return &FaceCompareService{ CompareNum: uuid.NewV4().String(), CompareArgs:args, } } type CompareOnce struct { CompareNum string `json:"compareOnce"` CompareData *CompareList `json:"compareData"` } type CompareList []*protomsg.SdkCompareEach func (s CompareList) Len()int { return len(s) } func (s CompareList) Swap(i,j int) { s[i],s[j] = s[j],s[i] } func (s CompareList) Less(i,j int) bool{//降序排序 return s[i].CompareScore > s[j].CompareScore } var compResultHisM = make(map[string]*CompareOnce,0) var lock sync.Mutex func SetCompResultByNum(co *CompareOnce) { lock.Lock() defer lock.Unlock() compResultHisM[co.CompareNum] = co } func GetCompResultByNum(num string) *CompareOnce { lock.Lock() defer lock.Unlock() if co,ok := compResultHisM[num];ok { return co } else { return nil } } var CaptureTable = "capturetable" // 抓拍库 func (sv *FaceCompareService) CompareVideoPersons() *CompareList{ sv.CompareArgs.TableIds = []string { CaptureTable } b, err := proto.Marshal(&sv.CompareArgs) esCompServerList := config.EsCompServerInfo.Ips logger.Debug("compServerList:", esCompServerList) //1.向各个Es compare进程发起请求拿到比对结果 var resultList CompareList for _,str :=range esCompServerList{ reqUrl := "tcp://"+str + ":"+strconv.Itoa(config.EsCompServerInfo.Port) resultB := doCompareRequest(reqUrl,b) if resultB == nil || len(*resultB) ==0 { continue } var sdkCompResult protomsg.SdkCompareResult err = proto.Unmarshal(*resultB, &sdkCompResult) if err !=nil { logger.Debug("comp sdkCompareResult unmarshal err:", err) continue } logger.Debug("comp len(rList):", len(sdkCompResult.CompareResult)) if len(sdkCompResult.CompareResult) >0 { resultList = append(resultList, sdkCompResult.CompareResult...) } } logger.Debug("comp totalList.len:", len(resultList)) return &resultList } //比对底库 func (sv *FaceCompareService) CompareDbPersons() *CompareList{ b, err := proto.Marshal(&sv.CompareArgs) dbPersonCompServerUrl := config.DbPersonCompInfo.Ip logger.Debug("comp Server url:", dbPersonCompServerUrl) var resultList CompareList reqUrl := "tcp://"+dbPersonCompServerUrl+":"+strconv.Itoa(config.DbPersonCompInfo.Port) resultB := doCompareRequest(reqUrl,b) if resultB == nil || len(*resultB) ==0 { return nil } var sdkCompResult protomsg.SdkCompareResult err = proto.Unmarshal(*resultB, &sdkCompResult) if err !=nil { logger.Debug("comp sdkCompareResult unmarshal err:", err) return nil } logger.Debug("comp len(rList):", len(sdkCompResult.CompareResult)) if len(sdkCompResult.CompareResult) >0 { resultList = append(resultList, sdkCompResult.CompareResult...) } logger.Debug("comp totalList.len:", len(resultList)) return &resultList } func doCompareRequest(url string,args []byte) *[]byte{ logger.Debug("comp reqUrl:",url) var sock mangos.Socket var err error var msg []byte if sock,err = req.NewSocket();err !=nil { logger.Debug("comp can't new req socket:%s",err.Error()) return nil } sock.AddTransport(ipc.NewTransport()) sock.AddTransport(tcp.NewTransport()) if err = sock.Dial(url);err !=nil { logger.Debug("comp can't dial on req socket:%s",err.Error()) return nil } sock.SetOption(mangos.OptionMaxRecvSize, 1024*1024*100) //sock.SetOption(mangos.OptionRecvDeadline, time.Second*10) if err = sock.Send(args);err !=nil { logger.Debug("comp can't send message on push socket:%s",err.Error()) return nil } if msg,err = sock.Recv();err !=nil { logger.Debug("comp sock.Recv receive err:%s",err.Error()) return nil } sock.Close() return &msg }