package serf
|
|
import (
|
"bytes"
|
"encoding/base64"
|
"encoding/json"
|
"errors"
|
"fmt"
|
"io/ioutil"
|
"log"
|
"math/rand"
|
"net"
|
"os"
|
"strconv"
|
"sync"
|
"sync/atomic"
|
"time"
|
|
"github.com/armon/go-metrics"
|
"github.com/hashicorp/go-msgpack/codec"
|
"github.com/hashicorp/memberlist"
|
"basic.com/valib/serf.git/coordinate"
|
)
|
|
// These are the protocol versions that Serf can _understand_. These are
|
// Serf-level protocol versions that are passed down as the delegate
|
// version to memberlist below.
|
const (
|
ProtocolVersionMin uint8 = 2
|
ProtocolVersionMax = 5
|
)
|
|
const (
|
// Used to detect if the meta data is tags
|
// or if it is a raw role
|
tagMagicByte uint8 = 255
|
)
|
|
var (
|
// FeatureNotSupported is returned if a feature cannot be used
|
// due to an older protocol version being used.
|
FeatureNotSupported = fmt.Errorf("Feature not supported")
|
)
|
|
func init() {
|
// Seed the random number generator
|
rand.Seed(time.Now().UnixNano())
|
}
|
|
// Serf is a single node that is part of a single cluster that gets
|
// events about joins/leaves/failures/etc. It is created with the Create
|
// method.
|
//
|
// All functions on the Serf structure are safe to call concurrently.
|
type Serf struct {
|
// The clocks for different purposes. These MUST be the first things
|
// in this struct due to Golang issue #599.
|
clock LamportClock
|
eventClock LamportClock
|
queryClock LamportClock
|
|
broadcasts *memberlist.TransmitLimitedQueue
|
config *Config
|
failedMembers []*memberState
|
leftMembers []*memberState
|
memberlist *memberlist.Memberlist
|
memberLock sync.RWMutex
|
members map[string]*memberState
|
|
// recentIntents the lamport time and type of intent for a given node in
|
// case we get an intent before the relevant memberlist event. This is
|
// indexed by node, and always store the latest lamport time / intent
|
// we've seen. The memberLock protects this structure.
|
recentIntents map[string]nodeIntent
|
|
eventBroadcasts *memberlist.TransmitLimitedQueue
|
eventBuffer []*userEvents
|
eventJoinIgnore atomic.Value
|
eventMinTime LamportTime
|
eventLock sync.RWMutex
|
|
queryBroadcasts *memberlist.TransmitLimitedQueue
|
queryBuffer []*queries
|
queryMinTime LamportTime
|
queryResponse map[LamportTime]*QueryResponse
|
queryLock sync.RWMutex
|
|
logger *log.Logger
|
joinLock sync.Mutex
|
stateLock sync.Mutex
|
state SerfState
|
shutdownCh chan struct{}
|
|
snapshotter *Snapshotter
|
keyManager *KeyManager
|
|
coordClient *coordinate.Client
|
coordCache map[string]*coordinate.Coordinate
|
coordCacheLock sync.RWMutex
|
}
|
|
// SerfState is the state of the Serf instance.
|
type SerfState int
|
|
const (
|
SerfAlive SerfState = iota
|
SerfLeaving
|
SerfLeft
|
SerfShutdown
|
)
|
|
func (s SerfState) String() string {
|
switch s {
|
case SerfAlive:
|
return "alive"
|
case SerfLeaving:
|
return "leaving"
|
case SerfLeft:
|
return "left"
|
case SerfShutdown:
|
return "shutdown"
|
default:
|
return "unknown"
|
}
|
}
|
|
// Member is a single member of the Serf cluster.
|
type Member struct {
|
Name string
|
Addr net.IP
|
Port uint16
|
Tags map[string]string
|
Status MemberStatus
|
|
// The minimum, maximum, and current values of the protocol versions
|
// and delegate (Serf) protocol versions that each member can understand
|
// or is speaking.
|
ProtocolMin uint8
|
ProtocolMax uint8
|
ProtocolCur uint8
|
DelegateMin uint8
|
DelegateMax uint8
|
DelegateCur uint8
|
}
|
|
// MemberStatus is the state that a member is in.
|
type MemberStatus int
|
|
const (
|
StatusNone MemberStatus = iota
|
StatusAlive
|
StatusLeaving
|
StatusLeft
|
StatusFailed
|
)
|
|
func (s MemberStatus) String() string {
|
switch s {
|
case StatusNone:
|
return "none"
|
case StatusAlive:
|
return "alive"
|
case StatusLeaving:
|
return "leaving"
|
case StatusLeft:
|
return "left"
|
case StatusFailed:
|
return "failed"
|
default:
|
panic(fmt.Sprintf("unknown MemberStatus: %d", s))
|
}
|
}
|
|
// memberState is used to track members that are no longer active due to
|
// leaving, failing, partitioning, etc. It tracks the member along with
|
// when that member was marked as leaving.
|
type memberState struct {
|
Member
|
statusLTime LamportTime // lamport clock time of last received message
|
leaveTime time.Time // wall clock time of leave
|
}
|
|
// nodeIntent is used to buffer intents for out-of-order deliveries.
|
type nodeIntent struct {
|
// Type is the intent being tracked. Only messageJoinType and
|
// messageLeaveType are tracked.
|
Type messageType
|
|
// WallTime is the wall clock time we saw this intent in order to
|
// expire it from the buffer.
|
WallTime time.Time
|
|
// LTime is the Lamport time, used for cluster-wide ordering of events.
|
LTime LamportTime
|
}
|
|
// userEvent is used to buffer events to prevent re-delivery
|
type userEvent struct {
|
Name string
|
Payload []byte
|
}
|
|
func (ue *userEvent) Equals(other *userEvent) bool {
|
if ue.Name != other.Name {
|
return false
|
}
|
if bytes.Compare(ue.Payload, other.Payload) != 0 {
|
return false
|
}
|
return true
|
}
|
|
// userEvents stores all the user events at a specific time
|
type userEvents struct {
|
LTime LamportTime
|
Events []userEvent
|
}
|
|
// queries stores all the query ids at a specific time
|
type queries struct {
|
LTime LamportTime
|
QueryIDs []uint32
|
}
|
|
const (
|
snapshotSizeLimit = 128 * 1024 // Maximum 128 KB snapshot
|
UserEventSizeLimit = 9 * 10 * 1024 // Maximum 9KB for event name and payload
|
)
|
|
// Create creates a new Serf instance, starting all the background tasks
|
// to maintain cluster membership information.
|
//
|
// After calling this function, the configuration should no longer be used
|
// or modified by the caller.
|
func Create(conf *Config) (*Serf, error) {
|
conf.Init()
|
if conf.ProtocolVersion < ProtocolVersionMin {
|
return nil, fmt.Errorf("Protocol version '%d' too low. Must be in range: [%d, %d]",
|
conf.ProtocolVersion, ProtocolVersionMin, ProtocolVersionMax)
|
} else if conf.ProtocolVersion > ProtocolVersionMax {
|
return nil, fmt.Errorf("Protocol version '%d' too high. Must be in range: [%d, %d]",
|
conf.ProtocolVersion, ProtocolVersionMin, ProtocolVersionMax)
|
}
|
|
if conf.UserEventSizeLimit > UserEventSizeLimit {
|
return nil, fmt.Errorf("user event size limit exceeds limit of %d bytes", UserEventSizeLimit)
|
}
|
|
logger := conf.Logger
|
if logger == nil {
|
logOutput := conf.LogOutput
|
if logOutput == nil {
|
logOutput = os.Stderr
|
}
|
logger = log.New(logOutput, "", log.LstdFlags)
|
}
|
|
serf := &Serf{
|
config: conf,
|
logger: logger,
|
members: make(map[string]*memberState),
|
queryResponse: make(map[LamportTime]*QueryResponse),
|
shutdownCh: make(chan struct{}),
|
state: SerfAlive,
|
}
|
serf.eventJoinIgnore.Store(false)
|
|
// Check that the meta data length is okay
|
if len(serf.encodeTags(conf.Tags)) > memberlist.MetaMaxSize {
|
return nil, fmt.Errorf("Encoded length of tags exceeds limit of %d bytes", memberlist.MetaMaxSize)
|
}
|
|
// Check if serf member event coalescing is enabled
|
if conf.CoalescePeriod > 0 && conf.QuiescentPeriod > 0 && conf.EventCh != nil {
|
c := &memberEventCoalescer{
|
lastEvents: make(map[string]EventType),
|
latestEvents: make(map[string]coalesceEvent),
|
}
|
|
conf.EventCh = coalescedEventCh(conf.EventCh, serf.shutdownCh,
|
conf.CoalescePeriod, conf.QuiescentPeriod, c)
|
}
|
|
// Check if user event coalescing is enabled
|
if conf.UserCoalescePeriod > 0 && conf.UserQuiescentPeriod > 0 && conf.EventCh != nil {
|
c := &userEventCoalescer{
|
events: make(map[string]*latestUserEvents),
|
}
|
|
conf.EventCh = coalescedEventCh(conf.EventCh, serf.shutdownCh,
|
conf.UserCoalescePeriod, conf.UserQuiescentPeriod, c)
|
}
|
|
// Listen for internal Serf queries. This is setup before the snapshotter, since
|
// we want to capture the query-time, but the internal listener does not passthrough
|
// the queries
|
outCh, err := newSerfQueries(serf, serf.logger, conf.EventCh, serf.shutdownCh)
|
if err != nil {
|
return nil, fmt.Errorf("Failed to setup serf query handler: %v", err)
|
}
|
conf.EventCh = outCh
|
|
// Set up network coordinate client.
|
if !conf.DisableCoordinates {
|
serf.coordClient, err = coordinate.NewClient(coordinate.DefaultConfig())
|
if err != nil {
|
return nil, fmt.Errorf("Failed to create coordinate client: %v", err)
|
}
|
}
|
|
// Try access the snapshot
|
var oldClock, oldEventClock, oldQueryClock LamportTime
|
var prev []*PreviousNode
|
if conf.SnapshotPath != "" {
|
eventCh, snap, err := NewSnapshotter(
|
conf.SnapshotPath,
|
snapshotSizeLimit,
|
conf.RejoinAfterLeave,
|
serf.logger,
|
&serf.clock,
|
conf.EventCh,
|
serf.shutdownCh)
|
if err != nil {
|
return nil, fmt.Errorf("Failed to setup snapshot: %v", err)
|
}
|
serf.snapshotter = snap
|
conf.EventCh = eventCh
|
prev = snap.AliveNodes()
|
oldClock = snap.LastClock()
|
oldEventClock = snap.LastEventClock()
|
oldQueryClock = snap.LastQueryClock()
|
serf.eventMinTime = oldEventClock + 1
|
serf.queryMinTime = oldQueryClock + 1
|
}
|
|
// Set up the coordinate cache. We do this after we read the snapshot to
|
// make sure we get a good initial value from there, if we got one.
|
if !conf.DisableCoordinates {
|
serf.coordCache = make(map[string]*coordinate.Coordinate)
|
serf.coordCache[conf.NodeName] = serf.coordClient.GetCoordinate()
|
}
|
|
// Setup the various broadcast queues, which we use to send our own
|
// custom broadcasts along the gossip channel.
|
serf.broadcasts = &memberlist.TransmitLimitedQueue{
|
NumNodes: serf.NumNodes,
|
RetransmitMult: conf.MemberlistConfig.RetransmitMult,
|
}
|
serf.eventBroadcasts = &memberlist.TransmitLimitedQueue{
|
NumNodes: serf.NumNodes,
|
RetransmitMult: conf.MemberlistConfig.RetransmitMult,
|
}
|
serf.queryBroadcasts = &memberlist.TransmitLimitedQueue{
|
NumNodes: serf.NumNodes,
|
RetransmitMult: conf.MemberlistConfig.RetransmitMult,
|
}
|
|
// Create the buffer for recent intents
|
serf.recentIntents = make(map[string]nodeIntent)
|
|
// Create a buffer for events and queries
|
serf.eventBuffer = make([]*userEvents, conf.EventBuffer)
|
serf.queryBuffer = make([]*queries, conf.QueryBuffer)
|
|
// Ensure our lamport clock is at least 1, so that the default
|
// join LTime of 0 does not cause issues
|
serf.clock.Increment()
|
serf.eventClock.Increment()
|
serf.queryClock.Increment()
|
|
// Restore the clock from snap if we have one
|
serf.clock.Witness(oldClock)
|
serf.eventClock.Witness(oldEventClock)
|
serf.queryClock.Witness(oldQueryClock)
|
|
// Modify the memberlist configuration with keys that we set
|
conf.MemberlistConfig.Events = &eventDelegate{serf: serf}
|
conf.MemberlistConfig.Conflict = &conflictDelegate{serf: serf}
|
conf.MemberlistConfig.Delegate = &delegate{serf: serf}
|
conf.MemberlistConfig.DelegateProtocolVersion = conf.ProtocolVersion
|
conf.MemberlistConfig.DelegateProtocolMin = ProtocolVersionMin
|
conf.MemberlistConfig.DelegateProtocolMax = ProtocolVersionMax
|
conf.MemberlistConfig.Name = conf.NodeName
|
conf.MemberlistConfig.ProtocolVersion = ProtocolVersionMap[conf.ProtocolVersion]
|
if !conf.DisableCoordinates {
|
conf.MemberlistConfig.Ping = &pingDelegate{serf: serf}
|
}
|
|
// Setup a merge delegate if necessary
|
if conf.Merge != nil {
|
md := &mergeDelegate{serf: serf}
|
conf.MemberlistConfig.Merge = md
|
conf.MemberlistConfig.Alive = md
|
}
|
|
// Create the underlying memberlist that will manage membership
|
// and failure detection for the Serf instance.
|
memberlist, err := memberlist.Create(conf.MemberlistConfig)
|
if err != nil {
|
return nil, fmt.Errorf("Failed to create memberlist: %v", err)
|
}
|
|
serf.memberlist = memberlist
|
|
// Create a key manager for handling all encryption key changes
|
serf.keyManager = &KeyManager{serf: serf}
|
|
// Start the background tasks. See the documentation above each method
|
// for more information on their role.
|
go serf.handleReap()
|
go serf.handleReconnect()
|
go serf.checkQueueDepth("Intent", serf.broadcasts)
|
go serf.checkQueueDepth("Event", serf.eventBroadcasts)
|
go serf.checkQueueDepth("Query", serf.queryBroadcasts)
|
|
// Attempt to re-join the cluster if we have known nodes
|
if len(prev) != 0 {
|
go serf.handleRejoin(prev)
|
}
|
|
return serf, nil
|
}
|
|
// ProtocolVersion returns the current protocol version in use by Serf.
|
// This is the Serf protocol version, not the memberlist protocol version.
|
func (s *Serf) ProtocolVersion() uint8 {
|
return s.config.ProtocolVersion
|
}
|
|
// EncryptionEnabled is a predicate that determines whether or not encryption
|
// is enabled, which can be possible in one of 2 cases:
|
// - Single encryption key passed at agent start (no persistence)
|
// - Keyring file provided at agent start
|
func (s *Serf) EncryptionEnabled() bool {
|
return s.config.MemberlistConfig.Keyring != nil
|
}
|
|
// KeyManager returns the key manager for the current Serf instance.
|
func (s *Serf) KeyManager() *KeyManager {
|
return s.keyManager
|
}
|
|
// UserEvent is used to broadcast a custom user event with a given
|
// name and payload. If the configured size limit is exceeded and error will be returned.
|
// If coalesce is enabled, nodes are allowed to coalesce this event.
|
// Coalescing is only available starting in v0.2
|
func (s *Serf) UserEvent(name string, payload []byte, coalesce bool) error {
|
payloadSizeBeforeEncoding := len(name) + len(payload)
|
|
// Check size before encoding to prevent needless encoding and return early if it's over the specified limit.
|
if payloadSizeBeforeEncoding > s.config.UserEventSizeLimit {
|
return fmt.Errorf(
|
"user event exceeds configured limit of %d bytes before encoding",
|
s.config.UserEventSizeLimit,
|
)
|
}
|
|
if payloadSizeBeforeEncoding > UserEventSizeLimit {
|
return fmt.Errorf(
|
"user event exceeds sane limit of %d bytes before encoding",
|
UserEventSizeLimit,
|
)
|
}
|
|
// Create a message
|
msg := messageUserEvent{
|
LTime: s.eventClock.Time(),
|
Name: name,
|
Payload: payload,
|
CC: coalesce,
|
}
|
|
// Start broadcasting the event
|
raw, err := encodeMessage(messageUserEventType, &msg)
|
if err != nil {
|
return err
|
}
|
|
// Check the size after encoding to be sure again that
|
// we're not attempting to send over the specified size limit.
|
if len(raw) > s.config.UserEventSizeLimit {
|
return fmt.Errorf(
|
"encoded user event exceeds configured limit of %d bytes after encoding",
|
s.config.UserEventSizeLimit,
|
)
|
}
|
|
if len(raw) > UserEventSizeLimit {
|
return fmt.Errorf(
|
"encoded user event exceeds sane limit of %d bytes before encoding",
|
UserEventSizeLimit,
|
)
|
}
|
|
s.eventClock.Increment()
|
|
// Process update locally
|
s.handleUserEvent(&msg)
|
|
s.eventBroadcasts.QueueBroadcast(&broadcast{
|
msg: raw,
|
})
|
return nil
|
}
|
|
// Query is used to broadcast a new query. The query must be fairly small,
|
// and an error will be returned if the size limit is exceeded. This is only
|
// available with protocol version 4 and newer. Query parameters are optional,
|
// and if not provided, a sane set of defaults will be used.
|
func (s *Serf) Query(name string, payload []byte, params *QueryParam) (*QueryResponse, error) {
|
// Check that the latest protocol is in use
|
if s.ProtocolVersion() < 4 {
|
return nil, FeatureNotSupported
|
}
|
|
// Provide default parameters if none given
|
if params == nil {
|
params = s.DefaultQueryParams()
|
} else if params.Timeout == 0 {
|
params.Timeout = s.DefaultQueryTimeout()
|
}
|
|
// Get the local node
|
local := s.memberlist.LocalNode()
|
|
// Encode the filters
|
filters, err := params.encodeFilters()
|
if err != nil {
|
return nil, fmt.Errorf("Failed to format filters: %v", err)
|
}
|
|
// Setup the flags
|
var flags uint32
|
if params.RequestAck {
|
flags |= queryFlagAck
|
}
|
|
// Create a message
|
q := messageQuery{
|
LTime: s.queryClock.Time(),
|
ID: uint32(rand.Int31()),
|
Addr: local.Addr,
|
Port: local.Port,
|
SourceNode: local.Name,
|
Filters: filters,
|
Flags: flags,
|
RelayFactor: params.RelayFactor,
|
Timeout: params.Timeout,
|
Name: name,
|
Payload: payload,
|
}
|
|
// Encode the query
|
raw, err := encodeMessage(messageQueryType, &q)
|
if err != nil {
|
return nil, err
|
}
|
|
// Check the size
|
if len(raw) > s.config.QuerySizeLimit {
|
return nil, fmt.Errorf("query exceeds limit of %d bytes", s.config.QuerySizeLimit)
|
}
|
|
// Register QueryResponse to track acks and responses
|
resp := newQueryResponse(s.memberlist.NumMembers(), &q)
|
s.registerQueryResponse(params.Timeout, resp)
|
|
// Process query locally
|
s.handleQuery(&q)
|
|
// Start broadcasting the event
|
s.queryBroadcasts.QueueBroadcast(&broadcast{
|
msg: raw,
|
})
|
return resp, nil
|
}
|
|
// registerQueryResponse is used to setup the listeners for the query,
|
// and to schedule closing the query after the timeout.
|
func (s *Serf) registerQueryResponse(timeout time.Duration, resp *QueryResponse) {
|
s.queryLock.Lock()
|
defer s.queryLock.Unlock()
|
|
// Map the LTime to the QueryResponse. This is necessarily 1-to-1,
|
// since we increment the time for each new query.
|
s.queryResponse[resp.lTime] = resp
|
|
// Setup a timer to close the response and deregister after the timeout
|
time.AfterFunc(timeout, func() {
|
s.queryLock.Lock()
|
delete(s.queryResponse, resp.lTime)
|
resp.Close()
|
s.queryLock.Unlock()
|
})
|
}
|
|
// SetTags is used to dynamically update the tags associated with
|
// the local node. This will propagate the change to the rest of
|
// the cluster. Blocks until a the message is broadcast out.
|
func (s *Serf) SetTags(tags map[string]string) error {
|
// Check that the meta data length is okay
|
if len(s.encodeTags(tags)) > memberlist.MetaMaxSize {
|
return fmt.Errorf("Encoded length of tags exceeds limit of %d bytes",
|
memberlist.MetaMaxSize)
|
}
|
|
// Update the config
|
s.config.Tags = tags
|
|
// Trigger a memberlist update
|
return s.memberlist.UpdateNode(s.config.BroadcastTimeout)
|
}
|
|
// Join joins an existing Serf cluster. Returns the number of nodes
|
// successfully contacted. The returned error will be non-nil only in the
|
// case that no nodes could be contacted. If ignoreOld is true, then any
|
// user messages sent prior to the join will be ignored.
|
func (s *Serf) Join(existing []string, ignoreOld bool) (int, error) {
|
// Do a quick state check
|
if s.State() != SerfAlive {
|
return 0, fmt.Errorf("Serf can't Join after Leave or Shutdown")
|
}
|
|
// Hold the joinLock, this is to make eventJoinIgnore safe
|
s.joinLock.Lock()
|
defer s.joinLock.Unlock()
|
|
// Ignore any events from a potential join. This is safe since we hold
|
// the joinLock and nobody else can be doing a Join
|
if ignoreOld {
|
s.eventJoinIgnore.Store(true)
|
defer func() {
|
s.eventJoinIgnore.Store(false)
|
}()
|
}
|
|
// Have memberlist attempt to join
|
num, err := s.memberlist.Join(existing)
|
|
// If we joined any nodes, broadcast the join message
|
if num > 0 {
|
// Start broadcasting the update
|
if err := s.broadcastJoin(s.clock.Time()); err != nil {
|
return num, err
|
}
|
}
|
|
return num, err
|
}
|
|
// broadcastJoin broadcasts a new join intent with a
|
// given clock value. It is used on either join, or if
|
// we need to refute an older leave intent. Cannot be called
|
// with the memberLock held.
|
func (s *Serf) broadcastJoin(ltime LamportTime) error {
|
// Construct message to update our lamport clock
|
msg := messageJoin{
|
LTime: ltime,
|
Node: s.config.NodeName,
|
}
|
s.clock.Witness(ltime)
|
|
// Process update locally
|
s.handleNodeJoinIntent(&msg)
|
|
// Start broadcasting the update
|
if err := s.broadcast(messageJoinType, &msg, nil); err != nil {
|
s.logger.Printf("[WARN] serf: Failed to broadcast join intent: %v", err)
|
return err
|
}
|
return nil
|
}
|
|
// Leave gracefully exits the cluster. It is safe to call this multiple
|
// times.
|
func (s *Serf) Leave() error {
|
// Check the current state
|
s.stateLock.Lock()
|
if s.state == SerfLeft {
|
s.stateLock.Unlock()
|
return nil
|
} else if s.state == SerfLeaving {
|
s.stateLock.Unlock()
|
return fmt.Errorf("Leave already in progress")
|
} else if s.state == SerfShutdown {
|
s.stateLock.Unlock()
|
return fmt.Errorf("Leave called after Shutdown")
|
}
|
s.state = SerfLeaving
|
s.stateLock.Unlock()
|
|
// If we have a snapshot, mark we are leaving
|
if s.snapshotter != nil {
|
s.snapshotter.Leave()
|
}
|
|
// Construct the message for the graceful leave
|
msg := messageLeave{
|
LTime: s.clock.Time(),
|
Node: s.config.NodeName,
|
}
|
s.clock.Increment()
|
|
// Process the leave locally
|
s.handleNodeLeaveIntent(&msg)
|
|
// Only broadcast the leave message if there is at least one
|
// other node alive.
|
if s.hasAliveMembers() {
|
notifyCh := make(chan struct{})
|
if err := s.broadcast(messageLeaveType, &msg, notifyCh); err != nil {
|
return err
|
}
|
|
select {
|
case <-notifyCh:
|
case <-time.After(s.config.BroadcastTimeout):
|
return errors.New("timeout while waiting for graceful leave")
|
}
|
}
|
|
// Attempt the memberlist leave
|
err := s.memberlist.Leave(s.config.BroadcastTimeout)
|
if err != nil {
|
return err
|
}
|
|
// Wait for the leave to propagate through the cluster. The broadcast
|
// timeout is how long we wait for the message to go out from our own
|
// queue, but this wait is for that message to propagate through the
|
// cluster. In particular, we want to stay up long enough to service
|
// any probes from other nodes before they learn about us leaving.
|
time.Sleep(s.config.LeavePropagateDelay)
|
|
// Transition to Left only if we not already shutdown
|
s.stateLock.Lock()
|
if s.state != SerfShutdown {
|
s.state = SerfLeft
|
}
|
s.stateLock.Unlock()
|
return nil
|
}
|
|
// hasAliveMembers is called to check for any alive members other than
|
// ourself.
|
func (s *Serf) hasAliveMembers() bool {
|
s.memberLock.RLock()
|
defer s.memberLock.RUnlock()
|
|
hasAlive := false
|
for _, m := range s.members {
|
// Skip ourself, we want to know if OTHER members are alive
|
if m.Name == s.config.NodeName {
|
continue
|
}
|
|
if m.Status == StatusAlive {
|
hasAlive = true
|
break
|
}
|
}
|
return hasAlive
|
}
|
|
// LocalMember returns the Member information for the local node
|
func (s *Serf) LocalMember() Member {
|
s.memberLock.RLock()
|
defer s.memberLock.RUnlock()
|
return s.members[s.config.NodeName].Member
|
}
|
|
// Members returns a point-in-time snapshot of the members of this cluster.
|
func (s *Serf) Members() []Member {
|
s.memberLock.RLock()
|
defer s.memberLock.RUnlock()
|
|
members := make([]Member, 0, len(s.members))
|
for _, m := range s.members {
|
members = append(members, m.Member)
|
}
|
|
return members
|
}
|
|
// RemoveFailedNode is a backwards compatible form
|
// of forceleave
|
func (s *Serf) RemoveFailedNode(node string) error {
|
return s.forceLeave(node, false)
|
}
|
|
func (s *Serf) RemoveFailedNodePrune(node string) error {
|
return s.forceLeave(node, true)
|
}
|
|
// ForceLeave forcibly removes a failed node from the cluster
|
// immediately, instead of waiting for the reaper to eventually reclaim it.
|
// This also has the effect that Serf will no longer attempt to reconnect
|
// to this node.
|
func (s *Serf) forceLeave(node string, prune bool) error {
|
// Construct the message to broadcast
|
msg := messageLeave{
|
LTime: s.clock.Time(),
|
Node: node,
|
Prune: prune,
|
}
|
s.clock.Increment()
|
|
// Process our own event
|
s.handleNodeLeaveIntent(&msg)
|
|
// If we have no members, then we don't need to broadcast
|
if !s.hasAliveMembers() {
|
return nil
|
}
|
|
// Broadcast the remove
|
notifyCh := make(chan struct{})
|
if err := s.broadcast(messageLeaveType, &msg, notifyCh); err != nil {
|
return err
|
}
|
|
// Wait for the broadcast
|
select {
|
case <-notifyCh:
|
case <-time.After(s.config.BroadcastTimeout):
|
return fmt.Errorf("timed out broadcasting node removal")
|
}
|
|
return nil
|
}
|
|
// Shutdown forcefully shuts down the Serf instance, stopping all network
|
// activity and background maintenance associated with the instance.
|
//
|
// This is not a graceful shutdown, and should be preceded by a call
|
// to Leave. Otherwise, other nodes in the cluster will detect this node's
|
// exit as a node failure.
|
//
|
// It is safe to call this method multiple times.
|
func (s *Serf) Shutdown() error {
|
s.stateLock.Lock()
|
defer s.stateLock.Unlock()
|
|
if s.state == SerfShutdown {
|
return nil
|
}
|
|
if s.state != SerfLeft {
|
s.logger.Printf("[WARN] serf: Shutdown without a Leave")
|
}
|
|
// Wait to close the shutdown channel until after we've shut down the
|
// memberlist and its associated network resources, since the shutdown
|
// channel signals that we are cleaned up outside of Serf.
|
s.state = SerfShutdown
|
err := s.memberlist.Shutdown()
|
if err != nil {
|
return err
|
}
|
close(s.shutdownCh)
|
|
// Wait for the snapshoter to finish if we have one
|
if s.snapshotter != nil {
|
s.snapshotter.Wait()
|
}
|
|
return nil
|
}
|
|
// ShutdownCh returns a channel that can be used to wait for
|
// Serf to shutdown.
|
func (s *Serf) ShutdownCh() <-chan struct{} {
|
return s.shutdownCh
|
}
|
|
// Memberlist is used to get access to the underlying Memberlist instance
|
func (s *Serf) Memberlist() *memberlist.Memberlist {
|
return s.memberlist
|
}
|
|
// State is the current state of this Serf instance.
|
func (s *Serf) State() SerfState {
|
s.stateLock.Lock()
|
defer s.stateLock.Unlock()
|
return s.state
|
}
|
|
// broadcast takes a Serf message type, encodes it for the wire, and queues
|
// the broadcast. If a notify channel is given, this channel will be closed
|
// when the broadcast is sent.
|
func (s *Serf) broadcast(t messageType, msg interface{}, notify chan<- struct{}) error {
|
raw, err := encodeMessage(t, msg)
|
if err != nil {
|
return err
|
}
|
|
s.broadcasts.QueueBroadcast(&broadcast{
|
msg: raw,
|
notify: notify,
|
})
|
return nil
|
}
|
|
// handleNodeJoin is called when a node join event is received
|
// from memberlist.
|
func (s *Serf) handleNodeJoin(n *memberlist.Node) {
|
s.memberLock.Lock()
|
defer s.memberLock.Unlock()
|
|
var oldStatus MemberStatus
|
member, ok := s.members[n.Name]
|
if !ok {
|
oldStatus = StatusNone
|
member = &memberState{
|
Member: Member{
|
Name: n.Name,
|
Addr: net.IP(n.Addr),
|
Port: n.Port,
|
Tags: s.decodeTags(n.Meta),
|
Status: StatusAlive,
|
},
|
}
|
|
// Check if we have a join or leave intent. The intent buffer
|
// will only hold one event for this node, so the more recent
|
// one will take effect.
|
if join, ok := recentIntent(s.recentIntents, n.Name, messageJoinType); ok {
|
member.statusLTime = join
|
}
|
if leave, ok := recentIntent(s.recentIntents, n.Name, messageLeaveType); ok {
|
member.Status = StatusLeaving
|
member.statusLTime = leave
|
}
|
|
s.members[n.Name] = member
|
} else {
|
oldStatus = member.Status
|
deadTime := time.Now().Sub(member.leaveTime)
|
if oldStatus == StatusFailed && deadTime < s.config.FlapTimeout {
|
metrics.IncrCounter([]string{"serf", "member", "flap"}, 1)
|
}
|
|
member.Status = StatusAlive
|
member.leaveTime = time.Time{}
|
member.Addr = net.IP(n.Addr)
|
member.Port = n.Port
|
member.Tags = s.decodeTags(n.Meta)
|
}
|
|
// Update the protocol versions every time we get an event
|
member.ProtocolMin = n.PMin
|
member.ProtocolMax = n.PMax
|
member.ProtocolCur = n.PCur
|
member.DelegateMin = n.DMin
|
member.DelegateMax = n.DMax
|
member.DelegateCur = n.DCur
|
|
// If node was previously in a failed state, then clean up some
|
// internal accounting.
|
// TODO(mitchellh): needs tests to verify not reaped
|
if oldStatus == StatusFailed || oldStatus == StatusLeft {
|
s.failedMembers = removeOldMember(s.failedMembers, member.Name)
|
s.leftMembers = removeOldMember(s.leftMembers, member.Name)
|
}
|
|
// Update some metrics
|
metrics.IncrCounter([]string{"serf", "member", "join"}, 1)
|
|
// Send an event along
|
s.logger.Printf("[INFO] serf: EventMemberJoin: %s %s",
|
member.Member.Name, member.Member.Addr)
|
if s.config.EventCh != nil {
|
s.config.EventCh <- MemberEvent{
|
Type: EventMemberJoin,
|
Members: []Member{member.Member},
|
}
|
}
|
}
|
|
// handleNodeLeave is called when a node leave event is received
|
// from memberlist.
|
func (s *Serf) handleNodeLeave(n *memberlist.Node) {
|
s.memberLock.Lock()
|
defer s.memberLock.Unlock()
|
|
member, ok := s.members[n.Name]
|
if !ok {
|
// We've never even heard of this node that is supposedly
|
// leaving. Just ignore it completely.
|
return
|
}
|
|
switch member.Status {
|
case StatusLeaving:
|
member.Status = StatusLeft
|
member.leaveTime = time.Now()
|
s.leftMembers = append(s.leftMembers, member)
|
case StatusAlive:
|
member.Status = StatusFailed
|
member.leaveTime = time.Now()
|
s.failedMembers = append(s.failedMembers, member)
|
default:
|
// Unknown state that it was in? Just don't do anything
|
s.logger.Printf("[WARN] serf: Bad state when leave: %d", member.Status)
|
return
|
}
|
|
// Send an event along
|
event := EventMemberLeave
|
eventStr := "EventMemberLeave"
|
if member.Status != StatusLeft {
|
event = EventMemberFailed
|
eventStr = "EventMemberFailed"
|
}
|
|
// Update some metrics
|
metrics.IncrCounter([]string{"serf", "member", member.Status.String()}, 1)
|
|
s.logger.Printf("[INFO] serf: %s: %s %s",
|
eventStr, member.Member.Name, member.Member.Addr)
|
if s.config.EventCh != nil {
|
s.config.EventCh <- MemberEvent{
|
Type: event,
|
Members: []Member{member.Member},
|
}
|
}
|
}
|
|
// handleNodeUpdate is called when a node meta data update
|
// has taken place
|
func (s *Serf) handleNodeUpdate(n *memberlist.Node) {
|
s.memberLock.Lock()
|
defer s.memberLock.Unlock()
|
|
member, ok := s.members[n.Name]
|
if !ok {
|
// We've never even heard of this node that is updating.
|
// Just ignore it completely.
|
return
|
}
|
|
// Update the member attributes
|
member.Addr = net.IP(n.Addr)
|
member.Port = n.Port
|
member.Tags = s.decodeTags(n.Meta)
|
|
// Snag the latest versions. NOTE - the current memberlist code will NOT
|
// fire an update event if the metadata (for Serf, tags) stays the same
|
// and only the protocol versions change. If we wake any Serf-level
|
// protocol changes where we want to get this event under those
|
// circumstances, we will need to update memberlist to do a check of
|
// versions as well as the metadata.
|
member.ProtocolMin = n.PMin
|
member.ProtocolMax = n.PMax
|
member.ProtocolCur = n.PCur
|
member.DelegateMin = n.DMin
|
member.DelegateMax = n.DMax
|
member.DelegateCur = n.DCur
|
|
// Update some metrics
|
metrics.IncrCounter([]string{"serf", "member", "update"}, 1)
|
|
// Send an event along
|
s.logger.Printf("[INFO] serf: EventMemberUpdate: %s", member.Member.Name)
|
if s.config.EventCh != nil {
|
s.config.EventCh <- MemberEvent{
|
Type: EventMemberUpdate,
|
Members: []Member{member.Member},
|
}
|
}
|
}
|
|
// handleNodeLeaveIntent is called when an intent to leave is received.
|
func (s *Serf) handleNodeLeaveIntent(leaveMsg *messageLeave) bool {
|
|
// Witness a potentially newer time
|
s.clock.Witness(leaveMsg.LTime)
|
|
s.memberLock.Lock()
|
defer s.memberLock.Unlock()
|
|
member, ok := s.members[leaveMsg.Node]
|
if !ok {
|
// Rebroadcast only if this was an update we hadn't seen before.
|
return upsertIntent(s.recentIntents, leaveMsg.Node, messageLeaveType, leaveMsg.LTime, time.Now)
|
}
|
|
// If the message is old, then it is irrelevant and we can skip it
|
if leaveMsg.LTime <= member.statusLTime {
|
return false
|
}
|
|
// Refute us leaving if we are in the alive state
|
// Must be done in another goroutine since we have the memberLock
|
if leaveMsg.Node == s.config.NodeName && s.state == SerfAlive {
|
s.logger.Printf("[DEBUG] serf: Refuting an older leave intent")
|
go s.broadcastJoin(s.clock.Time())
|
return false
|
}
|
|
// State transition depends on current state
|
switch member.Status {
|
case StatusAlive:
|
member.Status = StatusLeaving
|
member.statusLTime = leaveMsg.LTime
|
|
if leaveMsg.Prune {
|
s.handlePrune(member)
|
}
|
return true
|
case StatusFailed:
|
member.Status = StatusLeft
|
member.statusLTime = leaveMsg.LTime
|
|
// Remove from the failed list and add to the left list. We add
|
// to the left list so that when we do a sync, other nodes will
|
// remove it from their failed list.
|
|
s.failedMembers = removeOldMember(s.failedMembers, member.Name)
|
s.leftMembers = append(s.leftMembers, member)
|
|
// We must push a message indicating the node has now
|
// left to allow higher-level applications to handle the
|
// graceful leave.
|
s.logger.Printf("[INFO] serf: EventMemberLeave (forced): %s %s",
|
member.Member.Name, member.Member.Addr)
|
if s.config.EventCh != nil {
|
s.config.EventCh <- MemberEvent{
|
Type: EventMemberLeave,
|
Members: []Member{member.Member},
|
}
|
}
|
|
if leaveMsg.Prune {
|
s.handlePrune(member)
|
}
|
|
return true
|
|
case StatusLeaving, StatusLeft:
|
if leaveMsg.Prune {
|
s.handlePrune(member)
|
}
|
return true
|
default:
|
return false
|
}
|
}
|
|
// handlePrune waits for nodes that are leaving and then forcibly
|
// erases a member from the list of members
|
func (s *Serf) handlePrune(member *memberState) {
|
if member.Status == StatusLeaving {
|
time.Sleep(s.config.BroadcastTimeout + s.config.LeavePropagateDelay)
|
}
|
|
s.logger.Printf("[INFO] serf: EventMemberReap (forced): %s %s", member.Name, member.Member.Addr)
|
|
//If we are leaving or left we may be in that list of members
|
if member.Status == StatusLeaving || member.Status == StatusLeft {
|
s.leftMembers = removeOldMember(s.leftMembers, member.Name)
|
}
|
s.eraseNode(member)
|
|
}
|
|
// handleNodeJoinIntent is called when a node broadcasts a
|
// join message to set the lamport time of its join
|
func (s *Serf) handleNodeJoinIntent(joinMsg *messageJoin) bool {
|
// Witness a potentially newer time
|
s.clock.Witness(joinMsg.LTime)
|
|
s.memberLock.Lock()
|
defer s.memberLock.Unlock()
|
|
member, ok := s.members[joinMsg.Node]
|
if !ok {
|
// Rebroadcast only if this was an update we hadn't seen before.
|
return upsertIntent(s.recentIntents, joinMsg.Node, messageJoinType, joinMsg.LTime, time.Now)
|
}
|
|
// Check if this time is newer than what we have
|
if joinMsg.LTime <= member.statusLTime {
|
return false
|
}
|
|
// Update the LTime
|
member.statusLTime = joinMsg.LTime
|
|
// If we are in the leaving state, we should go back to alive,
|
// since the leaving message must have been for an older time
|
if member.Status == StatusLeaving {
|
member.Status = StatusAlive
|
}
|
return true
|
}
|
|
// handleUserEvent is called when a user event broadcast is
|
// received. Returns if the message should be rebroadcast.
|
func (s *Serf) handleUserEvent(eventMsg *messageUserEvent) bool {
|
// Witness a potentially newer time
|
s.eventClock.Witness(eventMsg.LTime)
|
|
s.eventLock.Lock()
|
defer s.eventLock.Unlock()
|
|
// Ignore if it is before our minimum event time
|
if eventMsg.LTime < s.eventMinTime {
|
s.logger.Printf("[WARN] serf: eventMsg.LTime: %d < s.eventMinTime:%d",eventMsg.LTime, s.eventMinTime)
|
return false
|
}
|
|
// Check if this message is too old
|
curTime := s.eventClock.Time()
|
if curTime > LamportTime(len(s.eventBuffer)) &&
|
eventMsg.LTime < curTime-LamportTime(len(s.eventBuffer)) {
|
s.logger.Printf(
|
"[WARN] serf: received old event %s from time %d (current: %d)",
|
eventMsg.Name,
|
eventMsg.LTime,
|
s.eventClock.Time())
|
return false
|
}
|
|
// Check if we've already seen this
|
idx := eventMsg.LTime % LamportTime(len(s.eventBuffer))
|
seen := s.eventBuffer[idx]
|
userEvent := userEvent{Name: eventMsg.Name, Payload: eventMsg.Payload}
|
if seen != nil && seen.LTime == eventMsg.LTime {
|
for _, previous := range seen.Events {
|
if previous.Equals(&userEvent) {
|
return false
|
}
|
}
|
} else {
|
seen = &userEvents{LTime: eventMsg.LTime}
|
s.eventBuffer[idx] = seen
|
}
|
|
// Add to recent events
|
seen.Events = append(seen.Events, userEvent)
|
|
// Update some metrics
|
metrics.IncrCounter([]string{"serf", "events"}, 1)
|
metrics.IncrCounter([]string{"serf", "events", eventMsg.Name}, 1)
|
|
if s.config.EventCh != nil {
|
s.logger.Printf("[WARN] serf: s.config.EventCh <- UserEvent")
|
s.config.EventCh <- UserEvent{
|
LTime: eventMsg.LTime,
|
Name: eventMsg.Name,
|
Payload: eventMsg.Payload,
|
Coalesce: eventMsg.CC,
|
}
|
}
|
return true
|
}
|
|
// handleQuery is called when a query broadcast is
|
// received. Returns if the message should be rebroadcast.
|
func (s *Serf) handleQuery(query *messageQuery) bool {
|
// Witness a potentially newer time
|
s.queryClock.Witness(query.LTime)
|
|
s.queryLock.Lock()
|
defer s.queryLock.Unlock()
|
|
// Ignore if it is before our minimum query time
|
if query.LTime < s.queryMinTime {
|
return false
|
}
|
|
// Check if this message is too old
|
curTime := s.queryClock.Time()
|
if curTime > LamportTime(len(s.queryBuffer)) &&
|
query.LTime < curTime-LamportTime(len(s.queryBuffer)) {
|
s.logger.Printf(
|
"[WARN] serf: received old query %s from time %d (current: %d)",
|
query.Name,
|
query.LTime,
|
s.queryClock.Time())
|
return false
|
}
|
|
// Check if we've already seen this
|
idx := query.LTime % LamportTime(len(s.queryBuffer))
|
seen := s.queryBuffer[idx]
|
if seen != nil && seen.LTime == query.LTime {
|
for _, previous := range seen.QueryIDs {
|
if previous == query.ID {
|
// Seen this ID already
|
return false
|
}
|
}
|
} else {
|
seen = &queries{LTime: query.LTime}
|
s.queryBuffer[idx] = seen
|
}
|
|
// Add to recent queries
|
seen.QueryIDs = append(seen.QueryIDs, query.ID)
|
|
// Update some metrics
|
metrics.IncrCounter([]string{"serf", "queries"}, 1)
|
metrics.IncrCounter([]string{"serf", "queries", query.Name}, 1)
|
|
// Check if we should rebroadcast, this may be disabled by a flag
|
rebroadcast := true
|
if query.NoBroadcast() {
|
rebroadcast = false
|
}
|
|
// Filter the query
|
if !s.shouldProcessQuery(query.Filters) {
|
// Even if we don't process it further, we should rebroadcast,
|
// since it is the first time we've seen this.
|
return rebroadcast
|
}
|
|
// Send ack if requested, without waiting for client to Respond()
|
if query.Ack() {
|
ack := messageQueryResponse{
|
LTime: query.LTime,
|
ID: query.ID,
|
From: s.config.NodeName,
|
Flags: queryFlagAck,
|
}
|
raw, err := encodeMessage(messageQueryResponseType, &ack)
|
if err != nil {
|
s.logger.Printf("[ERR] serf: failed to format ack: %v", err)
|
} else {
|
udpAddr := net.UDPAddr{IP: query.Addr, Port: int(query.Port)}
|
addr := memberlist.Address{
|
Addr: udpAddr.String(),
|
Name: query.SourceNode,
|
}
|
if err := s.memberlist.SendToAddress(addr, raw); err != nil {
|
s.logger.Printf("[ERR] serf: failed to send ack: %v", err)
|
}
|
if err := s.relayResponse(query.RelayFactor, udpAddr, query.SourceNode, &ack); err != nil {
|
s.logger.Printf("[ERR] serf: failed to relay ack: %v", err)
|
}
|
}
|
}
|
|
if s.config.EventCh != nil {
|
s.config.EventCh <- &Query{
|
LTime: query.LTime,
|
Name: query.Name,
|
Payload: query.Payload,
|
serf: s,
|
id: query.ID,
|
addr: query.Addr,
|
port: query.Port,
|
sourceNode: query.SourceNode,
|
deadline: time.Now().Add(query.Timeout),
|
relayFactor: query.RelayFactor,
|
}
|
}
|
return rebroadcast
|
}
|
|
// handleResponse is called when a query response is
|
// received.
|
func (s *Serf) handleQueryResponse(resp *messageQueryResponse) {
|
// Look for a corresponding QueryResponse
|
s.queryLock.RLock()
|
query, ok := s.queryResponse[resp.LTime]
|
s.queryLock.RUnlock()
|
if !ok {
|
s.logger.Printf("[WARN] serf: reply for non-running query (LTime: %d, ID: %d) From: %s",
|
resp.LTime, resp.ID, resp.From)
|
return
|
}
|
|
// Verify the ID matches
|
if query.id != resp.ID {
|
s.logger.Printf("[WARN] serf: query reply ID mismatch (Local: %d, Response: %d)",
|
query.id, resp.ID)
|
return
|
}
|
|
// Check if the query is closed
|
if query.Finished() {
|
return
|
}
|
|
// Process each type of response
|
if resp.Ack() {
|
// Exit early if this is a duplicate ack
|
if _, ok := query.acks[resp.From]; ok {
|
metrics.IncrCounter([]string{"serf", "query_duplicate_acks"}, 1)
|
return
|
}
|
|
metrics.IncrCounter([]string{"serf", "query_acks"}, 1)
|
select {
|
case query.ackCh <- resp.From:
|
query.acks[resp.From] = struct{}{}
|
default:
|
s.logger.Printf("[WARN] serf: Failed to deliver query ack, dropping")
|
}
|
} else {
|
// Exit early if this is a duplicate response
|
if _, ok := query.responses[resp.From]; ok {
|
metrics.IncrCounter([]string{"serf", "query_duplicate_responses"}, 1)
|
return
|
}
|
|
metrics.IncrCounter([]string{"serf", "query_responses"}, 1)
|
err := query.sendResponse(NodeResponse{From: resp.From, Payload: resp.Payload})
|
if err != nil {
|
s.logger.Printf("[WARN] %v", err)
|
}
|
}
|
}
|
|
// handleNodeConflict is invoked when a join detects a conflict over a name.
|
// This means two different nodes (IP/Port) are claiming the same name. Memberlist
|
// will reject the "new" node mapping, but we can still be notified.
|
func (s *Serf) handleNodeConflict(existing, other *memberlist.Node) {
|
// Log a basic warning if the node is not us...
|
if existing.Name != s.config.NodeName {
|
s.logger.Printf("[WARN] serf: Name conflict for '%s' both %s:%d and %s:%d are claiming",
|
existing.Name, existing.Addr, existing.Port, other.Addr, other.Port)
|
return
|
}
|
|
// The current node is conflicting! This is an error
|
s.logger.Printf("[ERR] serf: Node name conflicts with another node at %s:%d. Names must be unique! (Resolution enabled: %v)",
|
other.Addr, other.Port, s.config.EnableNameConflictResolution)
|
|
// If automatic resolution is enabled, kick off the resolution
|
if s.config.EnableNameConflictResolution {
|
go s.resolveNodeConflict()
|
}
|
}
|
|
// resolveNodeConflict is used to determine which node should remain during
|
// a name conflict. This is done by running an internal query.
|
func (s *Serf) resolveNodeConflict() {
|
// Get the local node
|
local := s.memberlist.LocalNode()
|
|
// Start a name resolution query
|
qName := internalQueryName(conflictQuery)
|
payload := []byte(s.config.NodeName)
|
resp, err := s.Query(qName, payload, nil)
|
if err != nil {
|
s.logger.Printf("[ERR] serf: Failed to start name resolution query: %v", err)
|
return
|
}
|
|
// Counter to determine winner
|
var responses, matching int
|
|
// Gather responses
|
respCh := resp.ResponseCh()
|
for r := range respCh {
|
// Decode the response
|
if len(r.Payload) < 1 || messageType(r.Payload[0]) != messageConflictResponseType {
|
s.logger.Printf("[ERR] serf: Invalid conflict query response type: %v", r.Payload)
|
continue
|
}
|
var member Member
|
if err := decodeMessage(r.Payload[1:], &member); err != nil {
|
s.logger.Printf("[ERR] serf: Failed to decode conflict query response: %v", err)
|
continue
|
}
|
|
// Update the counters
|
responses++
|
if member.Addr.Equal(local.Addr) && member.Port == local.Port {
|
matching++
|
}
|
}
|
|
// Query over, determine if we should live
|
majority := (responses / 2) + 1
|
if matching >= majority {
|
s.logger.Printf("[INFO] serf: majority in name conflict resolution [%d / %d]",
|
matching, responses)
|
return
|
}
|
|
// Since we lost the vote, we need to exit
|
s.logger.Printf("[WARN] serf: minority in name conflict resolution, quiting [%d / %d]",
|
matching, responses)
|
if err := s.Shutdown(); err != nil {
|
s.logger.Printf("[ERR] serf: Failed to shutdown: %v", err)
|
}
|
}
|
|
//eraseNode takes a node completely out of the member list
|
func (s *Serf) eraseNode(m *memberState) {
|
// Delete from members
|
delete(s.members, m.Name)
|
|
// Tell the coordinate client the node has gone away and delete
|
// its cached coordinates.
|
if !s.config.DisableCoordinates {
|
s.coordClient.ForgetNode(m.Name)
|
|
s.coordCacheLock.Lock()
|
delete(s.coordCache, m.Name)
|
s.coordCacheLock.Unlock()
|
}
|
|
// Send an event along
|
if s.config.EventCh != nil {
|
s.config.EventCh <- MemberEvent{
|
Type: EventMemberReap,
|
Members: []Member{m.Member},
|
}
|
}
|
}
|
|
// handleReap periodically reaps the list of failed and left members, as well
|
// as old buffered intents.
|
func (s *Serf) handleReap() {
|
for {
|
select {
|
case <-time.After(s.config.ReapInterval):
|
s.memberLock.Lock()
|
now := time.Now()
|
s.failedMembers = s.reap(s.failedMembers, now, s.config.ReconnectTimeout)
|
s.leftMembers = s.reap(s.leftMembers, now, s.config.TombstoneTimeout)
|
reapIntents(s.recentIntents, now, s.config.RecentIntentTimeout)
|
s.memberLock.Unlock()
|
case <-s.shutdownCh:
|
return
|
}
|
}
|
}
|
|
// handleReconnect attempts to reconnect to recently failed nodes
|
// on configured intervals.
|
func (s *Serf) handleReconnect() {
|
for {
|
select {
|
case <-time.After(s.config.ReconnectInterval):
|
s.reconnect()
|
case <-s.shutdownCh:
|
return
|
}
|
}
|
}
|
|
// reap is called with a list of old members and a timeout, and removes
|
// members that have exceeded the timeout. The members are removed from
|
// both the old list and the members itself. Locking is left to the caller.
|
func (s *Serf) reap(old []*memberState, now time.Time, timeout time.Duration) []*memberState {
|
n := len(old)
|
for i := 0; i < n; i++ {
|
m := old[i]
|
|
// Skip if the timeout is not yet reached
|
if now.Sub(m.leaveTime) <= timeout {
|
continue
|
}
|
|
// Delete from the list
|
old[i], old[n-1] = old[n-1], nil
|
old = old[:n-1]
|
n--
|
i--
|
|
// Delete from members and send out event
|
s.logger.Printf("[INFO] serf: EventMemberReap: %s", m.Name)
|
s.eraseNode(m)
|
|
}
|
|
return old
|
}
|
|
// reconnect attempts to reconnect to recently fail nodes.
|
func (s *Serf) reconnect() {
|
s.memberLock.RLock()
|
|
// Nothing to do if there are no failed members
|
n := len(s.failedMembers)
|
if n == 0 {
|
s.memberLock.RUnlock()
|
return
|
}
|
|
// Probability we should attempt to reconect is given
|
// by num failed / (num members - num failed - num left)
|
// This means that we probabilistically expect the cluster
|
// to attempt to connect to each failed member once per
|
// reconnect interval
|
numFailed := float32(len(s.failedMembers))
|
numAlive := float32(len(s.members) - len(s.failedMembers) - len(s.leftMembers))
|
if numAlive == 0 {
|
numAlive = 1 // guard against zero divide
|
}
|
prob := numFailed / numAlive
|
if rand.Float32() > prob {
|
s.memberLock.RUnlock()
|
s.logger.Printf("[DEBUG] serf: forgoing reconnect for random throttling")
|
return
|
}
|
|
// Select a random member to try and join
|
idx := rand.Int31n(int32(n))
|
mem := s.failedMembers[idx]
|
s.memberLock.RUnlock()
|
|
// Format the addr
|
addr := net.UDPAddr{IP: mem.Addr, Port: int(mem.Port)}
|
s.logger.Printf("[INFO] serf: attempting reconnect to %v %s", mem.Name, addr.String())
|
|
joinAddr := addr.String()
|
if mem.Name != "" {
|
joinAddr = mem.Name + "/" + addr.String()
|
}
|
|
// Attempt to join at the memberlist level
|
s.memberlist.Join([]string{joinAddr})
|
}
|
|
// getQueueMax will get the maximum queue depth, which might be dynamic depending
|
// on how Serf is configured.
|
func (s *Serf) getQueueMax() int {
|
max := s.config.MaxQueueDepth
|
if s.config.MinQueueDepth > 0 {
|
s.memberLock.RLock()
|
max = 2 * len(s.members)
|
s.memberLock.RUnlock()
|
|
if max < s.config.MinQueueDepth {
|
max = s.config.MinQueueDepth
|
}
|
}
|
return max
|
}
|
|
// checkQueueDepth periodically checks the size of a queue to see if
|
// it is too large
|
func (s *Serf) checkQueueDepth(name string, queue *memberlist.TransmitLimitedQueue) {
|
for {
|
select {
|
case <-time.After(s.config.QueueCheckInterval):
|
numq := queue.NumQueued()
|
metrics.AddSample([]string{"serf", "queue", name}, float32(numq))
|
if numq >= s.config.QueueDepthWarning {
|
s.logger.Printf("[WARN] serf: %s queue depth: %d", name, numq)
|
}
|
if max := s.getQueueMax(); numq > max {
|
s.logger.Printf("[WARN] serf: %s queue depth (%d) exceeds limit (%d), dropping messages!",
|
name, numq, max)
|
queue.Prune(max)
|
}
|
case <-s.shutdownCh:
|
return
|
}
|
}
|
}
|
|
// removeOldMember is used to remove an old member from a list of old
|
// members.
|
func removeOldMember(old []*memberState, name string) []*memberState {
|
for i, m := range old {
|
if m.Name == name {
|
n := len(old)
|
old[i], old[n-1] = old[n-1], nil
|
return old[:n-1]
|
}
|
}
|
|
return old
|
}
|
|
// reapIntents clears out any intents that are older than the timeout. Make sure
|
// the memberLock is held when passing in the Serf instance's recentIntents
|
// member.
|
func reapIntents(intents map[string]nodeIntent, now time.Time, timeout time.Duration) {
|
for node, intent := range intents {
|
if now.Sub(intent.WallTime) > timeout {
|
delete(intents, node)
|
}
|
}
|
}
|
|
// upsertIntent will update an existing intent with the supplied Lamport time,
|
// or create a new entry. This will return true if a new entry was added. The
|
// stamper is used to capture the wall clock time for expiring these buffered
|
// intents. Make sure the memberLock is held when passing in the Serf instance's
|
// recentIntents member.
|
func upsertIntent(intents map[string]nodeIntent, node string, itype messageType,
|
ltime LamportTime, stamper func() time.Time) bool {
|
if intent, ok := intents[node]; !ok || ltime > intent.LTime {
|
intents[node] = nodeIntent{
|
Type: itype,
|
WallTime: stamper(),
|
LTime: ltime,
|
}
|
return true
|
}
|
|
return false
|
}
|
|
// recentIntent checks the recent intent buffer for a matching entry for a given
|
// node, and returns the Lamport time, if an intent is present, indicated by the
|
// returned boolean. Make sure the memberLock is held for read when passing in
|
// the Serf instance's recentIntents member.
|
func recentIntent(intents map[string]nodeIntent, node string, itype messageType) (LamportTime, bool) {
|
if intent, ok := intents[node]; ok && intent.Type == itype {
|
return intent.LTime, true
|
}
|
|
return LamportTime(0), false
|
}
|
|
// handleRejoin attempts to reconnect to previously known alive nodes
|
func (s *Serf) handleRejoin(previous []*PreviousNode) {
|
for _, prev := range previous {
|
// Do not attempt to join ourself
|
if prev.Name == s.config.NodeName {
|
continue
|
}
|
|
joinAddr := prev.Addr
|
if prev.Name != "" {
|
joinAddr = prev.Name + "/" + prev.Addr
|
}
|
|
s.logger.Printf("[INFO] serf: Attempting re-join to previously known node: %s", prev)
|
_, err := s.memberlist.Join([]string{joinAddr})
|
if err == nil {
|
s.logger.Printf("[INFO] serf: Re-joined to previously known node: %s", prev)
|
return
|
}
|
}
|
s.logger.Printf("[WARN] serf: Failed to re-join any previously known node")
|
}
|
|
// encodeTags is used to encode a tag map
|
func (s *Serf) encodeTags(tags map[string]string) []byte {
|
// Support role-only backwards compatibility
|
if s.ProtocolVersion() < 3 {
|
role := tags["role"]
|
return []byte(role)
|
}
|
|
// Use a magic byte prefix and msgpack encode the tags
|
var buf bytes.Buffer
|
buf.WriteByte(tagMagicByte)
|
enc := codec.NewEncoder(&buf, &codec.MsgpackHandle{})
|
if err := enc.Encode(tags); err != nil {
|
panic(fmt.Sprintf("Failed to encode tags: %v", err))
|
}
|
return buf.Bytes()
|
}
|
|
// decodeTags is used to decode a tag map
|
func (s *Serf) decodeTags(buf []byte) map[string]string {
|
tags := make(map[string]string)
|
|
// Backwards compatibility mode
|
if len(buf) == 0 || buf[0] != tagMagicByte {
|
tags["role"] = string(buf)
|
return tags
|
}
|
|
// Decode the tags
|
r := bytes.NewReader(buf[1:])
|
dec := codec.NewDecoder(r, &codec.MsgpackHandle{})
|
if err := dec.Decode(&tags); err != nil {
|
s.logger.Printf("[ERR] serf: Failed to decode tags: %v", err)
|
}
|
return tags
|
}
|
|
// Stats is used to provide operator debugging information
|
func (s *Serf) Stats() map[string]string {
|
toString := func(v uint64) string {
|
return strconv.FormatUint(v, 10)
|
}
|
s.memberLock.RLock()
|
members := toString(uint64(len(s.members)))
|
failed := toString(uint64(len(s.failedMembers)))
|
left := toString(uint64(len(s.leftMembers)))
|
health_score := toString(uint64(s.memberlist.GetHealthScore()))
|
|
s.memberLock.RUnlock()
|
stats := map[string]string{
|
"members": members,
|
"failed": failed,
|
"left": left,
|
"health_score": health_score,
|
"member_time": toString(uint64(s.clock.Time())),
|
"event_time": toString(uint64(s.eventClock.Time())),
|
"query_time": toString(uint64(s.queryClock.Time())),
|
"intent_queue": toString(uint64(s.broadcasts.NumQueued())),
|
"event_queue": toString(uint64(s.eventBroadcasts.NumQueued())),
|
"query_queue": toString(uint64(s.queryBroadcasts.NumQueued())),
|
"encrypted": fmt.Sprintf("%v", s.EncryptionEnabled()),
|
}
|
if !s.config.DisableCoordinates {
|
stats["coordinate_resets"] = toString(uint64(s.coordClient.Stats().Resets))
|
}
|
return stats
|
}
|
|
// WriteKeyringFile will serialize the current keyring and save it to a file.
|
func (s *Serf) writeKeyringFile() error {
|
if len(s.config.KeyringFile) == 0 {
|
return nil
|
}
|
|
keyring := s.config.MemberlistConfig.Keyring
|
keysRaw := keyring.GetKeys()
|
keysEncoded := make([]string, len(keysRaw))
|
|
for i, key := range keysRaw {
|
keysEncoded[i] = base64.StdEncoding.EncodeToString(key)
|
}
|
|
encodedKeys, err := json.MarshalIndent(keysEncoded, "", " ")
|
if err != nil {
|
return fmt.Errorf("Failed to encode keys: %s", err)
|
}
|
|
// Use 0600 for permissions because key data is sensitive
|
if err = ioutil.WriteFile(s.config.KeyringFile, encodedKeys, 0600); err != nil {
|
return fmt.Errorf("Failed to write keyring file: %s", err)
|
}
|
|
// Success!
|
return nil
|
}
|
|
// GetCoordinate returns the network coordinate of the local node.
|
func (s *Serf) GetCoordinate() (*coordinate.Coordinate, error) {
|
if !s.config.DisableCoordinates {
|
return s.coordClient.GetCoordinate(), nil
|
}
|
|
return nil, fmt.Errorf("Coordinates are disabled")
|
}
|
|
// GetCachedCoordinate returns the network coordinate for the node with the given
|
// name. This will only be valid if DisableCoordinates is set to false.
|
func (s *Serf) GetCachedCoordinate(name string) (coord *coordinate.Coordinate, ok bool) {
|
if !s.config.DisableCoordinates {
|
s.coordCacheLock.RLock()
|
defer s.coordCacheLock.RUnlock()
|
if coord, ok = s.coordCache[name]; ok {
|
return coord, true
|
}
|
|
return nil, false
|
}
|
|
return nil, false
|
}
|
|
// NumNodes returns the number of nodes in the serf cluster, regardless of
|
// their health or status.
|
func (s *Serf) NumNodes() (numNodes int) {
|
s.memberLock.RLock()
|
numNodes = len(s.members)
|
s.memberLock.RUnlock()
|
|
return numNodes
|
}
|