package memberlist import ( "math" "sync" "github.com/google/btree" ) // TransmitLimitedQueue is used to queue messages to broadcast to // the cluster (via gossip) but limits the number of transmits per // message. It also prioritizes messages with lower transmit counts // (hence newer messages). type TransmitLimitedQueue struct { // NumNodes returns the number of nodes in the cluster. This is // used to determine the retransmit count, which is calculated // based on the log of this. NumNodes func() int // RetransmitMult is the multiplier used to determine the maximum // number of retransmissions attempted. RetransmitMult int mu sync.Mutex tq *btree.BTree // stores *limitedBroadcast as btree.Item tm map[string]*limitedBroadcast idGen int64 } type limitedBroadcast struct { transmits int // btree-key[0]: Number of transmissions attempted. msgLen int64 // btree-key[1]: copied from len(b.Message()) id int64 // btree-key[2]: unique incrementing id stamped at submission time b Broadcast name string // set if Broadcast is a NamedBroadcast } // Less tests whether the current item is less than the given argument. // // This must provide a strict weak ordering. // If !a.Less(b) && !b.Less(a), we treat this to mean a == b (i.e. we can only // hold one of either a or b in the tree). // // default ordering is // - [transmits=0, ..., transmits=inf] // - [transmits=0:len=999, ..., transmits=0:len=2, ...] // - [transmits=0:len=999,id=999, ..., transmits=0:len=999:id=1, ...] func (b *limitedBroadcast) Less(than btree.Item) bool { o := than.(*limitedBroadcast) if b.transmits < o.transmits { return true } else if b.transmits > o.transmits { return false } if b.msgLen > o.msgLen { return true } else if b.msgLen < o.msgLen { return false } return b.id > o.id } // for testing; emits in transmit order if reverse=false func (q *TransmitLimitedQueue) orderedView(reverse bool) []*limitedBroadcast { q.mu.Lock() defer q.mu.Unlock() out := make([]*limitedBroadcast, 0, q.lenLocked()) q.walkReadOnlyLocked(reverse, func(cur *limitedBroadcast) bool { out = append(out, cur) return true }) return out } // walkReadOnlyLocked calls f for each item in the queue traversing it in // natural order (by Less) when reverse=false and the opposite when true. You // must hold the mutex. // // This method panics if you attempt to mutate the item during traversal. The // underlying btree should also not be mutated during traversal. func (q *TransmitLimitedQueue) walkReadOnlyLocked(reverse bool, f func(*limitedBroadcast) bool) { if q.lenLocked() == 0 { return } iter := func(item btree.Item) bool { cur := item.(*limitedBroadcast) prevTransmits := cur.transmits prevMsgLen := cur.msgLen prevID := cur.id keepGoing := f(cur) if prevTransmits != cur.transmits || prevMsgLen != cur.msgLen || prevID != cur.id { panic("edited queue while walking read only") } return keepGoing } if reverse { q.tq.Descend(iter) // end with transmit 0 } else { q.tq.Ascend(iter) // start with transmit 0 } } // Broadcast is something that can be broadcasted via gossip to // the memberlist cluster. type Broadcast interface { // Invalidates checks if enqueuing the current broadcast // invalidates a previous broadcast Invalidates(b Broadcast) bool // Returns a byte form of the message Message() []byte // Finished is invoked when the message will no longer // be broadcast, either due to invalidation or to the // transmit limit being reached Finished() } // NamedBroadcast is an optional extension of the Broadcast interface that // gives each message a unique string name, and that is used to optimize // // You shoud ensure that Invalidates() checks the same uniqueness as the // example below: // // func (b *foo) Invalidates(other Broadcast) bool { // nb, ok := other.(NamedBroadcast) // if !ok { // return false // } // return b.Name() == nb.Name() // } // // Invalidates() isn't currently used for NamedBroadcasts, but that may change // in the future. type NamedBroadcast interface { Broadcast // The unique identity of this broadcast message. Name() string } // UniqueBroadcast is an optional interface that indicates that each message is // intrinsically unique and there is no need to scan the broadcast queue for // duplicates. // // You should ensure that Invalidates() always returns false if implementing // this interface. Invalidates() isn't currently used for UniqueBroadcasts, but // that may change in the future. type UniqueBroadcast interface { Broadcast // UniqueBroadcast is just a marker method for this interface. UniqueBroadcast() } // QueueBroadcast is used to enqueue a broadcast func (q *TransmitLimitedQueue) QueueBroadcast(b Broadcast) { q.queueBroadcast(b, 0) } // lazyInit initializes internal data structures the first time they are // needed. You must already hold the mutex. func (q *TransmitLimitedQueue) lazyInit() { if q.tq == nil { q.tq = btree.New(32) } if q.tm == nil { q.tm = make(map[string]*limitedBroadcast) } } // queueBroadcast is like QueueBroadcast but you can use a nonzero value for // the initial transmit tier assigned to the message. This is meant to be used // for unit testing. func (q *TransmitLimitedQueue) queueBroadcast(b Broadcast, initialTransmits int) { q.mu.Lock() defer q.mu.Unlock() q.lazyInit() if q.idGen == math.MaxInt64 { // it's super duper unlikely to wrap around within the retransmit limit q.idGen = 1 } else { q.idGen++ } id := q.idGen lb := &limitedBroadcast{ transmits: initialTransmits, msgLen: int64(len(b.Message())), id: id, b: b, } unique := false if nb, ok := b.(NamedBroadcast); ok { lb.name = nb.Name() } else if _, ok := b.(UniqueBroadcast); ok { unique = true } // Check if this message invalidates another. if lb.name != "" { if old, ok := q.tm[lb.name]; ok { old.b.Finished() q.deleteItem(old) } } else if !unique { // Slow path, hopefully nothing hot hits this. var remove []*limitedBroadcast q.tq.Ascend(func(item btree.Item) bool { cur := item.(*limitedBroadcast) // Special Broadcasts can only invalidate each other. switch cur.b.(type) { case NamedBroadcast: // noop case UniqueBroadcast: // noop default: if b.Invalidates(cur.b) { cur.b.Finished() remove = append(remove, cur) } } return true }) for _, cur := range remove { q.deleteItem(cur) } } // Append to the relevant queue. q.addItem(lb) } // deleteItem removes the given item from the overall datastructure. You // must already hold the mutex. func (q *TransmitLimitedQueue) deleteItem(cur *limitedBroadcast) { _ = q.tq.Delete(cur) if cur.name != "" { delete(q.tm, cur.name) } if q.tq.Len() == 0 { // At idle there's no reason to let the id generator keep going // indefinitely. q.idGen = 0 } } // addItem adds the given item into the overall datastructure. You must already // hold the mutex. func (q *TransmitLimitedQueue) addItem(cur *limitedBroadcast) { _ = q.tq.ReplaceOrInsert(cur) if cur.name != "" { q.tm[cur.name] = cur } } // getTransmitRange returns a pair of min/max values for transmit values // represented by the current queue contents. Both values represent actual // transmit values on the interval [0, len). You must already hold the mutex. func (q *TransmitLimitedQueue) getTransmitRange() (minTransmit, maxTransmit int) { if q.lenLocked() == 0 { return 0, 0 } minItem, maxItem := q.tq.Min(), q.tq.Max() if minItem == nil || maxItem == nil { return 0, 0 } min := minItem.(*limitedBroadcast).transmits max := maxItem.(*limitedBroadcast).transmits return min, max } // GetBroadcasts is used to get a number of broadcasts, up to a byte limit // and applying a per-message overhead as provided. func (q *TransmitLimitedQueue) GetBroadcasts(overhead, limit int) [][]byte { q.mu.Lock() defer q.mu.Unlock() // Fast path the default case if q.lenLocked() == 0 { return nil } transmitLimit := retransmitLimit(q.RetransmitMult, q.NumNodes()) var ( bytesUsed int toSend [][]byte reinsert []*limitedBroadcast ) // Visit fresher items first, but only look at stuff that will fit. // We'll go tier by tier, grabbing the largest items first. minTr, maxTr := q.getTransmitRange() for transmits := minTr; transmits <= maxTr; /*do not advance automatically*/ { free := int64(limit - bytesUsed - overhead) if free <= 0 { break // bail out early } // Search for the least element on a given tier (by transmit count) as // defined in the limitedBroadcast.Less function that will fit into our // remaining space. greaterOrEqual := &limitedBroadcast{ transmits: transmits, msgLen: free, id: math.MaxInt64, } lessThan := &limitedBroadcast{ transmits: transmits + 1, msgLen: math.MaxInt64, id: math.MaxInt64, } var keep *limitedBroadcast q.tq.AscendRange(greaterOrEqual, lessThan, func(item btree.Item) bool { cur := item.(*limitedBroadcast) // Check if this is within our limits if int64(len(cur.b.Message())) > free { // If this happens it's a bug in the datastructure or // surrounding use doing something like having len(Message()) // change over time. There's enough going on here that it's // probably sane to just skip it and move on for now. return true } keep = cur return false }) if keep == nil { // No more items of an appropriate size in the tier. transmits++ continue } msg := keep.b.Message() // Add to slice to send bytesUsed += overhead + len(msg) toSend = append(toSend, msg) // Check if we should stop transmission q.deleteItem(keep) if keep.transmits+1 >= transmitLimit { keep.b.Finished() } else { // We need to bump this item down to another transmit tier, but // because it would be in the same direction that we're walking the // tiers, we will have to delay the reinsertion until we are // finished our search. Otherwise we'll possibly re-add the message // when we ascend to the next tier. keep.transmits++ reinsert = append(reinsert, keep) } } for _, cur := range reinsert { q.addItem(cur) } return toSend } // NumQueued returns the number of queued messages func (q *TransmitLimitedQueue) NumQueued() int { q.mu.Lock() defer q.mu.Unlock() return q.lenLocked() } // lenLocked returns the length of the overall queue datastructure. You must // hold the mutex. func (q *TransmitLimitedQueue) lenLocked() int { if q.tq == nil { return 0 } return q.tq.Len() } // Reset clears all the queued messages. Should only be used for tests. func (q *TransmitLimitedQueue) Reset() { q.mu.Lock() defer q.mu.Unlock() q.walkReadOnlyLocked(false, func(cur *limitedBroadcast) bool { cur.b.Finished() return true }) q.tq = nil q.tm = nil q.idGen = 0 } // Prune will retain the maxRetain latest messages, and the rest // will be discarded. This can be used to prevent unbounded queue sizes func (q *TransmitLimitedQueue) Prune(maxRetain int) { q.mu.Lock() defer q.mu.Unlock() // Do nothing if queue size is less than the limit for q.tq.Len() > maxRetain { item := q.tq.Max() if item == nil { break } cur := item.(*limitedBroadcast) cur.b.Finished() q.deleteItem(cur) } }