/*
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
* contributor license agreements. See the NOTICE file distributed with
|
* this work for additional information regarding copyright ownership.
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
* (the "License"); you may not use this file except in compliance with
|
* the License. You may obtain a copy of the License at
|
*
|
* http://www.apache.org/licenses/LICENSE-2.0
|
*
|
* Unless required by applicable law or agreed to in writing, software
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
* See the License for the specific language governing permissions and
|
* limitations under the License.
|
*/
|
package serf
|
|
import (
|
"context"
|
"errors"
|
"time"
|
|
"github.com/apache/servicecomb-service-center/pkg/log"
|
"github.com/hashicorp/serf/cmd/serf/command/agent"
|
"github.com/hashicorp/serf/serf"
|
)
|
|
// Agent warps the serf agent
|
type Agent struct {
|
*agent.Agent
|
conf *Config
|
readyCh chan struct{}
|
errorCh chan error
|
}
|
|
// Create create serf agent with config
|
func Create(conf *Config) (*Agent, error) {
|
// config cover to serf config
|
serfConf, err := conf.convertToSerf()
|
if err != nil {
|
return nil, err
|
}
|
|
// create serf agent with serf config
|
serfAgent, err := agent.Create(conf.Config, serfConf, nil)
|
if err != nil {
|
return nil, err
|
}
|
return &Agent{
|
Agent: serfAgent,
|
conf: conf,
|
readyCh: make(chan struct{}),
|
errorCh: make(chan error),
|
}, nil
|
}
|
|
// Start agent
|
func (a *Agent) Start(ctx context.Context) {
|
err := a.Agent.Start()
|
if err != nil {
|
log.Errorf(err, "start serf agent failed")
|
a.errorCh <- err
|
return
|
}
|
a.RegisterEventHandler(a)
|
|
err = a.retryJoin(ctx)
|
if err != nil {
|
log.Errorf(err, "start serf agent failed")
|
if err != ctx.Err() && a.errorCh != nil {
|
a.errorCh <- err
|
}
|
}
|
}
|
|
// HandleEvent Handles serf.EventMemberJoin events,
|
// which will wait for members to join until the number of group members is equal to "groupExpect"
|
// when the startup mode is "ModeCluster",
|
// used for logical grouping of serf nodes
|
func (a *Agent) HandleEvent(event serf.Event) {
|
if event.EventType() != serf.EventMemberJoin {
|
return
|
}
|
|
if a.conf.Mode == ModeCluster {
|
if len(a.GroupMembers(a.conf.ClusterName)) < groupExpect {
|
return
|
}
|
}
|
a.DeregisterEventHandler(a)
|
close(a.readyCh)
|
}
|
|
// Ready Returns a channel that will be closed when serf is ready
|
func (a *Agent) Ready() <-chan struct{} {
|
return a.readyCh
|
}
|
|
// Error Returns a channel that will be transmit a serf error
|
func (a *Agent) Error() <-chan error {
|
return a.errorCh
|
}
|
|
// Stop serf agent
|
func (a *Agent) Stop() {
|
if a.errorCh != nil {
|
a.Leave()
|
a.Shutdown()
|
close(a.errorCh)
|
a.errorCh = nil
|
}
|
}
|
|
// LocalMember returns the Member information for the local node
|
func (a *Agent) LocalMember() *serf.Member {
|
serfAgent := a.Agent.Serf()
|
if serfAgent != nil {
|
member := serfAgent.LocalMember()
|
return &member
|
}
|
return nil
|
}
|
|
// GroupMembers returns a point-in-time snapshot of the members of by groupName
|
func (a *Agent) GroupMembers(groupName string) (members []serf.Member) {
|
serfAgent := a.Agent.Serf()
|
if serfAgent != nil {
|
for _, member := range serfAgent.Members() {
|
log.Debugf("member = %s, groupName = %s", member.Name, member.Tags[tagKeyClusterName])
|
if member.Tags[tagKeyClusterName] == groupName {
|
members = append(members, member)
|
}
|
}
|
}
|
return
|
}
|
|
// Member get member information with node
|
func (a *Agent) Member(node string) *serf.Member {
|
serfAgent := a.Agent.Serf()
|
if serfAgent != nil {
|
ms := serfAgent.Members()
|
for _, m := range ms {
|
if m.Name == node {
|
return &m
|
}
|
}
|
}
|
return nil
|
}
|
|
// SerfConfig get serf config
|
func (a *Agent) SerfConfig() *serf.Config {
|
return a.Agent.SerfConfig()
|
}
|
|
// Join serf clusters through one or more members
|
func (a *Agent) Join(addrs []string, replay bool) (n int, err error) {
|
return a.Agent.Join(addrs, replay)
|
}
|
|
// UserEvent sends a UserEvent on Serf
|
func (a *Agent) UserEvent(name string, payload []byte, coalesce bool) error {
|
return a.Agent.UserEvent(name, payload, coalesce)
|
}
|
|
// Query sends a Query on Serf
|
func (a *Agent) Query(name string, payload []byte, params *serf.QueryParam) (*serf.QueryResponse, error) {
|
return a.Agent.Query(name, payload, params)
|
}
|
|
func (a *Agent) retryJoin(ctx context.Context) (err error) {
|
if len(a.conf.RetryJoin) == 0 {
|
log.Infof("retry join mumber %d", len(a.conf.RetryJoin))
|
return nil
|
}
|
|
// Count of attempts
|
attempt := 0
|
ticker := time.NewTicker(a.conf.RetryInterval)
|
for {
|
log.Infof("serf: Joining cluster...(replay: %v)", a.conf.ReplayOnJoin)
|
var n int
|
|
// Try to join the specified serf nodes
|
n, err = a.Join(a.conf.RetryJoin, a.conf.ReplayOnJoin)
|
if err == nil {
|
log.Infof("serf: Join completed. Synced with %d initial agents", n)
|
break
|
}
|
attempt++
|
|
// If RetryMaxAttempts is greater than 0, agent will exit
|
// and throw an error when the number of attempts exceeds RetryMaxAttempts,
|
// else agent will try to join other nodes until successful always
|
if a.conf.RetryMaxAttempts > 0 && attempt > a.conf.RetryMaxAttempts {
|
err = errors.New("serf: maximum retry join attempts made, exiting")
|
log.Errorf(err, err.Error())
|
break
|
}
|
select {
|
case <-ctx.Done():
|
err = ctx.Err()
|
goto done
|
// Waiting for ticker to trigger
|
case <-ticker.C:
|
}
|
}
|
done:
|
ticker.Stop()
|
return
|
}
|