From daac628c64069633787588372dea22499ac35396 Mon Sep 17 00:00:00 2001
From: zhangzengfei <zhangzengfei@smartai.com>
Date: 星期三, 18 十月 2023 16:52:56 +0800
Subject: [PATCH] 修复集群功能

---
 system-service/service/clusterService.go |   56 +++++++++++++++++++++++++++++++++-----------------------
 1 files changed, 33 insertions(+), 23 deletions(-)

diff --git a/system-service/service/clusterService.go b/system-service/service/clusterService.go
index 2f978ef..feffd61 100644
--- a/system-service/service/clusterService.go
+++ b/system-service/service/clusterService.go
@@ -3,6 +3,7 @@
 import (
 	"context"
 	"encoding/json"
+	"fmt"
 	"strconv"
 	sysSync "sync"
 	"time"
@@ -190,6 +191,7 @@
 		logger.Debug("AddCluster JoinCluster len(joinIps)=0")
 		return false, errors.New("鍔犲叆鐨勭洰鏍噄p涓嶈兘涓虹┖")
 	}
+
 	logger.Debug("AddCluster joinIps:", joinIps)
 	joinArg.Password = config.ClusterSet.PwdPre + joinArg.Password
 
@@ -207,29 +209,29 @@
 		serf.Agent = agent
 
 		t := time.Now()
-		syncTableDataFromCluster(joinArg)
+		syncClusterNodes := syncTableDataFromCluster(joinArg)
 		logger.Debugf("AddCluster  time=%v", time.Since(t))
-		//if syncB {
-		chMsg := protomsg.DbChangeMessage{
-			Id:     joinArg.ClusterId,
-			Table:  protomsg.TableChanged_T_Cluster,
-			Action: protomsg.DbAction_Insert,
-			Info:   "join",
-		}
+		if syncClusterNodes {
+			chMsg := protomsg.DbChangeMessage{
+				Id:     joinArg.ClusterId,
+				Table:  protomsg.TableChanged_T_Cluster,
+				Action: protomsg.DbAction_Insert,
+				Info:   "join",
+			}
 
-		s.AddDbMessage(&chMsg)
-		logger.Debugf("AddCluster 鍔犲叆闆嗙兢鎴愬姛 time=%v", time.Since(start))
-		return true, nil
-		//} else {
-		//	logger.Debug("AddCluster syncTableDataFromCluster fail")
-		//	if agent != nil {
-		//		agent.Leave()
-		//		err = agent.Shutdown()
-		//		if err != nil {
-		//			logger.Debug("AddCluster agent shutdown err:", err)
-		//		}
-		//	}
-		//}
+			s.AddDbMessage(&chMsg)
+			logger.Debugf("AddCluster 鍔犲叆闆嗙兢鎴愬姛 time=%v", time.Since(start))
+			return true, nil
+		} else {
+			logger.Debug("AddCluster syncTableDataFromCluster fail")
+			agent.Leave()
+			err = agent.Shutdown()
+			if err != nil {
+				logger.Debug("AddCluster agent shutdown err:", err)
+			}
+
+			return false, errors.New("鍔犲叆闆嗙兢澶辫触")
+		}
 	} else {
 		logger.Debug("AddCluster dbSync.Init err:", err)
 		if agent != nil {
@@ -239,7 +241,9 @@
 
 		}
 	}
+
 	logger.Debugf("AddCluster 鍔犲叆闆嗙兢澶辫触, targetIp=%v, time=%v", targetIp, time.Since(start))
+
 	return false, errors.New("鍔犲叆闆嗙兢澶辫触")
 }
 
@@ -403,7 +407,7 @@
 	dumpSqls, err = serf.GetTableDataFromCluster(serf.Agent, joinArg.ClusterId, serf.SyncTables, 20*time.Second)
 	if dumpSqls != nil && len(*dumpSqls) > 0 {
 		for _, sqlStr := range *dumpSqls {
-			//logger.Debug("gorm exec dumpSql:", sqlStr)
+			logger.Debug("gorm exec dumpSql:", sqlStr)
 			if err = tx.Exec(sqlStr).Error; err != nil {
 				logger.Error("gorm exec dumpSql:", sqlStr, " error:", err.Error())
 				return false
@@ -423,8 +427,11 @@
 	serverIp, _, e1 := util.GetLocalIP(config.Server.NetworkAdapter)
 	if e1 != nil || serverIp == "" {
 		err = errors.New("get serverIp err")
+
+		logger.Error("get serverIp err")
 		return false
 	}
+
 	logger.Debug("cur Node serverIp:", serverIp, "nodeName:", nodeName)
 	if nodeName == "" {
 		nodeName = serverIp
@@ -436,7 +443,9 @@
 		logger.Debug("add cur node err:", err)
 		return false
 	}
-	if err = tx.Exec("update cluster_node set isDelete=0 where id='" + config.Server.AnalyServerId + "'").Error; err != nil {
+
+	joinSql := fmt.Sprintf("update cluster_node set isDelete=0,drift_state='',node_ip='%s',create_time='%s' where node_id='%s'", serverIp+":"+strconv.Itoa(dbSync.DefaultBindPort), fmtTimeStr, config.Server.AnalyServerId)
+	if err = tx.Exec(joinSql).Error; err != nil {
 		logger.Debug("update isDelete err:", err)
 		return false
 	}
@@ -445,6 +454,7 @@
 	tx.Exec("PRAGMA foreign_keys=ON")
 	tx.Commit()
 	serf.SyncSql([]string{sql})
+
 	return true
 }
 

--
Gitblit v1.8.0