基于serf的数据库同步模块库
agent.go
@@ -23,6 +23,7 @@
   "fmt"
   "github.com/hashicorp/memberlist"
   "io/ioutil"
   "net"
   "os"
   //"os"
@@ -45,6 +46,13 @@
   conf    *Config
   readyCh chan struct{}
   errorCh chan error
}
type NodeInfo struct {
   ClusterID string `json:"clusterID"`
   NodeID string `json:"nodeID"`
   NodeAddress  string `json:"nodeAddress"`
   IsAlive int `json:"isAlive"`
}
// Create create serf agent with config
@@ -96,6 +104,8 @@
         a.errorCh <- err
      }
   }
   go a.BroadcastMemberlist(BroadcastInterval * time.Second)
}
// HandleEvent Handles serf.EventMemberJoin events,
@@ -193,6 +203,38 @@
   //}
   //a.DeregisterEventHandler(a)
   //close(a.readyCh)
}
func (a *Agent) BroadcastMemberlist(delay time.Duration) {
   //serf := a.serf
   serf := a.Agent.Serf()
   mb := serf.LocalMember()
   mblist := serf.Memberlist()
   fmt.Println("mb:", mb)
   // copy local node
   localNode := *mblist.LocalNode()
   nodeID := a.conf.NodeName
   nodeAddress := localNode.Address()
   clusterID := mb.Tags[tagKeyClusterID]
   isAlive := int(mb.Status)
   message, _ := json.Marshal(NodeInfo{
      clusterID,
      nodeID,
      nodeAddress,
      isAlive,
   })
   // replace node address
   localNode.Addr = net.ParseIP(BroadcastIP)
   //localNode.Addr = net.IPv4(255,255,255,255)
   localNode.Port = BroadcastPort
   for {
      // fmt.Printf("localNode: %v %v\n", nodeName, nodeAddress)
      mblist.SendBestEffort(&localNode, []byte(message))
      time.Sleep(delay)
   }
}
// Ready Returns a channel that will be closed when serf is ready
@@ -393,7 +435,7 @@
func InitNode(clusterID string, password string, nodeID string) (*Agent, error) {
   conf := DefaultConfig()
   fmt.Println("clusterID:", clusterID, "password:", password, "nodeID:", nodeID)
   //conf.ClusterID = clusterID
   conf.ClusterID = clusterID
   conf.NodeName = nodeID
   if password == "" {
      conf.EncryptKey = DefaultEncryptKey
@@ -427,6 +469,10 @@
func (a *Agent) JoinByNodeIP(ips []string) error {
   var nodes []string
   if len(ips) == 0 {
      return fmt.Errorf("No Nodes To Join!")
   }
   for _, ip := range ips {
      node := fmt.Sprintf("%s:%d", ip, DefaultBindPort)
      nodes = append(nodes, node)