package service
|
|
import (
|
"basic.com/pubsub/esutil.git"
|
"basic.com/pubsub/protomsg.git"
|
"errors"
|
"github.com/gogo/protobuf/proto"
|
"github.com/satori/go.uuid"
|
"nanomsg.org/go-mangos"
|
"nanomsg.org/go-mangos/protocol/req"
|
"nanomsg.org/go-mangos/transport/ipc"
|
"nanomsg.org/go-mangos/transport/tcp"
|
"strconv"
|
"sync"
|
"time"
|
"basic.com/valib/logger.git"
|
"vamicro/config"
|
cc "vamicro/gb28181-service/cache"
|
)
|
|
type FaceCompareService struct {
|
CompareNum string
|
CompareArgs protomsg.CompareArgs
|
}
|
|
func NewFaceCompareService(args protomsg.CompareArgs) *FaceCompareService {
|
return &FaceCompareService{
|
CompareNum: uuid.NewV4().String(),
|
CompareArgs:args,
|
}
|
}
|
|
type CompareOnce struct {
|
CompareNum string `json:"compareOnce"`
|
CompareData *CompareList `json:"compareData"`
|
}
|
|
type CompareList []*protomsg.SdkCompareEach
|
|
func (s CompareList) Len()int {
|
return len(s)
|
}
|
func (s CompareList) Swap(i,j int) {
|
s[i],s[j] = s[j],s[i]
|
}
|
func (s CompareList) Less(i,j int) bool{//降序排序
|
return s[i].CompareScore > s[j].CompareScore
|
}
|
|
var compResultHisM = make(map[string]*CompareOnce,0)
|
var lock sync.Mutex
|
|
func SetCompResultByNum(co *CompareOnce) {
|
lock.Lock()
|
defer lock.Unlock()
|
compResultHisM[co.CompareNum] = co
|
}
|
|
func GetCompResultByNum(num string) *CompareOnce {
|
lock.Lock()
|
defer lock.Unlock()
|
if co,ok := compResultHisM[num];ok {
|
return co
|
} else {
|
return nil
|
}
|
}
|
|
var CaptureTable = "capturetable" // 抓拍库
|
|
func (sv *FaceCompareService) CompareVideoPersons() *CompareList{
|
sv.CompareArgs.TableIds = []string { CaptureTable }
|
b, _ := proto.Marshal(&sv.CompareArgs)
|
|
localConf, err2 := cc.GetServerInfo()
|
if err2 != nil || localConf.AlarmIp == "" || localConf.ServerId == "" {
|
logger.Debug("localConfig is wrong!!!")
|
return nil
|
}
|
targetNodeMap := make(map[string]string)
|
infos, e := esutil.GetShardsByIndex(localConf.AlarmIp, strconv.Itoa(int(localConf.AlarmPort)), config.EsInfo.EsIndex.AiOcean.IndexName)
|
if e !=nil || infos == nil {
|
logger.Debug("get es primary ips err")
|
return nil
|
}
|
for _,shard := range infos {
|
if shard.ShardRole == "primary" && shard.ShardState == "STARTED" {
|
if _,ok := targetNodeMap[shard.ShardIp];!ok {
|
targetNodeMap[shard.ShardIp] = shard.ShardIp
|
}
|
}
|
}
|
if len(targetNodeMap) == 0 {
|
logger.Debug("target compare server is nil")
|
return nil
|
}
|
logger.Debug("compServerList:", targetNodeMap)
|
//1.向各个Es compare进程发起请求拿到比对结果
|
var resultList CompareList
|
|
for str,_ :=range targetNodeMap{
|
reqUrl := "tcp://"+str + ":"+strconv.Itoa(config.EsCompServerInfo.Port)
|
resultB,_ := doCompareRequest(reqUrl,b)
|
if resultB == nil || len(*resultB) ==0 {
|
continue
|
}
|
var sdkCompResult protomsg.SdkCompareResult
|
|
err := proto.Unmarshal(*resultB, &sdkCompResult)
|
if err !=nil {
|
logger.Debug("comp sdkCompareResult unmarshal err:", err)
|
continue
|
}
|
|
logger.Debug("comp len(rList):", len(sdkCompResult.CompareResult))
|
if len(sdkCompResult.CompareResult) >0 {
|
resultList = append(resultList, sdkCompResult.CompareResult...)
|
}
|
}
|
logger.Debug("comp totalList.len:", len(resultList))
|
|
return &resultList
|
}
|
|
//比对底库
|
func (sv *FaceCompareService) CompareDbPersons() (*CompareList,error){
|
b, err := proto.Marshal(&sv.CompareArgs)
|
dbPersonCompServerUrl := config.DbPersonCompare.Ip
|
logger.Debug("comp Server url:", dbPersonCompServerUrl)
|
|
var resultList CompareList
|
|
reqUrl := "tcp://"+dbPersonCompServerUrl+":"+strconv.Itoa(config.DbPersonCompare.Port)
|
resultB,reqErr := doCompareRequest(reqUrl,b)
|
if resultB == nil || reqErr != nil {
|
return nil, errors.New("comp call db compare service err")
|
}
|
var sdkCompResult protomsg.SdkCompareResult
|
err = proto.Unmarshal(*resultB, &sdkCompResult)
|
if err !=nil {
|
logger.Debug("comp sdkCompareResult unmarshal err:", err)
|
return nil, errors.New("comp call db compare service err")
|
}
|
|
logger.Debug("comp len(rList):", len(sdkCompResult.CompareResult))
|
if len(sdkCompResult.CompareResult) >0 {
|
resultList = append(resultList, sdkCompResult.CompareResult...)
|
}
|
|
logger.Debug("comp totalList.len:", len(resultList))
|
|
return &resultList, nil
|
}
|
|
func doCompareRequest(url string,args []byte) (*[]byte,error){
|
logger.Debug("comp reqUrl:",url)
|
var sock mangos.Socket
|
var err error
|
var msg []byte
|
|
if sock,err = req.NewSocket();err !=nil {
|
logger.Debug("comp can't new req socket:%s",err.Error())
|
return nil, err
|
}
|
sock.AddTransport(ipc.NewTransport())
|
sock.AddTransport(tcp.NewTransport())
|
if err = sock.Dial(url);err !=nil {
|
logger.Debug("comp can't dial on req socket:%s",err.Error())
|
return nil, err
|
}
|
sock.SetOption(mangos.OptionMaxRecvSize, 1024*1024*100)
|
sock.SetOption(mangos.OptionRecvDeadline, time.Second*5)
|
if err = sock.Send(args);err !=nil {
|
logger.Debug("comp can't send message on push socket:%s",err.Error())
|
return nil, err
|
}
|
if msg,err = sock.Recv();err !=nil {
|
logger.Debug("comp sock.Recv receive err:%s",err.Error())
|
return nil, err
|
}
|
sock.Close()
|
return &msg, nil
|
}
|