package serf import ( "encoding/base64" "fmt" "log" "strings" ) const ( // This is the prefix we use for queries that are internal to Serf. // They are handled internally, and not forwarded to a client. InternalQueryPrefix = "_serf_" // pingQuery is run to check for reachability pingQuery = "ping" // conflictQuery is run to resolve a name conflict conflictQuery = "conflict" // installKeyQuery is used to install a new key installKeyQuery = "install-key" // useKeyQuery is used to change the primary encryption key useKeyQuery = "use-key" // removeKeyQuery is used to remove a key from the keyring removeKeyQuery = "remove-key" // listKeysQuery is used to list all known keys in the cluster listKeysQuery = "list-keys" // minEncodedKeyLength is used to compute the max number of keys in a list key // response. eg 1024/25 = 40. a message with max size of 1024 bytes cannot // contain more than 40 keys. There is a test // (TestSerfQueries_estimateMaxKeysInListKeyResponse) which does the // computation and in case of changes, the value can be adjusted. minEncodedKeyLength = 25 ) // internalQueryName is used to generate a query name for an internal query func internalQueryName(name string) string { return InternalQueryPrefix + name } // serfQueries is used to listen for queries that start with // _serf and respond to them as appropriate. type serfQueries struct { inCh chan Event logger *log.Logger outCh chan<- Event serf *Serf shutdownCh <-chan struct{} } // nodeKeyResponse is used to store the result from an individual node while // replying to key modification queries type nodeKeyResponse struct { // Result indicates true/false if there were errors or not Result bool // Message contains error messages or other information Message string // Keys is used in listing queries to relay a list of installed keys Keys []string } // newSerfQueries is used to create a new serfQueries. We return an event // channel that is ingested and forwarded to an outCh. Any Queries that // have the InternalQueryPrefix are handled instead of forwarded. func newSerfQueries(serf *Serf, logger *log.Logger, outCh chan<- Event, shutdownCh <-chan struct{}) (chan<- Event, error) { inCh := make(chan Event, 1024) q := &serfQueries{ inCh: inCh, logger: logger, outCh: outCh, serf: serf, shutdownCh: shutdownCh, } go q.stream() return inCh, nil } // stream is a long running routine to ingest the event stream func (s *serfQueries) stream() { for { select { case e := <-s.inCh: // Check if this is a query we should process if q, ok := e.(*Query); ok && strings.HasPrefix(q.Name, InternalQueryPrefix) { go s.handleQuery(q) } else if s.outCh != nil { s.outCh <- e } case <-s.shutdownCh: return } } } // handleQuery is invoked when we get an internal query func (s *serfQueries) handleQuery(q *Query) { // Get the queryName after the initial prefix queryName := q.Name[len(InternalQueryPrefix):] switch queryName { case pingQuery: // Nothing to do, we will ack the query case conflictQuery: s.handleConflict(q) case installKeyQuery: s.handleInstallKey(q) case useKeyQuery: s.handleUseKey(q) case removeKeyQuery: s.handleRemoveKey(q) case listKeysQuery: s.handleListKeys(q) default: s.logger.Printf("[WARN] serf: Unhandled internal query '%s'", queryName) } } // handleConflict is invoked when we get a query that is attempting to // disambiguate a name conflict. They payload is a node name, and the response // should the address we believe that node is at, if any. func (s *serfQueries) handleConflict(q *Query) { // The target node name is the payload node := string(q.Payload) // Do not respond to the query if it is about us if node == s.serf.config.NodeName { return } s.logger.Printf("[DEBUG] serf: Got conflict resolution query for '%s'", node) // Look for the member info var out *Member s.serf.memberLock.Lock() if member, ok := s.serf.members[node]; ok { out = &member.Member } s.serf.memberLock.Unlock() // Encode the response buf, err := encodeMessage(messageConflictResponseType, out) if err != nil { s.logger.Printf("[ERR] serf: Failed to encode conflict query response: %v", err) return } // Send our answer if err := q.Respond(buf); err != nil { s.logger.Printf("[ERR] serf: Failed to respond to conflict query: %v", err) } } func (s *serfQueries) keyListResponseWithCorrectSize(q *Query, resp *nodeKeyResponse) ([]byte, messageQueryResponse, error) { maxListKeys := q.serf.config.QueryResponseSizeLimit / minEncodedKeyLength actual := len(resp.Keys) for i := maxListKeys; i >= 0; i-- { buf, err := encodeMessage(messageKeyResponseType, resp) if err != nil { return nil, messageQueryResponse{}, err } // Create response qresp := q.createResponse(buf) // Encode response raw, err := encodeMessage(messageQueryResponseType, qresp) if err != nil { return nil, messageQueryResponse{}, err } // Check the size limit if err = q.checkResponseSize(raw); err != nil { resp.Keys = resp.Keys[0:i] resp.Message = fmt.Sprintf("truncated key list response, showing first %d of %d keys", i, actual) continue } if actual > i { s.logger.Printf("[WARN] serf: %s", resp.Message) } return raw, qresp, nil } return nil, messageQueryResponse{}, fmt.Errorf("Failed to truncate response so that it fits into message") } // sendKeyResponse handles responding to key-related queries. func (s *serfQueries) sendKeyResponse(q *Query, resp *nodeKeyResponse) { switch q.Name { case internalQueryName(listKeysQuery): raw, qresp, err := s.keyListResponseWithCorrectSize(q, resp) if err != nil { s.logger.Printf("[ERR] serf: %v", err) return } if err := q.respondWithMessageAndResponse(raw, qresp); err != nil { s.logger.Printf("[ERR] serf: Failed to respond to key query: %v", err) return } default: buf, err := encodeMessage(messageKeyResponseType, resp) if err != nil { s.logger.Printf("[ERR] serf: Failed to encode key response: %v", err) return } if err := q.Respond(buf); err != nil { s.logger.Printf("[ERR] serf: Failed to respond to key query: %v", err) return } } } // handleInstallKey is invoked whenever a new encryption key is received from // another member in the cluster, and handles the process of installing it onto // the memberlist keyring. This type of query may fail if the provided key does // not fit the constraints that memberlist enforces. If the query fails, the // response will contain the error message so that it may be relayed. func (s *serfQueries) handleInstallKey(q *Query) { response := nodeKeyResponse{Result: false} keyring := s.serf.config.MemberlistConfig.Keyring req := keyRequest{} err := decodeMessage(q.Payload[1:], &req) if err != nil { s.logger.Printf("[ERR] serf: Failed to decode key request: %v", err) goto SEND } if !s.serf.EncryptionEnabled() { response.Message = "No keyring to modify (encryption not enabled)" s.logger.Printf("[ERR] serf: No keyring to modify (encryption not enabled)") goto SEND } s.logger.Printf("[INFO] serf: Received install-key query") if err := keyring.AddKey(req.Key); err != nil { response.Message = err.Error() s.logger.Printf("[ERR] serf: Failed to install key: %s", err) goto SEND } if s.serf.config.KeyringFile != "" { if err := s.serf.writeKeyringFile(); err != nil { response.Message = err.Error() s.logger.Printf("[ERR] serf: Failed to write keyring file: %s", err) goto SEND } } response.Result = true SEND: s.sendKeyResponse(q, &response) } // handleUseKey is invoked whenever a query is received to mark a different key // in the internal keyring as the primary key. This type of query may fail due // to operator error (requested key not in ring), and thus sends error messages // back in the response. func (s *serfQueries) handleUseKey(q *Query) { response := nodeKeyResponse{Result: false} keyring := s.serf.config.MemberlistConfig.Keyring req := keyRequest{} err := decodeMessage(q.Payload[1:], &req) if err != nil { s.logger.Printf("[ERR] serf: Failed to decode key request: %v", err) goto SEND } if !s.serf.EncryptionEnabled() { response.Message = "No keyring to modify (encryption not enabled)" s.logger.Printf("[ERR] serf: No keyring to modify (encryption not enabled)") goto SEND } s.logger.Printf("[INFO] serf: Received use-key query") if err := keyring.UseKey(req.Key); err != nil { response.Message = err.Error() s.logger.Printf("[ERR] serf: Failed to change primary key: %s", err) goto SEND } if err := s.serf.writeKeyringFile(); err != nil { response.Message = err.Error() s.logger.Printf("[ERR] serf: Failed to write keyring file: %s", err) goto SEND } response.Result = true SEND: s.sendKeyResponse(q, &response) } // handleRemoveKey is invoked when a query is received to remove a particular // key from the keyring. This type of query can fail if the key requested for // deletion is currently the primary key in the keyring, so therefore it will // reply to the query with any relevant errors from the operation. func (s *serfQueries) handleRemoveKey(q *Query) { response := nodeKeyResponse{Result: false} keyring := s.serf.config.MemberlistConfig.Keyring req := keyRequest{} err := decodeMessage(q.Payload[1:], &req) if err != nil { s.logger.Printf("[ERR] serf: Failed to decode key request: %v", err) goto SEND } if !s.serf.EncryptionEnabled() { response.Message = "No keyring to modify (encryption not enabled)" s.logger.Printf("[ERR] serf: No keyring to modify (encryption not enabled)") goto SEND } s.logger.Printf("[INFO] serf: Received remove-key query") if err := keyring.RemoveKey(req.Key); err != nil { response.Message = err.Error() s.logger.Printf("[ERR] serf: Failed to remove key: %s", err) goto SEND } if err := s.serf.writeKeyringFile(); err != nil { response.Message = err.Error() s.logger.Printf("[ERR] serf: Failed to write keyring file: %s", err) goto SEND } response.Result = true SEND: s.sendKeyResponse(q, &response) } // handleListKeys is invoked when a query is received to return a list of all // installed keys the Serf instance knows of. For performance, the keys are // encoded to base64 on each of the members to remove this burden from the // node asking for the results. func (s *serfQueries) handleListKeys(q *Query) { response := nodeKeyResponse{Result: false} keyring := s.serf.config.MemberlistConfig.Keyring if !s.serf.EncryptionEnabled() { response.Message = "Keyring is empty (encryption not enabled)" s.logger.Printf("[ERR] serf: Keyring is empty (encryption not enabled)") goto SEND } s.logger.Printf("[INFO] serf: Received list-keys query") for _, keyBytes := range keyring.GetKeys() { // Encode the keys before sending the response. This should help take // some the burden of doing this off of the asking member. key := base64.StdEncoding.EncodeToString(keyBytes) response.Keys = append(response.Keys, key) } response.Result = true SEND: s.sendKeyResponse(q, &response) }