liuxiaolong
2022-06-28 37714b1093c04061e636e5b1d27179652e671c0a
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
package memberlist
 
/*
The broadcast mechanism works by maintaining a sorted list of messages to be
sent out. When a message is to be broadcast, the retransmit count
is set to zero and appended to the queue. The retransmit count serves
as the "priority", ensuring that newer messages get sent first. Once
a message hits the retransmit limit, it is removed from the queue.
 
Additionally, older entries can be invalidated by new messages that
are contradictory. For example, if we send "{suspect M1 inc: 1},
then a following {alive M1 inc: 2} will invalidate that message
*/
 
type memberlistBroadcast struct {
    node   string
    msg    []byte
    notify chan struct{}
}
 
func (b *memberlistBroadcast) Invalidates(other Broadcast) bool {
    // Check if that broadcast is a memberlist type
    mb, ok := other.(*memberlistBroadcast)
    if !ok {
        return false
    }
 
    // Invalidates any message about the same node
    return b.node == mb.node
}
 
// memberlist.NamedBroadcast optional interface
func (b *memberlistBroadcast) Name() string {
    return b.node
}
 
func (b *memberlistBroadcast) Message() []byte {
    return b.msg
}
 
func (b *memberlistBroadcast) Finished() {
    select {
    case b.notify <- struct{}{}:
    default:
    }
}
 
// encodeAndBroadcast encodes a message and enqueues it for broadcast. Fails
// silently if there is an encoding error.
func (m *Memberlist) encodeAndBroadcast(node string, msgType messageType, msg interface{}) {
    m.encodeBroadcastNotify(node, msgType, msg, nil)
}
 
// encodeBroadcastNotify encodes a message and enqueues it for broadcast
// and notifies the given channel when transmission is finished. Fails
// silently if there is an encoding error.
func (m *Memberlist) encodeBroadcastNotify(node string, msgType messageType, msg interface{}, notify chan struct{}) {
    buf, err := encode(msgType, msg)
    if err != nil {
        m.logger.Printf("[ERR] memberlist: Failed to encode message for broadcast: %s", err)
    } else {
        m.queueBroadcast(node, buf.Bytes(), notify)
    }
}
 
// queueBroadcast is used to start dissemination of a message. It will be
// sent up to a configured number of times. The message could potentially
// be invalidated by a future message about the same node
func (m *Memberlist) queueBroadcast(node string, msg []byte, notify chan struct{}) {
    b := &memberlistBroadcast{node, msg, notify}
    m.broadcasts.QueueBroadcast(b)
}
 
// getBroadcasts is used to return a slice of broadcasts to send up to
// a maximum byte size, while imposing a per-broadcast overhead. This is used
// to fill a UDP packet with piggybacked data
func (m *Memberlist) getBroadcasts(overhead, limit int) [][]byte {
    // Get memberlist messages first
    toSend := m.broadcasts.GetBroadcasts(overhead, limit)
 
    // Check if the user has anything to broadcast
    d := m.config.Delegate
    if d != nil {
        // Determine the bytes used already
        bytesUsed := 0
        for _, msg := range toSend {
            bytesUsed += len(msg) + overhead
        }
 
        // Check space remaining for user messages
        avail := limit - bytesUsed
        if avail > overhead+userMsgOverhead {
            userMsgs := d.GetBroadcasts(overhead+userMsgOverhead, avail)
 
            // Frame each user message
            for _, msg := range userMsgs {
                buf := make([]byte, 1, len(msg)+1)
                buf[0] = byte(userMsg)
                buf = append(buf, msg...)
                toSend = append(toSend, buf)
            }
        }
    }
    return toSend
}