From 0ba3d2c144dfad395ce3b95c6dd094ee8c7f5da4 Mon Sep 17 00:00:00 2001
From: zhangqian <zhangqian@123.com>
Date: 星期二, 27 二月 2024 15:38:26 +0800
Subject: [PATCH] Merge branch 'master' of http://192.168.5.5:10010/r/CloudAI/faceCompare
---
proto/face_compare.proto | 13 +
cache/compare.go | 30 ++--
main.go | 131 +++++++++++-------
proto/facecompare/face_compare.pb.go | 206 +++++++++++++++++++++++++++++
4 files changed, 311 insertions(+), 69 deletions(-)
diff --git a/cache/compare.go b/cache/compare.go
index d2e5b23..88fd8d7 100644
--- a/cache/compare.go
+++ b/cache/compare.go
@@ -82,7 +82,7 @@
// 璋冪敤澧為噺鐨勬帴鍙�
captures, err := libEs.GetPeriodInfos(serverIp, alarmPort, laststring, curstring, indexName, shardStr, targetType)
if err != nil {
- logger.Debug(err)
+ logger.Error(err)
return
}
for _, ei := range captures {
@@ -102,7 +102,7 @@
func getShards(serverIp string, alarmIp string, alarmPort string) (string, error) {
infos, e := libEs.GetShardsByIndex(alarmIp, alarmPort, config.EsInfo.EsIndex.AiOcean.IndexName)
if e != nil || infos == nil {
- logger.Debug("get es primary ips err")
+ logger.Error("get es primary ips err")
return "", errors.New("get es primary ips err")
}
@@ -148,7 +148,7 @@
captures, err := libEs.GetOceanFeatures(serverIp, alarmPort, 1000, indexName, shardStr, targetType)
if err != nil {
- logger.Debug("libEs.GetOceanFeatures err:", err)
+ logger.Error("libEs.GetOceanFeatures err:", err)
return err
}
logger.Debug("len(captures):", len(captures))
@@ -238,7 +238,7 @@
defer wg.Done()
dbpersons, err := dbpApi.GetPersonsCompareCacheBase(j, queryEachNum)
if err != nil {
- logger.Debug(err)
+ logger.Error(err)
return
}
logger.Debugf("鑾峰彇%d鏉′汉鍛樹俊鎭�", len(dbpersons))
@@ -271,7 +271,7 @@
var dbpApi DbPersons
info, err := dbpApi.GetPersonsCompareCacheById(id)
if err != nil {
- logger.Debug(err)
+ logger.Error(err)
return
}
if info.Tableid != "" {
@@ -310,7 +310,7 @@
Cmap.Cam[tableId].Del(id)
logger.Debug("DelPerson ok success")
} else {
- logger.Debug("tableId:", tableId, " not exist")
+ logger.Error("tableId:", tableId, " not exist")
}
}
@@ -425,7 +425,7 @@
for _, tid := range compareArgs.TableIds { //ruleProcess姣斿鎸囧畾搴曞簱
shardins, ok := Cmap.Cam[tid]
if !ok {
- logger.Debug("ruleProcess compare get shard error by tableId:", tid)
+ logger.Error("ruleProcess compare get shard error by tableId:", tid)
continue
}
if tShard, hasT := Cmap.Cam[PRE_DBTABLE]; hasT {
@@ -438,10 +438,10 @@
}
}
} else {
- logger.Debug("ruleProcess compare tables,tShard not exist tableId:", tid)
+ logger.Error("ruleProcess compare tables,tShard not exist tableId:", tid)
}
} else {
- logger.Debug("ruleProcess compare tables,PRE_DBTABLE tableId:", tid, " not exist")
+ logger.Error("ruleProcess compare tables,PRE_DBTABLE tableId:", tid, " not exist")
}
}
} else { //web璇锋眰锛屾瘮瀵规寚瀹氱殑鎶撴媿搴撴垨鑰呭簳搴�
@@ -450,7 +450,7 @@
tStart := time.Now()
serverIp, _, e := util.GetLocalIP(config.Server.NetworkAdapter)
if e != nil {
- logger.Debug("util.GetLocalIP err:", e)
+ logger.Error("util.GetLocalIP err:", e)
continue
}
alarmPort := config.EsCompServerInfo.ESPort
@@ -519,7 +519,7 @@
} else {
shardins, ok := Cmap.Cam[tid]
if !ok {
- logger.Debug("get shard error by tableId:", tid)
+ logger.Error("get shard error by tableId:", tid)
continue
}
@@ -536,7 +536,7 @@
buf, err := proto.Marshal(&scResult)
if err != nil {
- logger.Debug("scResult Marshal error!", err)
+ logger.Error("scResult Marshal error!", err)
return nil
}
@@ -547,16 +547,16 @@
co_d, err := base64.StdEncoding.DecodeString(co)
if err != nil {
- logger.Debug("DoSdkCompare err:", err)
+ logger.Error("DoSdkCompare err:", err)
return -1
}
if len(co_d) != 2560 {
- logger.Debug("target fea.len !=2560")
+ logger.Error("target fea.len !=2560")
return -1
}
if len(ci) != 2560 {
- logger.Debug("source fea.len !=2560")
+ logger.Error("source fea.len !=2560")
return -1
}
sec := DecCompare(ci, co_d)
diff --git a/main.go b/main.go
index f8800b5..2839a57 100644
--- a/main.go
+++ b/main.go
@@ -6,6 +6,7 @@
"os"
"path"
"path/filepath"
+ "sdkCompare/proto/facecompare"
"strconv"
"time"
@@ -51,32 +52,44 @@
logger.Debug("This is a new server about sdk compare, proc name ", procName)
serveUrl := "tcp://0.0.0.0:"
- if procName == "dbCompare" {
- if err := cache.ConnectDB(); err != nil {
- logger.Error(err.Error())
- return
- }
-
- cache.InitDbTablePersons()
- if !cache.InitCompare() {
- logger.Debug("init SDKFace return false,panic")
- return
- }
- serveUrl = serveUrl + strconv.Itoa(config.DbPersonCompInfo.ServePort)
- } else {
- if err := cache.Init(config.EsInfo.EsIndex.AiOcean.IndexName, targetType1); err != nil {
- logger.Info("init capture cache err:", err)
- return
- }
-
- if !cache.InitCompare() {
- logger.Debug("init SDKFace return false,panic")
- return
- }
-
- go cache.IncreVideoPersonsCache(time.Now(), targetType1)
- serveUrl = serveUrl + strconv.Itoa(config.EsCompServerInfo.ServePort)
+ if err := cache.ConnectDB(); err != nil {
+ logger.Error(err.Error())
+ return
}
+
+ cache.InitDbTablePersons()
+ if !cache.InitCompare() {
+ logger.Debug("init SDKFace return false,panic")
+ return
+ }
+ serveUrl = serveUrl + strconv.Itoa(config.DbPersonCompInfo.ServePort)
+
+ //if procName == "dbCompare" {
+ // if err := cache.ConnectDB(); err != nil {
+ // logger.Error(err.Error())
+ // return
+ // }
+ //
+ // cache.InitDbTablePersons()
+ // if !cache.InitCompare() {
+ // logger.Debug("init SDKFace return false,panic")
+ // return
+ // }
+ // serveUrl = serveUrl + strconv.Itoa(config.DbPersonCompInfo.ServePort)
+ //} else {
+ // if err := cache.Init(config.EsInfo.EsIndex.AiOcean.IndexName, targetType1); err != nil {
+ // logger.Info("init capture cache err:", err)
+ // return
+ // }
+ //
+ // if !cache.InitCompare() {
+ // logger.Debug("init SDKFace return false,panic")
+ // return
+ // }
+ //
+ // go cache.IncreVideoPersonsCache(time.Now(), targetType1)
+ // serveUrl = serveUrl + strconv.Itoa(config.EsCompServerInfo.ServePort)
+ //}
logger.Debugf("%s serve url:%s", procName, serveUrl)
@@ -89,12 +102,12 @@
var msg []byte
var ctx, _ = context.WithCancel(context.Background())
if sock, err = rep.NewSocket(); err != nil {
- logger.Debug("new rep socket err:", err)
+ logger.Error("new rep socket err:", err)
}
sock.AddTransport(ipc.NewTransport())
sock.AddTransport(tcp.NewTransport())
if err = sock.Listen(url); err != nil {
- logger.Debug("listen on rep socket err:", err)
+ logger.Error("listen on rep socket err:", err)
}
for {
@@ -109,35 +122,45 @@
}
if len(msg) > 0 {
- var compareArgInfo protomsg.CompareArgs
- var cacheChangeInfo protomsg.EsPersonCacheChange
- var compareEvent protomsg.CompareEvent
- if err = proto.Unmarshal(msg, &compareArgInfo); err == nil {
- timeStart := time.Now()
- result := cache.GetComparePersonBaseInfo(compareArgInfo)
- logger.Debug("鐢ㄦ椂锛�", time.Since(timeStart))
- err = sock.Send(result)
- if err != nil {
- logger.Debug("send reply err:", err.Error())
- }
-
- } else if err = proto.Unmarshal(msg, &cacheChangeInfo); err == nil {
- cache.UpdateCache(&cacheChangeInfo)
- err = sock.Send([]byte(""))
- if err != nil {
- logger.Debug("send reply err:", err.Error())
- }
- } else if err = proto.Unmarshal(msg, &compareEvent); err == nil {
- if compareEvent.EventType == protomsg.CompareEventType_ReInitCache { //鍔犲叆闆嗙兢鍚庨噸鏂板垵濮嬪寲缂撳瓨
- cache.ReInitDbTablePersonsCache()
- } else if compareEvent.EventType == protomsg.CompareEventType_UpdateCache { //搴撲腑鏂板鏇存柊缂撳瓨
- cache.UpdateDbPersonsCacheById(string(compareEvent.Payload))
- }
- } else {
- logger.Debug("json unmarshal error")
+ var compareType facecompare.CompareRequest
+ err = proto.Unmarshal(msg, &compareType)
+ if err != nil {
+ logger.Error("compareType json unmarshal error")
continue
}
-
+ var result []byte
+ if compareType.CompareType == facecompare.CompareType_Compare {
+ var compareArgInfo protomsg.CompareArgs
+ var cacheChangeInfo protomsg.EsPersonCacheChange
+ if err = proto.Unmarshal(compareType.Payload, &compareArgInfo); err == nil {
+ timeStart := time.Now()
+ result = cache.GetComparePersonBaseInfo(compareArgInfo)
+ logger.Debug("鐢ㄦ椂锛�", time.Since(timeStart))
+ } else if err = proto.Unmarshal(compareType.Payload, &cacheChangeInfo); err == nil {
+ cache.UpdateCache(&cacheChangeInfo)
+ } else {
+ logger.Error("CompareArgs or EsPersonCacheChange json unmarshal error")
+ continue
+ }
+ } else if compareType.CompareType == facecompare.CompareType_UpdateCache {
+ var compareEvent protomsg.CompareEvent
+ if err = proto.Unmarshal(compareType.Payload, &compareEvent); err == nil {
+ if compareEvent.EventType == protomsg.CompareEventType_ReInitCache { //鍔犲叆闆嗙兢鍚庨噸鏂板垵濮嬪寲缂撳瓨
+ cache.ReInitDbTablePersonsCache()
+ } else if compareEvent.EventType == protomsg.CompareEventType_UpdateCache { //搴撲腑鏂板鏇存柊缂撳瓨
+ id := string(compareEvent.Payload)
+ cache.UpdateDbPersonsCacheById(id)
+ logger.Debug("--------------鏇存柊浜哄憳缂撳瓨, id: ", id)
+ }
+ } else {
+ logger.Error("CompareEvent json unmarshal error")
+ continue
+ }
+ }
+ err = sock.Send(result)
+ if err != nil {
+ logger.Error("send reply err:", err.Error())
+ }
}
}
}
diff --git a/proto/face_compare.proto b/proto/face_compare.proto
new file mode 100644
index 0000000..3cb2be3
--- /dev/null
+++ b/proto/face_compare.proto
@@ -0,0 +1,13 @@
+syntax = "proto3";
+
+option go_package = "./facecompare";
+
+enum CompareType {
+ Compare = 0; // 鍋氭瘮瀵硅姹�
+ UpdateCache = 1; // 鏇存柊搴曞簱鍜屼汉鍛樼紦瀛�
+}
+
+message CompareRequest {
+ CompareType compareType =1;
+ bytes payload = 2;
+}
diff --git a/proto/facecompare/face_compare.pb.go b/proto/facecompare/face_compare.pb.go
new file mode 100644
index 0000000..e9b40be
--- /dev/null
+++ b/proto/facecompare/face_compare.pb.go
@@ -0,0 +1,206 @@
+// Code generated by protoc-gen-go. DO NOT EDIT.
+// versions:
+// protoc-gen-go v1.26.0
+// protoc v4.24.0
+// source: face_compare.proto
+
+package facecompare
+
+import (
+ protoreflect "google.golang.org/protobuf/reflect/protoreflect"
+ protoimpl "google.golang.org/protobuf/runtime/protoimpl"
+ reflect "reflect"
+ sync "sync"
+)
+
+const (
+ // Verify that this generated code is sufficiently up-to-date.
+ _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)
+ // Verify that runtime/protoimpl is sufficiently up-to-date.
+ _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
+)
+
+type CompareType int32
+
+const (
+ CompareType_Compare CompareType = 0 // 鍋氭瘮瀵硅姹�
+ CompareType_UpdateCache CompareType = 1 // 鏇存柊搴曞簱鍜屼汉鍛樼紦瀛�
+)
+
+// Enum value maps for CompareType.
+var (
+ CompareType_name = map[int32]string{
+ 0: "Compare",
+ 1: "UpdateCache",
+ }
+ CompareType_value = map[string]int32{
+ "Compare": 0,
+ "UpdateCache": 1,
+ }
+)
+
+func (x CompareType) Enum() *CompareType {
+ p := new(CompareType)
+ *p = x
+ return p
+}
+
+func (x CompareType) String() string {
+ return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x))
+}
+
+func (CompareType) Descriptor() protoreflect.EnumDescriptor {
+ return file_face_compare_proto_enumTypes[0].Descriptor()
+}
+
+func (CompareType) Type() protoreflect.EnumType {
+ return &file_face_compare_proto_enumTypes[0]
+}
+
+func (x CompareType) Number() protoreflect.EnumNumber {
+ return protoreflect.EnumNumber(x)
+}
+
+// Deprecated: Use CompareType.Descriptor instead.
+func (CompareType) EnumDescriptor() ([]byte, []int) {
+ return file_face_compare_proto_rawDescGZIP(), []int{0}
+}
+
+type CompareRequest struct {
+ state protoimpl.MessageState
+ sizeCache protoimpl.SizeCache
+ unknownFields protoimpl.UnknownFields
+
+ CompareType CompareType `protobuf:"varint,1,opt,name=compareType,proto3,enum=CompareType" json:"compareType,omitempty"`
+ Payload []byte `protobuf:"bytes,2,opt,name=payload,proto3" json:"payload,omitempty"`
+}
+
+func (x *CompareRequest) Reset() {
+ *x = CompareRequest{}
+ if protoimpl.UnsafeEnabled {
+ mi := &file_face_compare_proto_msgTypes[0]
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ ms.StoreMessageInfo(mi)
+ }
+}
+
+func (x *CompareRequest) String() string {
+ return protoimpl.X.MessageStringOf(x)
+}
+
+func (*CompareRequest) ProtoMessage() {}
+
+func (x *CompareRequest) ProtoReflect() protoreflect.Message {
+ mi := &file_face_compare_proto_msgTypes[0]
+ if protoimpl.UnsafeEnabled && x != nil {
+ ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
+ if ms.LoadMessageInfo() == nil {
+ ms.StoreMessageInfo(mi)
+ }
+ return ms
+ }
+ return mi.MessageOf(x)
+}
+
+// Deprecated: Use CompareRequest.ProtoReflect.Descriptor instead.
+func (*CompareRequest) Descriptor() ([]byte, []int) {
+ return file_face_compare_proto_rawDescGZIP(), []int{0}
+}
+
+func (x *CompareRequest) GetCompareType() CompareType {
+ if x != nil {
+ return x.CompareType
+ }
+ return CompareType_Compare
+}
+
+func (x *CompareRequest) GetPayload() []byte {
+ if x != nil {
+ return x.Payload
+ }
+ return nil
+}
+
+var File_face_compare_proto protoreflect.FileDescriptor
+
+var file_face_compare_proto_rawDesc = []byte{
+ 0x0a, 0x12, 0x66, 0x61, 0x63, 0x65, 0x5f, 0x63, 0x6f, 0x6d, 0x70, 0x61, 0x72, 0x65, 0x2e, 0x70,
+ 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x5a, 0x0a, 0x0e, 0x43, 0x6f, 0x6d, 0x70, 0x61, 0x72, 0x65, 0x52,
+ 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x2e, 0x0a, 0x0b, 0x63, 0x6f, 0x6d, 0x70, 0x61, 0x72,
+ 0x65, 0x54, 0x79, 0x70, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x0c, 0x2e, 0x43, 0x6f,
+ 0x6d, 0x70, 0x61, 0x72, 0x65, 0x54, 0x79, 0x70, 0x65, 0x52, 0x0b, 0x63, 0x6f, 0x6d, 0x70, 0x61,
+ 0x72, 0x65, 0x54, 0x79, 0x70, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61,
+ 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64,
+ 0x2a, 0x2b, 0x0a, 0x0b, 0x43, 0x6f, 0x6d, 0x70, 0x61, 0x72, 0x65, 0x54, 0x79, 0x70, 0x65, 0x12,
+ 0x0b, 0x0a, 0x07, 0x43, 0x6f, 0x6d, 0x70, 0x61, 0x72, 0x65, 0x10, 0x00, 0x12, 0x0f, 0x0a, 0x0b,
+ 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x43, 0x61, 0x63, 0x68, 0x65, 0x10, 0x01, 0x42, 0x0f, 0x5a,
+ 0x0d, 0x2e, 0x2f, 0x66, 0x61, 0x63, 0x65, 0x63, 0x6f, 0x6d, 0x70, 0x61, 0x72, 0x65, 0x62, 0x06,
+ 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
+}
+
+var (
+ file_face_compare_proto_rawDescOnce sync.Once
+ file_face_compare_proto_rawDescData = file_face_compare_proto_rawDesc
+)
+
+func file_face_compare_proto_rawDescGZIP() []byte {
+ file_face_compare_proto_rawDescOnce.Do(func() {
+ file_face_compare_proto_rawDescData = protoimpl.X.CompressGZIP(file_face_compare_proto_rawDescData)
+ })
+ return file_face_compare_proto_rawDescData
+}
+
+var file_face_compare_proto_enumTypes = make([]protoimpl.EnumInfo, 1)
+var file_face_compare_proto_msgTypes = make([]protoimpl.MessageInfo, 1)
+var file_face_compare_proto_goTypes = []interface{}{
+ (CompareType)(0), // 0: CompareType
+ (*CompareRequest)(nil), // 1: CompareRequest
+}
+var file_face_compare_proto_depIdxs = []int32{
+ 0, // 0: CompareRequest.compareType:type_name -> CompareType
+ 1, // [1:1] is the sub-list for method output_type
+ 1, // [1:1] is the sub-list for method input_type
+ 1, // [1:1] is the sub-list for extension type_name
+ 1, // [1:1] is the sub-list for extension extendee
+ 0, // [0:1] is the sub-list for field type_name
+}
+
+func init() { file_face_compare_proto_init() }
+func file_face_compare_proto_init() {
+ if File_face_compare_proto != nil {
+ return
+ }
+ if !protoimpl.UnsafeEnabled {
+ file_face_compare_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} {
+ switch v := v.(*CompareRequest); i {
+ case 0:
+ return &v.state
+ case 1:
+ return &v.sizeCache
+ case 2:
+ return &v.unknownFields
+ default:
+ return nil
+ }
+ }
+ }
+ type x struct{}
+ out := protoimpl.TypeBuilder{
+ File: protoimpl.DescBuilder{
+ GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
+ RawDescriptor: file_face_compare_proto_rawDesc,
+ NumEnums: 1,
+ NumMessages: 1,
+ NumExtensions: 0,
+ NumServices: 0,
+ },
+ GoTypes: file_face_compare_proto_goTypes,
+ DependencyIndexes: file_face_compare_proto_depIdxs,
+ EnumInfos: file_face_compare_proto_enumTypes,
+ MessageInfos: file_face_compare_proto_msgTypes,
+ }.Build()
+ File_face_compare_proto = out.File
+ file_face_compare_proto_rawDesc = nil
+ file_face_compare_proto_goTypes = nil
+ file_face_compare_proto_depIdxs = nil
+}
--
Gitblit v1.8.0