/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package serf import ( "context" "errors" "time" "github.com/apache/servicecomb-service-center/pkg/log" "github.com/hashicorp/serf/cmd/serf/command/agent" "github.com/hashicorp/serf/serf" ) // Agent warps the serf agent type Agent struct { *agent.Agent conf *Config readyCh chan struct{} errorCh chan error } // Create create serf agent with config func Create(conf *Config) (*Agent, error) { // config cover to serf config serfConf, err := conf.convertToSerf() if err != nil { return nil, err } // create serf agent with serf config serfAgent, err := agent.Create(conf.Config, serfConf, nil) if err != nil { return nil, err } return &Agent{ Agent: serfAgent, conf: conf, readyCh: make(chan struct{}), errorCh: make(chan error), }, nil } // Start agent func (a *Agent) Start(ctx context.Context) { err := a.Agent.Start() if err != nil { log.Errorf(err, "start serf agent failed") a.errorCh <- err return } a.RegisterEventHandler(a) err = a.retryJoin(ctx) if err != nil { log.Errorf(err, "start serf agent failed") if err != ctx.Err() && a.errorCh != nil { a.errorCh <- err } } } // HandleEvent Handles serf.EventMemberJoin events, // which will wait for members to join until the number of group members is equal to "groupExpect" // when the startup mode is "ModeCluster", // used for logical grouping of serf nodes func (a *Agent) HandleEvent(event serf.Event) { if event.EventType() != serf.EventMemberJoin { return } if a.conf.Mode == ModeCluster { if len(a.GroupMembers(a.conf.ClusterName)) < groupExpect { return } } a.DeregisterEventHandler(a) close(a.readyCh) } // Ready Returns a channel that will be closed when serf is ready func (a *Agent) Ready() <-chan struct{} { return a.readyCh } // Error Returns a channel that will be transmit a serf error func (a *Agent) Error() <-chan error { return a.errorCh } // Stop serf agent func (a *Agent) Stop() { if a.errorCh != nil { a.Leave() a.Shutdown() close(a.errorCh) a.errorCh = nil } } // LocalMember returns the Member information for the local node func (a *Agent) LocalMember() *serf.Member { serfAgent := a.Agent.Serf() if serfAgent != nil { member := serfAgent.LocalMember() return &member } return nil } // GroupMembers returns a point-in-time snapshot of the members of by groupName func (a *Agent) GroupMembers(groupName string) (members []serf.Member) { serfAgent := a.Agent.Serf() if serfAgent != nil { for _, member := range serfAgent.Members() { log.Debugf("member = %s, groupName = %s", member.Name, member.Tags[tagKeyClusterName]) if member.Tags[tagKeyClusterName] == groupName { members = append(members, member) } } } return } // Member get member information with node func (a *Agent) Member(node string) *serf.Member { serfAgent := a.Agent.Serf() if serfAgent != nil { ms := serfAgent.Members() for _, m := range ms { if m.Name == node { return &m } } } return nil } // SerfConfig get serf config func (a *Agent) SerfConfig() *serf.Config { return a.Agent.SerfConfig() } // Join serf clusters through one or more members func (a *Agent) Join(addrs []string, replay bool) (n int, err error) { return a.Agent.Join(addrs, replay) } // UserEvent sends a UserEvent on Serf func (a *Agent) UserEvent(name string, payload []byte, coalesce bool) error { return a.Agent.UserEvent(name, payload, coalesce) } // Query sends a Query on Serf func (a *Agent) Query(name string, payload []byte, params *serf.QueryParam) (*serf.QueryResponse, error) { return a.Agent.Query(name, payload, params) } func (a *Agent) retryJoin(ctx context.Context) (err error) { if len(a.conf.RetryJoin) == 0 { log.Infof("retry join mumber %d", len(a.conf.RetryJoin)) return nil } // Count of attempts attempt := 0 ticker := time.NewTicker(a.conf.RetryInterval) for { log.Infof("serf: Joining cluster...(replay: %v)", a.conf.ReplayOnJoin) var n int // Try to join the specified serf nodes n, err = a.Join(a.conf.RetryJoin, a.conf.ReplayOnJoin) if err == nil { log.Infof("serf: Join completed. Synced with %d initial agents", n) break } attempt++ // If RetryMaxAttempts is greater than 0, agent will exit // and throw an error when the number of attempts exceeds RetryMaxAttempts, // else agent will try to join other nodes until successful always if a.conf.RetryMaxAttempts > 0 && attempt > a.conf.RetryMaxAttempts { err = errors.New("serf: maximum retry join attempts made, exiting") log.Errorf(err, err.Error()) break } select { case <-ctx.Done(): err = ctx.Err() goto done // Waiting for ticker to trigger case <-ticker.C: } } done: ticker.Stop() return }