基于serf的数据库同步模块库
zhangzengfei
2023-05-15 0a86db67f14185ab2e08475fcafb64e43f0022dd
agent.go
@@ -36,6 +36,7 @@
// Agent warps the serf agent
type Agent struct {
   *agent.Agent
   ipc *agent.AgentIPC
   conf        *Config
   readyCh     chan struct{}
   errorCh     chan error
@@ -50,6 +51,7 @@
   NodeID            string       `json:"nodeID"`
   NodeAddress       string       `json:"nodeAddress"`
   IsAlive           int          `json:"isAlive"`
   Role               string       `json:"role"`
}
// Create create serf agent with config
@@ -91,15 +93,30 @@
   }
}
// Start agent
// Start agent and IPC
func (a *Agent) Start(ctx context.Context) {
   a.RegisterEventHandler(a)
   err := a.Agent.Start()
   if err != nil {
      logger.Error(err, "start serf agent failed")
      a.errorCh <- err
      return
   }
   a.RegisterEventHandler(a)
   ipc := a.startIPC()
   a.ipc = ipc
   go func() {
      for {
         select {
         case <-ctx.Done():
            a.Agent.Shutdown()
            a.ipc.Shutdown()
            return
         default:
            time.Sleep(1*time.Second)
         }
      }
   }()
   err = a.retryJoin(ctx)
   if err != nil {
@@ -112,6 +129,25 @@
   go a.BroadcastMemberlist(BroadcastInterval * time.Second)
}
func (a *Agent) startIPC() *agent.AgentIPC {
   // Parse the bind address information
   bindIP, bindPort, err := a.conf.AddrParts(a.conf.BindAddr)
   bindAddr := &net.TCPAddr{IP: net.ParseIP(bindIP), Port: bindPort}
   // Setup the RPC listener
   rpcListener, err := net.Listen("tcp", a.conf.RPCAddr)
   if err != nil {
      logger.Error("Error starting RPC listener:", err)
      return nil
   }
   ipc := agent.NewAgentIPC(a.Agent, a.conf.RPCAuthKey, rpcListener, logger.GetLogFile(), nil)
   logger.Debug("RPC addr:", a.conf.RPCAddr)
   logger.Debug("Snapshot:", a.conf.SnapshotPath)
   logger.Debug("Profile:", a.conf.Profile)
   logger.Debug("Message Compression Enabled:", a.conf.EnableCompression)
   logger.Debug("bindAddr:", bindAddr)
   return ipc
}
// HandleEvent Handles serf.EventMemberJoin events,
// which will wait for members to join until the number of group members is equal to "groupExpect"
// when the startup mode is "ModeCluster",
@@ -276,8 +312,8 @@
}
//Init serf Init
func Init(clusterID string, password string, nodeID string, addrs []string, snapshotPath string) (*Agent, error) {
   agent, err := InitNode(clusterID, password, nodeID, snapshotPath)
func Init(clusterID string, password string, nodeID string, addrs []string, snapshotPath string, c *Config) (*Agent, error) {
   agent, err := InitNode(clusterID, password, nodeID, snapshotPath, c)
   if err != nil {
      logger.Error("InitNode failed, error: %s", err)
      return agent, err
@@ -293,8 +329,10 @@
}
//InitNode web后台收到创建集群的请求,
func InitNode(clusterID string, password string, nodeID string, snapshotPath string) (*Agent, error) {
func InitNode(clusterID string, password string, nodeID string, snapshotPath string, c *Config) (*Agent, error) {
   conf := DefaultConfig()
   conf.MergeConf(c)
   logger.Info("clusterID:", clusterID, "password:", password, "nodeID:", nodeID)
   conf.ClusterID = clusterID
   conf.NodeName = nodeID
@@ -315,11 +353,9 @@
      return agent, err
   }
   agent.Start(context.Background())
   agent.Start(conf.Ctx)
   //<- agent.readyCh
   go func() {
      agent.ShutdownCh()
   }()
   time.Sleep(time.Second)
   logger.Info("Stats:", agent.Agent.Serf().Stats())
   logger.Info("EncryptionEnabled:", agent.Agent.Serf().EncryptionEnabled())
@@ -371,6 +407,7 @@
      node.NodeAddress = mb.Addr.String() + ":" + strconv.Itoa(int(mb.Port))
      node.IsAlive = int(mb.Status)
      node.ClusterID = mb.Tags[tagKeyClusterID]
      node.Role = mb.Tags["role"]
      nodes = append(nodes, node)
   }