From 514ce3d588608c96744fc72f6d8c1d3d9bbdb41f Mon Sep 17 00:00:00 2001
From: chenshijun <csj_sky@126.com>
Date: 星期二, 06 八月 2019 16:54:11 +0800
Subject: [PATCH] 增加查找集群的代码:实现方式是集群的节点定时广播自身信息到某个广播地址端口,新节点通过该广播地址端口和密码获取该信息并解析

---
 agent.go |   48 +++++++++++++++++++++++++++++++++++++++++++++++-
 1 files changed, 47 insertions(+), 1 deletions(-)

diff --git a/agent.go b/agent.go
index 2fb96b1..a6a77cc 100644
--- a/agent.go
+++ b/agent.go
@@ -23,6 +23,7 @@
 	"fmt"
 	"github.com/hashicorp/memberlist"
 	"io/ioutil"
+	"net"
 	"os"
 
 	//"os"
@@ -45,6 +46,13 @@
 	conf    *Config
 	readyCh chan struct{}
 	errorCh chan error
+}
+
+type NodeInfo struct {
+	ClusterID string `json:"clusterID"`
+	NodeID string `json:"nodeID"`
+	NodeAddress  string `json:"nodeAddress"`
+	IsAlive int `json:"isAlive"`
 }
 
 // Create create serf agent with config
@@ -96,6 +104,8 @@
 			a.errorCh <- err
 		}
 	}
+
+	go a.BroadcastMemberlist(BroadcastInterval * time.Second)
 }
 
 // HandleEvent Handles serf.EventMemberJoin events,
@@ -193,6 +203,38 @@
 	//}
 	//a.DeregisterEventHandler(a)
 	//close(a.readyCh)
+}
+
+func (a *Agent) BroadcastMemberlist(delay time.Duration) {
+	//serf := a.serf
+	serf := a.Agent.Serf()
+	mb := serf.LocalMember()
+	mblist := serf.Memberlist()
+	fmt.Println("mb:", mb)
+
+	// copy local node
+	localNode := *mblist.LocalNode()
+	nodeID := a.conf.NodeName
+	nodeAddress := localNode.Address()
+	clusterID := mb.Tags[tagKeyClusterID]
+	isAlive := int(mb.Status)
+
+	message, _ := json.Marshal(NodeInfo{
+		clusterID,
+		nodeID,
+		nodeAddress,
+		isAlive,
+	})
+
+	// replace node address
+	localNode.Addr = net.ParseIP(BroadcastIP)
+	//localNode.Addr = net.IPv4(255,255,255,255)
+	localNode.Port = BroadcastPort
+	for {
+		// fmt.Printf("localNode: %v %v\n", nodeName, nodeAddress)
+		mblist.SendBestEffort(&localNode, []byte(message))
+		time.Sleep(delay)
+	}
 }
 
 // Ready Returns a channel that will be closed when serf is ready
@@ -393,7 +435,7 @@
 func InitNode(clusterID string, password string, nodeID string) (*Agent, error) {
 	conf := DefaultConfig()
 	fmt.Println("clusterID:", clusterID, "password:", password, "nodeID:", nodeID)
-	//conf.ClusterID = clusterID
+	conf.ClusterID = clusterID
 	conf.NodeName = nodeID
 	if password == "" {
 		conf.EncryptKey = DefaultEncryptKey
@@ -427,6 +469,10 @@
 
 func (a *Agent) JoinByNodeIP(ips []string) error {
 	var nodes []string
+
+	if len(ips) == 0 {
+		return fmt.Errorf("No Nodes To Join!")
+	}
 	for _, ip := range ips {
 		node := fmt.Sprintf("%s:%d", ip, DefaultBindPort)
 		nodes = append(nodes, node)

--
Gitblit v1.8.0