package memberlist
|
|
import (
|
"bytes"
|
"fmt"
|
"math"
|
"math/rand"
|
"net"
|
"strings"
|
"sync/atomic"
|
"time"
|
|
metrics "github.com/armon/go-metrics"
|
)
|
|
type nodeStateType int
|
|
const (
|
stateAlive nodeStateType = iota
|
stateSuspect
|
stateDead
|
stateLeft
|
)
|
|
// Node represents a node in the cluster.
|
type Node struct {
|
Name string
|
Addr net.IP
|
Port uint16
|
Meta []byte // Metadata from the delegate for this node.
|
PMin uint8 // Minimum protocol version this understands
|
PMax uint8 // Maximum protocol version this understands
|
PCur uint8 // Current version node is speaking
|
DMin uint8 // Min protocol version for the delegate to understand
|
DMax uint8 // Max protocol version for the delegate to understand
|
DCur uint8 // Current version delegate is speaking
|
}
|
|
// Address returns the host:port form of a node's address, suitable for use
|
// with a transport.
|
func (n *Node) Address() string {
|
return joinHostPort(n.Addr.String(), n.Port)
|
}
|
|
// FullAddress returns the node name and host:port form of a node's address,
|
// suitable for use with a transport.
|
func (n *Node) FullAddress() Address {
|
return Address{
|
Addr: joinHostPort(n.Addr.String(), n.Port),
|
Name: n.Name,
|
}
|
}
|
|
// String returns the node name
|
func (n *Node) String() string {
|
return n.Name
|
}
|
|
// NodeState is used to manage our state view of another node
|
type nodeState struct {
|
Node
|
Incarnation uint32 // Last known incarnation number
|
State nodeStateType // Current state
|
StateChange time.Time // Time last state change happened
|
}
|
|
// Address returns the host:port form of a node's address, suitable for use
|
// with a transport.
|
func (n *nodeState) Address() string {
|
return n.Node.Address()
|
}
|
|
// FullAddress returns the node name and host:port form of a node's address,
|
// suitable for use with a transport.
|
func (n *nodeState) FullAddress() Address {
|
return n.Node.FullAddress()
|
}
|
|
func (n *nodeState) DeadOrLeft() bool {
|
return n.State == stateDead || n.State == stateLeft
|
}
|
|
// ackHandler is used to register handlers for incoming acks and nacks.
|
type ackHandler struct {
|
ackFn func([]byte, time.Time)
|
nackFn func()
|
timer *time.Timer
|
}
|
|
// NoPingResponseError is used to indicate a 'ping' packet was
|
// successfully issued but no response was received
|
type NoPingResponseError struct {
|
node string
|
}
|
|
func (f NoPingResponseError) Error() string {
|
return fmt.Sprintf("No response from node %s", f.node)
|
}
|
|
// Schedule is used to ensure the Tick is performed periodically. This
|
// function is safe to call multiple times. If the memberlist is already
|
// scheduled, then it won't do anything.
|
func (m *Memberlist) schedule() {
|
m.tickerLock.Lock()
|
defer m.tickerLock.Unlock()
|
|
// If we already have tickers, then don't do anything, since we're
|
// scheduled
|
if len(m.tickers) > 0 {
|
return
|
}
|
|
// Create the stop tick channel, a blocking channel. We close this
|
// when we should stop the tickers.
|
stopCh := make(chan struct{})
|
|
// Create a new probeTicker
|
if m.config.ProbeInterval > 0 {
|
t := time.NewTicker(m.config.ProbeInterval)
|
go m.triggerFunc(m.config.ProbeInterval, t.C, stopCh, m.probe)
|
m.tickers = append(m.tickers, t)
|
}
|
|
// Create a push pull ticker if needed
|
if m.config.PushPullInterval > 0 {
|
go m.pushPullTrigger(stopCh)
|
}
|
|
// Create a gossip ticker if needed
|
if m.config.GossipInterval > 0 && m.config.GossipNodes > 0 {
|
t := time.NewTicker(m.config.GossipInterval)
|
go m.triggerFunc(m.config.GossipInterval, t.C, stopCh, m.gossip)
|
m.tickers = append(m.tickers, t)
|
}
|
|
// If we made any tickers, then record the stopTick channel for
|
// later.
|
if len(m.tickers) > 0 {
|
m.stopTick = stopCh
|
}
|
}
|
|
// triggerFunc is used to trigger a function call each time a
|
// message is received until a stop tick arrives.
|
func (m *Memberlist) triggerFunc(stagger time.Duration, C <-chan time.Time, stop <-chan struct{}, f func()) {
|
// Use a random stagger to avoid syncronizing
|
randStagger := time.Duration(uint64(rand.Int63()) % uint64(stagger))
|
select {
|
case <-time.After(randStagger):
|
case <-stop:
|
return
|
}
|
for {
|
select {
|
case <-C:
|
f()
|
case <-stop:
|
return
|
}
|
}
|
}
|
|
// pushPullTrigger is used to periodically trigger a push/pull until
|
// a stop tick arrives. We don't use triggerFunc since the push/pull
|
// timer is dynamically scaled based on cluster size to avoid network
|
// saturation
|
func (m *Memberlist) pushPullTrigger(stop <-chan struct{}) {
|
interval := m.config.PushPullInterval
|
|
// Use a random stagger to avoid syncronizing
|
randStagger := time.Duration(uint64(rand.Int63()) % uint64(interval))
|
select {
|
case <-time.After(randStagger):
|
case <-stop:
|
return
|
}
|
|
// Tick using a dynamic timer
|
for {
|
tickTime := pushPullScale(interval, m.estNumNodes())
|
select {
|
case <-time.After(tickTime):
|
m.pushPull()
|
case <-stop:
|
return
|
}
|
}
|
}
|
|
// Deschedule is used to stop the background maintenance. This is safe
|
// to call multiple times.
|
func (m *Memberlist) deschedule() {
|
m.tickerLock.Lock()
|
defer m.tickerLock.Unlock()
|
|
// If we have no tickers, then we aren't scheduled.
|
if len(m.tickers) == 0 {
|
return
|
}
|
|
// Close the stop channel so all the ticker listeners stop.
|
close(m.stopTick)
|
|
// Explicitly stop all the tickers themselves so they don't take
|
// up any more resources, and get rid of the list.
|
for _, t := range m.tickers {
|
t.Stop()
|
}
|
m.tickers = nil
|
}
|
|
// Tick is used to perform a single round of failure detection and gossip
|
func (m *Memberlist) probe() {
|
// Track the number of indexes we've considered probing
|
numCheck := 0
|
START:
|
m.nodeLock.RLock()
|
|
// Make sure we don't wrap around infinitely
|
if numCheck >= len(m.nodes) {
|
m.nodeLock.RUnlock()
|
return
|
}
|
|
// Handle the wrap around case
|
if m.probeIndex >= len(m.nodes) {
|
m.nodeLock.RUnlock()
|
m.resetNodes()
|
m.probeIndex = 0
|
numCheck++
|
goto START
|
}
|
|
// Determine if we should probe this node
|
skip := false
|
var node nodeState
|
|
node = *m.nodes[m.probeIndex]
|
if node.Name == m.config.Name {
|
skip = true
|
} else if node.DeadOrLeft() {
|
skip = true
|
}
|
|
// Potentially skip
|
m.nodeLock.RUnlock()
|
m.probeIndex++
|
if skip {
|
numCheck++
|
goto START
|
}
|
|
// Probe the specific node
|
m.probeNode(&node)
|
}
|
|
// probeNodeByAddr just safely calls probeNode given only the address of the node (for tests)
|
func (m *Memberlist) probeNodeByAddr(addr string) {
|
m.nodeLock.RLock()
|
n := m.nodeMap[addr]
|
m.nodeLock.RUnlock()
|
|
m.probeNode(n)
|
}
|
|
// failedRemote checks the error and decides if it indicates a failure on the
|
// other end.
|
func failedRemote(err error) bool {
|
switch t := err.(type) {
|
case *net.OpError:
|
if strings.HasPrefix(t.Net, "tcp") {
|
switch t.Op {
|
case "dial", "read", "write":
|
return true
|
}
|
}
|
}
|
return false
|
}
|
|
// probeNode handles a single round of failure checking on a node.
|
func (m *Memberlist) probeNode(node *nodeState) {
|
defer metrics.MeasureSince([]string{"memberlist", "probeNode"}, time.Now())
|
|
// We use our health awareness to scale the overall probe interval, so we
|
// slow down if we detect problems. The ticker that calls us can handle
|
// us running over the base interval, and will skip missed ticks.
|
probeInterval := m.awareness.ScaleTimeout(m.config.ProbeInterval)
|
if probeInterval > m.config.ProbeInterval {
|
metrics.IncrCounter([]string{"memberlist", "degraded", "probe"}, 1)
|
}
|
|
// Prepare a ping message and setup an ack handler.
|
selfAddr, selfPort := m.getAdvertise()
|
ping := ping{
|
SeqNo: m.nextSeqNo(),
|
Node: node.Name,
|
SourceAddr: selfAddr,
|
SourcePort: selfPort,
|
SourceNode: m.config.Name,
|
}
|
ackCh := make(chan ackMessage, m.config.IndirectChecks+1)
|
nackCh := make(chan struct{}, m.config.IndirectChecks+1)
|
m.setProbeChannels(ping.SeqNo, ackCh, nackCh, probeInterval)
|
|
// Mark the sent time here, which should be after any pre-processing but
|
// before system calls to do the actual send. This probably over-reports
|
// a bit, but it's the best we can do. We had originally put this right
|
// after the I/O, but that would sometimes give negative RTT measurements
|
// which was not desirable.
|
sent := time.Now()
|
|
// Send a ping to the node. If this node looks like it's suspect or dead,
|
// also tack on a suspect message so that it has a chance to refute as
|
// soon as possible.
|
deadline := sent.Add(probeInterval)
|
addr := node.Address()
|
|
// Arrange for our self-awareness to get updated.
|
var awarenessDelta int
|
defer func() {
|
m.awareness.ApplyDelta(awarenessDelta)
|
}()
|
if node.State == stateAlive {
|
if err := m.encodeAndSendMsg(node.FullAddress(), pingMsg, &ping); err != nil {
|
m.logger.Printf("[ERR] memberlist: Failed to send ping: %s", err)
|
if failedRemote(err) {
|
goto HANDLE_REMOTE_FAILURE
|
} else {
|
return
|
}
|
}
|
} else {
|
var msgs [][]byte
|
if buf, err := encode(pingMsg, &ping); err != nil {
|
m.logger.Printf("[ERR] memberlist: Failed to encode ping message: %s", err)
|
return
|
} else {
|
msgs = append(msgs, buf.Bytes())
|
}
|
s := suspect{Incarnation: node.Incarnation, Node: node.Name, From: m.config.Name}
|
if buf, err := encode(suspectMsg, &s); err != nil {
|
m.logger.Printf("[ERR] memberlist: Failed to encode suspect message: %s", err)
|
return
|
} else {
|
msgs = append(msgs, buf.Bytes())
|
}
|
|
compound := makeCompoundMessage(msgs)
|
if err := m.rawSendMsgPacket(node.FullAddress(), &node.Node, compound.Bytes()); err != nil {
|
m.logger.Printf("[ERR] memberlist: Failed to send compound ping and suspect message to %s: %s", addr, err)
|
if failedRemote(err) {
|
goto HANDLE_REMOTE_FAILURE
|
} else {
|
return
|
}
|
}
|
}
|
|
// Arrange for our self-awareness to get updated. At this point we've
|
// sent the ping, so any return statement means the probe succeeded
|
// which will improve our health until we get to the failure scenarios
|
// at the end of this function, which will alter this delta variable
|
// accordingly.
|
awarenessDelta = -1
|
|
// Wait for response or round-trip-time.
|
select {
|
case v := <-ackCh:
|
if v.Complete == true {
|
if m.config.Ping != nil {
|
rtt := v.Timestamp.Sub(sent)
|
m.config.Ping.NotifyPingComplete(&node.Node, rtt, v.Payload)
|
}
|
return
|
}
|
|
// As an edge case, if we get a timeout, we need to re-enqueue it
|
// here to break out of the select below.
|
if v.Complete == false {
|
ackCh <- v
|
}
|
case <-time.After(m.config.ProbeTimeout):
|
// Note that we don't scale this timeout based on awareness and
|
// the health score. That's because we don't really expect waiting
|
// longer to help get UDP through. Since health does extend the
|
// probe interval it will give the TCP fallback more time, which
|
// is more active in dealing with lost packets, and it gives more
|
// time to wait for indirect acks/nacks.
|
m.logger.Printf("[DEBUG] memberlist: Failed ping: %s (timeout reached)", node.Name)
|
}
|
|
HANDLE_REMOTE_FAILURE:
|
// Get some random live nodes.
|
m.nodeLock.RLock()
|
kNodes := kRandomNodes(m.config.IndirectChecks, m.nodes, func(n *nodeState) bool {
|
return n.Name == m.config.Name ||
|
n.Name == node.Name ||
|
n.State != stateAlive
|
})
|
m.nodeLock.RUnlock()
|
|
// Attempt an indirect ping.
|
expectedNacks := 0
|
selfAddr, selfPort = m.getAdvertise()
|
ind := indirectPingReq{
|
SeqNo: ping.SeqNo,
|
Target: node.Addr,
|
Port: node.Port,
|
Node: node.Name,
|
SourceAddr: selfAddr,
|
SourcePort: selfPort,
|
SourceNode: m.config.Name,
|
}
|
for _, peer := range kNodes {
|
// We only expect nack to be sent from peers who understand
|
// version 4 of the protocol.
|
if ind.Nack = peer.PMax >= 4; ind.Nack {
|
expectedNacks++
|
}
|
|
if err := m.encodeAndSendMsg(peer.FullAddress(), indirectPingMsg, &ind); err != nil {
|
m.logger.Printf("[ERR] memberlist: Failed to send indirect ping: %s", err)
|
}
|
}
|
|
// Also make an attempt to contact the node directly over TCP. This
|
// helps prevent confused clients who get isolated from UDP traffic
|
// but can still speak TCP (which also means they can possibly report
|
// misinformation to other nodes via anti-entropy), avoiding flapping in
|
// the cluster.
|
//
|
// This is a little unusual because we will attempt a TCP ping to any
|
// member who understands version 3 of the protocol, regardless of
|
// which protocol version we are speaking. That's why we've included a
|
// config option to turn this off if desired.
|
fallbackCh := make(chan bool, 1)
|
|
disableTcpPings := m.config.DisableTcpPings ||
|
(m.config.DisableTcpPingsForNode != nil && m.config.DisableTcpPingsForNode(node.Name))
|
if (!disableTcpPings) && (node.PMax >= 3) {
|
go func() {
|
defer close(fallbackCh)
|
didContact, err := m.sendPingAndWaitForAck(node.FullAddress(), ping, deadline)
|
if err != nil {
|
m.logger.Printf("[ERR] memberlist: Failed fallback ping: %s", err)
|
} else {
|
fallbackCh <- didContact
|
}
|
}()
|
} else {
|
close(fallbackCh)
|
}
|
|
// Wait for the acks or timeout. Note that we don't check the fallback
|
// channel here because we want to issue a warning below if that's the
|
// *only* way we hear back from the peer, so we have to let this time
|
// out first to allow the normal UDP-based acks to come in.
|
select {
|
case v := <-ackCh:
|
if v.Complete == true {
|
return
|
}
|
}
|
|
// Finally, poll the fallback channel. The timeouts are set such that
|
// the channel will have something or be closed without having to wait
|
// any additional time here.
|
for didContact := range fallbackCh {
|
if didContact {
|
m.logger.Printf("[WARN] memberlist: Was able to connect to %s but other probes failed, network may be misconfigured", node.Name)
|
return
|
}
|
}
|
|
// Update our self-awareness based on the results of this failed probe.
|
// If we don't have peers who will send nacks then we penalize for any
|
// failed probe as a simple health metric. If we do have peers to nack
|
// verify, then we can use that as a more sophisticated measure of self-
|
// health because we assume them to be working, and they can help us
|
// decide if the probed node was really dead or if it was something wrong
|
// with ourselves.
|
awarenessDelta = 0
|
if expectedNacks > 0 {
|
if nackCount := len(nackCh); nackCount < expectedNacks {
|
awarenessDelta += (expectedNacks - nackCount)
|
}
|
} else {
|
awarenessDelta += 1
|
}
|
|
// No acks received from target, suspect it as failed.
|
m.logger.Printf("[INFO] memberlist: Suspect %s has failed, no acks received", node.Name)
|
s := suspect{Incarnation: node.Incarnation, Node: node.Name, From: m.config.Name}
|
m.suspectNode(&s)
|
}
|
|
// Ping initiates a ping to the node with the specified name.
|
func (m *Memberlist) Ping(node string, addr net.Addr) (time.Duration, error) {
|
// Prepare a ping message and setup an ack handler.
|
selfAddr, selfPort := m.getAdvertise()
|
ping := ping{
|
SeqNo: m.nextSeqNo(),
|
Node: node,
|
SourceAddr: selfAddr,
|
SourcePort: selfPort,
|
SourceNode: m.config.Name,
|
}
|
ackCh := make(chan ackMessage, m.config.IndirectChecks+1)
|
m.setProbeChannels(ping.SeqNo, ackCh, nil, m.config.ProbeInterval)
|
|
a := Address{Addr: addr.String(), Name: node}
|
|
// Send a ping to the node.
|
if err := m.encodeAndSendMsg(a, pingMsg, &ping); err != nil {
|
return 0, err
|
}
|
|
// Mark the sent time here, which should be after any pre-processing and
|
// system calls to do the actual send. This probably under-reports a bit,
|
// but it's the best we can do.
|
sent := time.Now()
|
|
// Wait for response or timeout.
|
select {
|
case v := <-ackCh:
|
if v.Complete == true {
|
return v.Timestamp.Sub(sent), nil
|
}
|
case <-time.After(m.config.ProbeTimeout):
|
// Timeout, return an error below.
|
}
|
|
m.logger.Printf("[DEBUG] memberlist: Failed UDP ping: %v (timeout reached)", node)
|
return 0, NoPingResponseError{ping.Node}
|
}
|
|
// resetNodes is used when the tick wraps around. It will reap the
|
// dead nodes and shuffle the node list.
|
func (m *Memberlist) resetNodes() {
|
m.nodeLock.Lock()
|
defer m.nodeLock.Unlock()
|
|
// Move dead nodes, but respect gossip to the dead interval
|
deadIdx := moveDeadNodes(m.nodes, m.config.GossipToTheDeadTime)
|
|
// Deregister the dead nodes
|
for i := deadIdx; i < len(m.nodes); i++ {
|
delete(m.nodeMap, m.nodes[i].Name)
|
m.nodes[i] = nil
|
}
|
|
// Trim the nodes to exclude the dead nodes
|
m.nodes = m.nodes[0:deadIdx]
|
|
// Update numNodes after we've trimmed the dead nodes
|
atomic.StoreUint32(&m.numNodes, uint32(deadIdx))
|
|
// Shuffle live nodes
|
shuffleNodes(m.nodes)
|
}
|
|
// gossip is invoked every GossipInterval period to broadcast our gossip
|
// messages to a few random nodes.
|
func (m *Memberlist) gossip() {
|
defer metrics.MeasureSince([]string{"memberlist", "gossip"}, time.Now())
|
|
// Get some random live, suspect, or recently dead nodes
|
m.nodeLock.RLock()
|
kNodes := kRandomNodes(m.config.GossipNodes, m.nodes, func(n *nodeState) bool {
|
if n.Name == m.config.Name {
|
return true
|
}
|
|
switch n.State {
|
case stateAlive, stateSuspect:
|
return false
|
|
case stateDead:
|
return time.Since(n.StateChange) > m.config.GossipToTheDeadTime
|
|
default:
|
return true
|
}
|
})
|
m.nodeLock.RUnlock()
|
|
// Compute the bytes available
|
bytesAvail := m.config.UDPBufferSize - compoundHeaderOverhead
|
if m.config.EncryptionEnabled() {
|
bytesAvail -= encryptOverhead(m.encryptionVersion())
|
}
|
|
for _, node := range kNodes {
|
// Get any pending broadcasts
|
msgs := m.getBroadcasts(compoundOverhead, bytesAvail)
|
if len(msgs) == 0 {
|
return
|
}
|
|
addr := node.Address()
|
if len(msgs) == 1 {
|
// Send single message as is
|
if err := m.rawSendMsgPacket(node.FullAddress(), &node.Node, msgs[0]); err != nil {
|
m.logger.Printf("[ERR] memberlist: Failed to send gossip to %s: %s", addr, err)
|
}
|
} else {
|
// Otherwise create and send a compound message
|
compound := makeCompoundMessage(msgs)
|
if err := m.rawSendMsgPacket(node.FullAddress(), &node.Node, compound.Bytes()); err != nil {
|
m.logger.Printf("[ERR] memberlist: Failed to send gossip to %s: %s", addr, err)
|
}
|
}
|
}
|
}
|
|
// pushPull is invoked periodically to randomly perform a complete state
|
// exchange. Used to ensure a high level of convergence, but is also
|
// reasonably expensive as the entire state of this node is exchanged
|
// with the other node.
|
func (m *Memberlist) pushPull() {
|
// Get a random live node
|
m.nodeLock.RLock()
|
nodes := kRandomNodes(1, m.nodes, func(n *nodeState) bool {
|
return n.Name == m.config.Name ||
|
n.State != stateAlive
|
})
|
m.nodeLock.RUnlock()
|
|
// If no nodes, bail
|
if len(nodes) == 0 {
|
return
|
}
|
node := nodes[0]
|
|
// Attempt a push pull
|
if err := m.pushPullNode(node.FullAddress(), false); err != nil {
|
m.logger.Printf("[ERR] memberlist: Push/Pull with %s failed: %s", node.Name, err)
|
}
|
}
|
|
// pushPullNode does a complete state exchange with a specific node.
|
func (m *Memberlist) pushPullNode(a Address, join bool) error {
|
defer metrics.MeasureSince([]string{"memberlist", "pushPullNode"}, time.Now())
|
|
// Attempt to send and receive with the node
|
remote, userState, err := m.sendAndReceiveState(a, join)
|
if err != nil {
|
return err
|
}
|
|
if err := m.mergeRemoteState(join, remote, userState); err != nil {
|
return err
|
}
|
return nil
|
}
|
|
// verifyProtocol verifies that all the remote nodes can speak with our
|
// nodes and vice versa on both the core protocol as well as the
|
// delegate protocol level.
|
//
|
// The verification works by finding the maximum minimum and
|
// minimum maximum understood protocol and delegate versions. In other words,
|
// it finds the common denominator of protocol and delegate version ranges
|
// for the entire cluster.
|
//
|
// After this, it goes through the entire cluster (local and remote) and
|
// verifies that everyone's speaking protocol versions satisfy this range.
|
// If this passes, it means that every node can understand each other.
|
func (m *Memberlist) verifyProtocol(remote []pushNodeState) error {
|
m.nodeLock.RLock()
|
defer m.nodeLock.RUnlock()
|
|
// Maximum minimum understood and minimum maximum understood for both
|
// the protocol and delegate versions. We use this to verify everyone
|
// can be understood.
|
var maxpmin, minpmax uint8
|
var maxdmin, mindmax uint8
|
minpmax = math.MaxUint8
|
mindmax = math.MaxUint8
|
|
for _, rn := range remote {
|
// If the node isn't alive, then skip it
|
if rn.State != stateAlive {
|
continue
|
}
|
|
// Skip nodes that don't have versions set, it just means
|
// their version is zero.
|
if len(rn.Vsn) == 0 {
|
continue
|
}
|
|
if rn.Vsn[0] > maxpmin {
|
maxpmin = rn.Vsn[0]
|
}
|
|
if rn.Vsn[1] < minpmax {
|
minpmax = rn.Vsn[1]
|
}
|
|
if rn.Vsn[3] > maxdmin {
|
maxdmin = rn.Vsn[3]
|
}
|
|
if rn.Vsn[4] < mindmax {
|
mindmax = rn.Vsn[4]
|
}
|
}
|
|
for _, n := range m.nodes {
|
// Ignore non-alive nodes
|
if n.State != stateAlive {
|
continue
|
}
|
|
if n.PMin > maxpmin {
|
maxpmin = n.PMin
|
}
|
|
if n.PMax < minpmax {
|
minpmax = n.PMax
|
}
|
|
if n.DMin > maxdmin {
|
maxdmin = n.DMin
|
}
|
|
if n.DMax < mindmax {
|
mindmax = n.DMax
|
}
|
}
|
|
// Now that we definitively know the minimum and maximum understood
|
// version that satisfies the whole cluster, we verify that every
|
// node in the cluster satisifies this.
|
for _, n := range remote {
|
var nPCur, nDCur uint8
|
if len(n.Vsn) > 0 {
|
nPCur = n.Vsn[2]
|
nDCur = n.Vsn[5]
|
}
|
|
if nPCur < maxpmin || nPCur > minpmax {
|
return fmt.Errorf(
|
"Node '%s' protocol version (%d) is incompatible: [%d, %d]",
|
n.Name, nPCur, maxpmin, minpmax)
|
}
|
|
if nDCur < maxdmin || nDCur > mindmax {
|
return fmt.Errorf(
|
"Node '%s' delegate protocol version (%d) is incompatible: [%d, %d]",
|
n.Name, nDCur, maxdmin, mindmax)
|
}
|
}
|
|
for _, n := range m.nodes {
|
nPCur := n.PCur
|
nDCur := n.DCur
|
|
if nPCur < maxpmin || nPCur > minpmax {
|
return fmt.Errorf(
|
"Node '%s' protocol version (%d) is incompatible: [%d, %d]",
|
n.Name, nPCur, maxpmin, minpmax)
|
}
|
|
if nDCur < maxdmin || nDCur > mindmax {
|
return fmt.Errorf(
|
"Node '%s' delegate protocol version (%d) is incompatible: [%d, %d]",
|
n.Name, nDCur, maxdmin, mindmax)
|
}
|
}
|
|
return nil
|
}
|
|
// nextSeqNo returns a usable sequence number in a thread safe way
|
func (m *Memberlist) nextSeqNo() uint32 {
|
return atomic.AddUint32(&m.sequenceNum, 1)
|
}
|
|
// nextIncarnation returns the next incarnation number in a thread safe way
|
func (m *Memberlist) nextIncarnation() uint32 {
|
return atomic.AddUint32(&m.incarnation, 1)
|
}
|
|
// skipIncarnation adds the positive offset to the incarnation number.
|
func (m *Memberlist) skipIncarnation(offset uint32) uint32 {
|
return atomic.AddUint32(&m.incarnation, offset)
|
}
|
|
// estNumNodes is used to get the current estimate of the number of nodes
|
func (m *Memberlist) estNumNodes() int {
|
return int(atomic.LoadUint32(&m.numNodes))
|
}
|
|
type ackMessage struct {
|
Complete bool
|
Payload []byte
|
Timestamp time.Time
|
}
|
|
// setProbeChannels is used to attach the ackCh to receive a message when an ack
|
// with a given sequence number is received. The `complete` field of the message
|
// will be false on timeout. Any nack messages will cause an empty struct to be
|
// passed to the nackCh, which can be nil if not needed.
|
func (m *Memberlist) setProbeChannels(seqNo uint32, ackCh chan ackMessage, nackCh chan struct{}, timeout time.Duration) {
|
// Create handler functions for acks and nacks
|
ackFn := func(payload []byte, timestamp time.Time) {
|
select {
|
case ackCh <- ackMessage{true, payload, timestamp}:
|
default:
|
}
|
}
|
nackFn := func() {
|
select {
|
case nackCh <- struct{}{}:
|
default:
|
}
|
}
|
|
// Add the handlers
|
ah := &ackHandler{ackFn, nackFn, nil}
|
m.ackLock.Lock()
|
m.ackHandlers[seqNo] = ah
|
m.ackLock.Unlock()
|
|
// Setup a reaping routing
|
ah.timer = time.AfterFunc(timeout, func() {
|
m.ackLock.Lock()
|
delete(m.ackHandlers, seqNo)
|
m.ackLock.Unlock()
|
select {
|
case ackCh <- ackMessage{false, nil, time.Now()}:
|
default:
|
}
|
})
|
}
|
|
// setAckHandler is used to attach a handler to be invoked when an ack with a
|
// given sequence number is received. If a timeout is reached, the handler is
|
// deleted. This is used for indirect pings so does not configure a function
|
// for nacks.
|
func (m *Memberlist) setAckHandler(seqNo uint32, ackFn func([]byte, time.Time), timeout time.Duration) {
|
// Add the handler
|
ah := &ackHandler{ackFn, nil, nil}
|
m.ackLock.Lock()
|
m.ackHandlers[seqNo] = ah
|
m.ackLock.Unlock()
|
|
// Setup a reaping routing
|
ah.timer = time.AfterFunc(timeout, func() {
|
m.ackLock.Lock()
|
delete(m.ackHandlers, seqNo)
|
m.ackLock.Unlock()
|
})
|
}
|
|
// Invokes an ack handler if any is associated, and reaps the handler immediately
|
func (m *Memberlist) invokeAckHandler(ack ackResp, timestamp time.Time) {
|
m.ackLock.Lock()
|
ah, ok := m.ackHandlers[ack.SeqNo]
|
delete(m.ackHandlers, ack.SeqNo)
|
m.ackLock.Unlock()
|
if !ok {
|
return
|
}
|
ah.timer.Stop()
|
ah.ackFn(ack.Payload, timestamp)
|
}
|
|
// Invokes nack handler if any is associated.
|
func (m *Memberlist) invokeNackHandler(nack nackResp) {
|
m.ackLock.Lock()
|
ah, ok := m.ackHandlers[nack.SeqNo]
|
m.ackLock.Unlock()
|
if !ok || ah.nackFn == nil {
|
return
|
}
|
ah.nackFn()
|
}
|
|
// refute gossips an alive message in response to incoming information that we
|
// are suspect or dead. It will make sure the incarnation number beats the given
|
// accusedInc value, or you can supply 0 to just get the next incarnation number.
|
// This alters the node state that's passed in so this MUST be called while the
|
// nodeLock is held.
|
func (m *Memberlist) refute(me *nodeState, accusedInc uint32) {
|
// Make sure the incarnation number beats the accusation.
|
inc := m.nextIncarnation()
|
if accusedInc >= inc {
|
inc = m.skipIncarnation(accusedInc - inc + 1)
|
}
|
me.Incarnation = inc
|
|
// Decrease our health because we are being asked to refute a problem.
|
m.awareness.ApplyDelta(1)
|
|
// Format and broadcast an alive message.
|
a := alive{
|
Incarnation: inc,
|
Node: me.Name,
|
Addr: me.Addr,
|
Port: me.Port,
|
Meta: me.Meta,
|
Vsn: []uint8{
|
me.PMin, me.PMax, me.PCur,
|
me.DMin, me.DMax, me.DCur,
|
},
|
}
|
m.encodeAndBroadcast(me.Addr.String(), aliveMsg, a)
|
}
|
|
// aliveNode is invoked by the network layer when we get a message about a
|
// live node.
|
func (m *Memberlist) aliveNode(a *alive, notify chan struct{}, bootstrap bool) {
|
m.nodeLock.Lock()
|
defer m.nodeLock.Unlock()
|
state, ok := m.nodeMap[a.Node]
|
|
// It is possible that during a Leave(), there is already an aliveMsg
|
// in-queue to be processed but blocked by the locks above. If we let
|
// that aliveMsg process, it'll cause us to re-join the cluster. This
|
// ensures that we don't.
|
if m.hasLeft() && a.Node == m.config.Name {
|
return
|
}
|
|
if len(a.Vsn) >= 3 {
|
pMin := a.Vsn[0]
|
pMax := a.Vsn[1]
|
pCur := a.Vsn[2]
|
if pMin == 0 || pMax == 0 || pMin > pMax {
|
m.logger.Printf("[WARN] memberlist: Ignoring an alive message for '%s' (%v:%d) because protocol version(s) are wrong: %d <= %d <= %d should be >0", a.Node, net.IP(a.Addr), a.Port, pMin, pCur, pMax)
|
return
|
}
|
}
|
|
// Invoke the Alive delegate if any. This can be used to filter out
|
// alive messages based on custom logic. For example, using a cluster name.
|
// Using a merge delegate is not enough, as it is possible for passive
|
// cluster merging to still occur.
|
if m.config.Alive != nil {
|
if len(a.Vsn) < 6 {
|
m.logger.Printf("[WARN] memberlist: ignoring alive message for '%s' (%v:%d) because Vsn is not present",
|
a.Node, net.IP(a.Addr), a.Port)
|
return
|
}
|
node := &Node{
|
Name: a.Node,
|
Addr: a.Addr,
|
Port: a.Port,
|
Meta: a.Meta,
|
PMin: a.Vsn[0],
|
PMax: a.Vsn[1],
|
PCur: a.Vsn[2],
|
DMin: a.Vsn[3],
|
DMax: a.Vsn[4],
|
DCur: a.Vsn[5],
|
}
|
if err := m.config.Alive.NotifyAlive(node); err != nil {
|
m.logger.Printf("[WARN] memberlist: ignoring alive message for '%s': %s",
|
a.Node, err)
|
return
|
}
|
}
|
|
// Check if we've never seen this node before, and if not, then
|
// store this node in our node map.
|
var updatesNode bool
|
if !ok {
|
state = &nodeState{
|
Node: Node{
|
Name: a.Node,
|
Addr: a.Addr,
|
Port: a.Port,
|
Meta: a.Meta,
|
},
|
State: stateDead,
|
}
|
if len(a.Vsn) > 5 {
|
state.PMin = a.Vsn[0]
|
state.PMax = a.Vsn[1]
|
state.PCur = a.Vsn[2]
|
state.DMin = a.Vsn[3]
|
state.DMax = a.Vsn[4]
|
state.DCur = a.Vsn[5]
|
}
|
|
// Add to map
|
m.nodeMap[a.Node] = state
|
|
// Get a random offset. This is important to ensure
|
// the failure detection bound is low on average. If all
|
// nodes did an append, failure detection bound would be
|
// very high.
|
n := len(m.nodes)
|
offset := randomOffset(n)
|
|
// Add at the end and swap with the node at the offset
|
m.nodes = append(m.nodes, state)
|
m.nodes[offset], m.nodes[n] = m.nodes[n], m.nodes[offset]
|
|
// Update numNodes after we've added a new node
|
atomic.AddUint32(&m.numNodes, 1)
|
} else {
|
// Check if this address is different than the existing node unless the old node is dead.
|
if !bytes.Equal([]byte(state.Addr), a.Addr) || state.Port != a.Port {
|
// If DeadNodeReclaimTime is configured, check if enough time has elapsed since the node died.
|
canReclaim := (m.config.DeadNodeReclaimTime > 0 &&
|
time.Since(state.StateChange) > m.config.DeadNodeReclaimTime)
|
|
// Allow the address to be updated if a dead node is being replaced.
|
if state.State == stateLeft || (state.State == stateDead && canReclaim) {
|
m.logger.Printf("[INFO] memberlist: Updating address for left or failed node %s from %v:%d to %v:%d",
|
state.Name, state.Addr, state.Port, net.IP(a.Addr), a.Port)
|
updatesNode = true
|
} else {
|
m.logger.Printf("[ERR] memberlist: Conflicting address for %s. Mine: %v:%d Theirs: %v:%d Old state: %v",
|
state.Name, state.Addr, state.Port, net.IP(a.Addr), a.Port, state.State)
|
|
// Inform the conflict delegate if provided
|
if m.config.Conflict != nil {
|
other := Node{
|
Name: a.Node,
|
Addr: a.Addr,
|
Port: a.Port,
|
Meta: a.Meta,
|
}
|
m.config.Conflict.NotifyConflict(&state.Node, &other)
|
}
|
return
|
}
|
}
|
}
|
|
// Bail if the incarnation number is older, and this is not about us
|
isLocalNode := state.Name == m.config.Name
|
if a.Incarnation <= state.Incarnation && !isLocalNode && !updatesNode {
|
return
|
}
|
|
// Bail if strictly less and this is about us
|
if a.Incarnation < state.Incarnation && isLocalNode {
|
return
|
}
|
|
// Clear out any suspicion timer that may be in effect.
|
delete(m.nodeTimers, a.Node)
|
|
// Store the old state and meta data
|
oldState := state.State
|
oldMeta := state.Meta
|
|
// If this is us we need to refute, otherwise re-broadcast
|
if !bootstrap && isLocalNode {
|
// Compute the version vector
|
versions := []uint8{
|
state.PMin, state.PMax, state.PCur,
|
state.DMin, state.DMax, state.DCur,
|
}
|
|
// If the Incarnation is the same, we need special handling, since it
|
// possible for the following situation to happen:
|
// 1) Start with configuration C, join cluster
|
// 2) Hard fail / Kill / Shutdown
|
// 3) Restart with configuration C', join cluster
|
//
|
// In this case, other nodes and the local node see the same incarnation,
|
// but the values may not be the same. For this reason, we always
|
// need to do an equality check for this Incarnation. In most cases,
|
// we just ignore, but we may need to refute.
|
//
|
if a.Incarnation == state.Incarnation &&
|
bytes.Equal(a.Meta, state.Meta) &&
|
bytes.Equal(a.Vsn, versions) {
|
return
|
}
|
m.refute(state, a.Incarnation)
|
m.logger.Printf("[WARN] memberlist: Refuting an alive message for '%s' (%v:%d) meta:(%v VS %v), vsn:(%v VS %v)", a.Node, net.IP(a.Addr), a.Port, a.Meta, state.Meta, a.Vsn, versions)
|
} else {
|
m.encodeBroadcastNotify(a.Node, aliveMsg, a, notify)
|
|
// Update protocol versions if it arrived
|
if len(a.Vsn) > 0 {
|
state.PMin = a.Vsn[0]
|
state.PMax = a.Vsn[1]
|
state.PCur = a.Vsn[2]
|
state.DMin = a.Vsn[3]
|
state.DMax = a.Vsn[4]
|
state.DCur = a.Vsn[5]
|
}
|
|
// Update the state and incarnation number
|
state.Incarnation = a.Incarnation
|
state.Meta = a.Meta
|
state.Addr = a.Addr
|
state.Port = a.Port
|
if state.State != stateAlive {
|
state.State = stateAlive
|
state.StateChange = time.Now()
|
}
|
}
|
|
// Update metrics
|
metrics.IncrCounter([]string{"memberlist", "msg", "alive"}, 1)
|
|
// Notify the delegate of any relevant updates
|
if m.config.Events != nil {
|
if oldState == stateDead || oldState == stateLeft {
|
// if Dead/Left -> Alive, notify of join
|
m.config.Events.NotifyJoin(&state.Node)
|
|
} else if !bytes.Equal(oldMeta, state.Meta) {
|
// if Meta changed, trigger an update notification
|
m.config.Events.NotifyUpdate(&state.Node)
|
}
|
}
|
}
|
|
// suspectNode is invoked by the network layer when we get a message
|
// about a suspect node
|
func (m *Memberlist) suspectNode(s *suspect) {
|
m.nodeLock.Lock()
|
defer m.nodeLock.Unlock()
|
state, ok := m.nodeMap[s.Node]
|
|
// If we've never heard about this node before, ignore it
|
if !ok {
|
return
|
}
|
|
// Ignore old incarnation numbers
|
if s.Incarnation < state.Incarnation {
|
return
|
}
|
|
// See if there's a suspicion timer we can confirm. If the info is new
|
// to us we will go ahead and re-gossip it. This allows for multiple
|
// independent confirmations to flow even when a node probes a node
|
// that's already suspect.
|
if timer, ok := m.nodeTimers[s.Node]; ok {
|
if timer.Confirm(s.From) {
|
m.encodeAndBroadcast(s.Node, suspectMsg, s)
|
}
|
return
|
}
|
|
// Ignore non-alive nodes
|
if state.State != stateAlive {
|
return
|
}
|
|
// If this is us we need to refute, otherwise re-broadcast
|
if state.Name == m.config.Name {
|
m.refute(state, s.Incarnation)
|
m.logger.Printf("[WARN] memberlist: Refuting a suspect message (from: %s)", s.From)
|
return // Do not mark ourself suspect
|
} else {
|
m.encodeAndBroadcast(s.Node, suspectMsg, s)
|
}
|
|
// Update metrics
|
metrics.IncrCounter([]string{"memberlist", "msg", "suspect"}, 1)
|
|
// Update the state
|
state.Incarnation = s.Incarnation
|
state.State = stateSuspect
|
changeTime := time.Now()
|
state.StateChange = changeTime
|
|
// Setup a suspicion timer. Given that we don't have any known phase
|
// relationship with our peers, we set up k such that we hit the nominal
|
// timeout two probe intervals short of what we expect given the suspicion
|
// multiplier.
|
k := m.config.SuspicionMult - 2
|
|
// If there aren't enough nodes to give the expected confirmations, just
|
// set k to 0 to say that we don't expect any. Note we subtract 2 from n
|
// here to take out ourselves and the node being probed.
|
n := m.estNumNodes()
|
if n-2 < k {
|
k = 0
|
}
|
|
// Compute the timeouts based on the size of the cluster.
|
min := suspicionTimeout(m.config.SuspicionMult, n, m.config.ProbeInterval)
|
max := time.Duration(m.config.SuspicionMaxTimeoutMult) * min
|
fn := func(numConfirmations int) {
|
m.nodeLock.Lock()
|
state, ok := m.nodeMap[s.Node]
|
timeout := ok && state.State == stateSuspect && state.StateChange == changeTime
|
m.nodeLock.Unlock()
|
|
if timeout {
|
if k > 0 && numConfirmations < k {
|
metrics.IncrCounter([]string{"memberlist", "degraded", "timeout"}, 1)
|
}
|
|
m.logger.Printf("[INFO] memberlist: Marking %s as failed, suspect timeout reached (%d peer confirmations)",
|
state.Name, numConfirmations)
|
d := dead{Incarnation: state.Incarnation, Node: state.Name, From: m.config.Name}
|
m.deadNode(&d)
|
}
|
}
|
m.nodeTimers[s.Node] = newSuspicion(s.From, k, min, max, fn)
|
}
|
|
// deadNode is invoked by the network layer when we get a message
|
// about a dead node
|
func (m *Memberlist) deadNode(d *dead) {
|
m.nodeLock.Lock()
|
defer m.nodeLock.Unlock()
|
state, ok := m.nodeMap[d.Node]
|
|
// If we've never heard about this node before, ignore it
|
if !ok {
|
return
|
}
|
|
// Ignore old incarnation numbers
|
if d.Incarnation < state.Incarnation {
|
return
|
}
|
|
// Clear out any suspicion timer that may be in effect.
|
delete(m.nodeTimers, d.Node)
|
|
// Ignore if node is already dead
|
if state.DeadOrLeft() {
|
return
|
}
|
|
// Check if this is us
|
if state.Name == m.config.Name {
|
// If we are not leaving we need to refute
|
if !m.hasLeft() {
|
m.refute(state, d.Incarnation)
|
m.logger.Printf("[WARN] memberlist: Refuting a dead message (from: %s)", d.From)
|
return // Do not mark ourself dead
|
}
|
|
// If we are leaving, we broadcast and wait
|
m.encodeBroadcastNotify(d.Node, deadMsg, d, m.leaveBroadcast)
|
} else {
|
m.encodeAndBroadcast(d.Node, deadMsg, d)
|
}
|
|
// Update metrics
|
metrics.IncrCounter([]string{"memberlist", "msg", "dead"}, 1)
|
|
// Update the state
|
state.Incarnation = d.Incarnation
|
|
// If the dead message was send by the node itself, mark it is left
|
// instead of dead.
|
if d.Node == d.From {
|
state.State = stateLeft
|
} else {
|
state.State = stateDead
|
}
|
state.StateChange = time.Now()
|
|
// Notify of death
|
if m.config.Events != nil {
|
m.config.Events.NotifyLeave(&state.Node)
|
}
|
}
|
|
// mergeState is invoked by the network layer when we get a Push/Pull
|
// state transfer
|
func (m *Memberlist) mergeState(remote []pushNodeState) {
|
for _, r := range remote {
|
switch r.State {
|
case stateAlive:
|
a := alive{
|
Incarnation: r.Incarnation,
|
Node: r.Name,
|
Addr: r.Addr,
|
Port: r.Port,
|
Meta: r.Meta,
|
Vsn: r.Vsn,
|
}
|
m.aliveNode(&a, nil, false)
|
|
case stateLeft:
|
d := dead{Incarnation: r.Incarnation, Node: r.Name, From: r.Name}
|
m.deadNode(&d)
|
case stateDead:
|
// If the remote node believes a node is dead, we prefer to
|
// suspect that node instead of declaring it dead instantly
|
fallthrough
|
case stateSuspect:
|
s := suspect{Incarnation: r.Incarnation, Node: r.Name, From: m.config.Name}
|
m.suspectNode(&s)
|
}
|
}
|
}
|