package serf
|
|
import (
|
"bytes"
|
"net"
|
"time"
|
|
"github.com/hashicorp/go-msgpack/codec"
|
)
|
|
// messageType are the types of gossip messages Serf will send along
|
// memberlist.
|
type messageType uint8
|
|
const (
|
messageLeaveType messageType = iota
|
messageJoinType
|
messagePushPullType
|
messageUserEventType
|
messageQueryType
|
messageQueryResponseType
|
messageConflictResponseType
|
messageKeyRequestType
|
messageKeyResponseType
|
messageRelayType
|
)
|
|
const (
|
// Ack flag is used to force receiver to send an ack back
|
queryFlagAck uint32 = 1 << iota
|
|
// NoBroadcast is used to prevent re-broadcast of a query.
|
// this can be used to selectively send queries to individual members
|
queryFlagNoBroadcast
|
)
|
|
// filterType is used with a queryFilter to specify the type of
|
// filter we are sending
|
type filterType uint8
|
|
const (
|
filterNodeType filterType = iota
|
filterTagType
|
)
|
|
// messageJoin is the message broadcasted after we join to
|
// associated the node with a lamport clock
|
type messageJoin struct {
|
LTime LamportTime
|
Node string
|
}
|
|
// messageLeave is the message broadcasted to signal the intentional to
|
// leave.
|
type messageLeave struct {
|
LTime LamportTime
|
Node string
|
Prune bool
|
}
|
|
// messagePushPullType is used when doing a state exchange. This
|
// is a relatively large message, but is sent infrequently
|
type messagePushPull struct {
|
LTime LamportTime // Current node lamport time
|
StatusLTimes map[string]LamportTime // Maps the node to its status time
|
LeftMembers []string // List of left nodes
|
EventLTime LamportTime // Lamport time for event clock
|
Events []*userEvents // Recent events
|
QueryLTime LamportTime // Lamport time for query clock
|
}
|
|
// messageUserEvent is used for user-generated events
|
type messageUserEvent struct {
|
LTime LamportTime
|
Name string
|
Payload []byte
|
CC bool // "Can Coalesce". Zero value is compatible with Serf 0.1
|
}
|
|
// messageQuery is used for query events
|
type messageQuery struct {
|
LTime LamportTime // Event lamport time
|
ID uint32 // Query ID, randomly generated
|
Addr []byte // Source address, used for a direct reply
|
Port uint16 // Source port, used for a direct reply
|
SourceNode string // Source name, used for a direct reply
|
Filters [][]byte // Potential query filters
|
Flags uint32 // Used to provide various flags
|
RelayFactor uint8 // Used to set the number of duplicate relayed responses
|
Timeout time.Duration // Maximum time between delivery and response
|
Name string // Query name
|
Payload []byte // Query payload
|
}
|
|
// Ack checks if the ack flag is set
|
func (m *messageQuery) Ack() bool {
|
return (m.Flags & queryFlagAck) != 0
|
}
|
|
// NoBroadcast checks if the no broadcast flag is set
|
func (m *messageQuery) NoBroadcast() bool {
|
return (m.Flags & queryFlagNoBroadcast) != 0
|
}
|
|
// filterNode is used with the filterNodeType, and is a list
|
// of node names
|
type filterNode []string
|
|
// filterTag is used with the filterTagType and is a regular
|
// expression to apply to a tag
|
type filterTag struct {
|
Tag string
|
Expr string
|
}
|
|
// messageQueryResponse is used to respond to a query
|
type messageQueryResponse struct {
|
LTime LamportTime // Event lamport time
|
ID uint32 // Query ID
|
From string // Node name
|
Flags uint32 // Used to provide various flags
|
Payload []byte // Optional response payload
|
}
|
|
// Ack checks if the ack flag is set
|
func (m *messageQueryResponse) Ack() bool {
|
return (m.Flags & queryFlagAck) != 0
|
}
|
|
func decodeMessage(buf []byte, out interface{}) error {
|
var handle codec.MsgpackHandle
|
return codec.NewDecoder(bytes.NewReader(buf), &handle).Decode(out)
|
}
|
|
func encodeMessage(t messageType, msg interface{}) ([]byte, error) {
|
buf := bytes.NewBuffer(nil)
|
buf.WriteByte(uint8(t))
|
|
handle := codec.MsgpackHandle{}
|
encoder := codec.NewEncoder(buf, &handle)
|
err := encoder.Encode(msg)
|
return buf.Bytes(), err
|
}
|
|
// relayHeader is used to store the end destination of a relayed message
|
type relayHeader struct {
|
DestAddr net.UDPAddr
|
DestName string
|
}
|
|
// encodeRelayMessage wraps a message in the messageRelayType, adding the length and
|
// address of the end recipient to the front of the message
|
func encodeRelayMessage(
|
t messageType,
|
addr net.UDPAddr,
|
nodeName string,
|
msg interface{},
|
) ([]byte, error) {
|
buf := bytes.NewBuffer(nil)
|
handle := codec.MsgpackHandle{}
|
encoder := codec.NewEncoder(buf, &handle)
|
|
buf.WriteByte(uint8(messageRelayType))
|
|
err := encoder.Encode(relayHeader{
|
DestAddr: addr,
|
DestName: nodeName,
|
})
|
if err != nil {
|
return nil, err
|
}
|
|
buf.WriteByte(uint8(t))
|
err = encoder.Encode(msg)
|
return buf.Bytes(), err
|
}
|
|
func encodeFilter(f filterType, filt interface{}) ([]byte, error) {
|
buf := bytes.NewBuffer(nil)
|
buf.WriteByte(uint8(f))
|
|
handle := codec.MsgpackHandle{}
|
encoder := codec.NewEncoder(buf, &handle)
|
err := encoder.Encode(filt)
|
return buf.Bytes(), err
|
}
|