增加查找集群的代码:实现方式是集群的节点定时广播自身信息到某个广播地址端口,新节点通过该广播地址端口和密码获取该信息并解析
| New file |
| | |
| | | 1. func Init(clusterID string, password string, nodeID string, ips []string) (*Agent, error) |
| | | 初始化:查询数据库,如果数据库有数据,将得到集群id,节点id,密码,其他节点ip,然后调用初始化函数,即可自动通过其他节点加入集群。若是空,则不初始化,等待页面填写参数并启动。注意事项:目前集群id并没有真正意义。 |
| | | |
| | | 2. func (a *Agent) JoinByNodeIP(ips []string) error |
| | | 加入集群:先初始化节点, 然后通过节点ip加入集群和密码。注意事项:集群id在初始化的时候就需要填写,但是还未加入集群,无法获取集群id。所以目前集群id未使用。 |
| | | |
| | | 3. func (a *Agent) Stop() |
| | | 退出集群:退出集群后,外部需要清空同步库的所有数据。 |
| | | |
| | | 4. func (a *Agent) GetNodes() (nodes []Node) |
| | | 获取集群节点列表:通过该接口获取节点列表,然后维护到数据库和页面展示。 |
| | | |
| | | 5. func (a *Agent)GetDbFromCluster(dbPathWrite string) |
| | | 获取数据库文件:新节点加入集群后,需要调用该接口去集群中任意一个结点获取一个数据库文件。数据库文件包含本地库和同步库的表结构,但是只有同步库有数据,本地库是空的。 |
| | | |
| | | 6. func (a *Agent)SyncSql(sqlOp string) |
| | | 同步数据到集群:所有操作同步库的SQL操作都需要同步到集群,集群其他节点收到后,调用数据库接口写入数据库。 |
| | | |
| | | 7. 查找集群信息:未加入集群前,查询集群信息。 |
| | |
| | | "fmt" |
| | | "github.com/hashicorp/memberlist" |
| | | "io/ioutil" |
| | | "net" |
| | | "os" |
| | | |
| | | //"os" |
| | |
| | | conf *Config |
| | | readyCh chan struct{} |
| | | errorCh chan error |
| | | } |
| | | |
| | | type NodeInfo struct { |
| | | ClusterID string `json:"clusterID"` |
| | | NodeID string `json:"nodeID"` |
| | | NodeAddress string `json:"nodeAddress"` |
| | | IsAlive int `json:"isAlive"` |
| | | } |
| | | |
| | | // Create create serf agent with config |
| | |
| | | a.errorCh <- err |
| | | } |
| | | } |
| | | |
| | | go a.BroadcastMemberlist(BroadcastInterval * time.Second) |
| | | } |
| | | |
| | | // HandleEvent Handles serf.EventMemberJoin events, |
| | |
| | | //} |
| | | //a.DeregisterEventHandler(a) |
| | | //close(a.readyCh) |
| | | } |
| | | |
| | | func (a *Agent) BroadcastMemberlist(delay time.Duration) { |
| | | //serf := a.serf |
| | | serf := a.Agent.Serf() |
| | | mb := serf.LocalMember() |
| | | mblist := serf.Memberlist() |
| | | fmt.Println("mb:", mb) |
| | | |
| | | // copy local node |
| | | localNode := *mblist.LocalNode() |
| | | nodeID := a.conf.NodeName |
| | | nodeAddress := localNode.Address() |
| | | clusterID := mb.Tags[tagKeyClusterID] |
| | | isAlive := int(mb.Status) |
| | | |
| | | message, _ := json.Marshal(NodeInfo{ |
| | | clusterID, |
| | | nodeID, |
| | | nodeAddress, |
| | | isAlive, |
| | | }) |
| | | |
| | | // replace node address |
| | | localNode.Addr = net.ParseIP(BroadcastIP) |
| | | //localNode.Addr = net.IPv4(255,255,255,255) |
| | | localNode.Port = BroadcastPort |
| | | for { |
| | | // fmt.Printf("localNode: %v %v\n", nodeName, nodeAddress) |
| | | mblist.SendBestEffort(&localNode, []byte(message)) |
| | | time.Sleep(delay) |
| | | } |
| | | } |
| | | |
| | | // Ready Returns a channel that will be closed when serf is ready |
| | |
| | | func InitNode(clusterID string, password string, nodeID string) (*Agent, error) { |
| | | conf := DefaultConfig() |
| | | fmt.Println("clusterID:", clusterID, "password:", password, "nodeID:", nodeID) |
| | | //conf.ClusterID = clusterID |
| | | conf.ClusterID = clusterID |
| | | conf.NodeName = nodeID |
| | | if password == "" { |
| | | conf.EncryptKey = DefaultEncryptKey |
| | |
| | | |
| | | func (a *Agent) JoinByNodeIP(ips []string) error { |
| | | var nodes []string |
| | | |
| | | if len(ips) == 0 { |
| | | return fmt.Errorf("No Nodes To Join!") |
| | | } |
| | | for _, ip := range ips { |
| | | node := fmt.Sprintf("%s:%d", ip, DefaultBindPort) |
| | | nodes = append(nodes, node) |
| | |
| | | tagKeyClusterID = "syncer-cluster-name" |
| | | TagKeyClusterPort = "syncer-cluster-port" |
| | | TagKeyRPCPort = "syncer-rpc-port" |
| | | BroadcastIP = "255.255.255.255" |
| | | BroadcastPort = 30193 |
| | | BroadcastInterval = 5 |
| | | ) |
| | | |
| | | // DefaultConfig default config |
| New file |
| | |
| | | package syncdb |
| | | |
| | | import ( |
| | | "encoding/json" |
| | | "fmt" |
| | | "time" |
| | | |
| | | "github.com/hashicorp/memberlist" |
| | | ) |
| | | |
| | | var ( |
| | | members [][]byte |
| | | delay time.Duration |
| | | ) |
| | | |
| | | // delegate is the interface that clients must implement if they want to hook into the gossip layer of Memberlist. |
| | | type delegate struct{} |
| | | |
| | | // NodeMeta is the delegate method, must implement. |
| | | func (d *delegate) NodeMeta(limit int) []byte { |
| | | return []byte{} |
| | | } |
| | | |
| | | // LocalState is the delegate method, must implement. |
| | | func (d *delegate) LocalState(join bool) []byte { |
| | | return []byte{} |
| | | } |
| | | |
| | | // MergeRemoteState is the delegate method, must implement. |
| | | func (d *delegate) MergeRemoteState(buf []byte, join bool) { |
| | | } |
| | | |
| | | // GetBroadcasts is the delegate method, must implement. |
| | | func (d *delegate) GetBroadcasts(overhead, limit int) [][]byte { |
| | | return [][]byte{} |
| | | } |
| | | |
| | | // eventDelegate is the interface that clients must implement if they want to hook into the gossip layer of Memberlist. |
| | | type eventDelegate struct{} |
| | | |
| | | // NotifyJoin is the eventDelegate method, must implement. |
| | | func (ed *eventDelegate) NotifyJoin(node *memberlist.Node) { |
| | | } |
| | | |
| | | // NotifyLeave is the eventDelegate method, must implement. |
| | | func (ed *eventDelegate) NotifyLeave(node *memberlist.Node) { |
| | | } |
| | | |
| | | // NotifyUpdate is the eventDelegate method, must implement. |
| | | func (ed *eventDelegate) NotifyUpdate(node *memberlist.Node) { |
| | | } |
| | | |
| | | // NotifyMsg is called when a user-data message is received |
| | | func (d *delegate) NotifyMsg(b []byte) { |
| | | // logMsg(b) |
| | | members = append(members, b) |
| | | } |
| | | |
| | | func logMsg(b []byte) { |
| | | type nodeInfo struct { |
| | | NodeName string `json:"name"` |
| | | Address string `json:"address"` |
| | | } |
| | | |
| | | node := nodeInfo{} |
| | | if err := json.Unmarshal(b, &node); err != nil { |
| | | |
| | | fmt.Println("Umarshal failed:", err) |
| | | return |
| | | } |
| | | |
| | | fmt.Println(node) |
| | | } |
| | | |
| | | func CreateSearchNode(key string) (*memberlist.Memberlist, error) { |
| | | conf := memberlist.DefaultLocalConfig() |
| | | conf.Events = &eventDelegate{} |
| | | conf.Delegate = &delegate{} |
| | | conf.BindAddr = BroadcastIP |
| | | conf.BindPort = BroadcastPort |
| | | conf.Name = "Cluster-Searcher" |
| | | |
| | | keyring, err := memberlist.NewKeyring(nil, []byte(key)) |
| | | if err != nil { |
| | | fmt.Printf("Failed to restore keyring: %s", err) |
| | | return nil, err |
| | | } |
| | | conf.Keyring = keyring |
| | | |
| | | return memberlist.Create(conf) |
| | | } |
| | | |
| | | func CreateSearchNodeWhitClose(key string, delay time.Duration) [][]byte { |
| | | m, err := CreateSearchNode(key) |
| | | if err == nil { |
| | | // fmt.Printf("Local member %s:%d\n", m.LocalNode().Addr, m.LocalNode().Port) |
| | | time.Sleep(delay) |
| | | |
| | | m.Shutdown() |
| | | } |
| | | return members |
| | | } |
| | | |
| | | func CloseSearchNode(m *memberlist.Memberlist) error{ |
| | | return m.Shutdown() |
| | | } |
| | | |
| | | func GetSearchNodes()[][]byte { |
| | | return members |
| | | } |
| | | |