From daac628c64069633787588372dea22499ac35396 Mon Sep 17 00:00:00 2001
From: zhangzengfei <zhangzengfei@smartai.com>
Date: 星期三, 18 十月 2023 16:52:56 +0800
Subject: [PATCH] 修复集群功能

---
 system-service/serf/serf.go              |    2 
 system-service/controllers/cluster.go    |    7 ++-
 system-service/service/clusterService.go |   56 ++++++++++++++++-----------
 system-service/serf/handler.go           |   14 +++++--
 system-service/iwlist/iwlist.go          |    5 ++
 5 files changed, 52 insertions(+), 32 deletions(-)

diff --git a/system-service/controllers/cluster.go b/system-service/controllers/cluster.go
index eb8bddf..df7d326 100644
--- a/system-service/controllers/cluster.go
+++ b/system-service/controllers/cluster.go
@@ -25,7 +25,7 @@
 	var clusterE models.Cluster
 	var reply = bhomeclient.Reply{
 		Success: false,
-		Msg:     "leave",
+		Msg:     "",
 		Data:    nil,
 	}
 
@@ -37,9 +37,9 @@
 
 			var nodeE models.Node
 			nodes, _ := nodeE.FindNodesByClusterId(arr[0].ClusterId)
-			logger.Debug("鏌ヨ闆嗙兢鑺傜偣:", nodes)
+			//logger.Debug("鏌ヨ闆嗙兢鑺傜偣:", nodes)
 			for _, node := range nodes {
-				logger.Debug("鑺傜偣:", node.NodeId, " servceId:", config.Server.AnalyServerId, " stat:", node.DriftState)
+				//logger.Debug("鑺傜偣:", node.NodeId, " serverId:", config.Server.AnalyServerId, " stat:", node.DriftState)
 				if node.NodeId == config.Server.AnalyServerId {
 					if node.DriftState == "master" {
 						reply.Msg = "master"
@@ -50,6 +50,7 @@
 					break
 				}
 			}
+			reply.Data = nodes
 		}
 	}
 
diff --git a/system-service/iwlist/iwlist.go b/system-service/iwlist/iwlist.go
index a175a21..c92554b 100644
--- a/system-service/iwlist/iwlist.go
+++ b/system-service/iwlist/iwlist.go
@@ -1,6 +1,7 @@
 package iwlist
 
 import (
+	"fmt"
 	"os/exec"
 	"regexp"
 	"strconv"
@@ -44,7 +45,9 @@
 
 func Scan(interfaceName string) ([]Cell, error) {
 	// execute iwlist for scanning wireless networks
-	cmd := exec.Command("iwlist", interfaceName, "scan")
+	// 鍗氶�氱綉鍗′笉鍔爏udo鎵弿涓嶅埌缃戠粶
+	cmdStr := fmt.Sprintf("sudo iwlist %s scan", interfaceName)
+	cmd := exec.Command("/bin/bash", "-c", cmdStr)
 	out, err := cmd.CombinedOutput()
 	if err != nil {
 		return nil, err
diff --git a/system-service/serf/handler.go b/system-service/serf/handler.go
index c5546db..c9689a3 100644
--- a/system-service/serf/handler.go
+++ b/system-service/serf/handler.go
@@ -4,6 +4,7 @@
 	"basic.com/valib/logger.git"
 	"basic.com/valib/serf.git/serf"
 	"encoding/json"
+	"fmt"
 	"github.com/golang/protobuf/proto"
 	"github.com/hashicorp/memberlist"
 	"github.com/satori/go.uuid"
@@ -239,7 +240,7 @@
 func HandleEventMemberLeave(ev serf.MemberEvent) {
 	if ev.Members != nil && len(ev.Members) == 1 {
 		leaveMember := ev.Members[0]
-		leaveSql := "update cluster_node set isDelete=1 where node_id='" + leaveMember.Name + "'"
+		leaveSql := "update cluster_node set isDelete=1,drift_state='' where node_id='" + leaveMember.Name + "'"
 		flag, e := executeSqlByGorm([]string{leaveSql})
 
 		logger.Info("EventMemberLeave,current Members:", ev.Members)
@@ -260,8 +261,13 @@
 
 func HandleEventMemberJoin(ev serf.MemberEvent) {
 	if ev.Members != nil && len(ev.Members) == 1 {
-		leaveMember := ev.Members[0]
-		joinSql := "update cluster_node set isDelete=0 where node_id='" + leaveMember.Name + "'"
+		joinMember := ev.Members[0]
+
+		timeUnix := time.Now().Unix()
+		fmtTimeStr := time.Unix(timeUnix, 0).Format("2006-01-02 15:04:05")
+
+		//joinSql := "update cluster_node set isDelete=0 where node_id='" + leaveMember.Name + "'"
+		joinSql := fmt.Sprintf("update cluster_node set isDelete=0,drift_state='',node_ip='%s',create_time='%s' where node_id='%s'", joinMember.Addr.String()+":"+strconv.Itoa(int(joinMember.Port)), fmtTimeStr, joinMember.Name)
 		flag, e := executeSqlByGorm([]string{joinSql})
 
 		logger.Info("EventMemberJoin,current Members:", ev.Members)
@@ -276,6 +282,6 @@
 		if e != nil {
 			logErr = e.Error()
 		}
-		executeSqlByGorm([]string{"insert into sql_sync_his(`id`,`lTime`,`createTime`,`sql`,`from`,`result`,`err`) values ('" + uuid.NewV4().String() + "','" + logLT + "','" + logT + "','" + logSql + "','" + leaveMember.Name + "'," + logResult + ",'" + logErr + "')"})
+		executeSqlByGorm([]string{"insert into sql_sync_his(`id`,`lTime`,`createTime`,`sql`,`from`,`result`,`err`) values ('" + uuid.NewV4().String() + "','" + logLT + "','" + logT + "','" + logSql + "','" + joinMember.Name + "'," + logResult + ",'" + logErr + "')"})
 	}
 }
diff --git a/system-service/serf/serf.go b/system-service/serf/serf.go
index 0a14a19..b7b4fde 100644
--- a/system-service/serf/serf.go
+++ b/system-service/serf/serf.go
@@ -64,7 +64,7 @@
 		} else if event.EventType() == serf.EventMemberJoin {
 			HandleEventMemberJoin(ev)
 		}
-
+		logger.Error("serf MemberEvent ", event.EventType())
 	default:
 		logger.Warn("Unknown event type: %s\n", ev.EventType().String())
 	}
diff --git a/system-service/service/clusterService.go b/system-service/service/clusterService.go
index 2f978ef..feffd61 100644
--- a/system-service/service/clusterService.go
+++ b/system-service/service/clusterService.go
@@ -3,6 +3,7 @@
 import (
 	"context"
 	"encoding/json"
+	"fmt"
 	"strconv"
 	sysSync "sync"
 	"time"
@@ -190,6 +191,7 @@
 		logger.Debug("AddCluster JoinCluster len(joinIps)=0")
 		return false, errors.New("鍔犲叆鐨勭洰鏍噄p涓嶈兘涓虹┖")
 	}
+
 	logger.Debug("AddCluster joinIps:", joinIps)
 	joinArg.Password = config.ClusterSet.PwdPre + joinArg.Password
 
@@ -207,29 +209,29 @@
 		serf.Agent = agent
 
 		t := time.Now()
-		syncTableDataFromCluster(joinArg)
+		syncClusterNodes := syncTableDataFromCluster(joinArg)
 		logger.Debugf("AddCluster  time=%v", time.Since(t))
-		//if syncB {
-		chMsg := protomsg.DbChangeMessage{
-			Id:     joinArg.ClusterId,
-			Table:  protomsg.TableChanged_T_Cluster,
-			Action: protomsg.DbAction_Insert,
-			Info:   "join",
-		}
+		if syncClusterNodes {
+			chMsg := protomsg.DbChangeMessage{
+				Id:     joinArg.ClusterId,
+				Table:  protomsg.TableChanged_T_Cluster,
+				Action: protomsg.DbAction_Insert,
+				Info:   "join",
+			}
 
-		s.AddDbMessage(&chMsg)
-		logger.Debugf("AddCluster 鍔犲叆闆嗙兢鎴愬姛 time=%v", time.Since(start))
-		return true, nil
-		//} else {
-		//	logger.Debug("AddCluster syncTableDataFromCluster fail")
-		//	if agent != nil {
-		//		agent.Leave()
-		//		err = agent.Shutdown()
-		//		if err != nil {
-		//			logger.Debug("AddCluster agent shutdown err:", err)
-		//		}
-		//	}
-		//}
+			s.AddDbMessage(&chMsg)
+			logger.Debugf("AddCluster 鍔犲叆闆嗙兢鎴愬姛 time=%v", time.Since(start))
+			return true, nil
+		} else {
+			logger.Debug("AddCluster syncTableDataFromCluster fail")
+			agent.Leave()
+			err = agent.Shutdown()
+			if err != nil {
+				logger.Debug("AddCluster agent shutdown err:", err)
+			}
+
+			return false, errors.New("鍔犲叆闆嗙兢澶辫触")
+		}
 	} else {
 		logger.Debug("AddCluster dbSync.Init err:", err)
 		if agent != nil {
@@ -239,7 +241,9 @@
 
 		}
 	}
+
 	logger.Debugf("AddCluster 鍔犲叆闆嗙兢澶辫触, targetIp=%v, time=%v", targetIp, time.Since(start))
+
 	return false, errors.New("鍔犲叆闆嗙兢澶辫触")
 }
 
@@ -403,7 +407,7 @@
 	dumpSqls, err = serf.GetTableDataFromCluster(serf.Agent, joinArg.ClusterId, serf.SyncTables, 20*time.Second)
 	if dumpSqls != nil && len(*dumpSqls) > 0 {
 		for _, sqlStr := range *dumpSqls {
-			//logger.Debug("gorm exec dumpSql:", sqlStr)
+			logger.Debug("gorm exec dumpSql:", sqlStr)
 			if err = tx.Exec(sqlStr).Error; err != nil {
 				logger.Error("gorm exec dumpSql:", sqlStr, " error:", err.Error())
 				return false
@@ -423,8 +427,11 @@
 	serverIp, _, e1 := util.GetLocalIP(config.Server.NetworkAdapter)
 	if e1 != nil || serverIp == "" {
 		err = errors.New("get serverIp err")
+
+		logger.Error("get serverIp err")
 		return false
 	}
+
 	logger.Debug("cur Node serverIp:", serverIp, "nodeName:", nodeName)
 	if nodeName == "" {
 		nodeName = serverIp
@@ -436,7 +443,9 @@
 		logger.Debug("add cur node err:", err)
 		return false
 	}
-	if err = tx.Exec("update cluster_node set isDelete=0 where id='" + config.Server.AnalyServerId + "'").Error; err != nil {
+
+	joinSql := fmt.Sprintf("update cluster_node set isDelete=0,drift_state='',node_ip='%s',create_time='%s' where node_id='%s'", serverIp+":"+strconv.Itoa(dbSync.DefaultBindPort), fmtTimeStr, config.Server.AnalyServerId)
+	if err = tx.Exec(joinSql).Error; err != nil {
 		logger.Debug("update isDelete err:", err)
 		return false
 	}
@@ -445,6 +454,7 @@
 	tx.Exec("PRAGMA foreign_keys=ON")
 	tx.Commit()
 	serf.SyncSql([]string{sql})
+
 	return true
 }
 

--
Gitblit v1.8.0