From 514ce3d588608c96744fc72f6d8c1d3d9bbdb41f Mon Sep 17 00:00:00 2001
From: chenshijun <csj_sky@126.com>
Date: 星期二, 06 八月 2019 16:54:11 +0800
Subject: [PATCH] 增加查找集群的代码:实现方式是集群的节点定时广播自身信息到某个广播地址端口,新节点通过该广播地址端口和密码获取该信息并解析
---
agent.go | 48 +++++++++++++++++++++++++++++++++++++++++++++++-
1 files changed, 47 insertions(+), 1 deletions(-)
diff --git a/agent.go b/agent.go
index 2fb96b1..a6a77cc 100644
--- a/agent.go
+++ b/agent.go
@@ -23,6 +23,7 @@
"fmt"
"github.com/hashicorp/memberlist"
"io/ioutil"
+ "net"
"os"
//"os"
@@ -45,6 +46,13 @@
conf *Config
readyCh chan struct{}
errorCh chan error
+}
+
+type NodeInfo struct {
+ ClusterID string `json:"clusterID"`
+ NodeID string `json:"nodeID"`
+ NodeAddress string `json:"nodeAddress"`
+ IsAlive int `json:"isAlive"`
}
// Create create serf agent with config
@@ -96,6 +104,8 @@
a.errorCh <- err
}
}
+
+ go a.BroadcastMemberlist(BroadcastInterval * time.Second)
}
// HandleEvent Handles serf.EventMemberJoin events,
@@ -193,6 +203,38 @@
//}
//a.DeregisterEventHandler(a)
//close(a.readyCh)
+}
+
+func (a *Agent) BroadcastMemberlist(delay time.Duration) {
+ //serf := a.serf
+ serf := a.Agent.Serf()
+ mb := serf.LocalMember()
+ mblist := serf.Memberlist()
+ fmt.Println("mb:", mb)
+
+ // copy local node
+ localNode := *mblist.LocalNode()
+ nodeID := a.conf.NodeName
+ nodeAddress := localNode.Address()
+ clusterID := mb.Tags[tagKeyClusterID]
+ isAlive := int(mb.Status)
+
+ message, _ := json.Marshal(NodeInfo{
+ clusterID,
+ nodeID,
+ nodeAddress,
+ isAlive,
+ })
+
+ // replace node address
+ localNode.Addr = net.ParseIP(BroadcastIP)
+ //localNode.Addr = net.IPv4(255,255,255,255)
+ localNode.Port = BroadcastPort
+ for {
+ // fmt.Printf("localNode: %v %v\n", nodeName, nodeAddress)
+ mblist.SendBestEffort(&localNode, []byte(message))
+ time.Sleep(delay)
+ }
}
// Ready Returns a channel that will be closed when serf is ready
@@ -393,7 +435,7 @@
func InitNode(clusterID string, password string, nodeID string) (*Agent, error) {
conf := DefaultConfig()
fmt.Println("clusterID:", clusterID, "password:", password, "nodeID:", nodeID)
- //conf.ClusterID = clusterID
+ conf.ClusterID = clusterID
conf.NodeName = nodeID
if password == "" {
conf.EncryptKey = DefaultEncryptKey
@@ -427,6 +469,10 @@
func (a *Agent) JoinByNodeIP(ips []string) error {
var nodes []string
+
+ if len(ips) == 0 {
+ return fmt.Errorf("No Nodes To Join!")
+ }
for _, ip := range ips {
node := fmt.Sprintf("%s:%d", ip, DefaultBindPort)
nodes = append(nodes, node)
--
Gitblit v1.8.0