From 2cd1af13bc4e7aec4c85b9fe88db2d294af6468f Mon Sep 17 00:00:00 2001
From: zhangzengfei <zhangzengfei@smartai.com>
Date: 星期日, 08 十月 2023 11:24:37 +0800
Subject: [PATCH] 修复集群同步功能

---
 system-service/service/clusterService.go |   57 ++++++++++++++++++++++++++++++++++-----------------------
 1 files changed, 34 insertions(+), 23 deletions(-)

diff --git a/system-service/service/clusterService.go b/system-service/service/clusterService.go
index afe172b..2f978ef 100644
--- a/system-service/service/clusterService.go
+++ b/system-service/service/clusterService.go
@@ -112,19 +112,21 @@
 	arr, err := clusterE.FindAll()
 	if err == nil && (arr == nil || len(arr) == 0) {
 
-		b := clusterE.Create()
-		if b {
+		err = clusterE.Create()
+		if err == nil {
 			serf.InitAgent(context.Background())
 
 			chMsg := protomsg.DbChangeMessage{
 				Id:     clusterE.ClusterId,
 				Table:  protomsg.TableChanged_T_Cluster,
 				Action: protomsg.DbAction_Insert,
-				Info:   "",
+				Info:   "create",
 			}
 			s.AddDbMessage(&chMsg)
 
 			return true, clusterId
+		} else {
+			logger.Error("鍒濆鍖栭泦缇ゆ暟鎹簱淇℃伅澶辫触. ", err.Error())
 		}
 	} else {
 		if s.UpdateClusterName(clusterName, virtualIp) {
@@ -201,22 +203,21 @@
 	agent, err := dbSync.Init(joinArg.ClusterId, joinArg.Password, config.Server.AnalyServerId, joinIps, config.ClusterSet.SerfSnapShotPath, conf)
 	if err == nil && agent != nil { //鍔犲叆鎴愬姛
 		logger.Debug("AddCluster dbSync.Init success")
+		agent.RegisterHandleEventFunc(serf.HandleSerfEvent)
 		serf.Agent = agent
 
 		t := time.Now()
 		syncTableDataFromCluster(joinArg)
 		logger.Debugf("AddCluster  time=%v", time.Since(t))
 		//if syncB {
-		//	//闇�瑕侀噸鏂板垵濮嬪寲鏈湴姣斿杩涚▼
-		//	go serf.ReInitDbPersonCompareData()
-		//	chMsg := protomsg.DbChangeMessage{
-		//		Id:     joinArg.ClusterId,
-		//		Table:  protomsg.TableChanged_T_Cluster,
-		//		Action: protomsg.DbAction_Insert,
-		//		Info:   "",
-		//	}
-		//
-		//	s.AddDbMessage(&chMsg)
+		chMsg := protomsg.DbChangeMessage{
+			Id:     joinArg.ClusterId,
+			Table:  protomsg.TableChanged_T_Cluster,
+			Action: protomsg.DbAction_Insert,
+			Info:   "join",
+		}
+
+		s.AddDbMessage(&chMsg)
 		logger.Debugf("AddCluster 鍔犲叆闆嗙兢鎴愬姛 time=%v", time.Since(start))
 		return true, nil
 		//} else {
@@ -332,7 +333,7 @@
 			Id:     "",
 			Table:  protomsg.TableChanged_T_Cluster,
 			Action: protomsg.DbAction_Delete,
-			Info:   "",
+			Info:   "leave",
 		}
 		logger.Debugf("Leave delete db time=%v", time.Since(t))
 		tm := time.Now()
@@ -341,6 +342,7 @@
 	}
 
 	logger.Debugf("Leave success time=%v", time.Since(start))
+
 	return true, nil
 }
 
@@ -374,6 +376,7 @@
 	db := models.GetDB()
 	db.LogMode(false)
 	defer db.LogMode(true)
+
 	tx := db.Begin()
 	defer func() {
 		if err != nil && tx != nil {
@@ -385,16 +388,12 @@
 	tx.Exec("PRAGMA foreign_keys=OFF")
 	//1.鍒犻櫎鏈湴鐨勫悓姝ュ簱鏁版嵁
 	for _, t := range serf.SyncTables {
-		delSql := ""
-		if t == "dbtables" {
-			delSql = "delete from " + t + " where (analyServerId='' or analyServerId=NULL)"
-		} else if t == "dbtablepersons" {
-			delSql = "delete from " + t + " where tableId in (select id from dbtables where (analyServerId='' or analyServerId=NULL))"
-		} else {
-			delSql = "delete from " + t + ""
-		}
+		delSql := "delete from " + t + ""
+
 		err = tx.Exec(delSql).Error
 		if err != nil {
+			logger.Error("鍒犻櫎鏈湴鐨勫悓姝ュ簱鏁版嵁澶辫触,", err.Error())
+			logger.Error("sql:", delSql)
 			return false
 		}
 	}
@@ -404,8 +403,9 @@
 	dumpSqls, err = serf.GetTableDataFromCluster(serf.Agent, joinArg.ClusterId, serf.SyncTables, 20*time.Second)
 	if dumpSqls != nil && len(*dumpSqls) > 0 {
 		for _, sqlStr := range *dumpSqls {
-			logger.Debug("gorm exec dumpSql:", sqlStr)
+			//logger.Debug("gorm exec dumpSql:", sqlStr)
 			if err = tx.Exec(sqlStr).Error; err != nil {
+				logger.Error("gorm exec dumpSql:", sqlStr, " error:", err.Error())
 				return false
 			}
 		}
@@ -457,3 +457,14 @@
 	var lc models.Node
 	return lc.FindIpByNode(nodeId)
 }
+
+func ClusterSyncProcMessage(payload []byte) {
+	if serf.Agent == nil {
+		logger.Error("鏈姞鍏ラ泦缇�")
+	}
+
+	err := serf.Agent.UserEvent(serf.UserEventSyncMessage, payload, false)
+	if err != nil {
+		logger.Error("UserEventSyncMessage err:", err)
+	}
+}

--
Gitblit v1.8.0