From 4860c7b312cdce2d948ee417b0bb1fed60fd9dc7 Mon Sep 17 00:00:00 2001 From: liujiandao <274878379@qq.com> Date: 星期二, 20 二月 2024 17:09:37 +0800 Subject: [PATCH] 更新与对比修改 --- proto/face_compare.proto | 13 + cache/compare.go | 30 ++-- main.go | 131 +++++++++++------- proto/facecompare/face_compare.pb.go | 206 +++++++++++++++++++++++++++++ 4 files changed, 311 insertions(+), 69 deletions(-) diff --git a/cache/compare.go b/cache/compare.go index d2e5b23..88fd8d7 100644 --- a/cache/compare.go +++ b/cache/compare.go @@ -82,7 +82,7 @@ // 璋冪敤澧為噺鐨勬帴鍙� captures, err := libEs.GetPeriodInfos(serverIp, alarmPort, laststring, curstring, indexName, shardStr, targetType) if err != nil { - logger.Debug(err) + logger.Error(err) return } for _, ei := range captures { @@ -102,7 +102,7 @@ func getShards(serverIp string, alarmIp string, alarmPort string) (string, error) { infos, e := libEs.GetShardsByIndex(alarmIp, alarmPort, config.EsInfo.EsIndex.AiOcean.IndexName) if e != nil || infos == nil { - logger.Debug("get es primary ips err") + logger.Error("get es primary ips err") return "", errors.New("get es primary ips err") } @@ -148,7 +148,7 @@ captures, err := libEs.GetOceanFeatures(serverIp, alarmPort, 1000, indexName, shardStr, targetType) if err != nil { - logger.Debug("libEs.GetOceanFeatures err:", err) + logger.Error("libEs.GetOceanFeatures err:", err) return err } logger.Debug("len(captures):", len(captures)) @@ -238,7 +238,7 @@ defer wg.Done() dbpersons, err := dbpApi.GetPersonsCompareCacheBase(j, queryEachNum) if err != nil { - logger.Debug(err) + logger.Error(err) return } logger.Debugf("鑾峰彇%d鏉′汉鍛樹俊鎭�", len(dbpersons)) @@ -271,7 +271,7 @@ var dbpApi DbPersons info, err := dbpApi.GetPersonsCompareCacheById(id) if err != nil { - logger.Debug(err) + logger.Error(err) return } if info.Tableid != "" { @@ -310,7 +310,7 @@ Cmap.Cam[tableId].Del(id) logger.Debug("DelPerson ok success") } else { - logger.Debug("tableId:", tableId, " not exist") + logger.Error("tableId:", tableId, " not exist") } } @@ -425,7 +425,7 @@ for _, tid := range compareArgs.TableIds { //ruleProcess姣斿鎸囧畾搴曞簱 shardins, ok := Cmap.Cam[tid] if !ok { - logger.Debug("ruleProcess compare get shard error by tableId:", tid) + logger.Error("ruleProcess compare get shard error by tableId:", tid) continue } if tShard, hasT := Cmap.Cam[PRE_DBTABLE]; hasT { @@ -438,10 +438,10 @@ } } } else { - logger.Debug("ruleProcess compare tables,tShard not exist tableId:", tid) + logger.Error("ruleProcess compare tables,tShard not exist tableId:", tid) } } else { - logger.Debug("ruleProcess compare tables,PRE_DBTABLE tableId:", tid, " not exist") + logger.Error("ruleProcess compare tables,PRE_DBTABLE tableId:", tid, " not exist") } } } else { //web璇锋眰锛屾瘮瀵规寚瀹氱殑鎶撴媿搴撴垨鑰呭簳搴� @@ -450,7 +450,7 @@ tStart := time.Now() serverIp, _, e := util.GetLocalIP(config.Server.NetworkAdapter) if e != nil { - logger.Debug("util.GetLocalIP err:", e) + logger.Error("util.GetLocalIP err:", e) continue } alarmPort := config.EsCompServerInfo.ESPort @@ -519,7 +519,7 @@ } else { shardins, ok := Cmap.Cam[tid] if !ok { - logger.Debug("get shard error by tableId:", tid) + logger.Error("get shard error by tableId:", tid) continue } @@ -536,7 +536,7 @@ buf, err := proto.Marshal(&scResult) if err != nil { - logger.Debug("scResult Marshal error!", err) + logger.Error("scResult Marshal error!", err) return nil } @@ -547,16 +547,16 @@ co_d, err := base64.StdEncoding.DecodeString(co) if err != nil { - logger.Debug("DoSdkCompare err:", err) + logger.Error("DoSdkCompare err:", err) return -1 } if len(co_d) != 2560 { - logger.Debug("target fea.len !=2560") + logger.Error("target fea.len !=2560") return -1 } if len(ci) != 2560 { - logger.Debug("source fea.len !=2560") + logger.Error("source fea.len !=2560") return -1 } sec := DecCompare(ci, co_d) diff --git a/main.go b/main.go index f8800b5..2839a57 100644 --- a/main.go +++ b/main.go @@ -6,6 +6,7 @@ "os" "path" "path/filepath" + "sdkCompare/proto/facecompare" "strconv" "time" @@ -51,32 +52,44 @@ logger.Debug("This is a new server about sdk compare, proc name ", procName) serveUrl := "tcp://0.0.0.0:" - if procName == "dbCompare" { - if err := cache.ConnectDB(); err != nil { - logger.Error(err.Error()) - return - } - - cache.InitDbTablePersons() - if !cache.InitCompare() { - logger.Debug("init SDKFace return false,panic") - return - } - serveUrl = serveUrl + strconv.Itoa(config.DbPersonCompInfo.ServePort) - } else { - if err := cache.Init(config.EsInfo.EsIndex.AiOcean.IndexName, targetType1); err != nil { - logger.Info("init capture cache err:", err) - return - } - - if !cache.InitCompare() { - logger.Debug("init SDKFace return false,panic") - return - } - - go cache.IncreVideoPersonsCache(time.Now(), targetType1) - serveUrl = serveUrl + strconv.Itoa(config.EsCompServerInfo.ServePort) + if err := cache.ConnectDB(); err != nil { + logger.Error(err.Error()) + return } + + cache.InitDbTablePersons() + if !cache.InitCompare() { + logger.Debug("init SDKFace return false,panic") + return + } + serveUrl = serveUrl + strconv.Itoa(config.DbPersonCompInfo.ServePort) + + //if procName == "dbCompare" { + // if err := cache.ConnectDB(); err != nil { + // logger.Error(err.Error()) + // return + // } + // + // cache.InitDbTablePersons() + // if !cache.InitCompare() { + // logger.Debug("init SDKFace return false,panic") + // return + // } + // serveUrl = serveUrl + strconv.Itoa(config.DbPersonCompInfo.ServePort) + //} else { + // if err := cache.Init(config.EsInfo.EsIndex.AiOcean.IndexName, targetType1); err != nil { + // logger.Info("init capture cache err:", err) + // return + // } + // + // if !cache.InitCompare() { + // logger.Debug("init SDKFace return false,panic") + // return + // } + // + // go cache.IncreVideoPersonsCache(time.Now(), targetType1) + // serveUrl = serveUrl + strconv.Itoa(config.EsCompServerInfo.ServePort) + //} logger.Debugf("%s serve url:%s", procName, serveUrl) @@ -89,12 +102,12 @@ var msg []byte var ctx, _ = context.WithCancel(context.Background()) if sock, err = rep.NewSocket(); err != nil { - logger.Debug("new rep socket err:", err) + logger.Error("new rep socket err:", err) } sock.AddTransport(ipc.NewTransport()) sock.AddTransport(tcp.NewTransport()) if err = sock.Listen(url); err != nil { - logger.Debug("listen on rep socket err:", err) + logger.Error("listen on rep socket err:", err) } for { @@ -109,35 +122,45 @@ } if len(msg) > 0 { - var compareArgInfo protomsg.CompareArgs - var cacheChangeInfo protomsg.EsPersonCacheChange - var compareEvent protomsg.CompareEvent - if err = proto.Unmarshal(msg, &compareArgInfo); err == nil { - timeStart := time.Now() - result := cache.GetComparePersonBaseInfo(compareArgInfo) - logger.Debug("鐢ㄦ椂锛�", time.Since(timeStart)) - err = sock.Send(result) - if err != nil { - logger.Debug("send reply err:", err.Error()) - } - - } else if err = proto.Unmarshal(msg, &cacheChangeInfo); err == nil { - cache.UpdateCache(&cacheChangeInfo) - err = sock.Send([]byte("")) - if err != nil { - logger.Debug("send reply err:", err.Error()) - } - } else if err = proto.Unmarshal(msg, &compareEvent); err == nil { - if compareEvent.EventType == protomsg.CompareEventType_ReInitCache { //鍔犲叆闆嗙兢鍚庨噸鏂板垵濮嬪寲缂撳瓨 - cache.ReInitDbTablePersonsCache() - } else if compareEvent.EventType == protomsg.CompareEventType_UpdateCache { //搴撲腑鏂板鏇存柊缂撳瓨 - cache.UpdateDbPersonsCacheById(string(compareEvent.Payload)) - } - } else { - logger.Debug("json unmarshal error") + var compareType facecompare.CompareRequest + err = proto.Unmarshal(msg, &compareType) + if err != nil { + logger.Error("compareType json unmarshal error") continue } - + var result []byte + if compareType.CompareType == facecompare.CompareType_Compare { + var compareArgInfo protomsg.CompareArgs + var cacheChangeInfo protomsg.EsPersonCacheChange + if err = proto.Unmarshal(compareType.Payload, &compareArgInfo); err == nil { + timeStart := time.Now() + result = cache.GetComparePersonBaseInfo(compareArgInfo) + logger.Debug("鐢ㄦ椂锛�", time.Since(timeStart)) + } else if err = proto.Unmarshal(compareType.Payload, &cacheChangeInfo); err == nil { + cache.UpdateCache(&cacheChangeInfo) + } else { + logger.Error("CompareArgs or EsPersonCacheChange json unmarshal error") + continue + } + } else if compareType.CompareType == facecompare.CompareType_UpdateCache { + var compareEvent protomsg.CompareEvent + if err = proto.Unmarshal(compareType.Payload, &compareEvent); err == nil { + if compareEvent.EventType == protomsg.CompareEventType_ReInitCache { //鍔犲叆闆嗙兢鍚庨噸鏂板垵濮嬪寲缂撳瓨 + cache.ReInitDbTablePersonsCache() + } else if compareEvent.EventType == protomsg.CompareEventType_UpdateCache { //搴撲腑鏂板鏇存柊缂撳瓨 + id := string(compareEvent.Payload) + cache.UpdateDbPersonsCacheById(id) + logger.Debug("--------------鏇存柊浜哄憳缂撳瓨, id: ", id) + } + } else { + logger.Error("CompareEvent json unmarshal error") + continue + } + } + err = sock.Send(result) + if err != nil { + logger.Error("send reply err:", err.Error()) + } } } } diff --git a/proto/face_compare.proto b/proto/face_compare.proto new file mode 100644 index 0000000..3cb2be3 --- /dev/null +++ b/proto/face_compare.proto @@ -0,0 +1,13 @@ +syntax = "proto3"; + +option go_package = "./facecompare"; + +enum CompareType { + Compare = 0; // 鍋氭瘮瀵硅姹� + UpdateCache = 1; // 鏇存柊搴曞簱鍜屼汉鍛樼紦瀛� +} + +message CompareRequest { + CompareType compareType =1; + bytes payload = 2; +} diff --git a/proto/facecompare/face_compare.pb.go b/proto/facecompare/face_compare.pb.go new file mode 100644 index 0000000..e9b40be --- /dev/null +++ b/proto/facecompare/face_compare.pb.go @@ -0,0 +1,206 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.26.0 +// protoc v4.24.0 +// source: face_compare.proto + +package facecompare + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type CompareType int32 + +const ( + CompareType_Compare CompareType = 0 // 鍋氭瘮瀵硅姹� + CompareType_UpdateCache CompareType = 1 // 鏇存柊搴曞簱鍜屼汉鍛樼紦瀛� +) + +// Enum value maps for CompareType. +var ( + CompareType_name = map[int32]string{ + 0: "Compare", + 1: "UpdateCache", + } + CompareType_value = map[string]int32{ + "Compare": 0, + "UpdateCache": 1, + } +) + +func (x CompareType) Enum() *CompareType { + p := new(CompareType) + *p = x + return p +} + +func (x CompareType) String() string { + return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x)) +} + +func (CompareType) Descriptor() protoreflect.EnumDescriptor { + return file_face_compare_proto_enumTypes[0].Descriptor() +} + +func (CompareType) Type() protoreflect.EnumType { + return &file_face_compare_proto_enumTypes[0] +} + +func (x CompareType) Number() protoreflect.EnumNumber { + return protoreflect.EnumNumber(x) +} + +// Deprecated: Use CompareType.Descriptor instead. +func (CompareType) EnumDescriptor() ([]byte, []int) { + return file_face_compare_proto_rawDescGZIP(), []int{0} +} + +type CompareRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + CompareType CompareType `protobuf:"varint,1,opt,name=compareType,proto3,enum=CompareType" json:"compareType,omitempty"` + Payload []byte `protobuf:"bytes,2,opt,name=payload,proto3" json:"payload,omitempty"` +} + +func (x *CompareRequest) Reset() { + *x = CompareRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_face_compare_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *CompareRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*CompareRequest) ProtoMessage() {} + +func (x *CompareRequest) ProtoReflect() protoreflect.Message { + mi := &file_face_compare_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use CompareRequest.ProtoReflect.Descriptor instead. +func (*CompareRequest) Descriptor() ([]byte, []int) { + return file_face_compare_proto_rawDescGZIP(), []int{0} +} + +func (x *CompareRequest) GetCompareType() CompareType { + if x != nil { + return x.CompareType + } + return CompareType_Compare +} + +func (x *CompareRequest) GetPayload() []byte { + if x != nil { + return x.Payload + } + return nil +} + +var File_face_compare_proto protoreflect.FileDescriptor + +var file_face_compare_proto_rawDesc = []byte{ + 0x0a, 0x12, 0x66, 0x61, 0x63, 0x65, 0x5f, 0x63, 0x6f, 0x6d, 0x70, 0x61, 0x72, 0x65, 0x2e, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x5a, 0x0a, 0x0e, 0x43, 0x6f, 0x6d, 0x70, 0x61, 0x72, 0x65, 0x52, + 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x2e, 0x0a, 0x0b, 0x63, 0x6f, 0x6d, 0x70, 0x61, 0x72, + 0x65, 0x54, 0x79, 0x70, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x0c, 0x2e, 0x43, 0x6f, + 0x6d, 0x70, 0x61, 0x72, 0x65, 0x54, 0x79, 0x70, 0x65, 0x52, 0x0b, 0x63, 0x6f, 0x6d, 0x70, 0x61, + 0x72, 0x65, 0x54, 0x79, 0x70, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, + 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, + 0x2a, 0x2b, 0x0a, 0x0b, 0x43, 0x6f, 0x6d, 0x70, 0x61, 0x72, 0x65, 0x54, 0x79, 0x70, 0x65, 0x12, + 0x0b, 0x0a, 0x07, 0x43, 0x6f, 0x6d, 0x70, 0x61, 0x72, 0x65, 0x10, 0x00, 0x12, 0x0f, 0x0a, 0x0b, + 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x43, 0x61, 0x63, 0x68, 0x65, 0x10, 0x01, 0x42, 0x0f, 0x5a, + 0x0d, 0x2e, 0x2f, 0x66, 0x61, 0x63, 0x65, 0x63, 0x6f, 0x6d, 0x70, 0x61, 0x72, 0x65, 0x62, 0x06, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_face_compare_proto_rawDescOnce sync.Once + file_face_compare_proto_rawDescData = file_face_compare_proto_rawDesc +) + +func file_face_compare_proto_rawDescGZIP() []byte { + file_face_compare_proto_rawDescOnce.Do(func() { + file_face_compare_proto_rawDescData = protoimpl.X.CompressGZIP(file_face_compare_proto_rawDescData) + }) + return file_face_compare_proto_rawDescData +} + +var file_face_compare_proto_enumTypes = make([]protoimpl.EnumInfo, 1) +var file_face_compare_proto_msgTypes = make([]protoimpl.MessageInfo, 1) +var file_face_compare_proto_goTypes = []interface{}{ + (CompareType)(0), // 0: CompareType + (*CompareRequest)(nil), // 1: CompareRequest +} +var file_face_compare_proto_depIdxs = []int32{ + 0, // 0: CompareRequest.compareType:type_name -> CompareType + 1, // [1:1] is the sub-list for method output_type + 1, // [1:1] is the sub-list for method input_type + 1, // [1:1] is the sub-list for extension type_name + 1, // [1:1] is the sub-list for extension extendee + 0, // [0:1] is the sub-list for field type_name +} + +func init() { file_face_compare_proto_init() } +func file_face_compare_proto_init() { + if File_face_compare_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_face_compare_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*CompareRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_face_compare_proto_rawDesc, + NumEnums: 1, + NumMessages: 1, + NumExtensions: 0, + NumServices: 0, + }, + GoTypes: file_face_compare_proto_goTypes, + DependencyIndexes: file_face_compare_proto_depIdxs, + EnumInfos: file_face_compare_proto_enumTypes, + MessageInfos: file_face_compare_proto_msgTypes, + }.Build() + File_face_compare_proto = out.File + file_face_compare_proto_rawDesc = nil + file_face_compare_proto_goTypes = nil + file_face_compare_proto_depIdxs = nil +} -- Gitblit v1.8.0