package main
|
|
import (
|
"context"
|
"flag"
|
"path"
|
"strconv"
|
"time"
|
|
"sdkCompare/cache"
|
"sdkCompare/compare"
|
"sdkCompare/config"
|
"sdkCompare/db"
|
"sdkCompare/proto/facecompare"
|
|
"basic.com/pubsub/protomsg.git"
|
"basic.com/valib/logger.git"
|
"basic.com/valib/version.git"
|
"github.com/golang/protobuf/proto"
|
"nanomsg.org/go-mangos"
|
"nanomsg.org/go-mangos/protocol/rep"
|
"nanomsg.org/go-mangos/transport/ipc"
|
"nanomsg.org/go-mangos/transport/tcp"
|
)
|
|
const procName = "faceCompare"
|
|
func init() {
|
flag.Parse()
|
vaversion.Usage()
|
}
|
|
func main() {
|
err := config.Init()
|
if err != nil {
|
return
|
}
|
|
var logFile = path.Join(config.LogConf.Path, "faceCompare.log")
|
|
// 日志初始化
|
logger.InitLogger(logFile, config.LogConf.Level, config.LogConf.MaxSize, config.LogConf.MaxBackups, config.LogConf.MaxAge)
|
logger.Info("logger init success !")
|
|
serveUrl := "tcp://0.0.0.0:"
|
if err := db.ConnectDB(); err != nil {
|
logger.Error(err.Error())
|
return
|
}
|
|
cache.InitDbTablePersons()
|
serveUrl = serveUrl + strconv.Itoa(config.DbPersonCompInfo.ServePort)
|
|
logger.Infof("%s serve url:%s", procName, serveUrl)
|
|
Recv(serveUrl)
|
}
|
|
func Recv(url string) {
|
var sock mangos.Socket
|
var err error
|
var msg []byte
|
var ctx, _ = context.WithCancel(context.Background())
|
if sock, err = rep.NewSocket(); err != nil {
|
logger.Error("new rep socket err:", err)
|
}
|
|
sock.AddTransport(ipc.NewTransport())
|
sock.AddTransport(tcp.NewTransport())
|
if err = sock.Listen(url); err != nil {
|
logger.Error("listen on rep socket err:", err)
|
}
|
|
for {
|
select {
|
case <-ctx.Done():
|
logger.Info("ctx done")
|
return
|
default:
|
msg, err = sock.Recv()
|
if err != nil || len(msg) <= 0 {
|
continue
|
}
|
|
var request facecompare.CompareRequest
|
err = proto.Unmarshal(msg, &request)
|
if err != nil {
|
logger.Warn("CompareRequest json unmarshal error")
|
continue
|
}
|
|
var result []byte
|
if request.CompareType == facecompare.CompareType_Compare {
|
var compareArgInfo protomsg.CompareArgs
|
var cacheChangeInfo protomsg.EsPersonCacheChange
|
if err = proto.Unmarshal(request.Payload, &compareArgInfo); err == nil {
|
timeStart := time.Now()
|
result = compare.GetComparePersonBaseInfo(compareArgInfo)
|
logger.Debug("用时:", time.Since(timeStart))
|
} else if err = proto.Unmarshal(request.Payload, &cacheChangeInfo); err == nil {
|
cache.UpdateCache(&cacheChangeInfo)
|
} else {
|
logger.Warn("CompareArgs or EsPersonCacheChange json unmarshal error")
|
continue
|
}
|
} else if request.CompareType == facecompare.CompareType_UpdateCache {
|
var compareEvent protomsg.CompareEvent
|
if err = proto.Unmarshal(request.Payload, &compareEvent); err == nil {
|
if compareEvent.EventType == protomsg.CompareEventType_ReInitCache { //加入集群后重新初始化缓存
|
cache.ReInitDbTablePersonsCache()
|
} else if compareEvent.EventType == protomsg.CompareEventType_UpdateCache { //库中新增更新缓存
|
id := string(compareEvent.Payload)
|
cache.UpdateDbPersonsCacheById(id)
|
logger.Info("--------------更新人员缓存, id: ", id)
|
}
|
} else {
|
logger.Warn("CompareEvent json unmarshal error")
|
continue
|
}
|
}
|
|
err = sock.Send(result)
|
if err != nil {
|
logger.Warn("send reply err:", err.Error())
|
}
|
}
|
}
|
}
|