zhangzengfei
2024-03-29 3cdf9117668a91f2fefcda89eb4263d7fb2f4434
精简代码
3个文件已修改
253 ■■■■■ 已修改文件
cache/compare.go 221 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
face/faceCompare.go 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
main.go 27 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
cache/compare.go
@@ -2,7 +2,6 @@
import (
    "encoding/base64"
    "errors"
    "flag"
    "fmt"
    "sdkCompare/face"
@@ -11,14 +10,10 @@
    "sync"
    "time"
    "sdkCompare/cache/shardmap"
    "sdkCompare/config"
    "sdkCompare/util"
    libEs "basic.com/pubsub/esutil.git"
    "basic.com/pubsub/protomsg.git"
    "basic.com/valib/logger.git"
    "github.com/golang/protobuf/proto"
    "sdkCompare/cache/shardmap"
)
var querynum = flag.Int("querynum", 3000, "the query number from database")
@@ -27,7 +22,6 @@
var (
    thresholdLimit     = float32(50)
    captureTable       = "capturetable"
    cartable           = "carTable_"
    PRE_DBTABLE        = "dbTable_"
    PRE_CAPTURE_SERVER = "captureServer_"
)
@@ -39,138 +33,6 @@
var Cmap *CmapItem
var doOnce sync.Once
// 计算每次查询的数据量条数
func Querynum(totalnum int) int {
    qn := totalnum / *threadnum //qn=6551
    if *querynum < qn {         //
        return *querynum
    }
    return qn
}
// 增量查询
func IncreVideoPersonsCache(lastT time.Time, targetType string) {
    ticker := time.NewTicker(time.Minute * 3)
    for {
        select {
        case <-ticker.C:
            curTime := time.Now()
            Incrementquery(lastT, curTime, targetType)
            lastT = curTime
        }
    }
}
func Incrementquery(last time.Time, cur time.Time, targetType string) {
    laststring := last.Format("2006-01-02 15:04:05")
    curstring := cur.Format("2006-01-02 15:04:05")
    alarmIp := config.EsCompServerInfo.ESIP
    alarmPort := config.EsCompServerInfo.ESPort
    indexName := config.EsInfo.EsIndex.AiOcean.IndexName
    serverIp, _, e := util.GetLocalIP(config.Server.NetworkAdapter)
    if e != nil {
        return
    }
    shardStr, e := getShards(serverIp, alarmIp, alarmPort)
    if e != nil {
        return
    }
    // 调用增量的接口
    captures, err := libEs.GetPeriodInfos(serverIp, alarmPort, laststring, curstring, indexName, shardStr, targetType)
    if err != nil {
        logger.Error(err)
        return
    }
    for _, ei := range captures {
        if ei.EsInfo.AnalyServerId != "" {
            cKey := PRE_CAPTURE_SERVER + ei.EsInfo.AnalyServerId
            ei.EsInfo.Tableid = captureTable
            if _, ok := Cmap.Cam[cKey]; !ok {
                Cmap.Cam[cKey] = shardmap.New(uint8(*threadnum))
            }
            Cmap.Cam[cKey].Set(ei.EsInfo.Id, ei.EsInfo)
        }
    }
}
func getShards(serverIp string, alarmIp string, alarmPort string) (string, error) {
    infos, e := libEs.GetShardsByIndex(alarmIp, alarmPort, config.EsInfo.EsIndex.AiOcean.IndexName)
    if e != nil || infos == nil {
        logger.Error("get es primary ips err")
        return "", errors.New("get es primary ips err")
    }
    var shards []string
    for _, shard := range infos {
        if (shard.ShardIp == serverIp || shard.ShardIp == "127.0.0.1") && shard.ShardRole == "primary" && shard.ShardState == "STARTED" {
            shards = append(shards, strconv.Itoa(shard.ShardNum))
        }
    }
    if len(shards) == 0 {
        return "", errors.New("current shards is empty")
    }
    return strings.Join(shards, ","), nil
}
// 1. 拿到总量, 计算每个线程的查询量
// 2. 分线程查询
func Init(indexName string, targetType string) error {
    alarmIp := config.EsCompServerInfo.ESIP
    alarmPort := config.EsCompServerInfo.ESPort
    doOnce.Do(func() {
        flag.Parse()
        Cmap = &CmapItem{
            Cam: make(map[string]*shardmap.ShardMap),
        }
    })
    serverIp, _, e := util.GetLocalIP(config.Server.NetworkAdapter)
    if e != nil {
        return e
    }
    shardStr, e := getShards(serverIp, alarmIp, alarmPort)
    if e != nil {
        return e
    }
    // 这里需要提供总量的接口
    estotalnum := libEs.GetTotal(serverIp, alarmPort, indexName, shardStr, targetType)
    *querynum = Querynum(estotalnum)
    temptime := time.Now()
    captures, err := libEs.GetOceanFeatures(serverIp, alarmPort, 1000, indexName, shardStr, targetType)
    if err != nil {
        logger.Error("libEs.GetOceanFeatures err:", err)
        return err
    }
    logger.Debug("len(captures):", len(captures))
    Cmap.Lock()
    for _, ei := range captures {
        if ei.EsInfo.AnalyServerId != "" {
            cKey := PRE_CAPTURE_SERVER + ei.EsInfo.AnalyServerId
            ei.EsInfo.Tableid = captureTable
            if _, ok := Cmap.Cam[cKey]; !ok {
                Cmap.Cam[cKey] = shardmap.New(uint8(*threadnum))
            }
            Cmap.Cam[cKey].Set(ei.EsInfo.Id, ei.EsInfo)
        }
    }
    Cmap.Unlock()
    logger.Debug(indexName, "缓存完成用时:", time.Since(temptime))
    return nil
}
func ReInitDbTablePersonsCache() {
    Cmap.Lock()
@@ -447,77 +309,6 @@
            }
        } else { //web请求,比对指定的抓拍库或者底库
            for _, tid := range compareArgs.TableIds {
                if tid == captureTable { //比对抓拍库
                    tStart := time.Now()
                    serverIp, _, e := util.GetLocalIP(config.Server.NetworkAdapter)
                    if e != nil {
                        logger.Error("util.GetLocalIP err:", e)
                        continue
                    }
                    alarmPort := config.EsCompServerInfo.ESPort
                    scopeIds := libEs.GetAllLocalVideopersonsId(compareArgs, config.EsInfo.EsIndex.AiOcean.IndexName, serverIp, alarmPort, compareArgs.AlarmLevel)
                    logger.Debug("libEs.GetAllLocalVideopersonsId len(scopeIds):", len(scopeIds), " 耗时:", time.Since(tStart))
                    if scopeIds != nil {
                        tCompStart := time.Now()
                        if compareArgs.AnalyServerId != "" { //指定server
                            for cKey, cacheMap := range Cmap.Cam {
                                if cKey == PRE_CAPTURE_SERVER+compareArgs.AnalyServerId {
                                    for _, sId := range scopeIds {
                                        obj, ok := cacheMap.Get(sId)
                                        if ok {
                                            eInfo, ok := obj.(*protomsg.Esinfo)
                                            if !ok {
                                                continue
                                            }
                                            sec := DoSdkCompare(compareArgs.FaceFeature, eInfo.FaceFeature)
                                            if sec >= baseScore {
                                                scResult.CompareResult = append(scResult.CompareResult, &protomsg.SdkCompareEach{
                                                    Id:           eInfo.Id,
                                                    CompareScore: sec,
                                                    Tableid:      eInfo.Tableid,
                                                })
                                            }
                                        }
                                    }
                                }
                            }
                        } else { //管理平台请求比对所有抓拍,不指定server
                            for cKey, cacheMap := range Cmap.Cam {
                                if len(compareArgs.ServerIds) > 0 {
                                    for _, termDevId := range compareArgs.ServerIds {
                                        if cKey == PRE_CAPTURE_SERVER+termDevId || !strings.HasPrefix(cKey, PRE_CAPTURE_SERVER) {
                                            targets := cacheMap.Walk(DoSdkCompare, compareArgs.FaceFeature, baseScore, compareArgs.Source, compareArgs.CompareTarget)
                                            if len(targets) > 0 {
                                                scResult.CompareResult = append(scResult.CompareResult, targets...)
                                            }
                                        }
                                    }
                                } else {
                                    if strings.HasPrefix(cKey, PRE_CAPTURE_SERVER) {
                                        for _, sId := range scopeIds {
                                            if obj, ok := cacheMap.Get(sId); ok {
                                                eInfo, ok := obj.(*protomsg.Esinfo)
                                                if !ok {
                                                    continue
                                                }
                                                sec := DoSdkCompare(compareArgs.FaceFeature, eInfo.FaceFeature)
                                                if sec >= baseScore {
                                                    scResult.CompareResult = append(scResult.CompareResult, &protomsg.SdkCompareEach{
                                                        Id:           eInfo.Id,
                                                        CompareScore: sec,
                                                        Tableid:      eInfo.Tableid,
                                                    })
                                                }
                                            }
                                        }
                                    }
                                }
                            }
                        }
                        logger.Debug("根据scopeIds比对耗时:", time.Since(tCompStart))
                    }
                } else {
                    shardins, ok := Cmap.Cam[tid]
                    if !ok {
                        logger.Error("get shard error by tableId:", tid)
@@ -527,7 +318,6 @@
                    targets := shardins.Walk(DoSdkCompare, compareArgs.FaceFeature, baseScore, compareArgs.Source, compareArgs.CompareTarget)
                    if len(targets) > 0 {
                        scResult.CompareResult = append(scResult.CompareResult, targets...)
                    }
                }
            }
        }
@@ -550,15 +340,6 @@
        logger.Error("DoSdkCompare err:", err)
        return -1
    }
    //if len(co_d) != 2560 {
    //    logger.Error("target fea.len !=2560")
    //    return -1
    //}
    //
    //if len(ci) != 2560 {
    //    logger.Error("source fea.len !=2560")
    //    return -1
    //}
    sec := face.DecCompare(ci, co_d)
    //logger.Debug("比对得分为:", sec)
face/faceCompare.go
@@ -1,7 +1,6 @@
package face
import (
    "fmt"
    "unsafe"
)
@@ -69,7 +68,7 @@
    if len(ffeat1) != len(ffeat2) {
        return 0
    }
    fmt.Println("len:", len(ffeat1), len(feat2))
    //fmt.Println("len:", len(ffeat1), len(feat2))
    //fmt.Println("ffeat1:", ffeat1, "ffeat2:", ffeat2, "len:", len(ffeat1), len(feat2))
    // normalize
    var score float32
@@ -83,7 +82,7 @@
    if score < 0.0001 {
        score = 0.0001
    }
    fmt.Println("score:", score)
    //fmt.Println("score:", score)
    return score
}
main.go
@@ -60,33 +60,6 @@
    cache.InitDbTablePersons()
    serveUrl = serveUrl + strconv.Itoa(config.DbPersonCompInfo.ServePort)
    //if procName == "dbCompare" {
    //    if err := cache.ConnectDB(); err != nil {
    //        logger.Error(err.Error())
    //        return
    //    }
    //
    //    cache.InitDbTablePersons()
    //    if !cache.InitCompare() {
    //        logger.Debug("init SDKFace return false,panic")
    //        return
    //    }
    //    serveUrl = serveUrl + strconv.Itoa(config.DbPersonCompInfo.ServePort)
    //} else {
    //    if err := cache.Init(config.EsInfo.EsIndex.AiOcean.IndexName, targetType1); err != nil {
    //        logger.Info("init capture cache err:", err)
    //        return
    //    }
    //
    //    if !cache.InitCompare() {
    //        logger.Debug("init SDKFace return false,panic")
    //        return
    //    }
    //
    //    go cache.IncreVideoPersonsCache(time.Now(), targetType1)
    //    serveUrl = serveUrl + strconv.Itoa(config.EsCompServerInfo.ServePort)
    //}
    logger.Debugf("%s serve url:%s", procName, serveUrl)
    Recv(serveUrl)