基于serf的数据库同步模块库
增加查找集群的代码:实现方式是集群的节点定时广播自身信息到某个广播地址端口,新节点通过该广播地址端口和密码获取该信息并解析
2个文件已修改
2个文件已添加
181 ■■■■■ 已修改文件
API.txt 19 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
agent.go 48 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
config.go 3 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
searcher.go 111 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
API.txt
New file
@@ -0,0 +1,19 @@
1. func Init(clusterID string, password string, nodeID string, ips []string) (*Agent, error)
初始化:查询数据库,如果数据库有数据,将得到集群id,节点id,密码,其他节点ip,然后调用初始化函数,即可自动通过其他节点加入集群。若是空,则不初始化,等待页面填写参数并启动。注意事项:目前集群id并没有真正意义。
2. func (a *Agent) JoinByNodeIP(ips []string) error
加入集群:先初始化节点, 然后通过节点ip加入集群和密码。注意事项:集群id在初始化的时候就需要填写,但是还未加入集群,无法获取集群id。所以目前集群id未使用。
3. func (a *Agent) Stop()
退出集群:退出集群后,外部需要清空同步库的所有数据。
4. func (a *Agent) GetNodes() (nodes []Node)
获取集群节点列表:通过该接口获取节点列表,然后维护到数据库和页面展示。
5. func (a *Agent)GetDbFromCluster(dbPathWrite string)
获取数据库文件:新节点加入集群后,需要调用该接口去集群中任意一个结点获取一个数据库文件。数据库文件包含本地库和同步库的表结构,但是只有同步库有数据,本地库是空的。
6. func (a *Agent)SyncSql(sqlOp string)
同步数据到集群:所有操作同步库的SQL操作都需要同步到集群,集群其他节点收到后,调用数据库接口写入数据库。
7. 查找集群信息:未加入集群前,查询集群信息。
agent.go
@@ -23,6 +23,7 @@
    "fmt"
    "github.com/hashicorp/memberlist"
    "io/ioutil"
    "net"
    "os"
    //"os"
@@ -45,6 +46,13 @@
    conf    *Config
    readyCh chan struct{}
    errorCh chan error
}
type NodeInfo struct {
    ClusterID string `json:"clusterID"`
    NodeID string `json:"nodeID"`
    NodeAddress  string `json:"nodeAddress"`
    IsAlive int `json:"isAlive"`
}
// Create create serf agent with config
@@ -96,6 +104,8 @@
            a.errorCh <- err
        }
    }
    go a.BroadcastMemberlist(BroadcastInterval * time.Second)
}
// HandleEvent Handles serf.EventMemberJoin events,
@@ -193,6 +203,38 @@
    //}
    //a.DeregisterEventHandler(a)
    //close(a.readyCh)
}
func (a *Agent) BroadcastMemberlist(delay time.Duration) {
    //serf := a.serf
    serf := a.Agent.Serf()
    mb := serf.LocalMember()
    mblist := serf.Memberlist()
    fmt.Println("mb:", mb)
    // copy local node
    localNode := *mblist.LocalNode()
    nodeID := a.conf.NodeName
    nodeAddress := localNode.Address()
    clusterID := mb.Tags[tagKeyClusterID]
    isAlive := int(mb.Status)
    message, _ := json.Marshal(NodeInfo{
        clusterID,
        nodeID,
        nodeAddress,
        isAlive,
    })
    // replace node address
    localNode.Addr = net.ParseIP(BroadcastIP)
    //localNode.Addr = net.IPv4(255,255,255,255)
    localNode.Port = BroadcastPort
    for {
        // fmt.Printf("localNode: %v %v\n", nodeName, nodeAddress)
        mblist.SendBestEffort(&localNode, []byte(message))
        time.Sleep(delay)
    }
}
// Ready Returns a channel that will be closed when serf is ready
@@ -393,7 +435,7 @@
func InitNode(clusterID string, password string, nodeID string) (*Agent, error) {
    conf := DefaultConfig()
    fmt.Println("clusterID:", clusterID, "password:", password, "nodeID:", nodeID)
    //conf.ClusterID = clusterID
    conf.ClusterID = clusterID
    conf.NodeName = nodeID
    if password == "" {
        conf.EncryptKey = DefaultEncryptKey
@@ -427,6 +469,10 @@
func (a *Agent) JoinByNodeIP(ips []string) error {
    var nodes []string
    if len(ips) == 0 {
        return fmt.Errorf("No Nodes To Join!")
    }
    for _, ip := range ips {
        node := fmt.Sprintf("%s:%d", ip, DefaultBindPort)
        nodes = append(nodes, node)
config.go
@@ -39,6 +39,9 @@
    tagKeyClusterID  = "syncer-cluster-name"
    TagKeyClusterPort  = "syncer-cluster-port"
    TagKeyRPCPort      = "syncer-rpc-port"
    BroadcastIP = "255.255.255.255"
    BroadcastPort = 30193
    BroadcastInterval = 5
)
// DefaultConfig default config
searcher.go
New file
@@ -0,0 +1,111 @@
package syncdb
import (
    "encoding/json"
    "fmt"
    "time"
    "github.com/hashicorp/memberlist"
)
var (
    members [][]byte
    delay   time.Duration
)
// delegate is the interface that clients must implement if they want to hook into the gossip layer of Memberlist.
type delegate struct{}
// NodeMeta is the delegate method, must implement.
func (d *delegate) NodeMeta(limit int) []byte {
    return []byte{}
}
// LocalState is the delegate method, must implement.
func (d *delegate) LocalState(join bool) []byte {
    return []byte{}
}
// MergeRemoteState is the delegate method, must implement.
func (d *delegate) MergeRemoteState(buf []byte, join bool) {
}
// GetBroadcasts is the delegate method, must implement.
func (d *delegate) GetBroadcasts(overhead, limit int) [][]byte {
    return [][]byte{}
}
// eventDelegate is the interface that clients must implement if they want to hook into the gossip layer of Memberlist.
type eventDelegate struct{}
// NotifyJoin is the eventDelegate method, must implement.
func (ed *eventDelegate) NotifyJoin(node *memberlist.Node) {
}
// NotifyLeave is the eventDelegate method, must implement.
func (ed *eventDelegate) NotifyLeave(node *memberlist.Node) {
}
// NotifyUpdate is the eventDelegate method, must implement.
func (ed *eventDelegate) NotifyUpdate(node *memberlist.Node) {
}
// NotifyMsg is called when a user-data message is received
func (d *delegate) NotifyMsg(b []byte) {
    // logMsg(b)
    members = append(members, b)
}
func logMsg(b []byte) {
    type nodeInfo struct {
        NodeName string `json:"name"`
        Address  string `json:"address"`
    }
    node := nodeInfo{}
    if err := json.Unmarshal(b, &node); err != nil {
        fmt.Println("Umarshal failed:", err)
        return
    }
    fmt.Println(node)
}
func CreateSearchNode(key string) (*memberlist.Memberlist, error) {
    conf := memberlist.DefaultLocalConfig()
    conf.Events = &eventDelegate{}
    conf.Delegate = &delegate{}
    conf.BindAddr = BroadcastIP
    conf.BindPort = BroadcastPort
    conf.Name = "Cluster-Searcher"
    keyring, err := memberlist.NewKeyring(nil, []byte(key))
    if err != nil {
        fmt.Printf("Failed to restore keyring: %s", err)
        return nil, err
    }
    conf.Keyring = keyring
    return memberlist.Create(conf)
}
func CreateSearchNodeWhitClose(key string, delay time.Duration) [][]byte {
    m, err := CreateSearchNode(key)
    if err == nil {
        // fmt.Printf("Local member %s:%d\n", m.LocalNode().Addr, m.LocalNode().Port)
        time.Sleep(delay)
        m.Shutdown()
    }
    return members
}
func CloseSearchNode(m *memberlist.Memberlist) error{
    return m.Shutdown()
}
func GetSearchNodes()[][]byte {
    return members
}