liuxiaolong
2020-09-16 83055166cfa596ef6a91cbb1fdbde40c1c7298a0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
package serf
 
import (
    "bytes"
    "net"
    "time"
 
    "github.com/hashicorp/go-msgpack/codec"
)
 
// messageType are the types of gossip messages Serf will send along
// memberlist.
type messageType uint8
 
const (
    messageLeaveType messageType = iota
    messageJoinType
    messagePushPullType
    messageUserEventType
    messageQueryType
    messageQueryResponseType
    messageConflictResponseType
    messageKeyRequestType
    messageKeyResponseType
    messageRelayType
)
 
const (
    // Ack flag is used to force receiver to send an ack back
    queryFlagAck uint32 = 1 << iota
 
    // NoBroadcast is used to prevent re-broadcast of a query.
    // this can be used to selectively send queries to individual members
    queryFlagNoBroadcast
)
 
// filterType is used with a queryFilter to specify the type of
// filter we are sending
type filterType uint8
 
const (
    filterNodeType filterType = iota
    filterTagType
)
 
// messageJoin is the message broadcasted after we join to
// associated the node with a lamport clock
type messageJoin struct {
    LTime LamportTime
    Node  string
}
 
// messageLeave is the message broadcasted to signal the intentional to
// leave.
type messageLeave struct {
    LTime LamportTime
    Node  string
    Prune bool
}
 
// messagePushPullType is used when doing a state exchange. This
// is a relatively large message, but is sent infrequently
type messagePushPull struct {
    LTime        LamportTime            // Current node lamport time
    StatusLTimes map[string]LamportTime // Maps the node to its status time
    LeftMembers  []string               // List of left nodes
    EventLTime   LamportTime            // Lamport time for event clock
    Events       []*userEvents          // Recent events
    QueryLTime   LamportTime            // Lamport time for query clock
}
 
// messageUserEvent is used for user-generated events
type messageUserEvent struct {
    LTime   LamportTime
    Name    string
    Payload []byte
    CC      bool // "Can Coalesce". Zero value is compatible with Serf 0.1
}
 
// messageQuery is used for query events
type messageQuery struct {
    LTime       LamportTime   // Event lamport time
    ID          uint32        // Query ID, randomly generated
    Addr        []byte        // Source address, used for a direct reply
    Port        uint16        // Source port, used for a direct reply
    SourceNode  string        // Source name, used for a direct reply
    Filters     [][]byte      // Potential query filters
    Flags       uint32        // Used to provide various flags
    RelayFactor uint8         // Used to set the number of duplicate relayed responses
    Timeout     time.Duration // Maximum time between delivery and response
    Name        string        // Query name
    Payload     []byte        // Query payload
}
 
// Ack checks if the ack flag is set
func (m *messageQuery) Ack() bool {
    return (m.Flags & queryFlagAck) != 0
}
 
// NoBroadcast checks if the no broadcast flag is set
func (m *messageQuery) NoBroadcast() bool {
    return (m.Flags & queryFlagNoBroadcast) != 0
}
 
// filterNode is used with the filterNodeType, and is a list
// of node names
type filterNode []string
 
// filterTag is used with the filterTagType and is a regular
// expression to apply to a tag
type filterTag struct {
    Tag  string
    Expr string
}
 
// messageQueryResponse is used to respond to a query
type messageQueryResponse struct {
    LTime   LamportTime // Event lamport time
    ID      uint32      // Query ID
    From    string      // Node name
    Flags   uint32      // Used to provide various flags
    Payload []byte      // Optional response payload
}
 
// Ack checks if the ack flag is set
func (m *messageQueryResponse) Ack() bool {
    return (m.Flags & queryFlagAck) != 0
}
 
func decodeMessage(buf []byte, out interface{}) error {
    var handle codec.MsgpackHandle
    return codec.NewDecoder(bytes.NewReader(buf), &handle).Decode(out)
}
 
func encodeMessage(t messageType, msg interface{}) ([]byte, error) {
    buf := bytes.NewBuffer(nil)
    buf.WriteByte(uint8(t))
 
    handle := codec.MsgpackHandle{}
    encoder := codec.NewEncoder(buf, &handle)
    err := encoder.Encode(msg)
    return buf.Bytes(), err
}
 
// relayHeader is used to store the end destination of a relayed message
type relayHeader struct {
    DestAddr net.UDPAddr
    DestName string
}
 
// encodeRelayMessage wraps a message in the messageRelayType, adding the length and
// address of the end recipient to the front of the message
func encodeRelayMessage(
    t messageType,
    addr net.UDPAddr,
    nodeName string,
    msg interface{},
) ([]byte, error) {
    buf := bytes.NewBuffer(nil)
    handle := codec.MsgpackHandle{}
    encoder := codec.NewEncoder(buf, &handle)
 
    buf.WriteByte(uint8(messageRelayType))
 
    err := encoder.Encode(relayHeader{
        DestAddr: addr,
        DestName: nodeName,
    })
    if err != nil {
        return nil, err
    }
 
    buf.WriteByte(uint8(t))
    err = encoder.Encode(msg)
    return buf.Bytes(), err
}
 
func encodeFilter(f filterType, filt interface{}) ([]byte, error) {
    buf := bytes.NewBuffer(nil)
    buf.WriteByte(uint8(f))
 
    handle := codec.MsgpackHandle{}
    encoder := codec.NewEncoder(buf, &handle)
    err := encoder.Encode(filt)
    return buf.Bytes(), err
}