From 883d8cfee0096974433fe4cbeac1a7a4ee86d9a3 Mon Sep 17 00:00:00 2001
From: zhangmeng <775834166@qq.com>
Date: 星期二, 26 三月 2024 10:42:33 +0800
Subject: [PATCH] add compare

---
 cache/compare.go |  104 +++++++++++++++++----------------------------------
 1 files changed, 35 insertions(+), 69 deletions(-)

diff --git a/cache/compare.go b/cache/compare.go
index ffaa428..d78760e 100644
--- a/cache/compare.go
+++ b/cache/compare.go
@@ -82,7 +82,7 @@
 	// 璋冪敤澧為噺鐨勬帴鍙�
 	captures, err := libEs.GetPeriodInfos(serverIp, alarmPort, laststring, curstring, indexName, shardStr, targetType)
 	if err != nil {
-		logger.Debug(err)
+		logger.Error(err)
 		return
 	}
 	for _, ei := range captures {
@@ -102,7 +102,7 @@
 func getShards(serverIp string, alarmIp string, alarmPort string) (string, error) {
 	infos, e := libEs.GetShardsByIndex(alarmIp, alarmPort, config.EsInfo.EsIndex.AiOcean.IndexName)
 	if e != nil || infos == nil {
-		logger.Debug("get es primary ips err")
+		logger.Error("get es primary ips err")
 		return "", errors.New("get es primary ips err")
 	}
 
@@ -148,7 +148,7 @@
 
 	captures, err := libEs.GetOceanFeatures(serverIp, alarmPort, 1000, indexName, shardStr, targetType)
 	if err != nil {
-		logger.Debug("libEs.GetOceanFeatures err:", err)
+		logger.Error("libEs.GetOceanFeatures err:", err)
 		return err
 	}
 	logger.Debug("len(captures):", len(captures))
@@ -238,7 +238,7 @@
 				defer wg.Done()
 				dbpersons, err := dbpApi.GetPersonsCompareCacheBase(j, queryEachNum)
 				if err != nil {
-					logger.Debug(err)
+					logger.Error(err)
 					return
 				}
 				logger.Debugf("鑾峰彇%d鏉′汉鍛樹俊鎭�", len(dbpersons))
@@ -266,56 +266,22 @@
 	}
 }
 
-// UpdateDbPersonsCache 鏇存柊缂撳瓨涓殑鍏ㄩ儴浜哄憳淇℃伅
-func UpdateDbPersonsCache() {
+// UpdateDbPersonsCacheById 鏇存柊缂撳瓨涓殑鍏ㄩ儴浜哄憳淇℃伅
+func UpdateDbPersonsCacheById(id string) {
 	var dbpApi DbPersons
-	total, e := dbpApi.GetPersonTotal("")
-	logger.Debugf("鎵�鏈夊簳搴撳叡鏈�%d鏉¤褰�", total)
-	if e == nil && total > 0 {
-		queryEachNum := *querynum
-		qn := int(total) / *threadnum
-		if *querynum < qn {
-			queryEachNum = qn
+	info, err := dbpApi.GetPersonsCompareCacheById(id)
+	if err != nil {
+		logger.Error(err)
+		return
+	}
+	if info.Tableid != "" {
+		Cmap.Lock()
+		defer Cmap.Unlock()
+		if _, ok := Cmap.Cam[info.Tableid]; !ok {
+			Cmap.Cam[info.Tableid] = shardmap.New(uint8(*threadnum))
 		}
-		queryT := int(total) / queryEachNum
-		if int(total)%queryEachNum > 0 {
-			queryT++
-		}
-		temptime := time.Now()
-		var wg sync.WaitGroup
-
-		for i := 0; i < queryT; i++ {
-			j := i * queryEachNum
-			wg.Add(1)
-			go func(qs int) {
-				defer wg.Done()
-				dbpersons, err := dbpApi.GetPersonsCompareCacheBase(j, queryEachNum)
-				if err != nil {
-					logger.Debug(err)
-					return
-				}
-				logger.Debugf("鑾峰彇%d鏉′汉鍛樹俊鎭�", len(dbpersons))
-				Cmap.Lock()
-				tableId := ""
-				for _, value := range dbpersons {
-					tableId = value.Tableid
-					if _, ok := Cmap.Cam[value.Tableid]; !ok {
-						Cmap.Cam[value.Tableid] = shardmap.New(uint8(*threadnum))
-					}
-
-					Cmap.Cam[value.Tableid].Set(value.Id, value)
-				}
-
-				if len(dbpersons) != 0 {
-					Cmap.Cam[tableId].Settime()
-				}
-
-				Cmap.Unlock()
-
-			}(j)
-		}
-		wg.Wait()
-		logger.Debug("搴曞簱浜哄憳缂撳瓨瀹屾垚鐢ㄦ椂:", time.Since(temptime))
+		Cmap.Cam[info.Tableid].Set(info.Id, info)
+		Cmap.Cam[info.Tableid].Settime()
 	}
 }
 
@@ -344,7 +310,7 @@
 		Cmap.Cam[tableId].Del(id)
 		logger.Debug("DelPerson ok success")
 	} else {
-		logger.Debug("tableId:", tableId, " not exist")
+		logger.Error("tableId:", tableId, " not exist")
 	}
 }
 
@@ -459,7 +425,7 @@
 			for _, tid := range compareArgs.TableIds { //ruleProcess姣斿鎸囧畾搴曞簱
 				shardins, ok := Cmap.Cam[tid]
 				if !ok {
-					logger.Debug("ruleProcess compare get shard error by tableId:", tid)
+					logger.Error("ruleProcess compare get shard error by tableId:", tid)
 					continue
 				}
 				if tShard, hasT := Cmap.Cam[PRE_DBTABLE]; hasT {
@@ -472,10 +438,10 @@
 							}
 						}
 					} else {
-						logger.Debug("ruleProcess compare tables,tShard not exist tableId:", tid)
+						logger.Error("ruleProcess compare tables,tShard not exist tableId:", tid)
 					}
 				} else {
-					logger.Debug("ruleProcess compare tables,PRE_DBTABLE tableId:", tid, " not exist")
+					logger.Error("ruleProcess compare tables,PRE_DBTABLE tableId:", tid, " not exist")
 				}
 			}
 		} else { //web璇锋眰锛屾瘮瀵规寚瀹氱殑鎶撴媿搴撴垨鑰呭簳搴�
@@ -484,7 +450,7 @@
 					tStart := time.Now()
 					serverIp, _, e := util.GetLocalIP(config.Server.NetworkAdapter)
 					if e != nil {
-						logger.Debug("util.GetLocalIP err:", e)
+						logger.Error("util.GetLocalIP err:", e)
 						continue
 					}
 					alarmPort := config.EsCompServerInfo.ESPort
@@ -553,7 +519,7 @@
 				} else {
 					shardins, ok := Cmap.Cam[tid]
 					if !ok {
-						logger.Debug("get shard error by tableId:", tid)
+						logger.Error("get shard error by tableId:", tid)
 						continue
 					}
 
@@ -570,7 +536,7 @@
 	buf, err := proto.Marshal(&scResult)
 
 	if err != nil {
-		logger.Debug("scResult Marshal error!", err)
+		logger.Error("scResult Marshal error!", err)
 		return nil
 	}
 
@@ -581,18 +547,18 @@
 
 	co_d, err := base64.StdEncoding.DecodeString(co)
 	if err != nil {
-		logger.Debug("DoSdkCompare err:", err)
+		logger.Error("DoSdkCompare err:", err)
 		return -1
 	}
-	if len(co_d) != 2560 {
-		logger.Debug("target fea.len !=2560")
-		return -1
-	}
-
-	if len(ci) != 2560 {
-		logger.Debug("source fea.len !=2560")
-		return -1
-	}
+	//if len(co_d) != 2560 {
+	//	logger.Error("target fea.len !=2560")
+	//	return -1
+	//}
+	//
+	//if len(ci) != 2560 {
+	//	logger.Error("source fea.len !=2560")
+	//	return -1
+	//}
 	sec := DecCompare(ci, co_d)
 	//logger.Debug("姣斿寰楀垎涓猴細", sec)
 

--
Gitblit v1.8.0