From 2863a050be2530afc452e48aae8b4be9b3965ebd Mon Sep 17 00:00:00 2001
From: zhangzengfei <zhangzengfei@smartai.com>
Date: 星期一, 15 五月 2023 18:18:53 +0800
Subject: [PATCH] add default role tag
---
agent.go | 56 +++++++++++++++++++++++++++++++++++++++++++++++---------
1 files changed, 47 insertions(+), 9 deletions(-)
diff --git a/agent.go b/agent.go
index 75be578..0274499 100644
--- a/agent.go
+++ b/agent.go
@@ -36,6 +36,7 @@
// Agent warps the serf agent
type Agent struct {
*agent.Agent
+ ipc *agent.AgentIPC
conf *Config
readyCh chan struct{}
errorCh chan error
@@ -50,6 +51,7 @@
NodeID string `json:"nodeID"`
NodeAddress string `json:"nodeAddress"`
IsAlive int `json:"isAlive"`
+ Role string `json:"role"`
}
// Create create serf agent with config
@@ -91,15 +93,30 @@
}
}
-// Start agent
+// Start agent and IPC
func (a *Agent) Start(ctx context.Context) {
+ a.RegisterEventHandler(a)
err := a.Agent.Start()
if err != nil {
logger.Error(err, "start serf agent failed")
a.errorCh <- err
return
}
- a.RegisterEventHandler(a)
+
+ ipc := a.startIPC()
+ a.ipc = ipc
+ go func() {
+ for {
+ select {
+ case <-ctx.Done():
+ a.Agent.Shutdown()
+ a.ipc.Shutdown()
+ return
+ default:
+ time.Sleep(1*time.Second)
+ }
+ }
+ }()
err = a.retryJoin(ctx)
if err != nil {
@@ -112,6 +129,25 @@
go a.BroadcastMemberlist(BroadcastInterval * time.Second)
}
+func (a *Agent) startIPC() *agent.AgentIPC {
+ // Parse the bind address information
+ bindIP, bindPort, err := a.conf.AddrParts(a.conf.BindAddr)
+ bindAddr := &net.TCPAddr{IP: net.ParseIP(bindIP), Port: bindPort}
+
+ // Setup the RPC listener
+ rpcListener, err := net.Listen("tcp", a.conf.RPCAddr)
+ if err != nil {
+ logger.Error("Error starting RPC listener:", err)
+ return nil
+ }
+ ipc := agent.NewAgentIPC(a.Agent, a.conf.RPCAuthKey, rpcListener, logger.GetLogFile(), nil)
+ logger.Debug("RPC addr:", a.conf.RPCAddr)
+ logger.Debug("Snapshot:", a.conf.SnapshotPath)
+ logger.Debug("Profile:", a.conf.Profile)
+ logger.Debug("Message Compression Enabled:", a.conf.EnableCompression)
+ logger.Debug("bindAddr:", bindAddr)
+ return ipc
+}
// HandleEvent Handles serf.EventMemberJoin events,
// which will wait for members to join until the number of group members is equal to "groupExpect"
// when the startup mode is "ModeCluster",
@@ -142,6 +178,7 @@
nodeID,
nodeAddress,
isAlive,
+ mb.Tags["role"],
})
// replace node address
@@ -276,8 +313,8 @@
}
//Init serf Init
-func Init(clusterID string, password string, nodeID string, addrs []string, snapshotPath string, hef HandleEventFunc) (*Agent, error) {
- agent, err := InitNode(clusterID, password, nodeID, snapshotPath, hef)
+func Init(clusterID string, password string, nodeID string, addrs []string, snapshotPath string, c *Config) (*Agent, error) {
+ agent, err := InitNode(clusterID, password, nodeID, snapshotPath, c)
if err != nil {
logger.Error("InitNode failed, error: %s", err)
return agent, err
@@ -293,8 +330,10 @@
}
//InitNode web鍚庡彴鏀跺埌鍒涘缓闆嗙兢鐨勮姹傦紝
-func InitNode(clusterID string, password string, nodeID string, snapshotPath string, hef HandleEventFunc) (*Agent, error) {
+func InitNode(clusterID string, password string, nodeID string, snapshotPath string, c *Config) (*Agent, error) {
conf := DefaultConfig()
+ conf.MergeConf(c)
+
logger.Info("clusterID:", clusterID, "password:", password, "nodeID:", nodeID)
conf.ClusterID = clusterID
conf.NodeName = nodeID
@@ -315,11 +354,9 @@
return agent, err
}
- agent.Start(context.Background())
+ agent.Start(conf.Ctx)
//<- agent.readyCh
- go func() {
- agent.ShutdownCh()
- }()
+
time.Sleep(time.Second)
logger.Info("Stats:", agent.Agent.Serf().Stats())
logger.Info("EncryptionEnabled:", agent.Agent.Serf().EncryptionEnabled())
@@ -371,6 +408,7 @@
node.NodeAddress = mb.Addr.String() + ":" + strconv.Itoa(int(mb.Port))
node.IsAlive = int(mb.Status)
node.ClusterID = mb.Tags[tagKeyClusterID]
+ node.Role = mb.Tags["role"]
nodes = append(nodes, node)
}
--
Gitblit v1.8.0