package serf
|
|
import (
|
"bytes"
|
"fmt"
|
|
"github.com/armon/go-metrics"
|
"github.com/hashicorp/go-msgpack/codec"
|
"github.com/hashicorp/memberlist"
|
)
|
|
// delegate is the memberlist.Delegate implementation that Serf uses.
|
type delegate struct {
|
serf *Serf
|
}
|
|
var _ memberlist.Delegate = &delegate{}
|
|
func (d *delegate) NodeMeta(limit int) []byte {
|
roleBytes := d.serf.encodeTags(d.serf.config.Tags)
|
if len(roleBytes) > limit {
|
panic(fmt.Errorf("Node tags '%v' exceeds length limit of %d bytes", d.serf.config.Tags, limit))
|
}
|
|
return roleBytes
|
}
|
|
func (d *delegate) NotifyMsg(buf []byte) {
|
// If we didn't actually receive any data, then ignore it.
|
if len(buf) == 0 {
|
return
|
}
|
metrics.AddSample([]string{"serf", "msgs", "received"}, float32(len(buf)))
|
|
rebroadcast := false
|
rebroadcastQueue := d.serf.broadcasts
|
t := messageType(buf[0])
|
switch t {
|
case messageLeaveType:
|
var leave messageLeave
|
if err := decodeMessage(buf[1:], &leave); err != nil {
|
d.serf.logger.Printf("[ERR] serf: Error decoding leave message: %s", err)
|
break
|
}
|
|
d.serf.logger.Printf("[DEBUG] serf: messageLeaveType: %s", leave.Node)
|
rebroadcast = d.serf.handleNodeLeaveIntent(&leave)
|
|
case messageJoinType:
|
var join messageJoin
|
if err := decodeMessage(buf[1:], &join); err != nil {
|
d.serf.logger.Printf("[ERR] serf: Error decoding join message: %s", err)
|
break
|
}
|
|
d.serf.logger.Printf("[DEBUG] serf: messageJoinType: %s", join.Node)
|
rebroadcast = d.serf.handleNodeJoinIntent(&join)
|
|
case messageUserEventType:
|
var event messageUserEvent
|
if err := decodeMessage(buf[1:], &event); err != nil {
|
d.serf.logger.Printf("[ERR] serf: Error decoding user event message: %s", err)
|
break
|
}
|
|
d.serf.logger.Printf("[DEBUG] serf: messageUserEventType: %s", event.Name)
|
rebroadcast = d.serf.handleUserEvent(&event)
|
rebroadcastQueue = d.serf.eventBroadcasts
|
|
case messageQueryType:
|
var query messageQuery
|
if err := decodeMessage(buf[1:], &query); err != nil {
|
d.serf.logger.Printf("[ERR] serf: Error decoding query message: %s", err)
|
break
|
}
|
|
d.serf.logger.Printf("[DEBUG] serf: messageQueryType: %s", query.Name)
|
rebroadcast = d.serf.handleQuery(&query)
|
rebroadcastQueue = d.serf.queryBroadcasts
|
|
case messageQueryResponseType:
|
var resp messageQueryResponse
|
if err := decodeMessage(buf[1:], &resp); err != nil {
|
d.serf.logger.Printf("[ERR] serf: Error decoding query response message: %s", err)
|
break
|
}
|
|
d.serf.logger.Printf("[DEBUG] serf: messageQueryResponseType: %v", resp.From)
|
d.serf.handleQueryResponse(&resp)
|
|
case messageRelayType:
|
var header relayHeader
|
var handle codec.MsgpackHandle
|
reader := bytes.NewReader(buf[1:])
|
decoder := codec.NewDecoder(reader, &handle)
|
if err := decoder.Decode(&header); err != nil {
|
d.serf.logger.Printf("[ERR] serf: Error decoding relay header: %s", err)
|
break
|
}
|
|
// The remaining contents are the message itself, so forward that
|
raw := make([]byte, reader.Len())
|
reader.Read(raw)
|
|
addr := memberlist.Address{
|
Addr: header.DestAddr.String(),
|
Name: header.DestName,
|
}
|
|
d.serf.logger.Printf("[DEBUG] serf: Relaying response to addr: %s", header.DestAddr.String())
|
if err := d.serf.memberlist.SendToAddress(addr, raw); err != nil {
|
d.serf.logger.Printf("[ERR] serf: Error forwarding message to %s: %s", header.DestAddr.String(), err)
|
break
|
}
|
|
default:
|
d.serf.logger.Printf("[WARN] serf: Received message of unknown type: %d", t)
|
}
|
|
if rebroadcast {
|
// Copy the buffer since it we cannot rely on the slice not changing
|
newBuf := make([]byte, len(buf))
|
copy(newBuf, buf)
|
|
rebroadcastQueue.QueueBroadcast(&broadcast{
|
msg: newBuf,
|
notify: nil,
|
})
|
}
|
}
|
|
func (d *delegate) GetBroadcasts(overhead, limit int) [][]byte {
|
msgs := d.serf.broadcasts.GetBroadcasts(overhead, limit)
|
|
// Determine the bytes used already
|
bytesUsed := 0
|
for _, msg := range msgs {
|
lm := len(msg)
|
bytesUsed += lm + overhead
|
metrics.AddSample([]string{"serf", "msgs", "sent"}, float32(lm))
|
}
|
|
// Get any additional query broadcasts
|
queryMsgs := d.serf.queryBroadcasts.GetBroadcasts(overhead, limit-bytesUsed)
|
if queryMsgs != nil {
|
for _, m := range queryMsgs {
|
lm := len(m)
|
bytesUsed += lm + overhead
|
metrics.AddSample([]string{"serf", "msgs", "sent"}, float32(lm))
|
}
|
msgs = append(msgs, queryMsgs...)
|
}
|
|
// Get any additional event broadcasts
|
eventMsgs := d.serf.eventBroadcasts.GetBroadcasts(overhead, limit-bytesUsed)
|
if eventMsgs != nil {
|
for _, m := range eventMsgs {
|
lm := len(m)
|
bytesUsed += lm + overhead
|
metrics.AddSample([]string{"serf", "msgs", "sent"}, float32(lm))
|
}
|
msgs = append(msgs, eventMsgs...)
|
}
|
|
return msgs
|
}
|
|
func (d *delegate) LocalState(join bool) []byte {
|
d.serf.memberLock.RLock()
|
defer d.serf.memberLock.RUnlock()
|
d.serf.eventLock.RLock()
|
defer d.serf.eventLock.RUnlock()
|
|
// Create the message to send
|
pp := messagePushPull{
|
LTime: d.serf.clock.Time(),
|
StatusLTimes: make(map[string]LamportTime, len(d.serf.members)),
|
LeftMembers: make([]string, 0, len(d.serf.leftMembers)),
|
EventLTime: d.serf.eventClock.Time(),
|
Events: d.serf.eventBuffer,
|
QueryLTime: d.serf.queryClock.Time(),
|
}
|
|
// Add all the join LTimes
|
for name, member := range d.serf.members {
|
pp.StatusLTimes[name] = member.statusLTime
|
}
|
|
// Add all the left nodes
|
for _, member := range d.serf.leftMembers {
|
pp.LeftMembers = append(pp.LeftMembers, member.Name)
|
}
|
|
// Encode the push pull state
|
buf, err := encodeMessage(messagePushPullType, &pp)
|
if err != nil {
|
d.serf.logger.Printf("[ERR] serf: Failed to encode local state: %v", err)
|
return nil
|
}
|
return buf
|
}
|
|
func (d *delegate) MergeRemoteState(buf []byte, isJoin bool) {
|
// Ensure we have a message
|
if len(buf) == 0 {
|
d.serf.logger.Printf("[ERR] serf: Remote state is zero bytes")
|
return
|
}
|
|
// Check the message type
|
if messageType(buf[0]) != messagePushPullType {
|
d.serf.logger.Printf("[ERR] serf: Remote state has bad type prefix: %v", buf[0])
|
return
|
}
|
|
// Attempt a decode
|
pp := messagePushPull{}
|
if err := decodeMessage(buf[1:], &pp); err != nil {
|
d.serf.logger.Printf("[ERR] serf: Failed to decode remote state: %v", err)
|
return
|
}
|
|
// Witness the Lamport clocks first.
|
// We subtract 1 since no message with that clock has been sent yet
|
if pp.LTime > 0 {
|
d.serf.clock.Witness(pp.LTime - 1)
|
}
|
if pp.EventLTime > 0 {
|
d.serf.eventClock.Witness(pp.EventLTime - 1)
|
}
|
if pp.QueryLTime > 0 {
|
d.serf.queryClock.Witness(pp.QueryLTime - 1)
|
}
|
|
// Process the left nodes first to avoid the LTimes from incrementing
|
// in the wrong order. Note that we don't have the actual Lamport time
|
// for the leave message, so we go one past the join time, since the
|
// leave must have been accepted after that to get onto the left members
|
// list. If we didn't do this then the message would not get processed.
|
leftMap := make(map[string]struct{}, len(pp.LeftMembers))
|
leave := messageLeave{}
|
for _, name := range pp.LeftMembers {
|
leftMap[name] = struct{}{}
|
leave.LTime = pp.StatusLTimes[name] + 1
|
leave.Node = name
|
d.serf.handleNodeLeaveIntent(&leave)
|
}
|
|
// Update any other LTimes
|
join := messageJoin{}
|
for name, statusLTime := range pp.StatusLTimes {
|
// Skip the left nodes
|
if _, ok := leftMap[name]; ok {
|
continue
|
}
|
|
// Create an artificial join message
|
join.LTime = statusLTime
|
join.Node = name
|
d.serf.handleNodeJoinIntent(&join)
|
}
|
|
// If we are doing a join, and eventJoinIgnore is set
|
// then we set the eventMinTime to the EventLTime. This
|
// prevents any of the incoming events from being processed
|
eventJoinIgnore := d.serf.eventJoinIgnore.Load().(bool)
|
if isJoin && eventJoinIgnore {
|
d.serf.eventLock.Lock()
|
if pp.EventLTime > d.serf.eventMinTime {
|
d.serf.eventMinTime = pp.EventLTime
|
}
|
d.serf.eventLock.Unlock()
|
}
|
|
// Process all the events
|
userEvent := messageUserEvent{}
|
for _, events := range pp.Events {
|
if events == nil {
|
continue
|
}
|
userEvent.LTime = events.LTime
|
for _, e := range events.Events {
|
userEvent.Name = e.Name
|
userEvent.Payload = e.Payload
|
d.serf.handleUserEvent(&userEvent)
|
}
|
}
|
}
|