| | |
| | | // Agent warps the serf agent |
| | | type Agent struct { |
| | | *agent.Agent |
| | | ipc *agent.AgentIPC |
| | | conf *Config |
| | | readyCh chan struct{} |
| | | errorCh chan error |
| | |
| | | } |
| | | } |
| | | |
| | | // Start agent |
| | | // Start agent and IPC |
| | | func (a *Agent) Start(ctx context.Context) { |
| | | a.RegisterEventHandler(a) |
| | | err := a.Agent.Start() |
| | | if err != nil { |
| | | logger.Error(err, "start serf agent failed") |
| | | a.errorCh <- err |
| | | return |
| | | } |
| | | a.RegisterEventHandler(a) |
| | | |
| | | ipc := a.startIPC() |
| | | a.ipc = ipc |
| | | go func() { |
| | | for { |
| | | select { |
| | | case <-ctx.Done(): |
| | | a.Agent.Shutdown() |
| | | a.ipc.Shutdown() |
| | | return |
| | | default: |
| | | time.Sleep(1*time.Second) |
| | | } |
| | | } |
| | | }() |
| | | |
| | | err = a.retryJoin(ctx) |
| | | if err != nil { |
| | |
| | | go a.BroadcastMemberlist(BroadcastInterval * time.Second) |
| | | } |
| | | |
| | | func (a *Agent) startIPC() *agent.AgentIPC { |
| | | // Parse the bind address information |
| | | bindIP, bindPort, err := a.conf.AddrParts(a.conf.BindAddr) |
| | | bindAddr := &net.TCPAddr{IP: net.ParseIP(bindIP), Port: bindPort} |
| | | |
| | | // Setup the RPC listener |
| | | rpcListener, err := net.Listen("tcp", a.conf.RPCAddr) |
| | | if err != nil { |
| | | logger.Error("Error starting RPC listener:", err) |
| | | return nil |
| | | } |
| | | ipc := agent.NewAgentIPC(a.Agent, a.conf.RPCAuthKey, rpcListener, logger.GetLogFile(), nil) |
| | | logger.Debug("RPC addr:", a.conf.RPCAddr) |
| | | logger.Debug("Snapshot:", a.conf.SnapshotPath) |
| | | logger.Debug("Profile:", a.conf.Profile) |
| | | logger.Debug("Message Compression Enabled:", a.conf.EnableCompression) |
| | | logger.Debug("bindAddr:", bindAddr) |
| | | return ipc |
| | | } |
| | | // HandleEvent Handles serf.EventMemberJoin events, |
| | | // which will wait for members to join until the number of group members is equal to "groupExpect" |
| | | // when the startup mode is "ModeCluster", |
| | |
| | | } |
| | | |
| | | //Init serf Init |
| | | func Init(clusterID string, password string, nodeID string, addrs []string, snapshotPath string, hef HandleEventFunc) (*Agent, error) { |
| | | agent, err := InitNode(clusterID, password, nodeID, snapshotPath, hef) |
| | | func Init(clusterID string, password string, nodeID string, addrs []string, snapshotPath string, c *Config) (*Agent, error) { |
| | | agent, err := InitNode(clusterID, password, nodeID, snapshotPath, c) |
| | | if err != nil { |
| | | logger.Error("InitNode failed, error: %s", err) |
| | | return agent, err |
| | |
| | | } |
| | | |
| | | //InitNode web后台收到创建集群的请求, |
| | | func InitNode(clusterID string, password string, nodeID string, snapshotPath string, hef HandleEventFunc) (*Agent, error) { |
| | | func InitNode(clusterID string, password string, nodeID string, snapshotPath string, c *Config) (*Agent, error) { |
| | | conf := DefaultConfig() |
| | | conf.MergeConf(c) |
| | | |
| | | logger.Info("clusterID:", clusterID, "password:", password, "nodeID:", nodeID) |
| | | conf.ClusterID = clusterID |
| | | conf.NodeName = nodeID |
| | |
| | | return agent, err |
| | | } |
| | | |
| | | agent.Start(context.Background()) |
| | | agent.Start(conf.Ctx) |
| | | //<- agent.readyCh |
| | | go func() { |
| | | agent.ShutdownCh() |
| | | }() |
| | | |
| | | time.Sleep(time.Second) |
| | | logger.Info("Stats:", agent.Agent.Serf().Stats()) |
| | | logger.Info("EncryptionEnabled:", agent.Agent.Serf().EncryptionEnabled()) |