package serf
|
|
import (
|
"encoding/base64"
|
"fmt"
|
"sync"
|
)
|
|
// KeyManager encapsulates all functionality within Serf for handling
|
// encryption keyring changes across a cluster.
|
type KeyManager struct {
|
serf *Serf
|
|
// Lock to protect read and write operations
|
l sync.RWMutex
|
}
|
|
// keyRequest is used to contain input parameters which get broadcasted to all
|
// nodes as part of a key query operation.
|
type keyRequest struct {
|
Key []byte
|
}
|
|
// KeyResponse is used to relay a query for a list of all keys in use.
|
type KeyResponse struct {
|
Messages map[string]string // Map of node name to response message
|
NumNodes int // Total nodes memberlist knows of
|
NumResp int // Total responses received
|
NumErr int // Total errors from request
|
|
// Keys is a mapping of the base64-encoded value of the key bytes to the
|
// number of nodes that have the key installed.
|
Keys map[string]int
|
}
|
|
// KeyRequestOptions is used to contain optional parameters for a keyring operation
|
type KeyRequestOptions struct {
|
// RelayFactor is the number of duplicate query responses to send by relaying through
|
// other nodes, for redundancy
|
RelayFactor uint8
|
}
|
|
// streamKeyResp takes care of reading responses from a channel and composing
|
// them into a KeyResponse. It will update a KeyResponse *in place* and
|
// therefore has nothing to return.
|
func (k *KeyManager) streamKeyResp(resp *KeyResponse, ch <-chan NodeResponse) {
|
for r := range ch {
|
var nodeResponse nodeKeyResponse
|
|
resp.NumResp++
|
|
// Decode the response
|
if len(r.Payload) < 1 || messageType(r.Payload[0]) != messageKeyResponseType {
|
resp.Messages[r.From] = fmt.Sprintf(
|
"Invalid key query response type: %v", r.Payload)
|
resp.NumErr++
|
goto NEXT
|
}
|
if err := decodeMessage(r.Payload[1:], &nodeResponse); err != nil {
|
resp.Messages[r.From] = fmt.Sprintf(
|
"Failed to decode key query response: %v", r.Payload)
|
resp.NumErr++
|
goto NEXT
|
}
|
|
if !nodeResponse.Result {
|
resp.Messages[r.From] = nodeResponse.Message
|
resp.NumErr++
|
}
|
|
if nodeResponse.Result && len(nodeResponse.Message) > 0 {
|
resp.Messages[r.From] = nodeResponse.Message
|
k.serf.logger.Println("[WARN] serf:", nodeResponse.Message)
|
}
|
|
// Currently only used for key list queries, this adds keys to a counter
|
// and increments them for each node response which contains them.
|
for _, key := range nodeResponse.Keys {
|
if _, ok := resp.Keys[key]; !ok {
|
resp.Keys[key] = 1
|
} else {
|
resp.Keys[key]++
|
}
|
}
|
|
NEXT:
|
// Return early if all nodes have responded. This allows us to avoid
|
// waiting for the full timeout when there is nothing left to do.
|
if resp.NumResp == resp.NumNodes {
|
return
|
}
|
}
|
}
|
|
// handleKeyRequest performs query broadcasting to all members for any type of
|
// key operation and manages gathering responses and packing them up into a
|
// KeyResponse for uniform response handling.
|
func (k *KeyManager) handleKeyRequest(key, query string, opts *KeyRequestOptions) (*KeyResponse, error) {
|
resp := &KeyResponse{
|
Messages: make(map[string]string),
|
Keys: make(map[string]int),
|
}
|
qName := internalQueryName(query)
|
|
// Decode the new key into raw bytes
|
rawKey, err := base64.StdEncoding.DecodeString(key)
|
if err != nil {
|
return resp, err
|
}
|
|
// Encode the query request
|
req, err := encodeMessage(messageKeyRequestType, keyRequest{Key: rawKey})
|
if err != nil {
|
return resp, err
|
}
|
|
qParam := k.serf.DefaultQueryParams()
|
if opts != nil {
|
qParam.RelayFactor = opts.RelayFactor
|
}
|
queryResp, err := k.serf.Query(qName, req, qParam)
|
if err != nil {
|
return resp, err
|
}
|
|
// Handle the response stream and populate the KeyResponse
|
resp.NumNodes = k.serf.memberlist.NumMembers()
|
k.streamKeyResp(resp, queryResp.respCh)
|
|
// Check the response for any reported failure conditions
|
if resp.NumErr != 0 {
|
return resp, fmt.Errorf("%d/%d nodes reported failure", resp.NumErr, resp.NumNodes)
|
}
|
if resp.NumResp != resp.NumNodes {
|
return resp, fmt.Errorf("%d/%d nodes reported success", resp.NumResp, resp.NumNodes)
|
}
|
|
return resp, nil
|
}
|
|
// InstallKey handles broadcasting a query to all members and gathering
|
// responses from each of them, returning a list of messages from each node
|
// and any applicable error conditions.
|
func (k *KeyManager) InstallKey(key string) (*KeyResponse, error) {
|
return k.InstallKeyWithOptions(key, nil)
|
}
|
|
func (k *KeyManager) InstallKeyWithOptions(key string, opts *KeyRequestOptions) (*KeyResponse, error) {
|
k.l.Lock()
|
defer k.l.Unlock()
|
|
return k.handleKeyRequest(key, installKeyQuery, opts)
|
}
|
|
// UseKey handles broadcasting a primary key change to all members in the
|
// cluster, and gathering any response messages. If successful, there should
|
// be an empty KeyResponse returned.
|
func (k *KeyManager) UseKey(key string) (*KeyResponse, error) {
|
return k.UseKeyWithOptions(key, nil)
|
}
|
|
func (k *KeyManager) UseKeyWithOptions(key string, opts *KeyRequestOptions) (*KeyResponse, error) {
|
k.l.Lock()
|
defer k.l.Unlock()
|
|
return k.handleKeyRequest(key, useKeyQuery, opts)
|
}
|
|
// RemoveKey handles broadcasting a key to the cluster for removal. Each member
|
// will receive this event, and if they have the key in their keyring, remove
|
// it. If any errors are encountered, RemoveKey will collect and relay them.
|
func (k *KeyManager) RemoveKey(key string) (*KeyResponse, error) {
|
return k.RemoveKeyWithOptions(key, nil)
|
}
|
|
func (k *KeyManager) RemoveKeyWithOptions(key string, opts *KeyRequestOptions) (*KeyResponse, error) {
|
k.l.Lock()
|
defer k.l.Unlock()
|
|
return k.handleKeyRequest(key, removeKeyQuery, opts)
|
}
|
|
// ListKeys is used to collect installed keys from members in a Serf cluster
|
// and return an aggregated list of all installed keys. This is useful to
|
// operators to ensure that there are no lingering keys installed on any agents.
|
// Since having multiple keys installed can cause performance penalties in some
|
// cases, it's important to verify this information and remove unneeded keys.
|
func (k *KeyManager) ListKeys() (*KeyResponse, error) {
|
return k.ListKeysWithOptions(nil)
|
}
|
|
func (k *KeyManager) ListKeysWithOptions(opts *KeyRequestOptions) (*KeyResponse, error) {
|
k.l.RLock()
|
defer k.l.RUnlock()
|
|
return k.handleKeyRequest("", listKeysQuery, opts)
|
}
|