/*
|
memberlist is a library that manages cluster
|
membership and member failure detection using a gossip based protocol.
|
|
The use cases for such a library are far-reaching: all distributed systems
|
require membership, and memberlist is a re-usable solution to managing
|
cluster membership and node failure detection.
|
|
memberlist is eventually consistent but converges quickly on average.
|
The speed at which it converges can be heavily tuned via various knobs
|
on the protocol. Node failures are detected and network partitions are partially
|
tolerated by attempting to communicate to potentially dead nodes through
|
multiple routes.
|
*/
|
package memberlist
|
|
import (
|
"container/list"
|
"errors"
|
"fmt"
|
"log"
|
"net"
|
"os"
|
"strconv"
|
"strings"
|
"sync"
|
"sync/atomic"
|
"time"
|
|
multierror "github.com/hashicorp/go-multierror"
|
sockaddr "github.com/hashicorp/go-sockaddr"
|
"github.com/miekg/dns"
|
)
|
|
var errNodeNamesAreRequired = errors.New("memberlist: node names are required by configuration but one was not provided")
|
|
type Memberlist struct {
|
sequenceNum uint32 // Local sequence number
|
incarnation uint32 // Local incarnation number
|
numNodes uint32 // Number of known nodes (estimate)
|
pushPullReq uint32 // Number of push/pull requests
|
|
advertiseLock sync.RWMutex
|
advertiseAddr net.IP
|
advertisePort uint16
|
|
config *Config
|
shutdown int32 // Used as an atomic boolean value
|
shutdownCh chan struct{}
|
leave int32 // Used as an atomic boolean value
|
leaveBroadcast chan struct{}
|
|
shutdownLock sync.Mutex // Serializes calls to Shutdown
|
leaveLock sync.Mutex // Serializes calls to Leave
|
|
transport NodeAwareTransport
|
|
handoffCh chan struct{}
|
highPriorityMsgQueue *list.List
|
lowPriorityMsgQueue *list.List
|
msgQueueLock sync.Mutex
|
|
nodeLock sync.RWMutex
|
nodes []*nodeState // Known nodes
|
nodeMap map[string]*nodeState // Maps Node.Name -> NodeState
|
nodeTimers map[string]*suspicion // Maps Node.Name -> suspicion timer
|
awareness *awareness
|
|
tickerLock sync.Mutex
|
tickers []*time.Ticker
|
stopTick chan struct{}
|
probeIndex int
|
|
ackLock sync.Mutex
|
ackHandlers map[uint32]*ackHandler
|
|
broadcasts *TransmitLimitedQueue
|
|
logger *log.Logger
|
}
|
|
// BuildVsnArray creates the array of Vsn
|
func (conf *Config) BuildVsnArray() []uint8 {
|
return []uint8{
|
ProtocolVersionMin, ProtocolVersionMax, conf.ProtocolVersion,
|
conf.DelegateProtocolMin, conf.DelegateProtocolMax,
|
conf.DelegateProtocolVersion,
|
}
|
}
|
|
// newMemberlist creates the network listeners.
|
// Does not schedule execution of background maintenance.
|
func newMemberlist(conf *Config) (*Memberlist, error) {
|
if conf.ProtocolVersion < ProtocolVersionMin {
|
return nil, fmt.Errorf("Protocol version '%d' too low. Must be in range: [%d, %d]",
|
conf.ProtocolVersion, ProtocolVersionMin, ProtocolVersionMax)
|
} else if conf.ProtocolVersion > ProtocolVersionMax {
|
return nil, fmt.Errorf("Protocol version '%d' too high. Must be in range: [%d, %d]",
|
conf.ProtocolVersion, ProtocolVersionMin, ProtocolVersionMax)
|
}
|
|
if len(conf.SecretKey) > 0 {
|
if conf.Keyring == nil {
|
keyring, err := NewKeyring(nil, conf.SecretKey)
|
if err != nil {
|
return nil, err
|
}
|
conf.Keyring = keyring
|
} else {
|
if err := conf.Keyring.AddKey(conf.SecretKey); err != nil {
|
return nil, err
|
}
|
if err := conf.Keyring.UseKey(conf.SecretKey); err != nil {
|
return nil, err
|
}
|
}
|
}
|
|
if conf.LogOutput != nil && conf.Logger != nil {
|
return nil, fmt.Errorf("Cannot specify both LogOutput and Logger. Please choose a single log configuration setting.")
|
}
|
|
logDest := conf.LogOutput
|
if logDest == nil {
|
logDest = os.Stderr
|
}
|
|
logger := conf.Logger
|
if logger == nil {
|
logger = log.New(logDest, "", log.LstdFlags)
|
}
|
|
// Set up a network transport by default if a custom one wasn't given
|
// by the config.
|
transport := conf.Transport
|
if transport == nil {
|
nc := &NetTransportConfig{
|
BindAddrs: []string{conf.BindAddr},
|
BindPort: conf.BindPort,
|
Logger: logger,
|
}
|
|
// See comment below for details about the retry in here.
|
makeNetRetry := func(limit int) (*NetTransport, error) {
|
var err error
|
for try := 0; try < limit; try++ {
|
var nt *NetTransport
|
if nt, err = NewNetTransport(nc); err == nil {
|
return nt, nil
|
}
|
if strings.Contains(err.Error(), "address already in use") {
|
logger.Printf("[DEBUG] memberlist: Got bind error: %v", err)
|
continue
|
}
|
}
|
|
return nil, fmt.Errorf("failed to obtain an address: %v", err)
|
}
|
|
// The dynamic bind port operation is inherently racy because
|
// even though we are using the kernel to find a port for us, we
|
// are attempting to bind multiple protocols (and potentially
|
// multiple addresses) with the same port number. We build in a
|
// few retries here since this often gets transient errors in
|
// busy unit tests.
|
limit := 1
|
if conf.BindPort == 0 {
|
limit = 10
|
}
|
|
nt, err := makeNetRetry(limit)
|
if err != nil {
|
return nil, fmt.Errorf("Could not set up network transport: %v", err)
|
}
|
if conf.BindPort == 0 {
|
port := nt.GetAutoBindPort()
|
conf.BindPort = port
|
conf.AdvertisePort = port
|
logger.Printf("[DEBUG] memberlist: Using dynamic bind port %d", port)
|
}
|
transport = nt
|
}
|
|
nodeAwareTransport, ok := transport.(NodeAwareTransport)
|
if !ok {
|
logger.Printf("[DEBUG] memberlist: configured Transport is not a NodeAwareTransport and some features may not work as desired")
|
nodeAwareTransport = &shimNodeAwareTransport{transport}
|
}
|
|
m := &Memberlist{
|
config: conf,
|
shutdownCh: make(chan struct{}),
|
leaveBroadcast: make(chan struct{}, 1),
|
transport: nodeAwareTransport,
|
handoffCh: make(chan struct{}, 1),
|
highPriorityMsgQueue: list.New(),
|
lowPriorityMsgQueue: list.New(),
|
nodeMap: make(map[string]*nodeState),
|
nodeTimers: make(map[string]*suspicion),
|
awareness: newAwareness(conf.AwarenessMaxMultiplier),
|
ackHandlers: make(map[uint32]*ackHandler),
|
broadcasts: &TransmitLimitedQueue{RetransmitMult: conf.RetransmitMult},
|
logger: logger,
|
}
|
m.broadcasts.NumNodes = func() int {
|
return m.estNumNodes()
|
}
|
|
// Get the final advertise address from the transport, which may need
|
// to see which address we bound to. We'll refresh this each time we
|
// send out an alive message.
|
if _, _, err := m.refreshAdvertise(); err != nil {
|
return nil, err
|
}
|
|
go m.streamListen()
|
go m.packetListen()
|
go m.packetHandler()
|
return m, nil
|
}
|
|
// Create will create a new Memberlist using the given configuration.
|
// This will not connect to any other node (see Join) yet, but will start
|
// all the listeners to allow other nodes to join this memberlist.
|
// After creating a Memberlist, the configuration given should not be
|
// modified by the user anymore.
|
func Create(conf *Config) (*Memberlist, error) {
|
m, err := newMemberlist(conf)
|
if err != nil {
|
return nil, err
|
}
|
if err := m.setAlive(); err != nil {
|
m.Shutdown()
|
return nil, err
|
}
|
m.schedule()
|
return m, nil
|
}
|
|
// Join is used to take an existing Memberlist and attempt to join a cluster
|
// by contacting all the given hosts and performing a state sync. Initially,
|
// the Memberlist only contains our own state, so doing this will cause
|
// remote nodes to become aware of the existence of this node, effectively
|
// joining the cluster.
|
//
|
// This returns the number of hosts successfully contacted and an error if
|
// none could be reached. If an error is returned, the node did not successfully
|
// join the cluster.
|
func (m *Memberlist) Join(existing []string) (int, error) {
|
numSuccess := 0
|
var errs error
|
for _, exist := range existing {
|
addrs, err := m.resolveAddr(exist)
|
if err != nil {
|
err = fmt.Errorf("Failed to resolve %s: %v", exist, err)
|
errs = multierror.Append(errs, err)
|
m.logger.Printf("[WARN] memberlist: %v", err)
|
continue
|
}
|
|
for _, addr := range addrs {
|
hp := joinHostPort(addr.ip.String(), addr.port)
|
a := Address{Addr: hp, Name: addr.nodeName}
|
if err := m.pushPullNode(a, true); err != nil {
|
err = fmt.Errorf("Failed to join %s: %v", addr.ip, err)
|
errs = multierror.Append(errs, err)
|
m.logger.Printf("[DEBUG] memberlist: %v", err)
|
continue
|
}
|
numSuccess++
|
}
|
|
}
|
if numSuccess > 0 {
|
errs = nil
|
}
|
return numSuccess, errs
|
}
|
|
// ipPort holds information about a node we want to try to join.
|
type ipPort struct {
|
ip net.IP
|
port uint16
|
nodeName string // optional
|
}
|
|
// tcpLookupIP is a helper to initiate a TCP-based DNS lookup for the given host.
|
// The built-in Go resolver will do a UDP lookup first, and will only use TCP if
|
// the response has the truncate bit set, which isn't common on DNS servers like
|
// Consul's. By doing the TCP lookup directly, we get the best chance for the
|
// largest list of hosts to join. Since joins are relatively rare events, it's ok
|
// to do this rather expensive operation.
|
func (m *Memberlist) tcpLookupIP(host string, defaultPort uint16, nodeName string) ([]ipPort, error) {
|
// Don't attempt any TCP lookups against non-fully qualified domain
|
// names, since those will likely come from the resolv.conf file.
|
if !strings.Contains(host, ".") {
|
return nil, nil
|
}
|
|
// Make sure the domain name is terminated with a dot (we know there's
|
// at least one character at this point).
|
dn := host
|
if dn[len(dn)-1] != '.' {
|
dn = dn + "."
|
}
|
|
// See if we can find a server to try.
|
cc, err := dns.ClientConfigFromFile(m.config.DNSConfigPath)
|
if err != nil {
|
return nil, err
|
}
|
if len(cc.Servers) > 0 {
|
// We support host:port in the DNS config, but need to add the
|
// default port if one is not supplied.
|
server := cc.Servers[0]
|
if !hasPort(server) {
|
server = net.JoinHostPort(server, cc.Port)
|
}
|
|
// Do the lookup.
|
c := new(dns.Client)
|
c.Net = "tcp"
|
msg := new(dns.Msg)
|
msg.SetQuestion(dn, dns.TypeANY)
|
in, _, err := c.Exchange(msg, server)
|
if err != nil {
|
return nil, err
|
}
|
|
// Handle any IPs we get back that we can attempt to join.
|
var ips []ipPort
|
for _, r := range in.Answer {
|
switch rr := r.(type) {
|
case (*dns.A):
|
ips = append(ips, ipPort{ip: rr.A, port: defaultPort, nodeName: nodeName})
|
case (*dns.AAAA):
|
ips = append(ips, ipPort{ip: rr.AAAA, port: defaultPort, nodeName: nodeName})
|
case (*dns.CNAME):
|
m.logger.Printf("[DEBUG] memberlist: Ignoring CNAME RR in TCP-first answer for '%s'", host)
|
}
|
}
|
return ips, nil
|
}
|
|
return nil, nil
|
}
|
|
// resolveAddr is used to resolve the address into an address,
|
// port, and error. If no port is given, use the default
|
func (m *Memberlist) resolveAddr(hostStr string) ([]ipPort, error) {
|
// First peel off any leading node name. This is optional.
|
nodeName := ""
|
if slashIdx := strings.Index(hostStr, "/"); slashIdx >= 0 {
|
if slashIdx == 0 {
|
return nil, fmt.Errorf("empty node name provided")
|
}
|
nodeName = hostStr[0:slashIdx]
|
hostStr = hostStr[slashIdx+1:]
|
}
|
|
// This captures the supplied port, or the default one.
|
hostStr = ensurePort(hostStr, m.config.BindPort)
|
host, sport, err := net.SplitHostPort(hostStr)
|
if err != nil {
|
return nil, err
|
}
|
lport, err := strconv.ParseUint(sport, 10, 16)
|
if err != nil {
|
return nil, err
|
}
|
port := uint16(lport)
|
|
// If it looks like an IP address we are done. The SplitHostPort() above
|
// will make sure the host part is in good shape for parsing, even for
|
// IPv6 addresses.
|
if ip := net.ParseIP(host); ip != nil {
|
return []ipPort{
|
ipPort{ip: ip, port: port, nodeName: nodeName},
|
}, nil
|
}
|
|
// First try TCP so we have the best chance for the largest list of
|
// hosts to join. If this fails it's not fatal since this isn't a standard
|
// way to query DNS, and we have a fallback below.
|
ips, err := m.tcpLookupIP(host, port, nodeName)
|
if err != nil {
|
m.logger.Printf("[DEBUG] memberlist: TCP-first lookup failed for '%s', falling back to UDP: %s", hostStr, err)
|
}
|
if len(ips) > 0 {
|
return ips, nil
|
}
|
|
// If TCP didn't yield anything then use the normal Go resolver which
|
// will try UDP, then might possibly try TCP again if the UDP response
|
// indicates it was truncated.
|
ans, err := net.LookupIP(host)
|
if err != nil {
|
return nil, err
|
}
|
ips = make([]ipPort, 0, len(ans))
|
for _, ip := range ans {
|
ips = append(ips, ipPort{ip: ip, port: port, nodeName: nodeName})
|
}
|
return ips, nil
|
}
|
|
// setAlive is used to mark this node as being alive. This is the same
|
// as if we received an alive notification our own network channel for
|
// ourself.
|
func (m *Memberlist) setAlive() error {
|
// Get the final advertise address from the transport, which may need
|
// to see which address we bound to.
|
addr, port, err := m.refreshAdvertise()
|
if err != nil {
|
return err
|
}
|
|
// Check if this is a public address without encryption
|
ipAddr, err := sockaddr.NewIPAddr(addr.String())
|
if err != nil {
|
return fmt.Errorf("Failed to parse interface addresses: %v", err)
|
}
|
ifAddrs := []sockaddr.IfAddr{
|
sockaddr.IfAddr{
|
SockAddr: ipAddr,
|
},
|
}
|
_, publicIfs, err := sockaddr.IfByRFC("6890", ifAddrs)
|
if len(publicIfs) > 0 && !m.config.EncryptionEnabled() {
|
m.logger.Printf("[WARN] memberlist: Binding to public address without encryption!")
|
}
|
|
// Set any metadata from the delegate.
|
var meta []byte
|
if m.config.Delegate != nil {
|
meta = m.config.Delegate.NodeMeta(MetaMaxSize)
|
if len(meta) > MetaMaxSize {
|
panic("Node meta data provided is longer than the limit")
|
}
|
}
|
|
a := alive{
|
Incarnation: m.nextIncarnation(),
|
Node: m.config.Name,
|
Addr: addr,
|
Port: uint16(port),
|
Meta: meta,
|
Vsn: m.config.BuildVsnArray(),
|
}
|
m.aliveNode(&a, nil, true)
|
|
return nil
|
}
|
|
func (m *Memberlist) getAdvertise() (net.IP, uint16) {
|
m.advertiseLock.RLock()
|
defer m.advertiseLock.RUnlock()
|
return m.advertiseAddr, m.advertisePort
|
}
|
|
func (m *Memberlist) setAdvertise(addr net.IP, port int) {
|
m.advertiseLock.Lock()
|
defer m.advertiseLock.Unlock()
|
m.advertiseAddr = addr
|
m.advertisePort = uint16(port)
|
}
|
|
func (m *Memberlist) refreshAdvertise() (net.IP, int, error) {
|
addr, port, err := m.transport.FinalAdvertiseAddr(
|
m.config.AdvertiseAddr, m.config.AdvertisePort)
|
if err != nil {
|
return nil, 0, fmt.Errorf("Failed to get final advertise address: %v", err)
|
}
|
m.setAdvertise(addr, port)
|
return addr, port, nil
|
}
|
|
// LocalNode is used to return the local Node
|
func (m *Memberlist) LocalNode() *Node {
|
m.nodeLock.RLock()
|
defer m.nodeLock.RUnlock()
|
state := m.nodeMap[m.config.Name]
|
return &state.Node
|
}
|
|
// UpdateNode is used to trigger re-advertising the local node. This is
|
// primarily used with a Delegate to support dynamic updates to the local
|
// meta data. This will block until the update message is successfully
|
// broadcasted to a member of the cluster, if any exist or until a specified
|
// timeout is reached.
|
func (m *Memberlist) UpdateNode(timeout time.Duration) error {
|
// Get the node meta data
|
var meta []byte
|
if m.config.Delegate != nil {
|
meta = m.config.Delegate.NodeMeta(MetaMaxSize)
|
if len(meta) > MetaMaxSize {
|
panic("Node meta data provided is longer than the limit")
|
}
|
}
|
|
// Get the existing node
|
m.nodeLock.RLock()
|
state := m.nodeMap[m.config.Name]
|
m.nodeLock.RUnlock()
|
|
// Format a new alive message
|
a := alive{
|
Incarnation: m.nextIncarnation(),
|
Node: m.config.Name,
|
Addr: state.Addr,
|
Port: state.Port,
|
Meta: meta,
|
Vsn: m.config.BuildVsnArray(),
|
}
|
notifyCh := make(chan struct{})
|
m.aliveNode(&a, notifyCh, true)
|
|
// Wait for the broadcast or a timeout
|
if m.anyAlive() {
|
var timeoutCh <-chan time.Time
|
if timeout > 0 {
|
timeoutCh = time.After(timeout)
|
}
|
select {
|
case <-notifyCh:
|
case <-timeoutCh:
|
return fmt.Errorf("timeout waiting for update broadcast")
|
}
|
}
|
return nil
|
}
|
|
// Deprecated: SendTo is deprecated in favor of SendBestEffort, which requires a node to
|
// target. If you don't have a node then use SendToAddress.
|
func (m *Memberlist) SendTo(to net.Addr, msg []byte) error {
|
a := Address{Addr: to.String(), Name: ""}
|
return m.SendToAddress(a, msg)
|
}
|
|
func (m *Memberlist) SendToAddress(a Address, msg []byte) error {
|
// Encode as a user message
|
buf := make([]byte, 1, len(msg)+1)
|
buf[0] = byte(userMsg)
|
buf = append(buf, msg...)
|
|
// Send the message
|
return m.rawSendMsgPacket(a, nil, buf)
|
}
|
|
// Deprecated: SendToUDP is deprecated in favor of SendBestEffort.
|
func (m *Memberlist) SendToUDP(to *Node, msg []byte) error {
|
return m.SendBestEffort(to, msg)
|
}
|
|
// Deprecated: SendToTCP is deprecated in favor of SendReliable.
|
func (m *Memberlist) SendToTCP(to *Node, msg []byte) error {
|
return m.SendReliable(to, msg)
|
}
|
|
// SendBestEffort uses the unreliable packet-oriented interface of the transport
|
// to target a user message at the given node (this does not use the gossip
|
// mechanism). The maximum size of the message depends on the configured
|
// UDPBufferSize for this memberlist instance.
|
func (m *Memberlist) SendBestEffort(to *Node, msg []byte) error {
|
// Encode as a user message
|
buf := make([]byte, 1, len(msg)+1)
|
buf[0] = byte(userMsg)
|
buf = append(buf, msg...)
|
|
// Send the message
|
a := Address{Addr: to.String(), Name: to.Name}
|
return m.rawSendMsgPacket(a, to, buf)
|
}
|
|
// SendReliable uses the reliable stream-oriented interface of the transport to
|
// target a user message at the given node (this does not use the gossip
|
// mechanism). Delivery is guaranteed if no error is returned, and there is no
|
// limit on the size of the message.
|
func (m *Memberlist) SendReliable(to *Node, msg []byte) error {
|
return m.sendUserMsg(to.FullAddress(), msg)
|
}
|
|
// Members returns a list of all known live nodes. The node structures
|
// returned must not be modified. If you wish to modify a Node, make a
|
// copy first.
|
func (m *Memberlist) Members() []*Node {
|
m.nodeLock.RLock()
|
defer m.nodeLock.RUnlock()
|
|
nodes := make([]*Node, 0, len(m.nodes))
|
for _, n := range m.nodes {
|
if !n.DeadOrLeft() {
|
nodes = append(nodes, &n.Node)
|
}
|
}
|
|
return nodes
|
}
|
|
// NumMembers returns the number of alive nodes currently known. Between
|
// the time of calling this and calling Members, the number of alive nodes
|
// may have changed, so this shouldn't be used to determine how many
|
// members will be returned by Members.
|
func (m *Memberlist) NumMembers() (alive int) {
|
m.nodeLock.RLock()
|
defer m.nodeLock.RUnlock()
|
|
for _, n := range m.nodes {
|
if !n.DeadOrLeft() {
|
alive++
|
}
|
}
|
|
return
|
}
|
|
// Leave will broadcast a leave message but will not shutdown the background
|
// listeners, meaning the node will continue participating in gossip and state
|
// updates.
|
//
|
// This will block until the leave message is successfully broadcasted to
|
// a member of the cluster, if any exist or until a specified timeout
|
// is reached.
|
//
|
// This method is safe to call multiple times, but must not be called
|
// after the cluster is already shut down.
|
func (m *Memberlist) Leave(timeout time.Duration) error {
|
m.leaveLock.Lock()
|
defer m.leaveLock.Unlock()
|
|
if m.hasShutdown() {
|
panic("leave after shutdown")
|
}
|
|
if !m.hasLeft() {
|
atomic.StoreInt32(&m.leave, 1)
|
|
m.nodeLock.Lock()
|
state, ok := m.nodeMap[m.config.Name]
|
m.nodeLock.Unlock()
|
if !ok {
|
m.logger.Printf("[WARN] memberlist: Leave but we're not in the node map.")
|
return nil
|
}
|
|
// This dead message is special, because Node and From are the
|
// same. This helps other nodes figure out that a node left
|
// intentionally. When Node equals From, other nodes know for
|
// sure this node is gone.
|
d := dead{
|
Incarnation: state.Incarnation,
|
Node: state.Name,
|
From: state.Name,
|
}
|
m.deadNode(&d)
|
|
// Block until the broadcast goes out
|
if m.anyAlive() {
|
var timeoutCh <-chan time.Time
|
if timeout > 0 {
|
timeoutCh = time.After(timeout)
|
}
|
select {
|
case <-m.leaveBroadcast:
|
case <-timeoutCh:
|
return fmt.Errorf("timeout waiting for leave broadcast")
|
}
|
}
|
}
|
|
return nil
|
}
|
|
// Check for any other alive node.
|
func (m *Memberlist) anyAlive() bool {
|
m.nodeLock.RLock()
|
defer m.nodeLock.RUnlock()
|
for _, n := range m.nodes {
|
if !n.DeadOrLeft() && n.Name != m.config.Name {
|
return true
|
}
|
}
|
return false
|
}
|
|
// GetHealthScore gives this instance's idea of how well it is meeting the soft
|
// real-time requirements of the protocol. Lower numbers are better, and zero
|
// means "totally healthy".
|
func (m *Memberlist) GetHealthScore() int {
|
return m.awareness.GetHealthScore()
|
}
|
|
// ProtocolVersion returns the protocol version currently in use by
|
// this memberlist.
|
func (m *Memberlist) ProtocolVersion() uint8 {
|
// NOTE: This method exists so that in the future we can control
|
// any locking if necessary, if we change the protocol version at
|
// runtime, etc.
|
return m.config.ProtocolVersion
|
}
|
|
// Shutdown will stop any background maintenance of network activity
|
// for this memberlist, causing it to appear "dead". A leave message
|
// will not be broadcasted prior, so the cluster being left will have
|
// to detect this node's shutdown using probing. If you wish to more
|
// gracefully exit the cluster, call Leave prior to shutting down.
|
//
|
// This method is safe to call multiple times.
|
func (m *Memberlist) Shutdown() error {
|
m.shutdownLock.Lock()
|
defer m.shutdownLock.Unlock()
|
|
if m.hasShutdown() {
|
return nil
|
}
|
|
// Shut down the transport first, which should block until it's
|
// completely torn down. If we kill the memberlist-side handlers
|
// those I/O handlers might get stuck.
|
if err := m.transport.Shutdown(); err != nil {
|
m.logger.Printf("[ERR] Failed to shutdown transport: %v", err)
|
}
|
|
// Now tear down everything else.
|
atomic.StoreInt32(&m.shutdown, 1)
|
close(m.shutdownCh)
|
m.deschedule()
|
return nil
|
}
|
|
func (m *Memberlist) hasShutdown() bool {
|
return atomic.LoadInt32(&m.shutdown) == 1
|
}
|
|
func (m *Memberlist) hasLeft() bool {
|
return atomic.LoadInt32(&m.leave) == 1
|
}
|
|
func (m *Memberlist) getNodeState(addr string) nodeStateType {
|
m.nodeLock.RLock()
|
defer m.nodeLock.RUnlock()
|
|
n := m.nodeMap[addr]
|
return n.State
|
}
|
|
func (m *Memberlist) getNodeStateChange(addr string) time.Time {
|
m.nodeLock.RLock()
|
defer m.nodeLock.RUnlock()
|
|
n := m.nodeMap[addr]
|
return n.StateChange
|
}
|
|
func (m *Memberlist) changeNode(addr string, f func(*nodeState)) {
|
m.nodeLock.Lock()
|
defer m.nodeLock.Unlock()
|
|
n := m.nodeMap[addr]
|
f(n)
|
}
|