package serve import ( "context" "time" "sdkCompare/cache" "sdkCompare/compare" "sdkCompare/proto/facecompare" "basic.com/pubsub/protomsg.git" "basic.com/valib/logger.git" "github.com/golang/protobuf/proto" "nanomsg.org/go-mangos" "nanomsg.org/go-mangos/protocol/rep" "nanomsg.org/go-mangos/transport/ipc" "nanomsg.org/go-mangos/transport/tcp" ) func serverWorker(ctx context.Context, sock mangos.Socket, id int) { for { select { case <-ctx.Done(): logger.Infof("worker %d done", id) return default: var msg *mangos.Message var err error if msg, err = sock.RecvMsg(); err != nil { return } var request facecompare.CompareRequest err = proto.Unmarshal(msg.Body, &request) if err != nil { logger.Warn("CompareRequest json unmarshal error") if err = sock.SendMsg(msg); err != nil { logger.Warn("send reply err:", err.Error()) continue } } var result []byte if request.CompareType == facecompare.CompareType_Compare { var compareArgInfo protomsg.CompareArgs if err = proto.Unmarshal(request.Payload, &compareArgInfo); err == nil { timeStart := time.Now() result = compare.Walk(compareArgInfo) logger.Debug("用时:", time.Since(timeStart)) } else { logger.Warn("CompareArgs or EsPersonCacheChange json unmarshal error") } } else if request.CompareType == facecompare.CompareType_UpdateCache { var compareEvent protomsg.CompareEvent if err = proto.Unmarshal(request.Payload, &compareEvent); err == nil { if compareEvent.EventType == protomsg.CompareEventType_ReInitCache { //加入集群后重新初始化缓存 cache.ReInitDbTablePersonsCache() } else if compareEvent.EventType == protomsg.CompareEventType_UpdateCache { //库中新增更新缓存 id := string(compareEvent.Payload) cache.UpdateDbPersonsCacheById(id) logger.Info("--------------更新人员缓存, id: ", id) } else if compareEvent.EventType == protomsg.CompareEventType_DeleteCache { //库中新增更新缓存 id := string(compareEvent.Payload) cache.DeleteDbPersonsCacheById(id) logger.Info("--------------删除人员缓存, id: ", id) } } else { logger.Warn("CompareEvent json unmarshal error") } } msg.Body = result if err = sock.SendMsg(msg); err != nil { logger.Warn("send reply err:", err.Error()) } } } } func Start(ctx context.Context, url string, nWorkers int) { var sock mangos.Socket var err error if sock, err = rep.NewSocket(); err != nil { logger.Error("can't get new rep socket: %s", err) return } if err = sock.SetOption(mangos.OptionRaw, true); err != nil { logger.Error("can't set raw mode: %s", err) return } sock.AddTransport(ipc.NewTransport()) sock.AddTransport(tcp.NewTransport()) if err = sock.Listen(url); err != nil { logger.Error("can't listen on rep socket: %s", err.Error()) return } logger.Debugf("Starting %d workers", nWorkers) for id := 0; id < nWorkers; id++ { go func(id int) { serverWorker(ctx, sock, id) }(id) } }