From 0a86db67f14185ab2e08475fcafb64e43f0022dd Mon Sep 17 00:00:00 2001
From: zhangzengfei <zhangzengfei@smartai.com>
Date: 星期一, 15 五月 2023 18:15:33 +0800
Subject: [PATCH] add default role tag

---
 agent.go |   55 ++++++++++++++++++++++++++++++++++++++++++++++---------
 1 files changed, 46 insertions(+), 9 deletions(-)

diff --git a/agent.go b/agent.go
index 75be578..d16148b 100644
--- a/agent.go
+++ b/agent.go
@@ -36,6 +36,7 @@
 // Agent warps the serf agent
 type Agent struct {
 	*agent.Agent
+	ipc *agent.AgentIPC
 	conf     	*Config
 	readyCh  	chan struct{}
 	errorCh  	chan error
@@ -50,6 +51,7 @@
 	NodeID      		string 		`json:"nodeID"`
 	NodeAddress 		string 		`json:"nodeAddress"`
 	IsAlive     		int    		`json:"isAlive"`
+	Role            	string 		`json:"role"`
 }
 
 // Create create serf agent with config
@@ -91,15 +93,30 @@
 	}
 }
 
-// Start agent
+// Start agent and IPC
 func (a *Agent) Start(ctx context.Context) {
+	a.RegisterEventHandler(a)
 	err := a.Agent.Start()
 	if err != nil {
 		logger.Error(err, "start serf agent failed")
 		a.errorCh <- err
 		return
 	}
-	a.RegisterEventHandler(a)
+
+	ipc := a.startIPC()
+	a.ipc = ipc
+	go func() {
+		for {
+			select {
+			case <-ctx.Done():
+				a.Agent.Shutdown()
+				a.ipc.Shutdown()
+				return
+			default:
+				time.Sleep(1*time.Second)
+			}
+		}
+	}()
 
 	err = a.retryJoin(ctx)
 	if err != nil {
@@ -112,6 +129,25 @@
 	go a.BroadcastMemberlist(BroadcastInterval * time.Second)
 }
 
+func (a *Agent) startIPC() *agent.AgentIPC {
+	// Parse the bind address information
+	bindIP, bindPort, err := a.conf.AddrParts(a.conf.BindAddr)
+	bindAddr := &net.TCPAddr{IP: net.ParseIP(bindIP), Port: bindPort}
+
+	// Setup the RPC listener
+	rpcListener, err := net.Listen("tcp", a.conf.RPCAddr)
+	if err != nil {
+		logger.Error("Error starting RPC listener:", err)
+		return nil
+	}
+	ipc := agent.NewAgentIPC(a.Agent, a.conf.RPCAuthKey, rpcListener, logger.GetLogFile(), nil)
+	logger.Debug("RPC addr:", a.conf.RPCAddr)
+	logger.Debug("Snapshot:", a.conf.SnapshotPath)
+	logger.Debug("Profile:", a.conf.Profile)
+	logger.Debug("Message Compression Enabled:", a.conf.EnableCompression)
+	logger.Debug("bindAddr:", bindAddr)
+	return ipc
+}
 // HandleEvent Handles serf.EventMemberJoin events,
 // which will wait for members to join until the number of group members is equal to "groupExpect"
 // when the startup mode is "ModeCluster",
@@ -276,8 +312,8 @@
 }
 
 //Init serf Init
-func Init(clusterID string, password string, nodeID string, addrs []string, snapshotPath string, hef HandleEventFunc) (*Agent, error) {
-	agent, err := InitNode(clusterID, password, nodeID, snapshotPath, hef)
+func Init(clusterID string, password string, nodeID string, addrs []string, snapshotPath string, c *Config) (*Agent, error) {
+	agent, err := InitNode(clusterID, password, nodeID, snapshotPath, c)
 	if err != nil {
 		logger.Error("InitNode failed, error: %s", err)
 		return agent, err
@@ -293,8 +329,10 @@
 }
 
 //InitNode web鍚庡彴鏀跺埌鍒涘缓闆嗙兢鐨勮姹傦紝
-func InitNode(clusterID string, password string, nodeID string, snapshotPath string, hef HandleEventFunc) (*Agent, error) {
+func InitNode(clusterID string, password string, nodeID string, snapshotPath string, c *Config) (*Agent, error) {
 	conf := DefaultConfig()
+	conf.MergeConf(c)
+
 	logger.Info("clusterID:", clusterID, "password:", password, "nodeID:", nodeID)
 	conf.ClusterID = clusterID
 	conf.NodeName = nodeID
@@ -315,11 +353,9 @@
 		return agent, err
 	}
 
-	agent.Start(context.Background())
+	agent.Start(conf.Ctx)
 	//<- agent.readyCh
-	go func() {
-		agent.ShutdownCh()
-	}()
+
 	time.Sleep(time.Second)
 	logger.Info("Stats:", agent.Agent.Serf().Stats())
 	logger.Info("EncryptionEnabled:", agent.Agent.Serf().EncryptionEnabled())
@@ -371,6 +407,7 @@
 		node.NodeAddress = mb.Addr.String() + ":" + strconv.Itoa(int(mb.Port))
 		node.IsAlive = int(mb.Status)
 		node.ClusterID = mb.Tags[tagKeyClusterID]
+		node.Role = mb.Tags["role"]
 
 		nodes = append(nodes, node)
 	}

--
Gitblit v1.8.0