From daac628c64069633787588372dea22499ac35396 Mon Sep 17 00:00:00 2001
From: zhangzengfei <zhangzengfei@smartai.com>
Date: 星期三, 18 十月 2023 16:52:56 +0800
Subject: [PATCH] 修复集群功能
---
system-service/serf/serf.go | 2
system-service/controllers/cluster.go | 7 ++-
system-service/service/clusterService.go | 56 ++++++++++++++++-----------
system-service/serf/handler.go | 14 +++++--
system-service/iwlist/iwlist.go | 5 ++
5 files changed, 52 insertions(+), 32 deletions(-)
diff --git a/system-service/controllers/cluster.go b/system-service/controllers/cluster.go
index eb8bddf..df7d326 100644
--- a/system-service/controllers/cluster.go
+++ b/system-service/controllers/cluster.go
@@ -25,7 +25,7 @@
var clusterE models.Cluster
var reply = bhomeclient.Reply{
Success: false,
- Msg: "leave",
+ Msg: "",
Data: nil,
}
@@ -37,9 +37,9 @@
var nodeE models.Node
nodes, _ := nodeE.FindNodesByClusterId(arr[0].ClusterId)
- logger.Debug("鏌ヨ闆嗙兢鑺傜偣:", nodes)
+ //logger.Debug("鏌ヨ闆嗙兢鑺傜偣:", nodes)
for _, node := range nodes {
- logger.Debug("鑺傜偣:", node.NodeId, " servceId:", config.Server.AnalyServerId, " stat:", node.DriftState)
+ //logger.Debug("鑺傜偣:", node.NodeId, " serverId:", config.Server.AnalyServerId, " stat:", node.DriftState)
if node.NodeId == config.Server.AnalyServerId {
if node.DriftState == "master" {
reply.Msg = "master"
@@ -50,6 +50,7 @@
break
}
}
+ reply.Data = nodes
}
}
diff --git a/system-service/iwlist/iwlist.go b/system-service/iwlist/iwlist.go
index a175a21..c92554b 100644
--- a/system-service/iwlist/iwlist.go
+++ b/system-service/iwlist/iwlist.go
@@ -1,6 +1,7 @@
package iwlist
import (
+ "fmt"
"os/exec"
"regexp"
"strconv"
@@ -44,7 +45,9 @@
func Scan(interfaceName string) ([]Cell, error) {
// execute iwlist for scanning wireless networks
- cmd := exec.Command("iwlist", interfaceName, "scan")
+ // 鍗氶�氱綉鍗′笉鍔爏udo鎵弿涓嶅埌缃戠粶
+ cmdStr := fmt.Sprintf("sudo iwlist %s scan", interfaceName)
+ cmd := exec.Command("/bin/bash", "-c", cmdStr)
out, err := cmd.CombinedOutput()
if err != nil {
return nil, err
diff --git a/system-service/serf/handler.go b/system-service/serf/handler.go
index c5546db..c9689a3 100644
--- a/system-service/serf/handler.go
+++ b/system-service/serf/handler.go
@@ -4,6 +4,7 @@
"basic.com/valib/logger.git"
"basic.com/valib/serf.git/serf"
"encoding/json"
+ "fmt"
"github.com/golang/protobuf/proto"
"github.com/hashicorp/memberlist"
"github.com/satori/go.uuid"
@@ -239,7 +240,7 @@
func HandleEventMemberLeave(ev serf.MemberEvent) {
if ev.Members != nil && len(ev.Members) == 1 {
leaveMember := ev.Members[0]
- leaveSql := "update cluster_node set isDelete=1 where node_id='" + leaveMember.Name + "'"
+ leaveSql := "update cluster_node set isDelete=1,drift_state='' where node_id='" + leaveMember.Name + "'"
flag, e := executeSqlByGorm([]string{leaveSql})
logger.Info("EventMemberLeave,current Members:", ev.Members)
@@ -260,8 +261,13 @@
func HandleEventMemberJoin(ev serf.MemberEvent) {
if ev.Members != nil && len(ev.Members) == 1 {
- leaveMember := ev.Members[0]
- joinSql := "update cluster_node set isDelete=0 where node_id='" + leaveMember.Name + "'"
+ joinMember := ev.Members[0]
+
+ timeUnix := time.Now().Unix()
+ fmtTimeStr := time.Unix(timeUnix, 0).Format("2006-01-02 15:04:05")
+
+ //joinSql := "update cluster_node set isDelete=0 where node_id='" + leaveMember.Name + "'"
+ joinSql := fmt.Sprintf("update cluster_node set isDelete=0,drift_state='',node_ip='%s',create_time='%s' where node_id='%s'", joinMember.Addr.String()+":"+strconv.Itoa(int(joinMember.Port)), fmtTimeStr, joinMember.Name)
flag, e := executeSqlByGorm([]string{joinSql})
logger.Info("EventMemberJoin,current Members:", ev.Members)
@@ -276,6 +282,6 @@
if e != nil {
logErr = e.Error()
}
- executeSqlByGorm([]string{"insert into sql_sync_his(`id`,`lTime`,`createTime`,`sql`,`from`,`result`,`err`) values ('" + uuid.NewV4().String() + "','" + logLT + "','" + logT + "','" + logSql + "','" + leaveMember.Name + "'," + logResult + ",'" + logErr + "')"})
+ executeSqlByGorm([]string{"insert into sql_sync_his(`id`,`lTime`,`createTime`,`sql`,`from`,`result`,`err`) values ('" + uuid.NewV4().String() + "','" + logLT + "','" + logT + "','" + logSql + "','" + joinMember.Name + "'," + logResult + ",'" + logErr + "')"})
}
}
diff --git a/system-service/serf/serf.go b/system-service/serf/serf.go
index 0a14a19..b7b4fde 100644
--- a/system-service/serf/serf.go
+++ b/system-service/serf/serf.go
@@ -64,7 +64,7 @@
} else if event.EventType() == serf.EventMemberJoin {
HandleEventMemberJoin(ev)
}
-
+ logger.Error("serf MemberEvent ", event.EventType())
default:
logger.Warn("Unknown event type: %s\n", ev.EventType().String())
}
diff --git a/system-service/service/clusterService.go b/system-service/service/clusterService.go
index 2f978ef..feffd61 100644
--- a/system-service/service/clusterService.go
+++ b/system-service/service/clusterService.go
@@ -3,6 +3,7 @@
import (
"context"
"encoding/json"
+ "fmt"
"strconv"
sysSync "sync"
"time"
@@ -190,6 +191,7 @@
logger.Debug("AddCluster JoinCluster len(joinIps)=0")
return false, errors.New("鍔犲叆鐨勭洰鏍噄p涓嶈兘涓虹┖")
}
+
logger.Debug("AddCluster joinIps:", joinIps)
joinArg.Password = config.ClusterSet.PwdPre + joinArg.Password
@@ -207,29 +209,29 @@
serf.Agent = agent
t := time.Now()
- syncTableDataFromCluster(joinArg)
+ syncClusterNodes := syncTableDataFromCluster(joinArg)
logger.Debugf("AddCluster time=%v", time.Since(t))
- //if syncB {
- chMsg := protomsg.DbChangeMessage{
- Id: joinArg.ClusterId,
- Table: protomsg.TableChanged_T_Cluster,
- Action: protomsg.DbAction_Insert,
- Info: "join",
- }
+ if syncClusterNodes {
+ chMsg := protomsg.DbChangeMessage{
+ Id: joinArg.ClusterId,
+ Table: protomsg.TableChanged_T_Cluster,
+ Action: protomsg.DbAction_Insert,
+ Info: "join",
+ }
- s.AddDbMessage(&chMsg)
- logger.Debugf("AddCluster 鍔犲叆闆嗙兢鎴愬姛 time=%v", time.Since(start))
- return true, nil
- //} else {
- // logger.Debug("AddCluster syncTableDataFromCluster fail")
- // if agent != nil {
- // agent.Leave()
- // err = agent.Shutdown()
- // if err != nil {
- // logger.Debug("AddCluster agent shutdown err:", err)
- // }
- // }
- //}
+ s.AddDbMessage(&chMsg)
+ logger.Debugf("AddCluster 鍔犲叆闆嗙兢鎴愬姛 time=%v", time.Since(start))
+ return true, nil
+ } else {
+ logger.Debug("AddCluster syncTableDataFromCluster fail")
+ agent.Leave()
+ err = agent.Shutdown()
+ if err != nil {
+ logger.Debug("AddCluster agent shutdown err:", err)
+ }
+
+ return false, errors.New("鍔犲叆闆嗙兢澶辫触")
+ }
} else {
logger.Debug("AddCluster dbSync.Init err:", err)
if agent != nil {
@@ -239,7 +241,9 @@
}
}
+
logger.Debugf("AddCluster 鍔犲叆闆嗙兢澶辫触, targetIp=%v, time=%v", targetIp, time.Since(start))
+
return false, errors.New("鍔犲叆闆嗙兢澶辫触")
}
@@ -403,7 +407,7 @@
dumpSqls, err = serf.GetTableDataFromCluster(serf.Agent, joinArg.ClusterId, serf.SyncTables, 20*time.Second)
if dumpSqls != nil && len(*dumpSqls) > 0 {
for _, sqlStr := range *dumpSqls {
- //logger.Debug("gorm exec dumpSql:", sqlStr)
+ logger.Debug("gorm exec dumpSql:", sqlStr)
if err = tx.Exec(sqlStr).Error; err != nil {
logger.Error("gorm exec dumpSql:", sqlStr, " error:", err.Error())
return false
@@ -423,8 +427,11 @@
serverIp, _, e1 := util.GetLocalIP(config.Server.NetworkAdapter)
if e1 != nil || serverIp == "" {
err = errors.New("get serverIp err")
+
+ logger.Error("get serverIp err")
return false
}
+
logger.Debug("cur Node serverIp:", serverIp, "nodeName:", nodeName)
if nodeName == "" {
nodeName = serverIp
@@ -436,7 +443,9 @@
logger.Debug("add cur node err:", err)
return false
}
- if err = tx.Exec("update cluster_node set isDelete=0 where id='" + config.Server.AnalyServerId + "'").Error; err != nil {
+
+ joinSql := fmt.Sprintf("update cluster_node set isDelete=0,drift_state='',node_ip='%s',create_time='%s' where node_id='%s'", serverIp+":"+strconv.Itoa(dbSync.DefaultBindPort), fmtTimeStr, config.Server.AnalyServerId)
+ if err = tx.Exec(joinSql).Error; err != nil {
logger.Debug("update isDelete err:", err)
return false
}
@@ -445,6 +454,7 @@
tx.Exec("PRAGMA foreign_keys=ON")
tx.Commit()
serf.SyncSql([]string{sql})
+
return true
}
--
Gitblit v1.8.0