liuxiaolong
2020-09-16 b04c3b2bf52d57550e8bfdc7f09859962b7c1396
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
package serf
 
import (
    "fmt"
    "net"
    "sync"
    "time"
 
    "github.com/hashicorp/memberlist"
)
 
// EventType are all the types of events that may occur and be sent
// along the Serf channel.
type EventType int
 
const (
    EventMemberJoin EventType = iota
    EventMemberLeave
    EventMemberFailed
    EventMemberUpdate
    EventMemberReap
    EventUser
    EventQuery
)
 
func (t EventType) String() string {
    switch t {
    case EventMemberJoin:
        return "member-join"
    case EventMemberLeave:
        return "member-leave"
    case EventMemberFailed:
        return "member-failed"
    case EventMemberUpdate:
        return "member-update"
    case EventMemberReap:
        return "member-reap"
    case EventUser:
        return "user"
    case EventQuery:
        return "query"
    default:
        panic(fmt.Sprintf("unknown event type: %d", t))
    }
}
 
// Event is a generic interface for exposing Serf events
// Clients will usually need to use a type switches to get
// to a more useful type
type Event interface {
    EventType() EventType
    String() string
}
 
// MemberEvent is the struct used for member related events
// Because Serf coalesces events, an event may contain multiple members.
type MemberEvent struct {
    Type    EventType
    Members []Member
}
 
func (m MemberEvent) EventType() EventType {
    return m.Type
}
 
func (m MemberEvent) String() string {
    switch m.Type {
    case EventMemberJoin:
        return "member-join"
    case EventMemberLeave:
        return "member-leave"
    case EventMemberFailed:
        return "member-failed"
    case EventMemberUpdate:
        return "member-update"
    case EventMemberReap:
        return "member-reap"
    default:
        panic(fmt.Sprintf("unknown event type: %d", m.Type))
    }
}
 
// UserEvent is the struct used for events that are triggered
// by the user and are not related to members
type UserEvent struct {
    LTime    LamportTime
    Name     string
    Payload  []byte
    Coalesce bool
}
 
func (u UserEvent) EventType() EventType {
    return EventUser
}
 
func (u UserEvent) String() string {
    return fmt.Sprintf("user-event: %s", u.Name)
}
 
// Query is the struct used by EventQuery type events
type Query struct {
    LTime   LamportTime
    Name    string
    Payload []byte
 
    serf        *Serf
    id          uint32    // ID is not exported, since it may change
    addr        []byte    // Address to respond to
    port        uint16    // Port to respond to
    sourceNode  string    // Node name to respond to
    deadline    time.Time // Must respond by this deadline
    relayFactor uint8     // Number of duplicate responses to relay back to sender
    respLock    sync.Mutex
}
 
func (q *Query) EventType() EventType {
    return EventQuery
}
 
func (q *Query) String() string {
    return fmt.Sprintf("query: %s", q.Name)
}
 
// Deadline returns the time by which a response must be sent
func (q *Query) Deadline() time.Time {
    return q.deadline
}
 
func (q *Query) createResponse(buf []byte) messageQueryResponse {
    // Create response
    return messageQueryResponse{
        LTime:   q.LTime,
        ID:      q.id,
        From:    q.serf.config.NodeName,
        Payload: buf,
    }
}
 
// Check response size
func (q *Query) checkResponseSize(resp []byte) error {
    if len(resp) > q.serf.config.QueryResponseSizeLimit {
        return fmt.Errorf("response exceeds limit of %d bytes", q.serf.config.QueryResponseSizeLimit)
    }
    return nil
}
 
func (q *Query) respondWithMessageAndResponse(raw []byte, resp messageQueryResponse) error {
    // Check the size limit
    if err := q.checkResponseSize(raw); err != nil {
        return err
    }
 
    q.respLock.Lock()
    defer q.respLock.Unlock()
 
    // Check if we've already responded
    if q.deadline.IsZero() {
        return fmt.Errorf("response already sent")
    }
 
    // Ensure we aren't past our response deadline
    if time.Now().After(q.deadline) {
        return fmt.Errorf("response is past the deadline")
    }
 
    // Send the response directly to the originator
    udpAddr := net.UDPAddr{IP: q.addr, Port: int(q.port)}
 
    addr := memberlist.Address{
        Addr: udpAddr.String(),
        Name: q.sourceNode,
    }
    if err := q.serf.memberlist.SendToAddress(addr, raw); err != nil {
        return err
    }
 
    // Relay the response through up to relayFactor other nodes
    if err := q.serf.relayResponse(q.relayFactor, udpAddr, q.sourceNode, &resp); err != nil {
        return err
    }
 
    // Clear the deadline, responses sent
    q.deadline = time.Time{}
 
    return nil
}
 
// Respond is used to send a response to the user query
func (q *Query) Respond(buf []byte) error {
    // Create response
    resp := q.createResponse(buf)
 
    // Encode response
    raw, err := encodeMessage(messageQueryResponseType, resp)
    if err != nil {
        return fmt.Errorf("failed to format response: %v", err)
    }
 
    if err := q.respondWithMessageAndResponse(raw, resp); err != nil {
        return fmt.Errorf("failed to respond to key query: %v", err)
    }
 
    return nil
}