package service import ( "basic.com/pubsub/esutil.git" "basic.com/pubsub/protomsg.git" "basic.com/valib/bhomedbapi.git" "basic.com/valib/logger.git" "github.com/gogo/protobuf/proto" "github.com/satori/go.uuid" "nanomsg.org/go-mangos" "nanomsg.org/go-mangos/protocol/req" "nanomsg.org/go-mangos/transport/ipc" "nanomsg.org/go-mangos/transport/tcp" "strconv" "sync" "time" "vamicro/config" ) type FaceCompareService struct { CompareNum string CompareArgs protomsg.CompareArgs } func NewFaceCompareService(args protomsg.CompareArgs) *FaceCompareService { return &FaceCompareService{ CompareNum: uuid.NewV4().String(), CompareArgs: args, } } type CompareOnce struct { CompareNum string `json:"compareOnce"` CompareData *CompareList `json:"compareData"` } type CompareList []*protomsg.SdkCompareEach func (s CompareList) Len() int { return len(s) } func (s CompareList) Swap(i, j int) { s[i], s[j] = s[j], s[i] } func (s CompareList) Less(i, j int) bool { //降序排序 return s[i].CompareScore > s[j].CompareScore } var compResultHisM = make(map[string]*CompareOnce, 0) var lock sync.Mutex func SetCompResultByNum(co *CompareOnce) { lock.Lock() defer lock.Unlock() compResultHisM[co.CompareNum] = co } func GetCompResultByNum(num string) *CompareOnce { lock.Lock() defer lock.Unlock() if co, ok := compResultHisM[num]; ok { return co } else { return nil } } var CaptureTable = "capturetable" // 抓拍库 func (sv *FaceCompareService) CompareVideoPersons() *CompareList { sv.CompareArgs.TableIds = []string{CaptureTable} b, err := proto.Marshal(&sv.CompareArgs) var sysApi bhomedbapi.SysSetApi sysb, localConf := sysApi.GetServerInfo() if !sysb || localConf.AlarmIp == "" || localConf.ServerId == "" { logger.Debug("localConfig is wrong!!!") return nil } targetNodeMap := make(map[string]string) infos, e := esutil.GetShardsByIndex(localConf.AlarmIp, strconv.Itoa(int(localConf.AlarmPort)), config.EsInfo.EsIndex.AiOcean.IndexName) if e != nil || infos == nil { logger.Debug("get es primary ips err") return nil } for _, shard := range infos { if shard.ShardRole == "primary" && shard.ShardState == "STARTED" { if _, ok := targetNodeMap[shard.ShardIp]; !ok { targetNodeMap[shard.ShardIp] = shard.ShardIp } } } if len(targetNodeMap) == 0 { logger.Debug("target compare server is nil") return nil } logger.Debug("compServerList:", targetNodeMap) //1.向各个Es compare进程发起请求拿到比对结果 var resultList CompareList for str, _ := range targetNodeMap { reqUrl := "tcp://" + str + ":" + strconv.Itoa(config.EsCompServerInfo.Port) resultB := doCompareRequest(reqUrl, b) if resultB == nil || len(*resultB) == 0 { continue } var sdkCompResult protomsg.SdkCompareResult err = proto.Unmarshal(*resultB, &sdkCompResult) if err != nil { logger.Debug("comp sdkCompareResult unmarshal err:", err) continue } logger.Debug("comp len(rList):", len(sdkCompResult.CompareResult)) if len(sdkCompResult.CompareResult) > 0 { resultList = append(resultList, sdkCompResult.CompareResult...) } } logger.Debug("comp totalList.len:", len(resultList)) return &resultList } //比对底库 func (sv *FaceCompareService) CompareDbPersons() *CompareList { b, err := proto.Marshal(&sv.CompareArgs) dbPersonCompServerUrl := config.DbPersonCompare.Ip logger.Debug("comp Server url:", dbPersonCompServerUrl) var resultList CompareList reqUrl := "tcp://" + dbPersonCompServerUrl + ":" + strconv.Itoa(config.DbPersonCompare.Port) resultB := doCompareRequest(reqUrl, b) if resultB == nil || len(*resultB) == 0 { return nil } var sdkCompResult protomsg.SdkCompareResult err = proto.Unmarshal(*resultB, &sdkCompResult) if err != nil { logger.Debug("comp sdkCompareResult unmarshal err:", err) return nil } logger.Debug("comp len(rList):", len(sdkCompResult.CompareResult)) if len(sdkCompResult.CompareResult) > 0 { resultList = append(resultList, sdkCompResult.CompareResult...) } logger.Debug("comp totalList.len:", len(resultList)) return &resultList } func doCompareRequest(url string, args []byte) *[]byte { logger.Debug("comp reqUrl:", url) var sock mangos.Socket var err error var msg []byte if sock, err = req.NewSocket(); err != nil { logger.Debug("comp can't new req socket:%s", err.Error()) return nil } sock.AddTransport(ipc.NewTransport()) sock.AddTransport(tcp.NewTransport()) if err = sock.Dial(url); err != nil { logger.Debug("comp can't dial on req socket:%s", err.Error()) return nil } sock.SetOption(mangos.OptionMaxRecvSize, 1024*1024*100) sock.SetOption(mangos.OptionRecvDeadline, time.Second*5) if err = sock.Send(args); err != nil { logger.Debug("comp can't send message on push socket:%s", err.Error()) return nil } if msg, err = sock.Recv(); err != nil { logger.Debug("comp sock.Recv receive err:%s", err.Error()) return nil } sock.Close() return &msg }