/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package syncdb import ( "context" "encoding/json" "errors" "fmt" "github.com/hashicorp/memberlist" "net" "strconv" "time" "basic.com/valib/serf.git/cmd/serf/command/agent" "basic.com/valib/serf.git/serf" //"github.com/apache/servicecomb-service-center/pkg/log" "basic.com/valib/logger.git" ) // Agent warps the serf agent type Agent struct { *agent.Agent ipc *agent.AgentIPC conf *Config readyCh chan struct{} errorCh chan error handleEv HandleEventFunc } //用户自定义事件处理 type HandleEventFunc func(event serf.Event) type NodeInfo struct { ClusterID string `json:"clusterID"` NodeID string `json:"nodeID"` NodeAddress string `json:"nodeAddress"` IsAlive int `json:"isAlive"` } // Create create serf agent with config func Create(conf *Config, snapshotPath string) (*Agent, error) { // config cover to serf config serfConf, err := conf.convertToSerf(snapshotPath) if err != nil { return nil, err } // create serf agent with serf config logger.Info("conf.Config.EncryptKey:", conf.EncryptKey) serfAgent, err := agent.Create(conf.Config, serfConf, logger.GetLogFile()) if err != nil { return nil, err } // Create the keyring keyring, err := memberlist.NewKeyring(nil, []byte(conf.EncryptKey)) if err != nil { logger.Error("Failed to restore keyring: %s", err) return nil, err } serfConf.MemberlistConfig.Keyring = keyring logger.Info("[INFO] agent: Restored keyring with %d keys from %s", len(conf.EncryptKey), conf.EncryptKey) return &Agent{ Agent: serfAgent, conf: conf, readyCh: make(chan struct{}), errorCh: make(chan error), }, nil } func (a *Agent) RegisterHandleEventFunc(f HandleEventFunc) { if f != nil { a.handleEv = f } } // Start agent and IPC func (a *Agent) Start(ctx context.Context) { a.RegisterEventHandler(a) err := a.Agent.Start() if err != nil { logger.Error(err, "start serf agent failed") a.errorCh <- err return } ipc := a.startIPC() a.ipc = ipc go func() { for { select { case <-ctx.Done(): a.Agent.Shutdown() a.ipc.Shutdown() return default: time.Sleep(1*time.Second) } } }() err = a.retryJoin(ctx) if err != nil { logger.Error(err, "start serf agent failed") if err != ctx.Err() && a.errorCh != nil { a.errorCh <- err } } go a.BroadcastMemberlist(BroadcastInterval * time.Second) } func (a *Agent) startIPC() *agent.AgentIPC { // Parse the bind address information bindIP, bindPort, err := a.conf.AddrParts(a.conf.BindAddr) bindAddr := &net.TCPAddr{IP: net.ParseIP(bindIP), Port: bindPort} // Setup the RPC listener rpcListener, err := net.Listen("tcp", a.conf.RPCAddr) if err != nil { logger.Error("Error starting RPC listener:", err) return nil } ipc := agent.NewAgentIPC(a.Agent, a.conf.RPCAuthKey, rpcListener, logger.GetLogFile(), nil) logger.Debug("RPC addr:", a.conf.RPCAddr) logger.Debug("Snapshot:", a.conf.SnapshotPath) logger.Debug("Profile:", a.conf.Profile) logger.Debug("Message Compression Enabled:", a.conf.EnableCompression) logger.Debug("bindAddr:", bindAddr) return ipc } // HandleEvent Handles serf.EventMemberJoin events, // which will wait for members to join until the number of group members is equal to "groupExpect" // when the startup mode is "ModeCluster", // used for logical grouping of serf nodes func (a *Agent) HandleEvent(event serf.Event) { if a.handleEv != nil { a.handleEv(event) } } func (a *Agent) BroadcastMemberlist(delay time.Duration) { //serf := a.serf serf := a.Agent.Serf() mb := serf.LocalMember() mblist := serf.Memberlist() logger.Info("mb:", mb) // copy local node localNode := *mblist.LocalNode() nodeID := a.conf.NodeName nodeAddress := localNode.Address() clusterID := mb.Tags[tagKeyClusterID] isAlive := int(mb.Status) message, _ := json.Marshal(NodeInfo{ clusterID, nodeID, nodeAddress, isAlive, }) // replace node address localNode.Addr = net.ParseIP(BroadcastIP) //localNode.Addr = net.IPv4(255,255,255,255) localNode.Port = BroadcastPort for { // logger.Info("localNode: %v %v\n", nodeName, nodeAddress) mblist.SendBestEffort(&localNode, []byte(message)) time.Sleep(delay) } } // Ready Returns a channel that will be closed when serf is ready func (a *Agent) Ready() <-chan struct{} { return a.readyCh } // Error Returns a channel that will be transmit a serf error func (a *Agent) Error() <-chan error { return a.errorCh } // Stop serf agent func (a *Agent) Stop() { if a.errorCh != nil { logger.Info("a.Shutdown()", a.Leave()) logger.Info("a.Shutdown()", a.Shutdown()) close(a.errorCh) a.errorCh = nil } } // LocalMember returns the Member information for the local node func (a *Agent) LocalMember() *serf.Member { serfAgent := a.Agent.Serf() if serfAgent != nil { member := serfAgent.LocalMember() return &member } return nil } // GroupMembers returns a point-in-time snapshot of the members of by clusterID func (a *Agent) GroupMembers(clusterID string) (members []serf.Member) { serfAgent := a.Agent.Serf() if serfAgent != nil { for _, member := range serfAgent.Members() { logger.Info("member = %s, clusterID = %s", member.Name, member.Tags[tagKeyClusterID]) if member.Tags[tagKeyClusterID] == clusterID { members = append(members, member) } } } return } // Member get member information with node func (a *Agent) Member(node string) *serf.Member { serfAgent := a.Agent.Serf() if serfAgent != nil { ms := serfAgent.Members() for _, m := range ms { if m.Name == node { return &m } } } return nil } // SerfConfig get serf config func (a *Agent) SerfConfig() *serf.Config { return a.Agent.SerfConfig() } // Join serf clusters through one or more members func (a *Agent) Join(addrs []string, replay bool) (n int, err error) { return a.Agent.Join(addrs, replay) } // UserEvent sends a UserEvent on Serf func (a *Agent) UserEvent(name string, payload []byte, coalesce bool) error { return a.Agent.UserEvent(name, payload, coalesce) } // Query sends a Query on Serf func (a *Agent) Query(name string, payload []byte, params *serf.QueryParam) (*serf.QueryResponse, error) { return a.Agent.Query(name, payload, params) } func (a *Agent) retryJoin(ctx context.Context) (err error) { if len(a.conf.RetryJoin) == 0 { logger.Error("retry join mumber %d", len(a.conf.RetryJoin)) return nil } // Count of attempts attempt := 0 ticker := time.NewTicker(a.conf.RetryInterval) for { logger.Info("serf: Joining cluster...(replay: %v)", a.conf.ReplayOnJoin) var n int // Try to join the specified serf nodes n, err = a.Join(a.conf.RetryJoin, a.conf.ReplayOnJoin) if err == nil { logger.Error("serf: Join completed. Synced with %d initial agents", n) break } attempt++ // If RetryMaxAttempts is greater than 0, agent will exit // and throw an error when the number of attempts exceeds RetryMaxAttempts, // else agent will try to join other nodes until successful always if a.conf.RetryMaxAttempts > 0 && attempt > a.conf.RetryMaxAttempts { err = errors.New("serf: maximum retry join attempts made, exiting") logger.Error(err, err.Error()) break } select { case <-ctx.Done(): err = ctx.Err() goto done // Waiting for ticker to trigger case <-ticker.C: } } done: ticker.Stop() return } //Init serf Init func Init(clusterID string, password string, nodeID string, addrs []string, snapshotPath string, c *Config) (*Agent, error) { agent, err := InitNode(clusterID, password, nodeID, snapshotPath, c) if err != nil { logger.Error("InitNode failed, error: %s", err) return agent, err } err = agent.JoinByNodeAddrs(addrs) if err != nil { logger.Error("JoinByNodeIP failed, error: %s", err) return agent, err } return agent, err } //InitNode web后台收到创建集群的请求, func InitNode(clusterID string, password string, nodeID string, snapshotPath string, c *Config) (*Agent, error) { conf := DefaultConfig() conf.MergeConf(c) logger.Info("clusterID:", clusterID, "password:", password, "nodeID:", nodeID) conf.ClusterID = clusterID conf.NodeName = nodeID if password == "" { conf.EncryptKey = DefaultEncryptKey } else { if len(password) >= 16 { password = password[:16] } else { password = fmt.Sprintf("%016s", password)[:16] //return nil, fmt.Errorf("error password") } conf.EncryptKey = password } agent, err := Create(conf, snapshotPath) if err != nil { logger.Error("create agent failed, error: %s", err) return agent, err } agent.Start(conf.Ctx) //<- agent.readyCh time.Sleep(time.Second) logger.Info("Stats:", agent.Agent.Serf().Stats()) logger.Info("EncryptionEnabled:", agent.Agent.Serf().EncryptionEnabled()) logger.Info("create agent sucess!!") return agent, nil } func (a *Agent) JoinByNodeAddrs(addrs []string) error { var nodes []string if len(addrs) == 0 { return fmt.Errorf("No Nodes To Join!") } for _, addr := range addrs { nodes = append(nodes, addr) } a.Agent.Join(nodes, true) return nil } //func (a *Agent) JoinByNodeIP(ips []string) error { // var nodes []string // // if len(ips) == 0 { // return fmt.Errorf("No Nodes To Join!") // } // for _, ip := range ips { // node := fmt.Sprintf("%s:%d", ip, DefaultBindPort) // nodes = append(nodes, node) // } // // n, err := a.Agent.Join(nodes, true) // if err != nil || n == 0 { // return fmt.Errorf("Error Encrypt Key!") // } // // return err //} func (a *Agent) GetNodes() (nodes []NodeInfo) { var node NodeInfo logger.Info("a.conf.ClusterID:", a.conf.ClusterID) mbs := a.GroupMembers(a.conf.ClusterID) for _, mb := range mbs { node.NodeID = mb.Name node.NodeAddress = mb.Addr.String() + ":" + strconv.Itoa(int(mb.Port)) node.IsAlive = int(mb.Status) node.ClusterID = mb.Tags[tagKeyClusterID] nodes = append(nodes, node) } return nodes }