From 883d8cfee0096974433fe4cbeac1a7a4ee86d9a3 Mon Sep 17 00:00:00 2001 From: zhangmeng <775834166@qq.com> Date: 星期二, 26 三月 2024 10:42:33 +0800 Subject: [PATCH] add compare --- cache/compare.go | 104 +++++++++++++++++---------------------------------- 1 files changed, 35 insertions(+), 69 deletions(-) diff --git a/cache/compare.go b/cache/compare.go index ffaa428..d78760e 100644 --- a/cache/compare.go +++ b/cache/compare.go @@ -82,7 +82,7 @@ // 璋冪敤澧為噺鐨勬帴鍙� captures, err := libEs.GetPeriodInfos(serverIp, alarmPort, laststring, curstring, indexName, shardStr, targetType) if err != nil { - logger.Debug(err) + logger.Error(err) return } for _, ei := range captures { @@ -102,7 +102,7 @@ func getShards(serverIp string, alarmIp string, alarmPort string) (string, error) { infos, e := libEs.GetShardsByIndex(alarmIp, alarmPort, config.EsInfo.EsIndex.AiOcean.IndexName) if e != nil || infos == nil { - logger.Debug("get es primary ips err") + logger.Error("get es primary ips err") return "", errors.New("get es primary ips err") } @@ -148,7 +148,7 @@ captures, err := libEs.GetOceanFeatures(serverIp, alarmPort, 1000, indexName, shardStr, targetType) if err != nil { - logger.Debug("libEs.GetOceanFeatures err:", err) + logger.Error("libEs.GetOceanFeatures err:", err) return err } logger.Debug("len(captures):", len(captures)) @@ -238,7 +238,7 @@ defer wg.Done() dbpersons, err := dbpApi.GetPersonsCompareCacheBase(j, queryEachNum) if err != nil { - logger.Debug(err) + logger.Error(err) return } logger.Debugf("鑾峰彇%d鏉′汉鍛樹俊鎭�", len(dbpersons)) @@ -266,56 +266,22 @@ } } -// UpdateDbPersonsCache 鏇存柊缂撳瓨涓殑鍏ㄩ儴浜哄憳淇℃伅 -func UpdateDbPersonsCache() { +// UpdateDbPersonsCacheById 鏇存柊缂撳瓨涓殑鍏ㄩ儴浜哄憳淇℃伅 +func UpdateDbPersonsCacheById(id string) { var dbpApi DbPersons - total, e := dbpApi.GetPersonTotal("") - logger.Debugf("鎵�鏈夊簳搴撳叡鏈�%d鏉¤褰�", total) - if e == nil && total > 0 { - queryEachNum := *querynum - qn := int(total) / *threadnum - if *querynum < qn { - queryEachNum = qn + info, err := dbpApi.GetPersonsCompareCacheById(id) + if err != nil { + logger.Error(err) + return + } + if info.Tableid != "" { + Cmap.Lock() + defer Cmap.Unlock() + if _, ok := Cmap.Cam[info.Tableid]; !ok { + Cmap.Cam[info.Tableid] = shardmap.New(uint8(*threadnum)) } - queryT := int(total) / queryEachNum - if int(total)%queryEachNum > 0 { - queryT++ - } - temptime := time.Now() - var wg sync.WaitGroup - - for i := 0; i < queryT; i++ { - j := i * queryEachNum - wg.Add(1) - go func(qs int) { - defer wg.Done() - dbpersons, err := dbpApi.GetPersonsCompareCacheBase(j, queryEachNum) - if err != nil { - logger.Debug(err) - return - } - logger.Debugf("鑾峰彇%d鏉′汉鍛樹俊鎭�", len(dbpersons)) - Cmap.Lock() - tableId := "" - for _, value := range dbpersons { - tableId = value.Tableid - if _, ok := Cmap.Cam[value.Tableid]; !ok { - Cmap.Cam[value.Tableid] = shardmap.New(uint8(*threadnum)) - } - - Cmap.Cam[value.Tableid].Set(value.Id, value) - } - - if len(dbpersons) != 0 { - Cmap.Cam[tableId].Settime() - } - - Cmap.Unlock() - - }(j) - } - wg.Wait() - logger.Debug("搴曞簱浜哄憳缂撳瓨瀹屾垚鐢ㄦ椂:", time.Since(temptime)) + Cmap.Cam[info.Tableid].Set(info.Id, info) + Cmap.Cam[info.Tableid].Settime() } } @@ -344,7 +310,7 @@ Cmap.Cam[tableId].Del(id) logger.Debug("DelPerson ok success") } else { - logger.Debug("tableId:", tableId, " not exist") + logger.Error("tableId:", tableId, " not exist") } } @@ -459,7 +425,7 @@ for _, tid := range compareArgs.TableIds { //ruleProcess姣斿鎸囧畾搴曞簱 shardins, ok := Cmap.Cam[tid] if !ok { - logger.Debug("ruleProcess compare get shard error by tableId:", tid) + logger.Error("ruleProcess compare get shard error by tableId:", tid) continue } if tShard, hasT := Cmap.Cam[PRE_DBTABLE]; hasT { @@ -472,10 +438,10 @@ } } } else { - logger.Debug("ruleProcess compare tables,tShard not exist tableId:", tid) + logger.Error("ruleProcess compare tables,tShard not exist tableId:", tid) } } else { - logger.Debug("ruleProcess compare tables,PRE_DBTABLE tableId:", tid, " not exist") + logger.Error("ruleProcess compare tables,PRE_DBTABLE tableId:", tid, " not exist") } } } else { //web璇锋眰锛屾瘮瀵规寚瀹氱殑鎶撴媿搴撴垨鑰呭簳搴� @@ -484,7 +450,7 @@ tStart := time.Now() serverIp, _, e := util.GetLocalIP(config.Server.NetworkAdapter) if e != nil { - logger.Debug("util.GetLocalIP err:", e) + logger.Error("util.GetLocalIP err:", e) continue } alarmPort := config.EsCompServerInfo.ESPort @@ -553,7 +519,7 @@ } else { shardins, ok := Cmap.Cam[tid] if !ok { - logger.Debug("get shard error by tableId:", tid) + logger.Error("get shard error by tableId:", tid) continue } @@ -570,7 +536,7 @@ buf, err := proto.Marshal(&scResult) if err != nil { - logger.Debug("scResult Marshal error!", err) + logger.Error("scResult Marshal error!", err) return nil } @@ -581,18 +547,18 @@ co_d, err := base64.StdEncoding.DecodeString(co) if err != nil { - logger.Debug("DoSdkCompare err:", err) + logger.Error("DoSdkCompare err:", err) return -1 } - if len(co_d) != 2560 { - logger.Debug("target fea.len !=2560") - return -1 - } - - if len(ci) != 2560 { - logger.Debug("source fea.len !=2560") - return -1 - } + //if len(co_d) != 2560 { + // logger.Error("target fea.len !=2560") + // return -1 + //} + // + //if len(ci) != 2560 { + // logger.Error("source fea.len !=2560") + // return -1 + //} sec := DecCompare(ci, co_d) //logger.Debug("姣斿寰楀垎涓猴細", sec) -- Gitblit v1.8.0