基于serf的数据库同步模块库
liuxiaolong
2022-06-28 15ba78e620fd0c1e7f505cebd4032979676c7eb2
agent.go
@@ -36,6 +36,7 @@
// Agent warps the serf agent
type Agent struct {
   *agent.Agent
   ipc *agent.AgentIPC
   conf        *Config
   readyCh     chan struct{}
   errorCh     chan error
@@ -91,15 +92,30 @@
   }
}
// Start agent
// Start agent and IPC
func (a *Agent) Start(ctx context.Context) {
   a.RegisterEventHandler(a)
   err := a.Agent.Start()
   if err != nil {
      logger.Error(err, "start serf agent failed")
      a.errorCh <- err
      return
   }
   a.RegisterEventHandler(a)
   ipc := a.startIPC()
   a.ipc = ipc
   go func() {
      for {
         select {
         case <-ctx.Done():
            a.Agent.Shutdown()
            a.ipc.Shutdown()
            return
         default:
            time.Sleep(1*time.Second)
         }
      }
   }()
   err = a.retryJoin(ctx)
   if err != nil {
@@ -112,6 +128,25 @@
   go a.BroadcastMemberlist(BroadcastInterval * time.Second)
}
func (a *Agent) startIPC() *agent.AgentIPC {
   // Parse the bind address information
   bindIP, bindPort, err := a.conf.AddrParts(a.conf.BindAddr)
   bindAddr := &net.TCPAddr{IP: net.ParseIP(bindIP), Port: bindPort}
   // Setup the RPC listener
   rpcListener, err := net.Listen("tcp", a.conf.RPCAddr)
   if err != nil {
      logger.Error("Error starting RPC listener:", err)
      return nil
   }
   ipc := agent.NewAgentIPC(a.Agent, a.conf.RPCAuthKey, rpcListener, logger.GetLogFile(), nil)
   logger.Debug("RPC addr:", a.conf.RPCAddr)
   logger.Debug("Snapshot:", a.conf.SnapshotPath)
   logger.Debug("Profile:", a.conf.Profile)
   logger.Debug("Message Compression Enabled:", a.conf.EnableCompression)
   logger.Debug("bindAddr:", bindAddr)
   return ipc
}
// HandleEvent Handles serf.EventMemberJoin events,
// which will wait for members to join until the number of group members is equal to "groupExpect"
// when the startup mode is "ModeCluster",
@@ -276,8 +311,8 @@
}
//Init serf Init
func Init(clusterID string, password string, nodeID string, addrs []string, snapshotPath string) (*Agent, error) {
   agent, err := InitNode(clusterID, password, nodeID, snapshotPath)
func Init(clusterID string, password string, nodeID string, addrs []string, snapshotPath string, c *Config) (*Agent, error) {
   agent, err := InitNode(clusterID, password, nodeID, snapshotPath, c)
   if err != nil {
      logger.Error("InitNode failed, error: %s", err)
      return agent, err
@@ -293,8 +328,10 @@
}
//InitNode web后台收到创建集群的请求,
func InitNode(clusterID string, password string, nodeID string, snapshotPath string) (*Agent, error) {
func InitNode(clusterID string, password string, nodeID string, snapshotPath string, c *Config) (*Agent, error) {
   conf := DefaultConfig()
   conf.MergeConf(c)
   logger.Info("clusterID:", clusterID, "password:", password, "nodeID:", nodeID)
   conf.ClusterID = clusterID
   conf.NodeName = nodeID
@@ -315,11 +352,9 @@
      return agent, err
   }
   agent.Start(context.Background())
   agent.Start(conf.Ctx)
   //<- agent.readyCh
   go func() {
      agent.ShutdownCh()
   }()
   time.Sleep(time.Second)
   logger.Info("Stats:", agent.Agent.Serf().Stats())
   logger.Info("EncryptionEnabled:", agent.Agent.Serf().EncryptionEnabled())