package memberlist
|
|
import (
|
"math"
|
"sync"
|
|
"github.com/google/btree"
|
)
|
|
// TransmitLimitedQueue is used to queue messages to broadcast to
|
// the cluster (via gossip) but limits the number of transmits per
|
// message. It also prioritizes messages with lower transmit counts
|
// (hence newer messages).
|
type TransmitLimitedQueue struct {
|
// NumNodes returns the number of nodes in the cluster. This is
|
// used to determine the retransmit count, which is calculated
|
// based on the log of this.
|
NumNodes func() int
|
|
// RetransmitMult is the multiplier used to determine the maximum
|
// number of retransmissions attempted.
|
RetransmitMult int
|
|
mu sync.Mutex
|
tq *btree.BTree // stores *limitedBroadcast as btree.Item
|
tm map[string]*limitedBroadcast
|
idGen int64
|
}
|
|
type limitedBroadcast struct {
|
transmits int // btree-key[0]: Number of transmissions attempted.
|
msgLen int64 // btree-key[1]: copied from len(b.Message())
|
id int64 // btree-key[2]: unique incrementing id stamped at submission time
|
b Broadcast
|
|
name string // set if Broadcast is a NamedBroadcast
|
}
|
|
// Less tests whether the current item is less than the given argument.
|
//
|
// This must provide a strict weak ordering.
|
// If !a.Less(b) && !b.Less(a), we treat this to mean a == b (i.e. we can only
|
// hold one of either a or b in the tree).
|
//
|
// default ordering is
|
// - [transmits=0, ..., transmits=inf]
|
// - [transmits=0:len=999, ..., transmits=0:len=2, ...]
|
// - [transmits=0:len=999,id=999, ..., transmits=0:len=999:id=1, ...]
|
func (b *limitedBroadcast) Less(than btree.Item) bool {
|
o := than.(*limitedBroadcast)
|
if b.transmits < o.transmits {
|
return true
|
} else if b.transmits > o.transmits {
|
return false
|
}
|
if b.msgLen > o.msgLen {
|
return true
|
} else if b.msgLen < o.msgLen {
|
return false
|
}
|
return b.id > o.id
|
}
|
|
// for testing; emits in transmit order if reverse=false
|
func (q *TransmitLimitedQueue) orderedView(reverse bool) []*limitedBroadcast {
|
q.mu.Lock()
|
defer q.mu.Unlock()
|
|
out := make([]*limitedBroadcast, 0, q.lenLocked())
|
q.walkReadOnlyLocked(reverse, func(cur *limitedBroadcast) bool {
|
out = append(out, cur)
|
return true
|
})
|
|
return out
|
}
|
|
// walkReadOnlyLocked calls f for each item in the queue traversing it in
|
// natural order (by Less) when reverse=false and the opposite when true. You
|
// must hold the mutex.
|
//
|
// This method panics if you attempt to mutate the item during traversal. The
|
// underlying btree should also not be mutated during traversal.
|
func (q *TransmitLimitedQueue) walkReadOnlyLocked(reverse bool, f func(*limitedBroadcast) bool) {
|
if q.lenLocked() == 0 {
|
return
|
}
|
|
iter := func(item btree.Item) bool {
|
cur := item.(*limitedBroadcast)
|
|
prevTransmits := cur.transmits
|
prevMsgLen := cur.msgLen
|
prevID := cur.id
|
|
keepGoing := f(cur)
|
|
if prevTransmits != cur.transmits || prevMsgLen != cur.msgLen || prevID != cur.id {
|
panic("edited queue while walking read only")
|
}
|
|
return keepGoing
|
}
|
|
if reverse {
|
q.tq.Descend(iter) // end with transmit 0
|
} else {
|
q.tq.Ascend(iter) // start with transmit 0
|
}
|
}
|
|
// Broadcast is something that can be broadcasted via gossip to
|
// the memberlist cluster.
|
type Broadcast interface {
|
// Invalidates checks if enqueuing the current broadcast
|
// invalidates a previous broadcast
|
Invalidates(b Broadcast) bool
|
|
// Returns a byte form of the message
|
Message() []byte
|
|
// Finished is invoked when the message will no longer
|
// be broadcast, either due to invalidation or to the
|
// transmit limit being reached
|
Finished()
|
}
|
|
// NamedBroadcast is an optional extension of the Broadcast interface that
|
// gives each message a unique string name, and that is used to optimize
|
//
|
// You shoud ensure that Invalidates() checks the same uniqueness as the
|
// example below:
|
//
|
// func (b *foo) Invalidates(other Broadcast) bool {
|
// nb, ok := other.(NamedBroadcast)
|
// if !ok {
|
// return false
|
// }
|
// return b.Name() == nb.Name()
|
// }
|
//
|
// Invalidates() isn't currently used for NamedBroadcasts, but that may change
|
// in the future.
|
type NamedBroadcast interface {
|
Broadcast
|
// The unique identity of this broadcast message.
|
Name() string
|
}
|
|
// UniqueBroadcast is an optional interface that indicates that each message is
|
// intrinsically unique and there is no need to scan the broadcast queue for
|
// duplicates.
|
//
|
// You should ensure that Invalidates() always returns false if implementing
|
// this interface. Invalidates() isn't currently used for UniqueBroadcasts, but
|
// that may change in the future.
|
type UniqueBroadcast interface {
|
Broadcast
|
// UniqueBroadcast is just a marker method for this interface.
|
UniqueBroadcast()
|
}
|
|
// QueueBroadcast is used to enqueue a broadcast
|
func (q *TransmitLimitedQueue) QueueBroadcast(b Broadcast) {
|
q.queueBroadcast(b, 0)
|
}
|
|
// lazyInit initializes internal data structures the first time they are
|
// needed. You must already hold the mutex.
|
func (q *TransmitLimitedQueue) lazyInit() {
|
if q.tq == nil {
|
q.tq = btree.New(32)
|
}
|
if q.tm == nil {
|
q.tm = make(map[string]*limitedBroadcast)
|
}
|
}
|
|
// queueBroadcast is like QueueBroadcast but you can use a nonzero value for
|
// the initial transmit tier assigned to the message. This is meant to be used
|
// for unit testing.
|
func (q *TransmitLimitedQueue) queueBroadcast(b Broadcast, initialTransmits int) {
|
q.mu.Lock()
|
defer q.mu.Unlock()
|
|
q.lazyInit()
|
|
if q.idGen == math.MaxInt64 {
|
// it's super duper unlikely to wrap around within the retransmit limit
|
q.idGen = 1
|
} else {
|
q.idGen++
|
}
|
id := q.idGen
|
|
lb := &limitedBroadcast{
|
transmits: initialTransmits,
|
msgLen: int64(len(b.Message())),
|
id: id,
|
b: b,
|
}
|
unique := false
|
if nb, ok := b.(NamedBroadcast); ok {
|
lb.name = nb.Name()
|
} else if _, ok := b.(UniqueBroadcast); ok {
|
unique = true
|
}
|
|
// Check if this message invalidates another.
|
if lb.name != "" {
|
if old, ok := q.tm[lb.name]; ok {
|
old.b.Finished()
|
q.deleteItem(old)
|
}
|
} else if !unique {
|
// Slow path, hopefully nothing hot hits this.
|
var remove []*limitedBroadcast
|
q.tq.Ascend(func(item btree.Item) bool {
|
cur := item.(*limitedBroadcast)
|
|
// Special Broadcasts can only invalidate each other.
|
switch cur.b.(type) {
|
case NamedBroadcast:
|
// noop
|
case UniqueBroadcast:
|
// noop
|
default:
|
if b.Invalidates(cur.b) {
|
cur.b.Finished()
|
remove = append(remove, cur)
|
}
|
}
|
return true
|
})
|
for _, cur := range remove {
|
q.deleteItem(cur)
|
}
|
}
|
|
// Append to the relevant queue.
|
q.addItem(lb)
|
}
|
|
// deleteItem removes the given item from the overall datastructure. You
|
// must already hold the mutex.
|
func (q *TransmitLimitedQueue) deleteItem(cur *limitedBroadcast) {
|
_ = q.tq.Delete(cur)
|
if cur.name != "" {
|
delete(q.tm, cur.name)
|
}
|
|
if q.tq.Len() == 0 {
|
// At idle there's no reason to let the id generator keep going
|
// indefinitely.
|
q.idGen = 0
|
}
|
}
|
|
// addItem adds the given item into the overall datastructure. You must already
|
// hold the mutex.
|
func (q *TransmitLimitedQueue) addItem(cur *limitedBroadcast) {
|
_ = q.tq.ReplaceOrInsert(cur)
|
if cur.name != "" {
|
q.tm[cur.name] = cur
|
}
|
}
|
|
// getTransmitRange returns a pair of min/max values for transmit values
|
// represented by the current queue contents. Both values represent actual
|
// transmit values on the interval [0, len). You must already hold the mutex.
|
func (q *TransmitLimitedQueue) getTransmitRange() (minTransmit, maxTransmit int) {
|
if q.lenLocked() == 0 {
|
return 0, 0
|
}
|
minItem, maxItem := q.tq.Min(), q.tq.Max()
|
if minItem == nil || maxItem == nil {
|
return 0, 0
|
}
|
|
min := minItem.(*limitedBroadcast).transmits
|
max := maxItem.(*limitedBroadcast).transmits
|
|
return min, max
|
}
|
|
// GetBroadcasts is used to get a number of broadcasts, up to a byte limit
|
// and applying a per-message overhead as provided.
|
func (q *TransmitLimitedQueue) GetBroadcasts(overhead, limit int) [][]byte {
|
q.mu.Lock()
|
defer q.mu.Unlock()
|
|
// Fast path the default case
|
if q.lenLocked() == 0 {
|
return nil
|
}
|
|
transmitLimit := retransmitLimit(q.RetransmitMult, q.NumNodes())
|
|
var (
|
bytesUsed int
|
toSend [][]byte
|
reinsert []*limitedBroadcast
|
)
|
|
// Visit fresher items first, but only look at stuff that will fit.
|
// We'll go tier by tier, grabbing the largest items first.
|
minTr, maxTr := q.getTransmitRange()
|
for transmits := minTr; transmits <= maxTr; /*do not advance automatically*/ {
|
free := int64(limit - bytesUsed - overhead)
|
if free <= 0 {
|
break // bail out early
|
}
|
|
// Search for the least element on a given tier (by transmit count) as
|
// defined in the limitedBroadcast.Less function that will fit into our
|
// remaining space.
|
greaterOrEqual := &limitedBroadcast{
|
transmits: transmits,
|
msgLen: free,
|
id: math.MaxInt64,
|
}
|
lessThan := &limitedBroadcast{
|
transmits: transmits + 1,
|
msgLen: math.MaxInt64,
|
id: math.MaxInt64,
|
}
|
var keep *limitedBroadcast
|
q.tq.AscendRange(greaterOrEqual, lessThan, func(item btree.Item) bool {
|
cur := item.(*limitedBroadcast)
|
// Check if this is within our limits
|
if int64(len(cur.b.Message())) > free {
|
// If this happens it's a bug in the datastructure or
|
// surrounding use doing something like having len(Message())
|
// change over time. There's enough going on here that it's
|
// probably sane to just skip it and move on for now.
|
return true
|
}
|
keep = cur
|
return false
|
})
|
if keep == nil {
|
// No more items of an appropriate size in the tier.
|
transmits++
|
continue
|
}
|
|
msg := keep.b.Message()
|
|
// Add to slice to send
|
bytesUsed += overhead + len(msg)
|
toSend = append(toSend, msg)
|
|
// Check if we should stop transmission
|
q.deleteItem(keep)
|
if keep.transmits+1 >= transmitLimit {
|
keep.b.Finished()
|
} else {
|
// We need to bump this item down to another transmit tier, but
|
// because it would be in the same direction that we're walking the
|
// tiers, we will have to delay the reinsertion until we are
|
// finished our search. Otherwise we'll possibly re-add the message
|
// when we ascend to the next tier.
|
keep.transmits++
|
reinsert = append(reinsert, keep)
|
}
|
}
|
|
for _, cur := range reinsert {
|
q.addItem(cur)
|
}
|
|
return toSend
|
}
|
|
// NumQueued returns the number of queued messages
|
func (q *TransmitLimitedQueue) NumQueued() int {
|
q.mu.Lock()
|
defer q.mu.Unlock()
|
return q.lenLocked()
|
}
|
|
// lenLocked returns the length of the overall queue datastructure. You must
|
// hold the mutex.
|
func (q *TransmitLimitedQueue) lenLocked() int {
|
if q.tq == nil {
|
return 0
|
}
|
return q.tq.Len()
|
}
|
|
// Reset clears all the queued messages. Should only be used for tests.
|
func (q *TransmitLimitedQueue) Reset() {
|
q.mu.Lock()
|
defer q.mu.Unlock()
|
|
q.walkReadOnlyLocked(false, func(cur *limitedBroadcast) bool {
|
cur.b.Finished()
|
return true
|
})
|
|
q.tq = nil
|
q.tm = nil
|
q.idGen = 0
|
}
|
|
// Prune will retain the maxRetain latest messages, and the rest
|
// will be discarded. This can be used to prevent unbounded queue sizes
|
func (q *TransmitLimitedQueue) Prune(maxRetain int) {
|
q.mu.Lock()
|
defer q.mu.Unlock()
|
|
// Do nothing if queue size is less than the limit
|
for q.tq.Len() > maxRetain {
|
item := q.tq.Max()
|
if item == nil {
|
break
|
}
|
cur := item.(*limitedBroadcast)
|
cur.b.Finished()
|
q.deleteItem(cur)
|
}
|
}
|