package serf import ( "errors" "fmt" "math" "math/rand" "net" "regexp" "sync" "time" "github.com/hashicorp/memberlist" ) // QueryParam is provided to Query() to configure the parameters of the // query. If not provided, sane defaults will be used. type QueryParam struct { // If provided, we restrict the nodes that should respond to those // with names in this list FilterNodes []string // FilterTags maps a tag name to a regular expression that is applied // to restrict the nodes that should respond FilterTags map[string]string // If true, we are requesting an delivery acknowledgement from // every node that meets the filter requirement. This means nodes // the receive the message but do not pass the filters, will not // send an ack. RequestAck bool // RelayFactor controls the number of duplicate responses to relay // back to the sender through other nodes for redundancy. RelayFactor uint8 // The timeout limits how long the query is left open. If not provided, // then a default timeout is used based on the configuration of Serf Timeout time.Duration } // DefaultQueryTimeout returns the default timeout value for a query // Computed as GossipInterval * QueryTimeoutMult * log(N+1) func (s *Serf) DefaultQueryTimeout() time.Duration { n := s.memberlist.NumMembers() timeout := s.config.MemberlistConfig.GossipInterval timeout *= time.Duration(s.config.QueryTimeoutMult) timeout *= time.Duration(math.Ceil(math.Log10(float64(n + 1)))) return timeout } // DefaultQueryParam is used to return the default query parameters func (s *Serf) DefaultQueryParams() *QueryParam { return &QueryParam{ FilterNodes: nil, FilterTags: nil, RequestAck: false, Timeout: s.DefaultQueryTimeout(), } } // encodeFilters is used to convert the filters into the wire format func (q *QueryParam) encodeFilters() ([][]byte, error) { var filters [][]byte // Add the node filter if len(q.FilterNodes) > 0 { if buf, err := encodeFilter(filterNodeType, q.FilterNodes); err != nil { return nil, err } else { filters = append(filters, buf) } } // Add the tag filters for tag, expr := range q.FilterTags { filt := filterTag{tag, expr} if buf, err := encodeFilter(filterTagType, &filt); err != nil { return nil, err } else { filters = append(filters, buf) } } return filters, nil } // QueryResponse is returned for each new Query. It is used to collect // Ack's as well as responses and to provide those back to a client. type QueryResponse struct { // ackCh is used to send the name of a node for which we've received an ack ackCh chan string // deadline is the query end time (start + query timeout) deadline time.Time // Query ID id uint32 // Stores the LTime of the query lTime LamportTime // respCh is used to send a response from a node respCh chan NodeResponse // acks/responses are used to track the nodes that have sent an ack/response acks map[string]struct{} responses map[string]struct{} closed bool closeLock sync.Mutex } // newQueryResponse is used to construct a new query response func newQueryResponse(n int, q *messageQuery) *QueryResponse { resp := &QueryResponse{ deadline: time.Now().Add(q.Timeout), id: q.ID, lTime: q.LTime, respCh: make(chan NodeResponse, n), responses: make(map[string]struct{}), } if q.Ack() { resp.ackCh = make(chan string, n) resp.acks = make(map[string]struct{}) } return resp } // Close is used to close the query, which will close the underlying // channels and prevent further deliveries func (r *QueryResponse) Close() { r.closeLock.Lock() defer r.closeLock.Unlock() if r.closed { return } r.closed = true if r.ackCh != nil { close(r.ackCh) } if r.respCh != nil { close(r.respCh) } } // Deadline returns the ending deadline of the query func (r *QueryResponse) Deadline() time.Time { return r.deadline } // Finished returns if the query is finished running func (r *QueryResponse) Finished() bool { r.closeLock.Lock() defer r.closeLock.Unlock() return r.closed || time.Now().After(r.deadline) } // AckCh returns a channel that can be used to listen for acks // Channel will be closed when the query is finished. This is nil, // if the query did not specify RequestAck. func (r *QueryResponse) AckCh() <-chan string { return r.ackCh } // ResponseCh returns a channel that can be used to listen for responses. // Channel will be closed when the query is finished. func (r *QueryResponse) ResponseCh() <-chan NodeResponse { return r.respCh } // sendResponse sends a response on the response channel ensuring the channel is not closed. func (r *QueryResponse) sendResponse(nr NodeResponse) error { r.closeLock.Lock() defer r.closeLock.Unlock() if r.closed { return nil } select { case r.respCh <- nr: r.responses[nr.From] = struct{}{} default: return errors.New("serf: Failed to deliver query response, dropping") } return nil } // NodeResponse is used to represent a single response from a node type NodeResponse struct { From string Payload []byte } // shouldProcessQuery checks if a query should be proceeded given // a set of filers. func (s *Serf) shouldProcessQuery(filters [][]byte) bool { for _, filter := range filters { switch filterType(filter[0]) { case filterNodeType: // Decode the filter var nodes filterNode if err := decodeMessage(filter[1:], &nodes); err != nil { s.logger.Printf("[WARN] serf: failed to decode filterNodeType: %v", err) return false } // Check if we are being targeted found := false for _, n := range nodes { if n == s.config.NodeName { found = true break } } if !found { return false } case filterTagType: // Decode the filter var filt filterTag if err := decodeMessage(filter[1:], &filt); err != nil { s.logger.Printf("[WARN] serf: failed to decode filterTagType: %v", err) return false } // Check if we match this regex tags := s.config.Tags matched, err := regexp.MatchString(filt.Expr, tags[filt.Tag]) if err != nil { s.logger.Printf("[WARN] serf: failed to compile filter regex (%s): %v", filt.Expr, err) return false } if !matched { return false } default: s.logger.Printf("[WARN] serf: query has unrecognized filter type: %d", filter[0]) return false } } return true } // relayResponse will relay a copy of the given response to up to relayFactor // other members. func (s *Serf) relayResponse( relayFactor uint8, addr net.UDPAddr, nodeName string, resp *messageQueryResponse, ) error { if relayFactor == 0 { return nil } // Needs to be worth it; we need to have at least relayFactor *other* // nodes. If you have a tiny cluster then the relayFactor shouldn't // be needed. members := s.Members() if len(members) < int(relayFactor)+1 { return nil } // Prep the relay message, which is a wrapped version of the original. raw, err := encodeRelayMessage(messageQueryResponseType, addr, nodeName, &resp) if err != nil { return fmt.Errorf("failed to format relayed response: %v", err) } if len(raw) > s.config.QueryResponseSizeLimit { return fmt.Errorf("relayed response exceeds limit of %d bytes", s.config.QueryResponseSizeLimit) } // Relay to a random set of peers. localName := s.LocalMember().Name relayMembers := kRandomMembers(int(relayFactor), members, func(m Member) bool { return m.Status != StatusAlive || m.ProtocolMax < 5 || m.Name == localName }) for _, m := range relayMembers { udpAddr := net.UDPAddr{IP: m.Addr, Port: int(m.Port)} relayAddr := memberlist.Address{ Addr: udpAddr.String(), Name: m.Name, } if err := s.memberlist.SendToAddress(relayAddr, raw); err != nil { return fmt.Errorf("failed to send relay response: %v", err) } } return nil } // kRandomMembers selects up to k members from a given list, optionally // filtering by the given filterFunc func kRandomMembers(k int, members []Member, filterFunc func(Member) bool) []Member { n := len(members) kMembers := make([]Member, 0, k) OUTER: // Probe up to 3*n times, with large n this is not necessary // since k << n, but with small n we want search to be // exhaustive for i := 0; i < 3*n && len(kMembers) < k; i++ { // Get random member idx := rand.Intn(n) member := members[idx] // Give the filter a shot at it. if filterFunc != nil && filterFunc(member) { continue OUTER } // Check if we have this member already for j := 0; j < len(kMembers); j++ { if member.Name == kMembers[j].Name { continue OUTER } } // Append the member kMembers = append(kMembers, member) } return kMembers }