liujiandao
2024-02-20 4860c7b312cdce2d948ee417b0bb1fed60fd9dc7
更新与对比修改
2个文件已添加
2个文件已修改
380 ■■■■ 已修改文件
cache/compare.go 30 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
main.go 131 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
proto/face_compare.proto 13 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
proto/facecompare/face_compare.pb.go 206 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
cache/compare.go
@@ -82,7 +82,7 @@
    // 调用增量的接口
    captures, err := libEs.GetPeriodInfos(serverIp, alarmPort, laststring, curstring, indexName, shardStr, targetType)
    if err != nil {
        logger.Debug(err)
        logger.Error(err)
        return
    }
    for _, ei := range captures {
@@ -102,7 +102,7 @@
func getShards(serverIp string, alarmIp string, alarmPort string) (string, error) {
    infos, e := libEs.GetShardsByIndex(alarmIp, alarmPort, config.EsInfo.EsIndex.AiOcean.IndexName)
    if e != nil || infos == nil {
        logger.Debug("get es primary ips err")
        logger.Error("get es primary ips err")
        return "", errors.New("get es primary ips err")
    }
@@ -148,7 +148,7 @@
    captures, err := libEs.GetOceanFeatures(serverIp, alarmPort, 1000, indexName, shardStr, targetType)
    if err != nil {
        logger.Debug("libEs.GetOceanFeatures err:", err)
        logger.Error("libEs.GetOceanFeatures err:", err)
        return err
    }
    logger.Debug("len(captures):", len(captures))
@@ -238,7 +238,7 @@
                defer wg.Done()
                dbpersons, err := dbpApi.GetPersonsCompareCacheBase(j, queryEachNum)
                if err != nil {
                    logger.Debug(err)
                    logger.Error(err)
                    return
                }
                logger.Debugf("获取%d条人员信息", len(dbpersons))
@@ -271,7 +271,7 @@
    var dbpApi DbPersons
    info, err := dbpApi.GetPersonsCompareCacheById(id)
    if err != nil {
        logger.Debug(err)
        logger.Error(err)
        return
    }
    if info.Tableid != "" {
@@ -310,7 +310,7 @@
        Cmap.Cam[tableId].Del(id)
        logger.Debug("DelPerson ok success")
    } else {
        logger.Debug("tableId:", tableId, " not exist")
        logger.Error("tableId:", tableId, " not exist")
    }
}
@@ -425,7 +425,7 @@
            for _, tid := range compareArgs.TableIds { //ruleProcess比对指定底库
                shardins, ok := Cmap.Cam[tid]
                if !ok {
                    logger.Debug("ruleProcess compare get shard error by tableId:", tid)
                    logger.Error("ruleProcess compare get shard error by tableId:", tid)
                    continue
                }
                if tShard, hasT := Cmap.Cam[PRE_DBTABLE]; hasT {
@@ -438,10 +438,10 @@
                            }
                        }
                    } else {
                        logger.Debug("ruleProcess compare tables,tShard not exist tableId:", tid)
                        logger.Error("ruleProcess compare tables,tShard not exist tableId:", tid)
                    }
                } else {
                    logger.Debug("ruleProcess compare tables,PRE_DBTABLE tableId:", tid, " not exist")
                    logger.Error("ruleProcess compare tables,PRE_DBTABLE tableId:", tid, " not exist")
                }
            }
        } else { //web请求,比对指定的抓拍库或者底库
@@ -450,7 +450,7 @@
                    tStart := time.Now()
                    serverIp, _, e := util.GetLocalIP(config.Server.NetworkAdapter)
                    if e != nil {
                        logger.Debug("util.GetLocalIP err:", e)
                        logger.Error("util.GetLocalIP err:", e)
                        continue
                    }
                    alarmPort := config.EsCompServerInfo.ESPort
@@ -519,7 +519,7 @@
                } else {
                    shardins, ok := Cmap.Cam[tid]
                    if !ok {
                        logger.Debug("get shard error by tableId:", tid)
                        logger.Error("get shard error by tableId:", tid)
                        continue
                    }
@@ -536,7 +536,7 @@
    buf, err := proto.Marshal(&scResult)
    if err != nil {
        logger.Debug("scResult Marshal error!", err)
        logger.Error("scResult Marshal error!", err)
        return nil
    }
@@ -547,16 +547,16 @@
    co_d, err := base64.StdEncoding.DecodeString(co)
    if err != nil {
        logger.Debug("DoSdkCompare err:", err)
        logger.Error("DoSdkCompare err:", err)
        return -1
    }
    if len(co_d) != 2560 {
        logger.Debug("target fea.len !=2560")
        logger.Error("target fea.len !=2560")
        return -1
    }
    if len(ci) != 2560 {
        logger.Debug("source fea.len !=2560")
        logger.Error("source fea.len !=2560")
        return -1
    }
    sec := DecCompare(ci, co_d)
main.go
@@ -6,6 +6,7 @@
    "os"
    "path"
    "path/filepath"
    "sdkCompare/proto/facecompare"
    "strconv"
    "time"
@@ -51,32 +52,44 @@
    logger.Debug("This is a new server about sdk compare, proc name ", procName)
    serveUrl := "tcp://0.0.0.0:"
    if procName == "dbCompare" {
        if err := cache.ConnectDB(); err != nil {
            logger.Error(err.Error())
            return
        }
        cache.InitDbTablePersons()
        if !cache.InitCompare() {
            logger.Debug("init SDKFace return false,panic")
            return
        }
        serveUrl = serveUrl + strconv.Itoa(config.DbPersonCompInfo.ServePort)
    } else {
        if err := cache.Init(config.EsInfo.EsIndex.AiOcean.IndexName, targetType1); err != nil {
            logger.Info("init capture cache err:", err)
            return
        }
        if !cache.InitCompare() {
            logger.Debug("init SDKFace return false,panic")
            return
        }
        go cache.IncreVideoPersonsCache(time.Now(), targetType1)
        serveUrl = serveUrl + strconv.Itoa(config.EsCompServerInfo.ServePort)
    if err := cache.ConnectDB(); err != nil {
        logger.Error(err.Error())
        return
    }
    cache.InitDbTablePersons()
    if !cache.InitCompare() {
        logger.Debug("init SDKFace return false,panic")
        return
    }
    serveUrl = serveUrl + strconv.Itoa(config.DbPersonCompInfo.ServePort)
    //if procName == "dbCompare" {
    //    if err := cache.ConnectDB(); err != nil {
    //        logger.Error(err.Error())
    //        return
    //    }
    //
    //    cache.InitDbTablePersons()
    //    if !cache.InitCompare() {
    //        logger.Debug("init SDKFace return false,panic")
    //        return
    //    }
    //    serveUrl = serveUrl + strconv.Itoa(config.DbPersonCompInfo.ServePort)
    //} else {
    //    if err := cache.Init(config.EsInfo.EsIndex.AiOcean.IndexName, targetType1); err != nil {
    //        logger.Info("init capture cache err:", err)
    //        return
    //    }
    //
    //    if !cache.InitCompare() {
    //        logger.Debug("init SDKFace return false,panic")
    //        return
    //    }
    //
    //    go cache.IncreVideoPersonsCache(time.Now(), targetType1)
    //    serveUrl = serveUrl + strconv.Itoa(config.EsCompServerInfo.ServePort)
    //}
    logger.Debugf("%s serve url:%s", procName, serveUrl)
@@ -89,12 +102,12 @@
    var msg []byte
    var ctx, _ = context.WithCancel(context.Background())
    if sock, err = rep.NewSocket(); err != nil {
        logger.Debug("new rep socket err:", err)
        logger.Error("new rep socket err:", err)
    }
    sock.AddTransport(ipc.NewTransport())
    sock.AddTransport(tcp.NewTransport())
    if err = sock.Listen(url); err != nil {
        logger.Debug("listen on rep socket err:", err)
        logger.Error("listen on rep socket err:", err)
    }
    for {
@@ -109,35 +122,45 @@
            }
            if len(msg) > 0 {
                var compareArgInfo protomsg.CompareArgs
                var cacheChangeInfo protomsg.EsPersonCacheChange
                var compareEvent protomsg.CompareEvent
                if err = proto.Unmarshal(msg, &compareArgInfo); err == nil {
                    timeStart := time.Now()
                    result := cache.GetComparePersonBaseInfo(compareArgInfo)
                    logger.Debug("用时:", time.Since(timeStart))
                    err = sock.Send(result)
                    if err != nil {
                        logger.Debug("send reply err:", err.Error())
                    }
                } else if err = proto.Unmarshal(msg, &cacheChangeInfo); err == nil {
                    cache.UpdateCache(&cacheChangeInfo)
                    err = sock.Send([]byte(""))
                    if err != nil {
                        logger.Debug("send reply err:", err.Error())
                    }
                } else if err = proto.Unmarshal(msg, &compareEvent); err == nil {
                    if compareEvent.EventType == protomsg.CompareEventType_ReInitCache { //加入集群后重新初始化缓存
                        cache.ReInitDbTablePersonsCache()
                    } else if compareEvent.EventType == protomsg.CompareEventType_UpdateCache { //库中新增更新缓存
                        cache.UpdateDbPersonsCacheById(string(compareEvent.Payload))
                    }
                } else {
                    logger.Debug("json unmarshal error")
                var compareType facecompare.CompareRequest
                err = proto.Unmarshal(msg, &compareType)
                if err != nil {
                    logger.Error("compareType json unmarshal error")
                    continue
                }
                var result []byte
                if compareType.CompareType == facecompare.CompareType_Compare {
                    var compareArgInfo protomsg.CompareArgs
                    var cacheChangeInfo protomsg.EsPersonCacheChange
                    if err = proto.Unmarshal(compareType.Payload, &compareArgInfo); err == nil {
                        timeStart := time.Now()
                        result = cache.GetComparePersonBaseInfo(compareArgInfo)
                        logger.Debug("用时:", time.Since(timeStart))
                    } else if err = proto.Unmarshal(compareType.Payload, &cacheChangeInfo); err == nil {
                        cache.UpdateCache(&cacheChangeInfo)
                    } else {
                        logger.Error("CompareArgs or EsPersonCacheChange json unmarshal error")
                        continue
                    }
                } else if compareType.CompareType == facecompare.CompareType_UpdateCache {
                    var compareEvent protomsg.CompareEvent
                    if err = proto.Unmarshal(compareType.Payload, &compareEvent); err == nil {
                        if compareEvent.EventType == protomsg.CompareEventType_ReInitCache { //加入集群后重新初始化缓存
                            cache.ReInitDbTablePersonsCache()
                        } else if compareEvent.EventType == protomsg.CompareEventType_UpdateCache { //库中新增更新缓存
                            id := string(compareEvent.Payload)
                            cache.UpdateDbPersonsCacheById(id)
                            logger.Debug("--------------更新人员缓存, id: ", id)
                        }
                    } else {
                        logger.Error("CompareEvent json unmarshal error")
                        continue
                    }
                }
                err = sock.Send(result)
                if err != nil {
                    logger.Error("send reply err:", err.Error())
                }
            }
        }
    }
proto/face_compare.proto
New file
@@ -0,0 +1,13 @@
syntax = "proto3";
option go_package = "./facecompare";
enum CompareType {
  Compare = 0; // 做比对请求
  UpdateCache = 1; // 更新底库和人员缓存
}
message CompareRequest {
  CompareType compareType =1;
  bytes payload = 2;
}
proto/facecompare/face_compare.pb.go
New file
@@ -0,0 +1,206 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
//     protoc-gen-go v1.26.0
//     protoc        v4.24.0
// source: face_compare.proto
package facecompare
import (
    protoreflect "google.golang.org/protobuf/reflect/protoreflect"
    protoimpl "google.golang.org/protobuf/runtime/protoimpl"
    reflect "reflect"
    sync "sync"
)
const (
    // Verify that this generated code is sufficiently up-to-date.
    _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)
    // Verify that runtime/protoimpl is sufficiently up-to-date.
    _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
)
type CompareType int32
const (
    CompareType_Compare     CompareType = 0 // 做比对请求
    CompareType_UpdateCache CompareType = 1 // 更新底库和人员缓存
)
// Enum value maps for CompareType.
var (
    CompareType_name = map[int32]string{
        0: "Compare",
        1: "UpdateCache",
    }
    CompareType_value = map[string]int32{
        "Compare":     0,
        "UpdateCache": 1,
    }
)
func (x CompareType) Enum() *CompareType {
    p := new(CompareType)
    *p = x
    return p
}
func (x CompareType) String() string {
    return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x))
}
func (CompareType) Descriptor() protoreflect.EnumDescriptor {
    return file_face_compare_proto_enumTypes[0].Descriptor()
}
func (CompareType) Type() protoreflect.EnumType {
    return &file_face_compare_proto_enumTypes[0]
}
func (x CompareType) Number() protoreflect.EnumNumber {
    return protoreflect.EnumNumber(x)
}
// Deprecated: Use CompareType.Descriptor instead.
func (CompareType) EnumDescriptor() ([]byte, []int) {
    return file_face_compare_proto_rawDescGZIP(), []int{0}
}
type CompareRequest struct {
    state         protoimpl.MessageState
    sizeCache     protoimpl.SizeCache
    unknownFields protoimpl.UnknownFields
    CompareType CompareType `protobuf:"varint,1,opt,name=compareType,proto3,enum=CompareType" json:"compareType,omitempty"`
    Payload     []byte      `protobuf:"bytes,2,opt,name=payload,proto3" json:"payload,omitempty"`
}
func (x *CompareRequest) Reset() {
    *x = CompareRequest{}
    if protoimpl.UnsafeEnabled {
        mi := &file_face_compare_proto_msgTypes[0]
        ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
        ms.StoreMessageInfo(mi)
    }
}
func (x *CompareRequest) String() string {
    return protoimpl.X.MessageStringOf(x)
}
func (*CompareRequest) ProtoMessage() {}
func (x *CompareRequest) ProtoReflect() protoreflect.Message {
    mi := &file_face_compare_proto_msgTypes[0]
    if protoimpl.UnsafeEnabled && x != nil {
        ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
        if ms.LoadMessageInfo() == nil {
            ms.StoreMessageInfo(mi)
        }
        return ms
    }
    return mi.MessageOf(x)
}
// Deprecated: Use CompareRequest.ProtoReflect.Descriptor instead.
func (*CompareRequest) Descriptor() ([]byte, []int) {
    return file_face_compare_proto_rawDescGZIP(), []int{0}
}
func (x *CompareRequest) GetCompareType() CompareType {
    if x != nil {
        return x.CompareType
    }
    return CompareType_Compare
}
func (x *CompareRequest) GetPayload() []byte {
    if x != nil {
        return x.Payload
    }
    return nil
}
var File_face_compare_proto protoreflect.FileDescriptor
var file_face_compare_proto_rawDesc = []byte{
    0x0a, 0x12, 0x66, 0x61, 0x63, 0x65, 0x5f, 0x63, 0x6f, 0x6d, 0x70, 0x61, 0x72, 0x65, 0x2e, 0x70,
    0x72, 0x6f, 0x74, 0x6f, 0x22, 0x5a, 0x0a, 0x0e, 0x43, 0x6f, 0x6d, 0x70, 0x61, 0x72, 0x65, 0x52,
    0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x2e, 0x0a, 0x0b, 0x63, 0x6f, 0x6d, 0x70, 0x61, 0x72,
    0x65, 0x54, 0x79, 0x70, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x0c, 0x2e, 0x43, 0x6f,
    0x6d, 0x70, 0x61, 0x72, 0x65, 0x54, 0x79, 0x70, 0x65, 0x52, 0x0b, 0x63, 0x6f, 0x6d, 0x70, 0x61,
    0x72, 0x65, 0x54, 0x79, 0x70, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61,
    0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64,
    0x2a, 0x2b, 0x0a, 0x0b, 0x43, 0x6f, 0x6d, 0x70, 0x61, 0x72, 0x65, 0x54, 0x79, 0x70, 0x65, 0x12,
    0x0b, 0x0a, 0x07, 0x43, 0x6f, 0x6d, 0x70, 0x61, 0x72, 0x65, 0x10, 0x00, 0x12, 0x0f, 0x0a, 0x0b,
    0x55, 0x70, 0x64, 0x61, 0x74, 0x65, 0x43, 0x61, 0x63, 0x68, 0x65, 0x10, 0x01, 0x42, 0x0f, 0x5a,
    0x0d, 0x2e, 0x2f, 0x66, 0x61, 0x63, 0x65, 0x63, 0x6f, 0x6d, 0x70, 0x61, 0x72, 0x65, 0x62, 0x06,
    0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
}
var (
    file_face_compare_proto_rawDescOnce sync.Once
    file_face_compare_proto_rawDescData = file_face_compare_proto_rawDesc
)
func file_face_compare_proto_rawDescGZIP() []byte {
    file_face_compare_proto_rawDescOnce.Do(func() {
        file_face_compare_proto_rawDescData = protoimpl.X.CompressGZIP(file_face_compare_proto_rawDescData)
    })
    return file_face_compare_proto_rawDescData
}
var file_face_compare_proto_enumTypes = make([]protoimpl.EnumInfo, 1)
var file_face_compare_proto_msgTypes = make([]protoimpl.MessageInfo, 1)
var file_face_compare_proto_goTypes = []interface{}{
    (CompareType)(0),       // 0: CompareType
    (*CompareRequest)(nil), // 1: CompareRequest
}
var file_face_compare_proto_depIdxs = []int32{
    0, // 0: CompareRequest.compareType:type_name -> CompareType
    1, // [1:1] is the sub-list for method output_type
    1, // [1:1] is the sub-list for method input_type
    1, // [1:1] is the sub-list for extension type_name
    1, // [1:1] is the sub-list for extension extendee
    0, // [0:1] is the sub-list for field type_name
}
func init() { file_face_compare_proto_init() }
func file_face_compare_proto_init() {
    if File_face_compare_proto != nil {
        return
    }
    if !protoimpl.UnsafeEnabled {
        file_face_compare_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} {
            switch v := v.(*CompareRequest); i {
            case 0:
                return &v.state
            case 1:
                return &v.sizeCache
            case 2:
                return &v.unknownFields
            default:
                return nil
            }
        }
    }
    type x struct{}
    out := protoimpl.TypeBuilder{
        File: protoimpl.DescBuilder{
            GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
            RawDescriptor: file_face_compare_proto_rawDesc,
            NumEnums:      1,
            NumMessages:   1,
            NumExtensions: 0,
            NumServices:   0,
        },
        GoTypes:           file_face_compare_proto_goTypes,
        DependencyIndexes: file_face_compare_proto_depIdxs,
        EnumInfos:         file_face_compare_proto_enumTypes,
        MessageInfos:      file_face_compare_proto_msgTypes,
    }.Build()
    File_face_compare_proto = out.File
    file_face_compare_proto_rawDesc = nil
    file_face_compare_proto_goTypes = nil
    file_face_compare_proto_depIdxs = nil
}