package main
|
|
import (
|
"context"
|
"flag"
|
"os"
|
"path"
|
"path/filepath"
|
"sdkCompare/proto/facecompare"
|
"strconv"
|
"time"
|
|
"sdkCompare/cache"
|
"sdkCompare/config"
|
|
"basic.com/pubsub/protomsg.git"
|
"basic.com/valib/logger.git"
|
"basic.com/valib/version.git"
|
"github.com/golang/protobuf/proto"
|
"nanomsg.org/go-mangos"
|
"nanomsg.org/go-mangos/protocol/rep"
|
"nanomsg.org/go-mangos/transport/ipc"
|
"nanomsg.org/go-mangos/transport/tcp"
|
)
|
|
var (
|
envirment string
|
procName string
|
targetType1 string
|
)
|
|
func init() {
|
flag.StringVar(&envirment, "e", "config", "")
|
|
flag.StringVar(&targetType1, "targetType1", "FaceDetect", "cache feature")
|
|
flag.Parse()
|
vaversion.Usage()
|
|
config.Init(envirment)
|
|
procName = filepath.Base(os.Args[0])
|
var logFile = path.Join(config.LogConf.Path, procName+".log")
|
|
// 日志初始化
|
logger.InitLogger(logFile, config.LogConf.Level, config.LogConf.MaxSize, config.LogConf.MaxBackups, config.LogConf.MaxAge)
|
logger.Info("logger init success !")
|
}
|
|
func main() {
|
//esutil.InitLog(logger.Debug)
|
logger.Debug("This is a new server about sdk compare, proc name ", procName)
|
|
serveUrl := "tcp://0.0.0.0:"
|
if err := cache.ConnectDB(); err != nil {
|
logger.Error(err.Error())
|
return
|
}
|
|
cache.InitDbTablePersons()
|
if !cache.InitCompare() {
|
logger.Debug("init SDKFace return false,panic")
|
return
|
}
|
serveUrl = serveUrl + strconv.Itoa(config.DbPersonCompInfo.ServePort)
|
|
//if procName == "dbCompare" {
|
// if err := cache.ConnectDB(); err != nil {
|
// logger.Error(err.Error())
|
// return
|
// }
|
//
|
// cache.InitDbTablePersons()
|
// if !cache.InitCompare() {
|
// logger.Debug("init SDKFace return false,panic")
|
// return
|
// }
|
// serveUrl = serveUrl + strconv.Itoa(config.DbPersonCompInfo.ServePort)
|
//} else {
|
// if err := cache.Init(config.EsInfo.EsIndex.AiOcean.IndexName, targetType1); err != nil {
|
// logger.Info("init capture cache err:", err)
|
// return
|
// }
|
//
|
// if !cache.InitCompare() {
|
// logger.Debug("init SDKFace return false,panic")
|
// return
|
// }
|
//
|
// go cache.IncreVideoPersonsCache(time.Now(), targetType1)
|
// serveUrl = serveUrl + strconv.Itoa(config.EsCompServerInfo.ServePort)
|
//}
|
|
logger.Debugf("%s serve url:%s", procName, serveUrl)
|
|
Recv(serveUrl)
|
}
|
|
func Recv(url string) {
|
var sock mangos.Socket
|
var err error
|
var msg []byte
|
var ctx, _ = context.WithCancel(context.Background())
|
if sock, err = rep.NewSocket(); err != nil {
|
logger.Error("new rep socket err:", err)
|
}
|
sock.AddTransport(ipc.NewTransport())
|
sock.AddTransport(tcp.NewTransport())
|
if err = sock.Listen(url); err != nil {
|
logger.Error("listen on rep socket err:", err)
|
}
|
|
for {
|
select {
|
case <-ctx.Done():
|
logger.Debug("ctx done")
|
return
|
default:
|
msg, err = sock.Recv()
|
if err != nil {
|
continue
|
}
|
|
if len(msg) > 0 {
|
var compareType facecompare.CompareRequest
|
err = proto.Unmarshal(msg, &compareType)
|
if err != nil {
|
logger.Error("compareType json unmarshal error")
|
continue
|
}
|
var result []byte
|
if compareType.CompareType == facecompare.CompareType_Compare {
|
var compareArgInfo protomsg.CompareArgs
|
var cacheChangeInfo protomsg.EsPersonCacheChange
|
if err = proto.Unmarshal(compareType.Payload, &compareArgInfo); err == nil {
|
timeStart := time.Now()
|
result = cache.GetComparePersonBaseInfo(compareArgInfo)
|
logger.Debug("用时:", time.Since(timeStart))
|
} else if err = proto.Unmarshal(compareType.Payload, &cacheChangeInfo); err == nil {
|
cache.UpdateCache(&cacheChangeInfo)
|
} else {
|
logger.Error("CompareArgs or EsPersonCacheChange json unmarshal error")
|
continue
|
}
|
} else if compareType.CompareType == facecompare.CompareType_UpdateCache {
|
var compareEvent protomsg.CompareEvent
|
if err = proto.Unmarshal(compareType.Payload, &compareEvent); err == nil {
|
if compareEvent.EventType == protomsg.CompareEventType_ReInitCache { //加入集群后重新初始化缓存
|
cache.ReInitDbTablePersonsCache()
|
} else if compareEvent.EventType == protomsg.CompareEventType_UpdateCache { //库中新增更新缓存
|
id := string(compareEvent.Payload)
|
cache.UpdateDbPersonsCacheById(id)
|
logger.Debug("--------------更新人员缓存, id: ", id)
|
}
|
} else {
|
logger.Error("CompareEvent json unmarshal error")
|
continue
|
}
|
}
|
err = sock.Send(result)
|
if err != nil {
|
logger.Error("send reply err:", err.Error())
|
}
|
}
|
}
|
}
|
}
|