From 3a706d3378aa3626501370352963883fd2783558 Mon Sep 17 00:00:00 2001
From: zhangzengfei <zhangzengfei@smartai.com>
Date: 星期二, 28 十一月 2023 11:24:49 +0800
Subject: [PATCH] 添加appcenter,完善算法上传下载功能

---
 system-service/service/clusterService.go |  121 ++++++++++++++++++++++++++++------------
 1 files changed, 84 insertions(+), 37 deletions(-)

diff --git a/system-service/service/clusterService.go b/system-service/service/clusterService.go
index afe172b..a39741f 100644
--- a/system-service/service/clusterService.go
+++ b/system-service/service/clusterService.go
@@ -3,6 +3,7 @@
 import (
 	"context"
 	"encoding/json"
+	"fmt"
 	"strconv"
 	sysSync "sync"
 	"time"
@@ -112,19 +113,21 @@
 	arr, err := clusterE.FindAll()
 	if err == nil && (arr == nil || len(arr) == 0) {
 
-		b := clusterE.Create()
-		if b {
+		err = clusterE.Create()
+		if err == nil {
 			serf.InitAgent(context.Background())
 
 			chMsg := protomsg.DbChangeMessage{
 				Id:     clusterE.ClusterId,
 				Table:  protomsg.TableChanged_T_Cluster,
 				Action: protomsg.DbAction_Insert,
-				Info:   "",
+				Info:   "create",
 			}
 			s.AddDbMessage(&chMsg)
 
 			return true, clusterId
+		} else {
+			logger.Error("鍒濆鍖栭泦缇ゆ暟鎹簱淇℃伅澶辫触. ", err.Error())
 		}
 	} else {
 		if s.UpdateClusterName(clusterName, virtualIp) {
@@ -133,6 +136,31 @@
 	}
 
 	return false, ""
+}
+
+// 鏍规嵁闆嗙兢鍚嶇О鍜屽瘑鐮佸垱寤洪泦缇�
+func (s ClusterService) UpdateDriftStateByNodeId(clusterId, nodeId, role string) (bool, string) {
+	var node models.Node
+	isSuccess := node.UpdateDriftStateByNodeId(role, nodeId, false)
+
+	if isSuccess {
+		// 閫氱煡涓昏妭鐐瑰彉鏇�
+		chMsg := protomsg.DbChangeMessage{
+			Id:     clusterId,
+			Table:  protomsg.TableChanged_T_Cluster,
+			Action: protomsg.DbAction_Insert,
+			Info:   "slave2master",
+		}
+
+		s.AddDbMessage(&chMsg)
+
+		err := serf.Agent.UserEvent(serf.UserEventChangeMaster, []byte(nodeId), false)
+		if err != nil {
+			logger.Error("UserEventSyncVirtualIp err:", err)
+		}
+	}
+
+	return isSuccess, ""
 }
 
 func (s ClusterService) SearchByPwd(pwd string) (err error) {
@@ -188,6 +216,7 @@
 		logger.Debug("AddCluster JoinCluster len(joinIps)=0")
 		return false, errors.New("鍔犲叆鐨勭洰鏍噄p涓嶈兘涓虹┖")
 	}
+
 	logger.Debug("AddCluster joinIps:", joinIps)
 	joinArg.Password = config.ClusterSet.PwdPre + joinArg.Password
 
@@ -201,34 +230,33 @@
 	agent, err := dbSync.Init(joinArg.ClusterId, joinArg.Password, config.Server.AnalyServerId, joinIps, config.ClusterSet.SerfSnapShotPath, conf)
 	if err == nil && agent != nil { //鍔犲叆鎴愬姛
 		logger.Debug("AddCluster dbSync.Init success")
+		agent.RegisterHandleEventFunc(serf.HandleSerfEvent)
 		serf.Agent = agent
 
 		t := time.Now()
-		syncTableDataFromCluster(joinArg)
+		syncClusterNodes := syncTableDataFromCluster(joinArg)
 		logger.Debugf("AddCluster  time=%v", time.Since(t))
-		//if syncB {
-		//	//闇�瑕侀噸鏂板垵濮嬪寲鏈湴姣斿杩涚▼
-		//	go serf.ReInitDbPersonCompareData()
-		//	chMsg := protomsg.DbChangeMessage{
-		//		Id:     joinArg.ClusterId,
-		//		Table:  protomsg.TableChanged_T_Cluster,
-		//		Action: protomsg.DbAction_Insert,
-		//		Info:   "",
-		//	}
-		//
-		//	s.AddDbMessage(&chMsg)
-		logger.Debugf("AddCluster 鍔犲叆闆嗙兢鎴愬姛 time=%v", time.Since(start))
-		return true, nil
-		//} else {
-		//	logger.Debug("AddCluster syncTableDataFromCluster fail")
-		//	if agent != nil {
-		//		agent.Leave()
-		//		err = agent.Shutdown()
-		//		if err != nil {
-		//			logger.Debug("AddCluster agent shutdown err:", err)
-		//		}
-		//	}
-		//}
+		if syncClusterNodes {
+			chMsg := protomsg.DbChangeMessage{
+				Id:     joinArg.ClusterId,
+				Table:  protomsg.TableChanged_T_Cluster,
+				Action: protomsg.DbAction_Insert,
+				Info:   "join",
+			}
+
+			s.AddDbMessage(&chMsg)
+			logger.Debugf("AddCluster 鍔犲叆闆嗙兢鎴愬姛 time=%v", time.Since(start))
+			return true, nil
+		} else {
+			logger.Debug("AddCluster syncTableDataFromCluster fail")
+			agent.Leave()
+			err = agent.Shutdown()
+			if err != nil {
+				logger.Debug("AddCluster agent shutdown err:", err)
+			}
+
+			return false, errors.New("鍔犲叆闆嗙兢澶辫触")
+		}
 	} else {
 		logger.Debug("AddCluster dbSync.Init err:", err)
 		if agent != nil {
@@ -238,7 +266,9 @@
 
 		}
 	}
+
 	logger.Debugf("AddCluster 鍔犲叆闆嗙兢澶辫触, targetIp=%v, time=%v", targetIp, time.Since(start))
+
 	return false, errors.New("鍔犲叆闆嗙兢澶辫触")
 }
 
@@ -332,7 +362,7 @@
 			Id:     "",
 			Table:  protomsg.TableChanged_T_Cluster,
 			Action: protomsg.DbAction_Delete,
-			Info:   "",
+			Info:   "leave",
 		}
 		logger.Debugf("Leave delete db time=%v", time.Since(t))
 		tm := time.Now()
@@ -341,6 +371,7 @@
 	}
 
 	logger.Debugf("Leave success time=%v", time.Since(start))
+
 	return true, nil
 }
 
@@ -374,6 +405,7 @@
 	db := models.GetDB()
 	db.LogMode(false)
 	defer db.LogMode(true)
+
 	tx := db.Begin()
 	defer func() {
 		if err != nil && tx != nil {
@@ -385,16 +417,12 @@
 	tx.Exec("PRAGMA foreign_keys=OFF")
 	//1.鍒犻櫎鏈湴鐨勫悓姝ュ簱鏁版嵁
 	for _, t := range serf.SyncTables {
-		delSql := ""
-		if t == "dbtables" {
-			delSql = "delete from " + t + " where (analyServerId='' or analyServerId=NULL)"
-		} else if t == "dbtablepersons" {
-			delSql = "delete from " + t + " where tableId in (select id from dbtables where (analyServerId='' or analyServerId=NULL))"
-		} else {
-			delSql = "delete from " + t + ""
-		}
+		delSql := "delete from " + t + ""
+
 		err = tx.Exec(delSql).Error
 		if err != nil {
+			logger.Error("鍒犻櫎鏈湴鐨勫悓姝ュ簱鏁版嵁澶辫触,", err.Error())
+			logger.Error("sql:", delSql)
 			return false
 		}
 	}
@@ -406,6 +434,7 @@
 		for _, sqlStr := range *dumpSqls {
 			logger.Debug("gorm exec dumpSql:", sqlStr)
 			if err = tx.Exec(sqlStr).Error; err != nil {
+				logger.Error("gorm exec dumpSql:", sqlStr, " error:", err.Error())
 				return false
 			}
 		}
@@ -423,8 +452,11 @@
 	serverIp, _, e1 := util.GetLocalIP(config.Server.NetworkAdapter)
 	if e1 != nil || serverIp == "" {
 		err = errors.New("get serverIp err")
+
+		logger.Error("get serverIp err")
 		return false
 	}
+
 	logger.Debug("cur Node serverIp:", serverIp, "nodeName:", nodeName)
 	if nodeName == "" {
 		nodeName = serverIp
@@ -436,7 +468,9 @@
 		logger.Debug("add cur node err:", err)
 		return false
 	}
-	if err = tx.Exec("update cluster_node set isDelete=0 where id='" + config.Server.AnalyServerId + "'").Error; err != nil {
+
+	joinSql := fmt.Sprintf("update cluster_node set isDelete=0,drift_state='slave',node_ip='%s',create_time='%s' where node_id='%s'", serverIp+":"+strconv.Itoa(dbSync.DefaultBindPort), fmtTimeStr, config.Server.AnalyServerId)
+	if err = tx.Exec(joinSql).Error; err != nil {
 		logger.Debug("update isDelete err:", err)
 		return false
 	}
@@ -445,6 +479,7 @@
 	tx.Exec("PRAGMA foreign_keys=ON")
 	tx.Commit()
 	serf.SyncSql([]string{sql})
+
 	return true
 }
 
@@ -457,3 +492,15 @@
 	var lc models.Node
 	return lc.FindIpByNode(nodeId)
 }
+
+func ClusterSyncProcMessage(payload []byte) {
+	if serf.Agent == nil {
+		logger.Error("鏈姞鍏ラ泦缇�")
+		return
+	}
+
+	err := serf.Agent.UserEvent(serf.UserEventSyncMessage, payload, false)
+	if err != nil {
+		logger.Error("UserEventSyncMessage err:", err)
+	}
+}

--
Gitblit v1.8.0