| | |
| | | // 调用增量的接口 |
| | | captures, err := libEs.GetPeriodInfos(serverIp, alarmPort, laststring, curstring, indexName, shardStr, targetType) |
| | | if err != nil { |
| | | logger.Debug(err) |
| | | logger.Error(err) |
| | | return |
| | | } |
| | | for _, ei := range captures { |
| | |
| | | func getShards(serverIp string, alarmIp string, alarmPort string) (string, error) { |
| | | infos, e := libEs.GetShardsByIndex(alarmIp, alarmPort, config.EsInfo.EsIndex.AiOcean.IndexName) |
| | | if e != nil || infos == nil { |
| | | logger.Debug("get es primary ips err") |
| | | logger.Error("get es primary ips err") |
| | | return "", errors.New("get es primary ips err") |
| | | } |
| | | |
| | |
| | | |
| | | captures, err := libEs.GetOceanFeatures(serverIp, alarmPort, 1000, indexName, shardStr, targetType) |
| | | if err != nil { |
| | | logger.Debug("libEs.GetOceanFeatures err:", err) |
| | | logger.Error("libEs.GetOceanFeatures err:", err) |
| | | return err |
| | | } |
| | | logger.Debug("len(captures):", len(captures)) |
| | |
| | | defer wg.Done() |
| | | dbpersons, err := dbpApi.GetPersonsCompareCacheBase(j, queryEachNum) |
| | | if err != nil { |
| | | logger.Debug(err) |
| | | logger.Error(err) |
| | | return |
| | | } |
| | | logger.Debugf("获取%d条人员信息", len(dbpersons)) |
| | |
| | | var dbpApi DbPersons |
| | | info, err := dbpApi.GetPersonsCompareCacheById(id) |
| | | if err != nil { |
| | | logger.Debug(err) |
| | | logger.Error(err) |
| | | return |
| | | } |
| | | if info.Tableid != "" { |
| | |
| | | Cmap.Cam[tableId].Del(id) |
| | | logger.Debug("DelPerson ok success") |
| | | } else { |
| | | logger.Debug("tableId:", tableId, " not exist") |
| | | logger.Error("tableId:", tableId, " not exist") |
| | | } |
| | | } |
| | | |
| | |
| | | for _, tid := range compareArgs.TableIds { //ruleProcess比对指定底库 |
| | | shardins, ok := Cmap.Cam[tid] |
| | | if !ok { |
| | | logger.Debug("ruleProcess compare get shard error by tableId:", tid) |
| | | logger.Error("ruleProcess compare get shard error by tableId:", tid) |
| | | continue |
| | | } |
| | | if tShard, hasT := Cmap.Cam[PRE_DBTABLE]; hasT { |
| | |
| | | } |
| | | } |
| | | } else { |
| | | logger.Debug("ruleProcess compare tables,tShard not exist tableId:", tid) |
| | | logger.Error("ruleProcess compare tables,tShard not exist tableId:", tid) |
| | | } |
| | | } else { |
| | | logger.Debug("ruleProcess compare tables,PRE_DBTABLE tableId:", tid, " not exist") |
| | | logger.Error("ruleProcess compare tables,PRE_DBTABLE tableId:", tid, " not exist") |
| | | } |
| | | } |
| | | } else { //web请求,比对指定的抓拍库或者底库 |
| | |
| | | tStart := time.Now() |
| | | serverIp, _, e := util.GetLocalIP(config.Server.NetworkAdapter) |
| | | if e != nil { |
| | | logger.Debug("util.GetLocalIP err:", e) |
| | | logger.Error("util.GetLocalIP err:", e) |
| | | continue |
| | | } |
| | | alarmPort := config.EsCompServerInfo.ESPort |
| | |
| | | } else { |
| | | shardins, ok := Cmap.Cam[tid] |
| | | if !ok { |
| | | logger.Debug("get shard error by tableId:", tid) |
| | | logger.Error("get shard error by tableId:", tid) |
| | | continue |
| | | } |
| | | |
| | |
| | | buf, err := proto.Marshal(&scResult) |
| | | |
| | | if err != nil { |
| | | logger.Debug("scResult Marshal error!", err) |
| | | logger.Error("scResult Marshal error!", err) |
| | | return nil |
| | | } |
| | | |
| | |
| | | |
| | | co_d, err := base64.StdEncoding.DecodeString(co) |
| | | if err != nil { |
| | | logger.Debug("DoSdkCompare err:", err) |
| | | logger.Error("DoSdkCompare err:", err) |
| | | return -1 |
| | | } |
| | | if len(co_d) != 2560 { |
| | | logger.Debug("target fea.len !=2560") |
| | | logger.Error("target fea.len !=2560") |
| | | return -1 |
| | | } |
| | | |
| | | if len(ci) != 2560 { |
| | | logger.Debug("source fea.len !=2560") |
| | | logger.Error("source fea.len !=2560") |
| | | return -1 |
| | | } |
| | | sec := DecCompare(ci, co_d) |
| | |
| | | "os" |
| | | "path" |
| | | "path/filepath" |
| | | "sdkCompare/proto/facecompare" |
| | | "strconv" |
| | | "time" |
| | | |
| | |
| | | logger.Debug("This is a new server about sdk compare, proc name ", procName) |
| | | |
| | | serveUrl := "tcp://0.0.0.0:" |
| | | if procName == "dbCompare" { |
| | | if err := cache.ConnectDB(); err != nil { |
| | | logger.Error(err.Error()) |
| | | return |
| | | } |
| | | |
| | | cache.InitDbTablePersons() |
| | | if !cache.InitCompare() { |
| | | logger.Debug("init SDKFace return false,panic") |
| | | return |
| | | } |
| | | serveUrl = serveUrl + strconv.Itoa(config.DbPersonCompInfo.ServePort) |
| | | } else { |
| | | if err := cache.Init(config.EsInfo.EsIndex.AiOcean.IndexName, targetType1); err != nil { |
| | | logger.Info("init capture cache err:", err) |
| | | return |
| | | } |
| | | |
| | | if !cache.InitCompare() { |
| | | logger.Debug("init SDKFace return false,panic") |
| | | return |
| | | } |
| | | |
| | | go cache.IncreVideoPersonsCache(time.Now(), targetType1) |
| | | serveUrl = serveUrl + strconv.Itoa(config.EsCompServerInfo.ServePort) |
| | | if err := cache.ConnectDB(); err != nil { |
| | | logger.Error(err.Error()) |
| | | return |
| | | } |
| | | |
| | | cache.InitDbTablePersons() |
| | | if !cache.InitCompare() { |
| | | logger.Debug("init SDKFace return false,panic") |
| | | return |
| | | } |
| | | serveUrl = serveUrl + strconv.Itoa(config.DbPersonCompInfo.ServePort) |
| | | |
| | | //if procName == "dbCompare" { |
| | | // if err := cache.ConnectDB(); err != nil { |
| | | // logger.Error(err.Error()) |
| | | // return |
| | | // } |
| | | // |
| | | // cache.InitDbTablePersons() |
| | | // if !cache.InitCompare() { |
| | | // logger.Debug("init SDKFace return false,panic") |
| | | // return |
| | | // } |
| | | // serveUrl = serveUrl + strconv.Itoa(config.DbPersonCompInfo.ServePort) |
| | | //} else { |
| | | // if err := cache.Init(config.EsInfo.EsIndex.AiOcean.IndexName, targetType1); err != nil { |
| | | // logger.Info("init capture cache err:", err) |
| | | // return |
| | | // } |
| | | // |
| | | // if !cache.InitCompare() { |
| | | // logger.Debug("init SDKFace return false,panic") |
| | | // return |
| | | // } |
| | | // |
| | | // go cache.IncreVideoPersonsCache(time.Now(), targetType1) |
| | | // serveUrl = serveUrl + strconv.Itoa(config.EsCompServerInfo.ServePort) |
| | | //} |
| | | |
| | | logger.Debugf("%s serve url:%s", procName, serveUrl) |
| | | |
| | |
| | | var msg []byte |
| | | var ctx, _ = context.WithCancel(context.Background()) |
| | | if sock, err = rep.NewSocket(); err != nil { |
| | | logger.Debug("new rep socket err:", err) |
| | | logger.Error("new rep socket err:", err) |
| | | } |
| | | sock.AddTransport(ipc.NewTransport()) |
| | | sock.AddTransport(tcp.NewTransport()) |
| | | if err = sock.Listen(url); err != nil { |
| | | logger.Debug("listen on rep socket err:", err) |
| | | logger.Error("listen on rep socket err:", err) |
| | | } |
| | | |
| | | for { |
| | |
| | | } |
| | | |
| | | if len(msg) > 0 { |
| | | var compareArgInfo protomsg.CompareArgs |
| | | var cacheChangeInfo protomsg.EsPersonCacheChange |
| | | var compareEvent protomsg.CompareEvent |
| | | if err = proto.Unmarshal(msg, &compareArgInfo); err == nil { |
| | | timeStart := time.Now() |
| | | result := cache.GetComparePersonBaseInfo(compareArgInfo) |
| | | logger.Debug("用时:", time.Since(timeStart)) |
| | | err = sock.Send(result) |
| | | if err != nil { |
| | | logger.Debug("send reply err:", err.Error()) |
| | | } |
| | | |
| | | } else if err = proto.Unmarshal(msg, &cacheChangeInfo); err == nil { |
| | | cache.UpdateCache(&cacheChangeInfo) |
| | | err = sock.Send([]byte("")) |
| | | if err != nil { |
| | | logger.Debug("send reply err:", err.Error()) |
| | | } |
| | | } else if err = proto.Unmarshal(msg, &compareEvent); err == nil { |
| | | if compareEvent.EventType == protomsg.CompareEventType_ReInitCache { //加入集群后重新初始化缓存 |
| | | cache.ReInitDbTablePersonsCache() |
| | | } else if compareEvent.EventType == protomsg.CompareEventType_UpdateCache { //库中新增更新缓存 |
| | | cache.UpdateDbPersonsCacheById(string(compareEvent.Payload)) |
| | | } |
| | | } else { |
| | | logger.Debug("json unmarshal error") |
| | | var compareType facecompare.CompareRequest |
| | | err = proto.Unmarshal(msg, &compareType) |
| | | if err != nil { |
| | | logger.Error("compareType json unmarshal error") |
| | | continue |
| | | } |
| | | |
| | | var result []byte |
| | | if compareType.CompareType == facecompare.CompareType_Compare { |
| | | var compareArgInfo protomsg.CompareArgs |
| | | var cacheChangeInfo protomsg.EsPersonCacheChange |
| | | if err = proto.Unmarshal(compareType.Payload, &compareArgInfo); err == nil { |
| | | timeStart := time.Now() |
| | | result = cache.GetComparePersonBaseInfo(compareArgInfo) |
| | | logger.Debug("用时:", time.Since(timeStart)) |
| | | } else if err = proto.Unmarshal(compareType.Payload, &cacheChangeInfo); err == nil { |
| | | cache.UpdateCache(&cacheChangeInfo) |
| | | } else { |
| | | logger.Error("CompareArgs or EsPersonCacheChange json unmarshal error") |
| | | continue |
| | | } |
| | | } else if compareType.CompareType == facecompare.CompareType_UpdateCache { |
| | | var compareEvent protomsg.CompareEvent |
| | | if err = proto.Unmarshal(compareType.Payload, &compareEvent); err == nil { |
| | | if compareEvent.EventType == protomsg.CompareEventType_ReInitCache { //加入集群后重新初始化缓存 |
| | | cache.ReInitDbTablePersonsCache() |
| | | } else if compareEvent.EventType == protomsg.CompareEventType_UpdateCache { //库中新增更新缓存 |
| | | id := string(compareEvent.Payload) |
| | | cache.UpdateDbPersonsCacheById(id) |
| | | logger.Debug("--------------更新人员缓存, id: ", id) |
| | | } |
| | | } else { |
| | | logger.Error("CompareEvent json unmarshal error") |
| | | continue |
| | | } |
| | | } |
| | | err = sock.Send(result) |
| | | if err != nil { |
| | | logger.Error("send reply err:", err.Error()) |
| | | } |
| | | } |
| | | } |
| | | } |
New file |
| | |
| | | syntax = "proto3"; |
| | | |
| | | option go_package = "./facecompare"; |
| | | |
| | | enum CompareType { |
| | | Compare = 0; // 做比对请求 |
| | | UpdateCache = 1; // 更新底库和人员缓存 |
| | | } |
| | | |
| | | message CompareRequest { |
| | | CompareType compareType =1; |
| | | bytes payload = 2; |
| | | } |
New file |
| | |
| | | // Code generated by protoc-gen-go. DO NOT EDIT. |
| | | // versions: |
| | | // protoc-gen-go v1.26.0 |
| | | // protoc v4.24.0 |
| | | // source: face_compare.proto |
| | | |
| | | package facecompare |
| | | |
| | | import ( |
| | | protoreflect "google.golang.org/protobuf/reflect/protoreflect" |
| | | protoimpl "google.golang.org/protobuf/runtime/protoimpl" |
| | | reflect "reflect" |
| | | sync "sync" |
| | | ) |
| | | |
| | | const ( |
| | | // Verify that this generated code is sufficiently up-to-date. |
| | | _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) |
| | | // Verify that runtime/protoimpl is sufficiently up-to-date. |
| | | _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) |
| | | ) |
| | | |
| | | type CompareType int32 |
| | | |
| | | const ( |
| | | CompareType_Compare CompareType = 0 // 做比对请求 |
| | | CompareType_UpdateCache CompareType = 1 // 更新底库和人员缓存 |
| | | ) |
| | | |
| | | // Enum value maps for CompareType. |
| | | var ( |
| | | CompareType_name = map[int32]string{ |
| | | 0: "Compare", |
| | | 1: "UpdateCache", |
| | | } |
| | | CompareType_value = map[string]int32{ |
| | | "Compare": 0, |
| | | "UpdateCache": 1, |
| | | } |
| | | ) |
| | | |
| | | func (x CompareType) Enum() *CompareType { |
| | | p := new(CompareType) |
| | | *p = x |
| | | return p |
| | | } |
| | | |
| | | func (x CompareType) String() string { |
| | | return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x)) |
| | | } |
| | | |
| | | func (CompareType) Descriptor() protoreflect.EnumDescriptor { |
| | | return file_face_compare_proto_enumTypes[0].Descriptor() |
| | | } |
| | | |
| | | func (CompareType) Type() protoreflect.EnumType { |
| | | return &file_face_compare_proto_enumTypes[0] |
| | | } |
| | | |
| | | func (x CompareType) Number() protoreflect.EnumNumber { |
| | | return protoreflect.EnumNumber(x) |
| | | } |
| | | |
| | | // Deprecated: Use CompareType.Descriptor instead. |
| | | func (CompareType) EnumDescriptor() ([]byte, []int) { |
| | | return file_face_compare_proto_rawDescGZIP(), []int{0} |
| | | } |
| | | |
| | | type CompareRequest struct { |
| | | state protoimpl.MessageState |
| | | sizeCache protoimpl.SizeCache |
| | | unknownFields protoimpl.UnknownFields |
| | | |
| | | CompareType CompareType `protobuf:"varint,1,opt,name=compareType,proto3,enum=CompareType" json:"compareType,omitempty"` |
| | | Payload []byte `protobuf:"bytes,2,opt,name=payload,proto3" json:"payload,omitempty"` |
| | | } |
| | | |
| | | func (x *CompareRequest) Reset() { |
| | | *x = CompareRequest{} |
| | | if protoimpl.UnsafeEnabled { |
| | | mi := &file_face_compare_proto_msgTypes[0] |
| | | ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) |
| | | ms.StoreMessageInfo(mi) |
| | | } |
| | | } |
| | | |
| | | func (x *CompareRequest) String() string { |
| | | return protoimpl.X.MessageStringOf(x) |
| | | } |
| | | |
| | | func (*CompareRequest) ProtoMessage() {} |
| | | |
| | | func (x *CompareRequest) ProtoReflect() protoreflect.Message { |
| | | mi := &file_face_compare_proto_msgTypes[0] |
| | | if protoimpl.UnsafeEnabled && x != nil { |
| | | ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) |
| | | if ms.LoadMessageInfo() == nil { |
| | | ms.StoreMessageInfo(mi) |
| | | } |
| | | return ms |
| | | } |
| | | return mi.MessageOf(x) |
| | | } |
| | | |
| | | // Deprecated: Use CompareRequest.ProtoReflect.Descriptor instead. |
| | | func (*CompareRequest) Descriptor() ([]byte, []int) { |
| | | return file_face_compare_proto_rawDescGZIP(), []int{0} |
| | | } |
| | | |
| | | func (x *CompareRequest) GetCompareType() CompareType { |
| | | if x != nil { |
| | | return x.CompareType |
| | | } |
| | | return CompareType_Compare |
| | | } |
| | | |
| | | func (x *CompareRequest) GetPayload() []byte { |
| | | if x != nil { |
| | | return x.Payload |
| | | } |
| | | return nil |
| | | } |
| | | |
| | | var File_face_compare_proto protoreflect.FileDescriptor |
| | | |
| | | var file_face_compare_proto_rawDesc = []byte{ |
| | | 0x0a, 0x12, 0x66, 0x61, 0x63, 0x65, 0x5f, 0x63, 0x6f, 0x6d, 0x70, 0x61, 0x72, 0x65, 0x2e, 0x70, |
| | | 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x5a, 0x0a, 0x0e, 0x43, 0x6f, 0x6d, 0x70, 0x61, 0x72, 0x65, 0x52, |
| | | 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x2e, 0x0a, 0x0b, 0x63, 0x6f, 0x6d, 0x70, 0x61, 0x72, |
| | | 0x65, 0x54, 0x79, 0x70, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x0c, 0x2e, 0x43, 0x6f, |
| | | 0x6d, 0x70, 0x61, 0x72, 0x65, 0x54, 0x79, 0x70, 0x65, 0x52, 0x0b, 0x63, 0x6f, 0x6d, 0x70, 0x61, |
| | | 0x72, 0x65, 0x54, 0x79, 0x70, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, |
| | | 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, |
| | | 0x2a, 0x2b, 0x0a, 0x0b, 0x43, 0x6f, 0x6d, 0x70, 0x61, 0x72, 0x65, 0x54, 0x79, 0x70, 0x65, 0x12, |
| | | 0x0b, 0x0a, 0x07, 0x43, 0x6f, 0x6d, 0x70, 0x61, 0x72, 0x65, 0x10, 0x00, 0x12, 0x0f, 0x0a, 0x0b, |
| | | 0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x43, 0x61, 0x63, 0x68, 0x65, 0x10, 0x01, 0x42, 0x0f, 0x5a, |
| | | 0x0d, 0x2e, 0x2f, 0x66, 0x61, 0x63, 0x65, 0x63, 0x6f, 0x6d, 0x70, 0x61, 0x72, 0x65, 0x62, 0x06, |
| | | 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, |
| | | } |
| | | |
| | | var ( |
| | | file_face_compare_proto_rawDescOnce sync.Once |
| | | file_face_compare_proto_rawDescData = file_face_compare_proto_rawDesc |
| | | ) |
| | | |
| | | func file_face_compare_proto_rawDescGZIP() []byte { |
| | | file_face_compare_proto_rawDescOnce.Do(func() { |
| | | file_face_compare_proto_rawDescData = protoimpl.X.CompressGZIP(file_face_compare_proto_rawDescData) |
| | | }) |
| | | return file_face_compare_proto_rawDescData |
| | | } |
| | | |
| | | var file_face_compare_proto_enumTypes = make([]protoimpl.EnumInfo, 1) |
| | | var file_face_compare_proto_msgTypes = make([]protoimpl.MessageInfo, 1) |
| | | var file_face_compare_proto_goTypes = []interface{}{ |
| | | (CompareType)(0), // 0: CompareType |
| | | (*CompareRequest)(nil), // 1: CompareRequest |
| | | } |
| | | var file_face_compare_proto_depIdxs = []int32{ |
| | | 0, // 0: CompareRequest.compareType:type_name -> CompareType |
| | | 1, // [1:1] is the sub-list for method output_type |
| | | 1, // [1:1] is the sub-list for method input_type |
| | | 1, // [1:1] is the sub-list for extension type_name |
| | | 1, // [1:1] is the sub-list for extension extendee |
| | | 0, // [0:1] is the sub-list for field type_name |
| | | } |
| | | |
| | | func init() { file_face_compare_proto_init() } |
| | | func file_face_compare_proto_init() { |
| | | if File_face_compare_proto != nil { |
| | | return |
| | | } |
| | | if !protoimpl.UnsafeEnabled { |
| | | file_face_compare_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { |
| | | switch v := v.(*CompareRequest); i { |
| | | case 0: |
| | | return &v.state |
| | | case 1: |
| | | return &v.sizeCache |
| | | case 2: |
| | | return &v.unknownFields |
| | | default: |
| | | return nil |
| | | } |
| | | } |
| | | } |
| | | type x struct{} |
| | | out := protoimpl.TypeBuilder{ |
| | | File: protoimpl.DescBuilder{ |
| | | GoPackagePath: reflect.TypeOf(x{}).PkgPath(), |
| | | RawDescriptor: file_face_compare_proto_rawDesc, |
| | | NumEnums: 1, |
| | | NumMessages: 1, |
| | | NumExtensions: 0, |
| | | NumServices: 0, |
| | | }, |
| | | GoTypes: file_face_compare_proto_goTypes, |
| | | DependencyIndexes: file_face_compare_proto_depIdxs, |
| | | EnumInfos: file_face_compare_proto_enumTypes, |
| | | MessageInfos: file_face_compare_proto_msgTypes, |
| | | }.Build() |
| | | File_face_compare_proto = out.File |
| | | file_face_compare_proto_rawDesc = nil |
| | | file_face_compare_proto_goTypes = nil |
| | | file_face_compare_proto_depIdxs = nil |
| | | } |