package serf
|
|
import (
|
"bufio"
|
"fmt"
|
"log"
|
"math/rand"
|
"net"
|
"os"
|
"strconv"
|
"strings"
|
"time"
|
|
"github.com/armon/go-metrics"
|
)
|
|
/*
|
Serf supports using a "snapshot" file that contains various
|
transactional data that is used to help Serf recover quickly
|
and gracefully from a failure. We append member events, as well
|
as the latest clock values to the file during normal operation,
|
and periodically checkpoint and roll over the file. During a restore,
|
we can replay the various member events to recall a list of known
|
nodes to re-join, as well as restore our clock values to avoid replaying
|
old events.
|
*/
|
|
const (
|
// flushInterval is how often we force a flush of the snapshot file
|
flushInterval = 500 * time.Millisecond
|
|
// clockUpdateInterval is how often we fetch the current lamport time of the cluster and write to the snapshot file
|
clockUpdateInterval = 500 * time.Millisecond
|
|
// tmpExt is the extention we use for the temporary file during compaction
|
tmpExt = ".compact"
|
|
// snapshotErrorRecoveryInterval is how often we attempt to recover from
|
// errors writing to the snapshot file.
|
snapshotErrorRecoveryInterval = 30 * time.Second
|
|
// eventChSize is the size of the event buffers between Serf and the
|
// consuming application. If this is exhausted we will block Serf and Memberlist.
|
eventChSize = 2048
|
|
// shutdownFlushTimeout is the time limit to write pending events to the snapshot during a shutdown
|
shutdownFlushTimeout = 250 * time.Millisecond
|
|
// snapshotBytesPerNode is an estimated bytes per node to snapshot
|
snapshotBytesPerNode = 128
|
|
// snapshotCompactionThreshold is the threshold we apply to
|
// the snapshot size estimate (nodes * bytes per node) before compacting.
|
snapshotCompactionThreshold = 2
|
)
|
|
// Snapshotter is responsible for ingesting events and persisting
|
// them to disk, and providing a recovery mechanism at start time.
|
type Snapshotter struct {
|
aliveNodes map[string]string
|
clock *LamportClock
|
fh *os.File
|
buffered *bufio.Writer
|
inCh <-chan Event
|
streamCh chan Event
|
lastFlush time.Time
|
lastClock LamportTime
|
lastEventClock LamportTime
|
lastQueryClock LamportTime
|
leaveCh chan struct{}
|
leaving bool
|
logger *log.Logger
|
minCompactSize int64
|
path string
|
offset int64
|
outCh chan<- Event
|
rejoinAfterLeave bool
|
shutdownCh <-chan struct{}
|
waitCh chan struct{}
|
lastAttemptedCompaction time.Time
|
}
|
|
// PreviousNode is used to represent the previously known alive nodes
|
type PreviousNode struct {
|
Name string
|
Addr string
|
}
|
|
func (p PreviousNode) String() string {
|
return fmt.Sprintf("%s: %s", p.Name, p.Addr)
|
}
|
|
// NewSnapshotter creates a new Snapshotter that records events up to a
|
// max byte size before rotating the file. It can also be used to
|
// recover old state. Snapshotter works by reading an event channel it returns,
|
// passing through to an output channel, and persisting relevant events to disk.
|
// Setting rejoinAfterLeave makes leave not clear the state, and can be used
|
// if you intend to rejoin the same cluster after a leave.
|
func NewSnapshotter(path string,
|
minCompactSize int,
|
rejoinAfterLeave bool,
|
logger *log.Logger,
|
clock *LamportClock,
|
outCh chan<- Event,
|
shutdownCh <-chan struct{}) (chan<- Event, *Snapshotter, error) {
|
inCh := make(chan Event, eventChSize)
|
streamCh := make(chan Event, eventChSize)
|
|
// Try to open the file
|
fh, err := os.OpenFile(path, os.O_RDWR|os.O_APPEND|os.O_CREATE, 0644)
|
if err != nil {
|
return nil, nil, fmt.Errorf("failed to open snapshot: %v", err)
|
}
|
|
// Determine the offset
|
info, err := fh.Stat()
|
if err != nil {
|
fh.Close()
|
return nil, nil, fmt.Errorf("failed to stat snapshot: %v", err)
|
}
|
offset := info.Size()
|
|
// Create the snapshotter
|
snap := &Snapshotter{
|
aliveNodes: make(map[string]string),
|
clock: clock,
|
fh: fh,
|
buffered: bufio.NewWriter(fh),
|
inCh: inCh,
|
streamCh: streamCh,
|
lastClock: 0,
|
lastEventClock: 0,
|
lastQueryClock: 0,
|
leaveCh: make(chan struct{}),
|
logger: logger,
|
minCompactSize: int64(minCompactSize),
|
path: path,
|
offset: offset,
|
outCh: outCh,
|
rejoinAfterLeave: rejoinAfterLeave,
|
shutdownCh: shutdownCh,
|
waitCh: make(chan struct{}),
|
}
|
|
// Recover the last known state
|
if err := snap.replay(); err != nil {
|
fh.Close()
|
return nil, nil, err
|
}
|
|
// Start handling new commands
|
go snap.teeStream()
|
go snap.stream()
|
return inCh, snap, nil
|
}
|
|
// LastClock returns the last known clock time
|
func (s *Snapshotter) LastClock() LamportTime {
|
return s.lastClock
|
}
|
|
// LastEventClock returns the last known event clock time
|
func (s *Snapshotter) LastEventClock() LamportTime {
|
return s.lastEventClock
|
}
|
|
// LastQueryClock returns the last known query clock time
|
func (s *Snapshotter) LastQueryClock() LamportTime {
|
return s.lastQueryClock
|
}
|
|
// AliveNodes returns the last known alive nodes
|
func (s *Snapshotter) AliveNodes() []*PreviousNode {
|
// Copy the previously known
|
previous := make([]*PreviousNode, 0, len(s.aliveNodes))
|
for name, addr := range s.aliveNodes {
|
previous = append(previous, &PreviousNode{name, addr})
|
}
|
|
// Randomize the order, prevents hot shards
|
for i := range previous {
|
j := rand.Intn(i + 1)
|
previous[i], previous[j] = previous[j], previous[i]
|
}
|
return previous
|
}
|
|
// Wait is used to wait until the snapshotter finishes shut down
|
func (s *Snapshotter) Wait() {
|
<-s.waitCh
|
}
|
|
// Leave is used to remove known nodes to prevent a restart from
|
// causing a join. Otherwise nodes will re-join after leaving!
|
func (s *Snapshotter) Leave() {
|
select {
|
case s.leaveCh <- struct{}{}:
|
case <-s.shutdownCh:
|
}
|
}
|
|
// teeStream is a long running routine that is used to copy events
|
// to the output channel and the internal event handler.
|
func (s *Snapshotter) teeStream() {
|
flushEvent := func(e Event) {
|
// Forward to the internal stream, do not block
|
select {
|
case s.streamCh <- e:
|
default:
|
}
|
|
// Forward the event immediately, do not block
|
if s.outCh != nil {
|
select {
|
case s.outCh <- e:
|
default:
|
}
|
}
|
}
|
|
OUTER:
|
for {
|
select {
|
case e := <-s.inCh:
|
flushEvent(e)
|
case <-s.shutdownCh:
|
break OUTER
|
}
|
}
|
|
// Drain any remaining events before exiting
|
for {
|
select {
|
case e := <-s.inCh:
|
flushEvent(e)
|
default:
|
return
|
}
|
}
|
}
|
|
// stream is a long running routine that is used to handle events
|
func (s *Snapshotter) stream() {
|
clockTicker := time.NewTicker(clockUpdateInterval)
|
defer clockTicker.Stop()
|
|
// flushEvent is used to handle writing out an event
|
flushEvent := func(e Event) {
|
// Stop recording events after a leave is issued
|
if s.leaving {
|
return
|
}
|
switch typed := e.(type) {
|
case MemberEvent:
|
s.processMemberEvent(typed)
|
case UserEvent:
|
s.processUserEvent(typed)
|
case *Query:
|
s.processQuery(typed)
|
default:
|
s.logger.Printf("[ERR] serf: Unknown event to snapshot: %#v", e)
|
}
|
}
|
|
for {
|
select {
|
case <-s.leaveCh:
|
s.leaving = true
|
|
// If we plan to re-join, keep our state
|
if !s.rejoinAfterLeave {
|
s.aliveNodes = make(map[string]string)
|
}
|
s.tryAppend("leave\n")
|
if err := s.buffered.Flush(); err != nil {
|
s.logger.Printf("[ERR] serf: failed to flush leave to snapshot: %v", err)
|
}
|
if err := s.fh.Sync(); err != nil {
|
s.logger.Printf("[ERR] serf: failed to sync leave to snapshot: %v", err)
|
}
|
|
case e := <-s.streamCh:
|
flushEvent(e)
|
|
case <-clockTicker.C:
|
s.updateClock()
|
|
case <-s.shutdownCh:
|
// Setup a timeout
|
flushTimeout := time.After(shutdownFlushTimeout)
|
|
// Snapshot the clock
|
s.updateClock()
|
|
// Clear out the buffers
|
FLUSH:
|
for {
|
select {
|
case e := <-s.streamCh:
|
flushEvent(e)
|
case <-flushTimeout:
|
break FLUSH
|
default:
|
break FLUSH
|
}
|
}
|
|
if err := s.buffered.Flush(); err != nil {
|
s.logger.Printf("[ERR] serf: failed to flush snapshot: %v", err)
|
}
|
if err := s.fh.Sync(); err != nil {
|
s.logger.Printf("[ERR] serf: failed to sync snapshot: %v", err)
|
}
|
s.fh.Close()
|
close(s.waitCh)
|
return
|
}
|
}
|
}
|
|
// processMemberEvent is used to handle a single member event
|
func (s *Snapshotter) processMemberEvent(e MemberEvent) {
|
switch e.Type {
|
case EventMemberJoin:
|
for _, mem := range e.Members {
|
addr := net.TCPAddr{IP: mem.Addr, Port: int(mem.Port)}
|
s.aliveNodes[mem.Name] = addr.String()
|
s.tryAppend(fmt.Sprintf("alive: %s %s\n", mem.Name, addr.String()))
|
}
|
|
case EventMemberLeave:
|
fallthrough
|
case EventMemberFailed:
|
for _, mem := range e.Members {
|
delete(s.aliveNodes, mem.Name)
|
s.tryAppend(fmt.Sprintf("not-alive: %s\n", mem.Name))
|
}
|
}
|
s.updateClock()
|
}
|
|
// updateClock is called periodically to check if we should udpate our
|
// clock value. This is done after member events but should also be done
|
// periodically due to race conditions with join and leave intents
|
func (s *Snapshotter) updateClock() {
|
lastSeen := s.clock.Time() - 1
|
if lastSeen > s.lastClock {
|
s.lastClock = lastSeen
|
s.tryAppend(fmt.Sprintf("clock: %d\n", s.lastClock))
|
}
|
}
|
|
// processUserEvent is used to handle a single user event
|
func (s *Snapshotter) processUserEvent(e UserEvent) {
|
// Ignore old clocks
|
if e.LTime <= s.lastEventClock {
|
return
|
}
|
s.lastEventClock = e.LTime
|
s.tryAppend(fmt.Sprintf("event-clock: %d\n", e.LTime))
|
}
|
|
// processQuery is used to handle a single query event
|
func (s *Snapshotter) processQuery(q *Query) {
|
// Ignore old clocks
|
if q.LTime <= s.lastQueryClock {
|
return
|
}
|
s.lastQueryClock = q.LTime
|
s.tryAppend(fmt.Sprintf("query-clock: %d\n", q.LTime))
|
}
|
|
// tryAppend will invoke append line but will not return an error
|
func (s *Snapshotter) tryAppend(l string) {
|
if err := s.appendLine(l); err != nil {
|
s.logger.Printf("[ERR] serf: Failed to update snapshot: %v", err)
|
now := time.Now()
|
if now.Sub(s.lastAttemptedCompaction) > snapshotErrorRecoveryInterval {
|
s.lastAttemptedCompaction = now
|
s.logger.Printf("[INFO] serf: Attempting compaction to recover from error...")
|
err = s.compact()
|
if err != nil {
|
s.logger.Printf("[ERR] serf: Compaction failed, will reattempt after %v: %v", snapshotErrorRecoveryInterval, err)
|
} else {
|
s.logger.Printf("[INFO] serf: Finished compaction, successfully recovered from error state")
|
}
|
}
|
}
|
}
|
|
// appendLine is used to append a line to the existing log
|
func (s *Snapshotter) appendLine(l string) error {
|
defer metrics.MeasureSince([]string{"serf", "snapshot", "appendLine"}, time.Now())
|
|
n, err := s.buffered.WriteString(l)
|
if err != nil {
|
return err
|
}
|
|
// Check if we should flush
|
now := time.Now()
|
if now.Sub(s.lastFlush) > flushInterval {
|
s.lastFlush = now
|
if err := s.buffered.Flush(); err != nil {
|
return err
|
}
|
}
|
|
// Check if a compaction is necessary
|
s.offset += int64(n)
|
if s.offset > s.snapshotMaxSize() {
|
return s.compact()
|
}
|
return nil
|
}
|
|
// snapshotMaxSize computes the maximum size and is used to force periodic compaction.
|
func (s *Snapshotter) snapshotMaxSize() int64 {
|
nodes := int64(len(s.aliveNodes))
|
estSize := nodes * snapshotBytesPerNode
|
threshold := estSize * snapshotCompactionThreshold
|
|
// Apply a minimum threshold to avoid frequent compaction
|
if threshold < s.minCompactSize {
|
threshold = s.minCompactSize
|
}
|
return threshold
|
}
|
|
// Compact is used to compact the snapshot once it is too large
|
func (s *Snapshotter) compact() error {
|
defer metrics.MeasureSince([]string{"serf", "snapshot", "compact"}, time.Now())
|
|
// Try to open the file to new fiel
|
newPath := s.path + tmpExt
|
fh, err := os.OpenFile(newPath, os.O_RDWR|os.O_TRUNC|os.O_CREATE, 0755)
|
if err != nil {
|
return fmt.Errorf("failed to open new snapshot: %v", err)
|
}
|
|
// Create a buffered writer
|
buf := bufio.NewWriter(fh)
|
|
// Write out the live nodes
|
var offset int64
|
for name, addr := range s.aliveNodes {
|
line := fmt.Sprintf("alive: %s %s\n", name, addr)
|
n, err := buf.WriteString(line)
|
if err != nil {
|
fh.Close()
|
return err
|
}
|
offset += int64(n)
|
}
|
|
// Write out the clocks
|
line := fmt.Sprintf("clock: %d\n", s.lastClock)
|
n, err := buf.WriteString(line)
|
if err != nil {
|
fh.Close()
|
return err
|
}
|
offset += int64(n)
|
|
line = fmt.Sprintf("event-clock: %d\n", s.lastEventClock)
|
n, err = buf.WriteString(line)
|
if err != nil {
|
fh.Close()
|
return err
|
}
|
offset += int64(n)
|
|
line = fmt.Sprintf("query-clock: %d\n", s.lastQueryClock)
|
n, err = buf.WriteString(line)
|
if err != nil {
|
fh.Close()
|
return err
|
}
|
offset += int64(n)
|
|
// Flush the new snapshot
|
err = buf.Flush()
|
|
if err != nil {
|
return fmt.Errorf("failed to flush new snapshot: %v", err)
|
}
|
|
err = fh.Sync()
|
|
if err != nil {
|
fh.Close()
|
return fmt.Errorf("failed to fsync new snapshot: %v", err)
|
}
|
|
fh.Close()
|
|
// We now need to swap the old snapshot file with the new snapshot.
|
// Turns out, Windows won't let us rename the files if we have
|
// open handles to them or if the destination already exists. This
|
// means we are forced to close the existing handles, delete the
|
// old file, move the new one in place, and then re-open the file
|
// handles.
|
|
// Flush the existing snapshot, ignoring errors since we will
|
// delete it momentarily.
|
s.buffered.Flush()
|
s.buffered = nil
|
|
// Close the file handle to the old snapshot
|
s.fh.Close()
|
s.fh = nil
|
|
// Delete the old file
|
if err := os.Remove(s.path); err != nil {
|
return fmt.Errorf("failed to remove old snapshot: %v", err)
|
}
|
|
// Move the new file into place
|
if err := os.Rename(newPath, s.path); err != nil {
|
return fmt.Errorf("failed to install new snapshot: %v", err)
|
}
|
|
// Open the new snapshot
|
fh, err = os.OpenFile(s.path, os.O_RDWR|os.O_APPEND|os.O_CREATE, 0755)
|
if err != nil {
|
return fmt.Errorf("failed to open snapshot: %v", err)
|
}
|
buf = bufio.NewWriter(fh)
|
|
// Rotate our handles
|
s.fh = fh
|
s.buffered = buf
|
s.offset = offset
|
s.lastFlush = time.Now()
|
return nil
|
}
|
|
// replay is used to seek to reset our internal state by replaying
|
// the snapshot file. It is used at initialization time to read old
|
// state
|
func (s *Snapshotter) replay() error {
|
// Seek to the beginning
|
if _, err := s.fh.Seek(0, os.SEEK_SET); err != nil {
|
return err
|
}
|
|
// Read each line
|
reader := bufio.NewReader(s.fh)
|
for {
|
line, err := reader.ReadString('\n')
|
if err != nil {
|
break
|
}
|
|
// Skip the newline
|
line = line[:len(line)-1]
|
|
// Switch on the prefix
|
if strings.HasPrefix(line, "alive: ") {
|
info := strings.TrimPrefix(line, "alive: ")
|
addrIdx := strings.LastIndex(info, " ")
|
if addrIdx == -1 {
|
s.logger.Printf("[WARN] serf: Failed to parse address: %v", line)
|
continue
|
}
|
addr := info[addrIdx+1:]
|
name := info[:addrIdx]
|
s.aliveNodes[name] = addr
|
|
} else if strings.HasPrefix(line, "not-alive: ") {
|
name := strings.TrimPrefix(line, "not-alive: ")
|
delete(s.aliveNodes, name)
|
|
} else if strings.HasPrefix(line, "clock: ") {
|
timeStr := strings.TrimPrefix(line, "clock: ")
|
timeInt, err := strconv.ParseUint(timeStr, 10, 64)
|
if err != nil {
|
s.logger.Printf("[WARN] serf: Failed to convert clock time: %v", err)
|
continue
|
}
|
s.lastClock = LamportTime(timeInt)
|
|
} else if strings.HasPrefix(line, "event-clock: ") {
|
timeStr := strings.TrimPrefix(line, "event-clock: ")
|
timeInt, err := strconv.ParseUint(timeStr, 10, 64)
|
if err != nil {
|
s.logger.Printf("[WARN] serf: Failed to convert event clock time: %v", err)
|
continue
|
}
|
s.lastEventClock = LamportTime(timeInt)
|
|
} else if strings.HasPrefix(line, "query-clock: ") {
|
timeStr := strings.TrimPrefix(line, "query-clock: ")
|
timeInt, err := strconv.ParseUint(timeStr, 10, 64)
|
if err != nil {
|
s.logger.Printf("[WARN] serf: Failed to convert query clock time: %v", err)
|
continue
|
}
|
s.lastQueryClock = LamportTime(timeInt)
|
|
} else if strings.HasPrefix(line, "coordinate: ") {
|
continue // Ignores any coordinate persistence from old snapshots, serf should re-converge
|
} else if line == "leave" {
|
// Ignore a leave if we plan on re-joining
|
if s.rejoinAfterLeave {
|
s.logger.Printf("[INFO] serf: Ignoring previous leave in snapshot")
|
continue
|
}
|
s.aliveNodes = make(map[string]string)
|
s.lastClock = 0
|
s.lastEventClock = 0
|
s.lastQueryClock = 0
|
|
} else if strings.HasPrefix(line, "#") {
|
// Skip comment lines
|
|
} else {
|
s.logger.Printf("[WARN] serf: Unrecognized snapshot line: %v", line)
|
}
|
}
|
|
// Seek to the end
|
if _, err := s.fh.Seek(0, os.SEEK_END); err != nil {
|
return err
|
}
|
return nil
|
}
|