package serve
|
|
import (
|
"context"
|
"time"
|
|
"sdkCompare/cache"
|
"sdkCompare/compare"
|
"sdkCompare/proto/facecompare"
|
|
"basic.com/pubsub/protomsg.git"
|
"basic.com/valib/logger.git"
|
"github.com/golang/protobuf/proto"
|
"nanomsg.org/go-mangos"
|
"nanomsg.org/go-mangos/protocol/rep"
|
"nanomsg.org/go-mangos/transport/ipc"
|
"nanomsg.org/go-mangos/transport/tcp"
|
)
|
|
func serverWorker(ctx context.Context, sock mangos.Socket, id int) {
|
for {
|
select {
|
case <-ctx.Done():
|
logger.Infof("worker %d done", id)
|
return
|
default:
|
var msg *mangos.Message
|
var err error
|
if msg, err = sock.RecvMsg(); err != nil {
|
return
|
}
|
|
var request facecompare.CompareRequest
|
err = proto.Unmarshal(msg.Body, &request)
|
if err != nil {
|
logger.Warn("CompareRequest json unmarshal error")
|
if err = sock.SendMsg(msg); err != nil {
|
logger.Warn("send reply err:", err.Error())
|
continue
|
}
|
}
|
|
var result []byte
|
if request.CompareType == facecompare.CompareType_Compare {
|
var compareArgInfo protomsg.CompareArgs
|
if err = proto.Unmarshal(request.Payload, &compareArgInfo); err == nil {
|
timeStart := time.Now()
|
result = compare.Walk(compareArgInfo)
|
logger.Debug("用时:", time.Since(timeStart))
|
} else {
|
logger.Warn("CompareArgs or EsPersonCacheChange json unmarshal error")
|
}
|
} else if request.CompareType == facecompare.CompareType_UpdateCache {
|
var compareEvent protomsg.CompareEvent
|
if err = proto.Unmarshal(request.Payload, &compareEvent); err == nil {
|
if compareEvent.EventType == protomsg.CompareEventType_ReInitCache { //加入集群后重新初始化缓存
|
cache.ReInitDbTablePersonsCache()
|
} else if compareEvent.EventType == protomsg.CompareEventType_UpdateCache { //库中新增更新缓存
|
id := string(compareEvent.Payload)
|
cache.UpdateDbPersonsCacheById(id)
|
logger.Info("--------------更新人员缓存, id: ", id)
|
} else if compareEvent.EventType == protomsg.CompareEventType_DeleteCache { //库中新增更新缓存
|
id := string(compareEvent.Payload)
|
cache.DeleteDbPersonsCacheById(id)
|
logger.Info("--------------删除人员缓存, id: ", id)
|
}
|
} else {
|
logger.Warn("CompareEvent json unmarshal error")
|
}
|
}
|
|
msg.Body = result
|
|
if err = sock.SendMsg(msg); err != nil {
|
logger.Warn("send reply err:", err.Error())
|
}
|
}
|
}
|
}
|
|
func Start(ctx context.Context, url string, nWorkers int) {
|
var sock mangos.Socket
|
var err error
|
|
if sock, err = rep.NewSocket(); err != nil {
|
logger.Error("can't get new rep socket: %s", err)
|
return
|
}
|
|
if err = sock.SetOption(mangos.OptionRaw, true); err != nil {
|
logger.Error("can't set raw mode: %s", err)
|
return
|
}
|
|
sock.AddTransport(ipc.NewTransport())
|
sock.AddTransport(tcp.NewTransport())
|
if err = sock.Listen(url); err != nil {
|
logger.Error("can't listen on rep socket: %s", err.Error())
|
return
|
}
|
|
logger.Debugf("Starting %d workers", nWorkers)
|
|
for id := 0; id < nWorkers; id++ {
|
go func(id int) {
|
serverWorker(ctx, sock, id)
|
}(id)
|
}
|
}
|