package serf import ( "fmt" "net" "sync" "time" "github.com/hashicorp/memberlist" ) // EventType are all the types of events that may occur and be sent // along the Serf channel. type EventType int const ( EventMemberJoin EventType = iota EventMemberLeave EventMemberFailed EventMemberUpdate EventMemberReap EventUser EventQuery ) func (t EventType) String() string { switch t { case EventMemberJoin: return "member-join" case EventMemberLeave: return "member-leave" case EventMemberFailed: return "member-failed" case EventMemberUpdate: return "member-update" case EventMemberReap: return "member-reap" case EventUser: return "user" case EventQuery: return "query" default: panic(fmt.Sprintf("unknown event type: %d", t)) } } // Event is a generic interface for exposing Serf events // Clients will usually need to use a type switches to get // to a more useful type type Event interface { EventType() EventType String() string } // MemberEvent is the struct used for member related events // Because Serf coalesces events, an event may contain multiple members. type MemberEvent struct { Type EventType Members []Member } func (m MemberEvent) EventType() EventType { return m.Type } func (m MemberEvent) String() string { switch m.Type { case EventMemberJoin: return "member-join" case EventMemberLeave: return "member-leave" case EventMemberFailed: return "member-failed" case EventMemberUpdate: return "member-update" case EventMemberReap: return "member-reap" default: panic(fmt.Sprintf("unknown event type: %d", m.Type)) } } // UserEvent is the struct used for events that are triggered // by the user and are not related to members type UserEvent struct { LTime LamportTime Name string Payload []byte Coalesce bool } func (u UserEvent) EventType() EventType { return EventUser } func (u UserEvent) String() string { return fmt.Sprintf("user-event: %s", u.Name) } // Query is the struct used by EventQuery type events type Query struct { LTime LamportTime Name string Payload []byte serf *Serf id uint32 // ID is not exported, since it may change addr []byte // Address to respond to port uint16 // Port to respond to sourceNode string // Node name to respond to deadline time.Time // Must respond by this deadline relayFactor uint8 // Number of duplicate responses to relay back to sender respLock sync.Mutex } func (q *Query) EventType() EventType { return EventQuery } func (q *Query) String() string { return fmt.Sprintf("query: %s", q.Name) } // Deadline returns the time by which a response must be sent func (q *Query) Deadline() time.Time { return q.deadline } func (q *Query) createResponse(buf []byte) messageQueryResponse { // Create response return messageQueryResponse{ LTime: q.LTime, ID: q.id, From: q.serf.config.NodeName, Payload: buf, } } // Check response size func (q *Query) checkResponseSize(resp []byte) error { if len(resp) > q.serf.config.QueryResponseSizeLimit { return fmt.Errorf("response exceeds limit of %d bytes", q.serf.config.QueryResponseSizeLimit) } return nil } func (q *Query) respondWithMessageAndResponse(raw []byte, resp messageQueryResponse) error { // Check the size limit if err := q.checkResponseSize(raw); err != nil { return err } q.respLock.Lock() defer q.respLock.Unlock() // Check if we've already responded if q.deadline.IsZero() { return fmt.Errorf("response already sent") } // Ensure we aren't past our response deadline if time.Now().After(q.deadline) { return fmt.Errorf("response is past the deadline") } // Send the response directly to the originator udpAddr := net.UDPAddr{IP: q.addr, Port: int(q.port)} addr := memberlist.Address{ Addr: udpAddr.String(), Name: q.sourceNode, } if err := q.serf.memberlist.SendToAddress(addr, raw); err != nil { return err } // Relay the response through up to relayFactor other nodes if err := q.serf.relayResponse(q.relayFactor, udpAddr, q.sourceNode, &resp); err != nil { return err } // Clear the deadline, responses sent q.deadline = time.Time{} return nil } // Respond is used to send a response to the user query func (q *Query) Respond(buf []byte) error { // Create response resp := q.createResponse(buf) // Encode response raw, err := encodeMessage(messageQueryResponseType, resp) if err != nil { return fmt.Errorf("failed to format response: %v", err) } if err := q.respondWithMessageAndResponse(raw, resp); err != nil { return fmt.Errorf("failed to respond to key query: %v", err) } return nil }