From b5687ecf98b999ad535e242c5d25cc5be328af77 Mon Sep 17 00:00:00 2001
From: zhangzengfei <zhangzengfei@smartai.com>
Date: 星期五, 13 九月 2024 11:17:00 +0800
Subject: [PATCH] add rlock in shardmap walk
---
serve/serve.go | 26 +++++++++++++++-----------
1 files changed, 15 insertions(+), 11 deletions(-)
diff --git a/serve/serve.go b/serve/serve.go
index 933ec58..0e4ccb1 100644
--- a/serve/serve.go
+++ b/serve/serve.go
@@ -2,7 +2,6 @@
import (
"context"
- "sync"
"time"
"sdkCompare/cache"
@@ -25,16 +24,20 @@
logger.Infof("worker %d done", id)
return
default:
- msg, err := sock.Recv()
- if err != nil || len(msg) <= 0 {
- continue
+ var msg *mangos.Message
+ var err error
+ if msg, err = sock.RecvMsg(); err != nil {
+ return
}
var request facecompare.CompareRequest
- err = proto.Unmarshal(msg, &request)
+ err = proto.Unmarshal(msg.Body, &request)
if err != nil {
logger.Warn("CompareRequest json unmarshal error")
- continue
+ if err = sock.SendMsg(msg); err != nil {
+ logger.Warn("send reply err:", err.Error())
+ continue
+ }
}
var result []byte
@@ -42,11 +45,10 @@
var compareArgInfo protomsg.CompareArgs
if err = proto.Unmarshal(request.Payload, &compareArgInfo); err == nil {
timeStart := time.Now()
- result = compare.GetComparePersonBaseInfo(compareArgInfo)
+ result = compare.Walk(compareArgInfo)
logger.Debug("鐢ㄦ椂锛�", time.Since(timeStart))
} else {
logger.Warn("CompareArgs or EsPersonCacheChange json unmarshal error")
- continue
}
} else if request.CompareType == facecompare.CompareType_UpdateCache {
var compareEvent protomsg.CompareEvent
@@ -64,12 +66,12 @@
}
} else {
logger.Warn("CompareEvent json unmarshal error")
- continue
}
}
- err = sock.Send(result)
- if err != nil {
+ msg.Body = result
+
+ if err = sock.SendMsg(msg); err != nil {
logger.Warn("send reply err:", err.Error())
}
}
@@ -84,10 +86,12 @@
logger.Error("can't get new rep socket: %s", err)
return
}
+
if err = sock.SetOption(mangos.OptionRaw, true); err != nil {
logger.Error("can't set raw mode: %s", err)
return
}
+
sock.AddTransport(ipc.NewTransport())
sock.AddTransport(tcp.NewTransport())
if err = sock.Listen(url); err != nil {
--
Gitblit v1.8.0