From 3a706d3378aa3626501370352963883fd2783558 Mon Sep 17 00:00:00 2001
From: zhangzengfei <zhangzengfei@smartai.com>
Date: 星期二, 28 十一月 2023 11:24:49 +0800
Subject: [PATCH] 添加appcenter,完善算法上传下载功能

---
 system-service/service/clusterService.go |  159 ++++++++++++++++++++++++++--------------------------
 1 files changed, 79 insertions(+), 80 deletions(-)

diff --git a/system-service/service/clusterService.go b/system-service/service/clusterService.go
index 07a9928..a39741f 100644
--- a/system-service/service/clusterService.go
+++ b/system-service/service/clusterService.go
@@ -3,6 +3,7 @@
 import (
 	"context"
 	"encoding/json"
+	"fmt"
 	"strconv"
 	sysSync "sync"
 	"time"
@@ -98,12 +99,8 @@
 	return m
 }
 
-//鏍规嵁闆嗙兢鍚嶇О鍜屽瘑鐮佸垱寤洪泦缇�
+// 鏍规嵁闆嗙兢鍚嶇О鍜屽瘑鐮佸垱寤洪泦缇�
 func (s ClusterService) Create(clusterName string, pwd string, virtualIp string) (bool, string) {
-	devTyp := GetDevType(config.Server.DeviceType)
-	if devTyp != DevType_Storage && devTyp != DevType_Analysis_Storage {
-		return false, "鍙湁鍒嗘瀽鍜屽瓨鍌ㄨ妭鐐规墠鑳藉垱寤洪泦缇�"
-	}
 	clusterId := uuid.NewV4().String()
 	pwd = config.ClusterSet.PwdPre + pwd
 	var clusterE = models.Cluster{
@@ -116,32 +113,21 @@
 	arr, err := clusterE.FindAll()
 	if err == nil && (arr == nil || len(arr) == 0) {
 
-		b := clusterE.Create()
-		if b {
+		err = clusterE.Create()
+		if err == nil {
 			serf.InitAgent(context.Background())
-
-			//鍒涘缓es闆嗙兢
-			if _, err := CreateOriginalCluster("/opt/elasticsearch/config", "/opt/vasystem/script", "/opt/vasystem/indexInit"); err != nil {
-				//鍒涘缓澶辫触锛屽垯serf闆嗙兢涔熻鎺ㄥ嚭
-				s.Leave(false)
-				return false, err.Error()
-			}
-
-			//鍒涘缓weedfs闆嗙兢
-			if _, err = CreateWeedfsServer("/opt/vasystem/script"); err != nil {
-				s.Leave(false)
-				return false, err.Error()
-			}
 
 			chMsg := protomsg.DbChangeMessage{
 				Id:     clusterE.ClusterId,
 				Table:  protomsg.TableChanged_T_Cluster,
 				Action: protomsg.DbAction_Insert,
-				Info:   "",
+				Info:   "create",
 			}
 			s.AddDbMessage(&chMsg)
 
 			return true, clusterId
+		} else {
+			logger.Error("鍒濆鍖栭泦缇ゆ暟鎹簱淇℃伅澶辫触. ", err.Error())
 		}
 	} else {
 		if s.UpdateClusterName(clusterName, virtualIp) {
@@ -150,6 +136,31 @@
 	}
 
 	return false, ""
+}
+
+// 鏍规嵁闆嗙兢鍚嶇О鍜屽瘑鐮佸垱寤洪泦缇�
+func (s ClusterService) UpdateDriftStateByNodeId(clusterId, nodeId, role string) (bool, string) {
+	var node models.Node
+	isSuccess := node.UpdateDriftStateByNodeId(role, nodeId, false)
+
+	if isSuccess {
+		// 閫氱煡涓昏妭鐐瑰彉鏇�
+		chMsg := protomsg.DbChangeMessage{
+			Id:     clusterId,
+			Table:  protomsg.TableChanged_T_Cluster,
+			Action: protomsg.DbAction_Insert,
+			Info:   "slave2master",
+		}
+
+		s.AddDbMessage(&chMsg)
+
+		err := serf.Agent.UserEvent(serf.UserEventChangeMaster, []byte(nodeId), false)
+		if err != nil {
+			logger.Error("UserEventSyncVirtualIp err:", err)
+		}
+	}
+
+	return isSuccess, ""
 }
 
 func (s ClusterService) SearchByPwd(pwd string) (err error) {
@@ -186,7 +197,7 @@
 	}
 }
 
-//鍔犲叆闆嗙兢
+// 鍔犲叆闆嗙兢
 func (s ClusterService) JoinCluster(joinArg *vo.ClusterJoinVo) (bool, error) {
 	start := time.Now()
 	if config.Server.AnalyServerId == "" {
@@ -205,6 +216,7 @@
 		logger.Debug("AddCluster JoinCluster len(joinIps)=0")
 		return false, errors.New("鍔犲叆鐨勭洰鏍噄p涓嶈兘涓虹┖")
 	}
+
 	logger.Debug("AddCluster joinIps:", joinIps)
 	joinArg.Password = config.ClusterSet.PwdPre + joinArg.Password
 
@@ -218,49 +230,32 @@
 	agent, err := dbSync.Init(joinArg.ClusterId, joinArg.Password, config.Server.AnalyServerId, joinIps, config.ClusterSet.SerfSnapShotPath, conf)
 	if err == nil && agent != nil { //鍔犲叆鎴愬姛
 		logger.Debug("AddCluster dbSync.Init success")
+		agent.RegisterHandleEventFunc(serf.HandleSerfEvent)
 		serf.Agent = agent
 
 		t := time.Now()
-		syncB := syncTableDataFromCluster(joinArg)
-		logger.Debugf("AddCluster syncTableDataFromCluster time=%v, syncB=%v", time.Since(t), syncB)
-		if syncB {
-			devTyp := GetDevType(config.Server.DeviceType)
-			if devTyp == DevType_Analysis_Storage || devTyp == DevType_Storage { //鍙湁鑳藉惎鍔‥S鐨勮妭鐐规墠鑳藉姞鍏S
-				go func() {
-					//娣诲姞ES鑺傜偣
-					if _, err := AddCluster("/opt/elasticsearch/config", "/opt/vasystem/script", targetIp, "9200", config.EsConfig.StorePath); err != nil {
-						s.Leave(false)
-						return
-					}
-
-					//娣诲姞weedfs鑺傜偣
-					if _, err := AddWeedfsServer("/opt/vasystem/script", targetIp, config.StorageConf.VolumePath); err != nil {
-						s.Leave(false)
-						return
-					}
-				}()
-			}
-
-			//闇�瑕侀噸鏂板垵濮嬪寲鏈湴姣斿杩涚▼
-			go serf.ReInitDbPersonCompareData()
+		syncClusterNodes := syncTableDataFromCluster(joinArg)
+		logger.Debugf("AddCluster  time=%v", time.Since(t))
+		if syncClusterNodes {
 			chMsg := protomsg.DbChangeMessage{
 				Id:     joinArg.ClusterId,
 				Table:  protomsg.TableChanged_T_Cluster,
 				Action: protomsg.DbAction_Insert,
-				Info:   "",
+				Info:   "join",
 			}
+
 			s.AddDbMessage(&chMsg)
 			logger.Debugf("AddCluster 鍔犲叆闆嗙兢鎴愬姛 time=%v", time.Since(start))
 			return true, nil
 		} else {
 			logger.Debug("AddCluster syncTableDataFromCluster fail")
-			if agent != nil {
-				agent.Leave()
-				err = agent.Shutdown()
-				if err != nil {
-					logger.Debug("AddCluster agent shutdown err:", err)
-				}
+			agent.Leave()
+			err = agent.Shutdown()
+			if err != nil {
+				logger.Debug("AddCluster agent shutdown err:", err)
 			}
+
+			return false, errors.New("鍔犲叆闆嗙兢澶辫触")
 		}
 	} else {
 		logger.Debug("AddCluster dbSync.Init err:", err)
@@ -271,7 +266,9 @@
 
 		}
 	}
+
 	logger.Debugf("AddCluster 鍔犲叆闆嗙兢澶辫触, targetIp=%v, time=%v", targetIp, time.Since(start))
+
 	return false, errors.New("鍔犲叆闆嗙兢澶辫触")
 }
 
@@ -282,7 +279,7 @@
 	DevType_Other            = "other"            //鍏朵粬璁惧绫诲瀷锛宔g锛氬簲鐢�
 )
 
-//鑾峰彇鏈満绫诲瀷锛屽彧杩涜鍒嗘瀽銆佸垎鏋愬瓨鍌ㄤ竴浣撱�佸彧瀛樺偍銆佸叾浠栥�傘��
+// 鑾峰彇鏈満绫诲瀷锛屽彧杩涜鍒嗘瀽銆佸垎鏋愬瓨鍌ㄤ竴浣撱�佸彧瀛樺偍銆佸叾浠栥�傘��
 func GetDevType(dt string) string {
 	if dt != "" {
 		if len(dt) >= 4 {
@@ -336,23 +333,6 @@
 
 func (s ClusterService) Leave(isDel bool) (bool, error) {
 	start := time.Now()
-	devType := GetDevType(config.Server.DeviceType)
-	logger.Debugf("Leave GetDevType=%v, isDel=%v", devType, isDel)
-
-	if devType == DevType_Analysis_Storage || devType == DevType_Storage { //鍙湁鍒嗘瀽鍜屽瓨鍌ㄦ墠鏈塃S闆嗙兢
-		go func() {
-			//閫�鍑簑eedfs鑺傜偣
-			if _, err := ExitWeedfsServer("/opt/vasystem/script", isDel, config.StorageConf.VolumePath); err != nil {
-				logger.Error("Leave ExitWeedfsServer err:", err)
-				return
-			}
-			//閫�鍑篹s鑺傜偣
-			if _, err := ExitCluster("/opt/elasticsearch/config", "/opt/vasystem/script"); err != nil {
-				logger.Error("Leave ExitCluster es cluster err:", err)
-				return
-			}
-		}()
-	}
 
 	var err error
 	tx := models.GetDB().Begin()
@@ -382,7 +362,7 @@
 			Id:     "",
 			Table:  protomsg.TableChanged_T_Cluster,
 			Action: protomsg.DbAction_Delete,
-			Info:   "",
+			Info:   "leave",
 		}
 		logger.Debugf("Leave delete db time=%v", time.Since(t))
 		tm := time.Now()
@@ -391,6 +371,7 @@
 	}
 
 	logger.Debugf("Leave success time=%v", time.Since(start))
+
 	return true, nil
 }
 
@@ -411,7 +392,7 @@
 	return false
 }
 
-//鍔犲叆闆嗙兢鍚庢竻绌烘湰鍦扮殑鍚屾搴撴暟鎹�
+// 鍔犲叆闆嗙兢鍚庢竻绌烘湰鍦扮殑鍚屾搴撴暟鎹�
 func syncTableDataFromCluster(joinArg *vo.ClusterJoinVo) bool {
 	var lc models.LocalConfig
 	e := lc.Select()
@@ -424,29 +405,28 @@
 	db := models.GetDB()
 	db.LogMode(false)
 	defer db.LogMode(true)
+
 	tx := db.Begin()
 	defer func() {
 		if err != nil && tx != nil {
 			tx.Rollback()
 		}
 	}()
+
 	//0.鍏抽棴reference
 	tx.Exec("PRAGMA foreign_keys=OFF")
 	//1.鍒犻櫎鏈湴鐨勫悓姝ュ簱鏁版嵁
 	for _, t := range serf.SyncTables {
-		delSql := ""
-		if t == "dbtables" {
-			delSql = "delete from " + t + " where (analyServerId='' or analyServerId=NULL)"
-		} else if t == "dbtablepersons" {
-			delSql = "delete from " + t + " where tableId in (select id from dbtables where (analyServerId='' or analyServerId=NULL))"
-		} else {
-			delSql = "delete from " + t + ""
-		}
+		delSql := "delete from " + t + ""
+
 		err = tx.Exec(delSql).Error
 		if err != nil {
+			logger.Error("鍒犻櫎鏈湴鐨勫悓姝ュ簱鏁版嵁澶辫触,", err.Error())
+			logger.Error("sql:", delSql)
 			return false
 		}
 	}
+
 	//2.鎷夊彇闆嗙兢鍐呯殑鍚屾搴撴暟鎹埌鏈湴鏁版嵁搴撹〃涓�
 	var dumpSqls *[]string
 	dumpSqls, err = serf.GetTableDataFromCluster(serf.Agent, joinArg.ClusterId, serf.SyncTables, 20*time.Second)
@@ -454,6 +434,7 @@
 		for _, sqlStr := range *dumpSqls {
 			logger.Debug("gorm exec dumpSql:", sqlStr)
 			if err = tx.Exec(sqlStr).Error; err != nil {
+				logger.Error("gorm exec dumpSql:", sqlStr, " error:", err.Error())
 				return false
 			}
 		}
@@ -471,8 +452,11 @@
 	serverIp, _, e1 := util.GetLocalIP(config.Server.NetworkAdapter)
 	if e1 != nil || serverIp == "" {
 		err = errors.New("get serverIp err")
+
+		logger.Error("get serverIp err")
 		return false
 	}
+
 	logger.Debug("cur Node serverIp:", serverIp, "nodeName:", nodeName)
 	if nodeName == "" {
 		nodeName = serverIp
@@ -484,7 +468,9 @@
 		logger.Debug("add cur node err:", err)
 		return false
 	}
-	if err = tx.Exec("update cluster_node set isDelete=0 where id='" + config.Server.AnalyServerId + "'").Error; err != nil {
+
+	joinSql := fmt.Sprintf("update cluster_node set isDelete=0,drift_state='slave',node_ip='%s',create_time='%s' where node_id='%s'", serverIp+":"+strconv.Itoa(dbSync.DefaultBindPort), fmtTimeStr, config.Server.AnalyServerId)
+	if err = tx.Exec(joinSql).Error; err != nil {
 		logger.Debug("update isDelete err:", err)
 		return false
 	}
@@ -493,6 +479,7 @@
 	tx.Exec("PRAGMA foreign_keys=ON")
 	tx.Commit()
 	serf.SyncSql([]string{sql})
+
 	return true
 }
 
@@ -505,3 +492,15 @@
 	var lc models.Node
 	return lc.FindIpByNode(nodeId)
 }
+
+func ClusterSyncProcMessage(payload []byte) {
+	if serf.Agent == nil {
+		logger.Error("鏈姞鍏ラ泦缇�")
+		return
+	}
+
+	err := serf.Agent.UserEvent(serf.UserEventSyncMessage, payload, false)
+	if err != nil {
+		logger.Error("UserEventSyncMessage err:", err)
+	}
+}

--
Gitblit v1.8.0