package client
|
|
import (
|
"bufio"
|
"fmt"
|
"log"
|
"net"
|
"sync"
|
"sync/atomic"
|
"time"
|
|
"github.com/hashicorp/go-msgpack/codec"
|
"github.com/hashicorp/logutils"
|
"basic.com/valib/serf.git/coordinate"
|
)
|
|
const (
|
// This is the default IO timeout for the client
|
DefaultTimeout = 10 * time.Second
|
)
|
|
var (
|
clientClosed = fmt.Errorf("client closed")
|
)
|
|
type seqCallback struct {
|
handler func(*responseHeader)
|
}
|
|
func (sc *seqCallback) Handle(resp *responseHeader) {
|
sc.handler(resp)
|
}
|
func (sc *seqCallback) Cleanup() {}
|
|
// seqHandler interface is used to handle responses
|
type seqHandler interface {
|
Handle(*responseHeader)
|
Cleanup()
|
}
|
|
// Config is provided to ClientFromConfig to make
|
// a new RPCClient from the given configuration
|
type Config struct {
|
// Addr must be the RPC address to contact
|
Addr string
|
|
// If provided, the client will perform key based auth
|
AuthKey string
|
|
// If provided, overrides the DefaultTimeout used for
|
// IO deadlines
|
Timeout time.Duration
|
}
|
|
// RPCClient is used to make requests to the Agent using an RPC mechanism.
|
// Additionally, the client manages event streams and monitors, enabling a client
|
// to easily receive event notifications instead of using the fork/exec mechanism.
|
type RPCClient struct {
|
seq uint64
|
|
timeout time.Duration
|
conn *net.TCPConn
|
reader *bufio.Reader
|
writer *bufio.Writer
|
dec *codec.Decoder
|
enc *codec.Encoder
|
writeLock sync.Mutex
|
|
dispatch map[uint64]seqHandler
|
dispatchLock sync.Mutex
|
|
shutdown bool
|
shutdownCh chan struct{}
|
shutdownLock sync.Mutex
|
}
|
|
// send is used to send an object using the MsgPack encoding. send
|
// is serialized to prevent write overlaps, while properly buffering.
|
func (c *RPCClient) send(header *requestHeader, obj interface{}) error {
|
c.writeLock.Lock()
|
defer c.writeLock.Unlock()
|
|
if c.shutdown {
|
return clientClosed
|
}
|
|
// Setup an IO deadline, this way we won't wait indefinitely
|
// if the client has hung.
|
if err := c.conn.SetWriteDeadline(time.Now().Add(c.timeout)); err != nil {
|
return err
|
}
|
|
if err := c.enc.Encode(header); err != nil {
|
return err
|
}
|
|
if obj != nil {
|
if err := c.enc.Encode(obj); err != nil {
|
return err
|
}
|
}
|
|
if err := c.writer.Flush(); err != nil {
|
return err
|
}
|
|
return nil
|
}
|
|
// NewRPCClient is used to create a new RPC client given the
|
// RPC address of the Serf agent. This will return a client,
|
// or an error if the connection could not be established.
|
// This will use the DefaultTimeout for the client.
|
func NewRPCClient(addr string) (*RPCClient, error) {
|
conf := Config{Addr: addr}
|
return ClientFromConfig(&conf)
|
}
|
|
// ClientFromConfig is used to create a new RPC client given the
|
// configuration object. This will return a client, or an error if
|
// the connection could not be established.
|
func ClientFromConfig(c *Config) (*RPCClient, error) {
|
// Setup the defaults
|
if c.Timeout == 0 {
|
c.Timeout = DefaultTimeout
|
}
|
|
// Try to dial to serf
|
conn, err := net.DialTimeout("tcp", c.Addr, c.Timeout)
|
if err != nil {
|
return nil, err
|
}
|
|
// Create the client
|
client := &RPCClient{
|
seq: 0,
|
timeout: c.Timeout,
|
conn: conn.(*net.TCPConn),
|
reader: bufio.NewReader(conn),
|
writer: bufio.NewWriter(conn),
|
dispatch: make(map[uint64]seqHandler),
|
shutdownCh: make(chan struct{}),
|
}
|
client.dec = codec.NewDecoder(client.reader,
|
&codec.MsgpackHandle{RawToString: true, WriteExt: true})
|
client.enc = codec.NewEncoder(client.writer,
|
&codec.MsgpackHandle{RawToString: true, WriteExt: true})
|
go client.listen()
|
|
// Do the initial handshake
|
if err := client.handshake(); err != nil {
|
client.Close()
|
return nil, err
|
}
|
|
// Do the initial authentication if needed
|
if c.AuthKey != "" {
|
if err := client.auth(c.AuthKey); err != nil {
|
client.Close()
|
return nil, err
|
}
|
}
|
|
return client, err
|
}
|
|
// StreamHandle is an opaque handle passed to stop to stop streaming
|
type StreamHandle uint64
|
|
func (c *RPCClient) IsClosed() bool {
|
return c.shutdown
|
}
|
|
// Close is used to free any resources associated with the client
|
func (c *RPCClient) Close() error {
|
c.shutdownLock.Lock()
|
defer c.shutdownLock.Unlock()
|
|
if !c.shutdown {
|
c.shutdown = true
|
close(c.shutdownCh)
|
c.deregisterAll()
|
return c.conn.Close()
|
}
|
return nil
|
}
|
|
// ForceLeave is used to ask the agent to issue a leave command for
|
// a given node
|
func (c *RPCClient) ForceLeave(node string) error {
|
header := requestHeader{
|
Command: forceLeaveCommand,
|
Seq: c.getSeq(),
|
}
|
req := forceLeaveRequest{
|
Node: node,
|
Prune: false,
|
}
|
return c.genericRPC(&header, &req, nil)
|
}
|
|
//ForceLeavePrune uses ForceLeave but is used to reap the
|
//node entirely
|
func (c *RPCClient) ForceLeavePrune(node string) error {
|
header := requestHeader{
|
Command: forceLeaveCommand,
|
Seq: c.getSeq(),
|
}
|
req := forceLeaveRequest{
|
Node: node,
|
Prune: true,
|
}
|
return c.genericRPC(&header, &req, nil)
|
}
|
|
// Join is used to instruct the agent to attempt a join
|
func (c *RPCClient) Join(addrs []string, replay bool) (int, error) {
|
header := requestHeader{
|
Command: joinCommand,
|
Seq: c.getSeq(),
|
}
|
req := joinRequest{
|
Existing: addrs,
|
Replay: replay,
|
}
|
var resp joinResponse
|
|
err := c.genericRPC(&header, &req, &resp)
|
return int(resp.Num), err
|
}
|
|
// Members is used to fetch a list of known members
|
func (c *RPCClient) Members() ([]Member, error) {
|
header := requestHeader{
|
Command: membersCommand,
|
Seq: c.getSeq(),
|
}
|
var resp membersResponse
|
|
err := c.genericRPC(&header, nil, &resp)
|
return resp.Members, err
|
}
|
|
// MembersFiltered returns a subset of members
|
func (c *RPCClient) MembersFiltered(tags map[string]string, status string,
|
name string) ([]Member, error) {
|
header := requestHeader{
|
Command: membersFilteredCommand,
|
Seq: c.getSeq(),
|
}
|
req := membersFilteredRequest{
|
Tags: tags,
|
Status: status,
|
Name: name,
|
}
|
var resp membersResponse
|
|
err := c.genericRPC(&header, &req, &resp)
|
return resp.Members, err
|
}
|
|
// UserEvent is used to trigger sending an event
|
func (c *RPCClient) UserEvent(name string, payload []byte, coalesce bool) error {
|
header := requestHeader{
|
Command: eventCommand,
|
Seq: c.getSeq(),
|
}
|
req := eventRequest{
|
Name: name,
|
Payload: payload,
|
Coalesce: coalesce,
|
}
|
return c.genericRPC(&header, &req, nil)
|
}
|
|
// Leave is used to trigger a graceful leave and shutdown of the agent
|
func (c *RPCClient) Leave() error {
|
header := requestHeader{
|
Command: leaveCommand,
|
Seq: c.getSeq(),
|
}
|
return c.genericRPC(&header, nil, nil)
|
}
|
|
// UpdateTags will modify the tags on a running serf agent
|
func (c *RPCClient) UpdateTags(tags map[string]string, delTags []string) error {
|
header := requestHeader{
|
Command: tagsCommand,
|
Seq: c.getSeq(),
|
}
|
req := tagsRequest{
|
Tags: tags,
|
DeleteTags: delTags,
|
}
|
return c.genericRPC(&header, &req, nil)
|
}
|
|
// Respond allows a client to respond to a query event. The ID is the
|
// ID of the Query to respond to, and the given payload is the response.
|
func (c *RPCClient) Respond(id uint64, buf []byte) error {
|
header := requestHeader{
|
Command: respondCommand,
|
Seq: c.getSeq(),
|
}
|
req := respondRequest{
|
ID: id,
|
Payload: buf,
|
}
|
return c.genericRPC(&header, &req, nil)
|
}
|
|
// IntallKey installs a new encryption key onto the keyring
|
func (c *RPCClient) InstallKey(key string) (map[string]string, error) {
|
header := requestHeader{
|
Command: installKeyCommand,
|
Seq: c.getSeq(),
|
}
|
req := keyRequest{
|
Key: key,
|
}
|
|
resp := keyResponse{}
|
err := c.genericRPC(&header, &req, &resp)
|
|
return resp.Messages, err
|
}
|
|
// UseKey changes the primary encryption key on the keyring
|
func (c *RPCClient) UseKey(key string) (map[string]string, error) {
|
header := requestHeader{
|
Command: useKeyCommand,
|
Seq: c.getSeq(),
|
}
|
req := keyRequest{
|
Key: key,
|
}
|
|
resp := keyResponse{}
|
err := c.genericRPC(&header, &req, &resp)
|
|
return resp.Messages, err
|
}
|
|
// RemoveKey changes the primary encryption key on the keyring
|
func (c *RPCClient) RemoveKey(key string) (map[string]string, error) {
|
header := requestHeader{
|
Command: removeKeyCommand,
|
Seq: c.getSeq(),
|
}
|
req := keyRequest{
|
Key: key,
|
}
|
|
resp := keyResponse{}
|
err := c.genericRPC(&header, &req, &resp)
|
|
return resp.Messages, err
|
}
|
|
// ListKeys returns all of the active keys on each member of the cluster
|
func (c *RPCClient) ListKeys() (map[string]int, int, map[string]string, error) {
|
header := requestHeader{
|
Command: listKeysCommand,
|
Seq: c.getSeq(),
|
}
|
|
resp := keyResponse{}
|
err := c.genericRPC(&header, nil, &resp)
|
|
return resp.Keys, resp.NumNodes, resp.Messages, err
|
}
|
|
// Stats is used to get debugging state information
|
func (c *RPCClient) Stats() (map[string]map[string]string, error) {
|
header := requestHeader{
|
Command: statsCommand,
|
Seq: c.getSeq(),
|
}
|
var resp map[string]map[string]string
|
|
err := c.genericRPC(&header, nil, &resp)
|
return resp, err
|
}
|
|
// GetCoordinate is used to retrieve the cached coordinate of a node.
|
func (c *RPCClient) GetCoordinate(node string) (*coordinate.Coordinate, error) {
|
header := requestHeader{
|
Command: getCoordinateCommand,
|
Seq: c.getSeq(),
|
}
|
req := coordinateRequest{
|
Node: node,
|
}
|
var resp coordinateResponse
|
|
if err := c.genericRPC(&header, &req, &resp); err != nil {
|
return nil, err
|
}
|
if resp.Ok {
|
return &resp.Coord, nil
|
}
|
return nil, nil
|
}
|
|
type monitorHandler struct {
|
client *RPCClient
|
closed bool
|
init bool
|
initCh chan<- error
|
logCh chan<- string
|
seq uint64
|
}
|
|
func (mh *monitorHandler) Handle(resp *responseHeader) {
|
// Initialize on the first response
|
if !mh.init {
|
mh.init = true
|
mh.initCh <- strToError(resp.Error)
|
return
|
}
|
|
// Decode logs for all other responses
|
var rec logRecord
|
if err := mh.client.dec.Decode(&rec); err != nil {
|
log.Printf("[ERR] Failed to decode log: %v", err)
|
mh.client.deregisterHandler(mh.seq)
|
return
|
}
|
select {
|
case mh.logCh <- rec.Log:
|
default:
|
log.Printf("[ERR] Dropping log! Monitor channel full")
|
}
|
}
|
|
func (mh *monitorHandler) Cleanup() {
|
if !mh.closed {
|
if !mh.init {
|
mh.init = true
|
mh.initCh <- fmt.Errorf("Stream closed")
|
}
|
if mh.logCh != nil {
|
close(mh.logCh)
|
}
|
mh.closed = true
|
}
|
}
|
|
// Monitor is used to subscribe to the logs of the agent
|
func (c *RPCClient) Monitor(level logutils.LogLevel, ch chan<- string) (StreamHandle, error) {
|
// Setup the request
|
seq := c.getSeq()
|
header := requestHeader{
|
Command: monitorCommand,
|
Seq: seq,
|
}
|
req := monitorRequest{
|
LogLevel: string(level),
|
}
|
|
// Create a monitor handler
|
initCh := make(chan error, 1)
|
handler := &monitorHandler{
|
client: c,
|
initCh: initCh,
|
logCh: ch,
|
seq: seq,
|
}
|
c.handleSeq(seq, handler)
|
|
// Send the request
|
if err := c.send(&header, &req); err != nil {
|
c.deregisterHandler(seq)
|
return 0, err
|
}
|
|
// Wait for a response
|
select {
|
case err := <-initCh:
|
return StreamHandle(seq), err
|
case <-c.shutdownCh:
|
c.deregisterHandler(seq)
|
return 0, clientClosed
|
}
|
}
|
|
type streamHandler struct {
|
client *RPCClient
|
closed bool
|
init bool
|
initCh chan<- error
|
eventCh chan<- map[string]interface{}
|
seq uint64
|
}
|
|
func (sh *streamHandler) Handle(resp *responseHeader) {
|
// Initialize on the first response
|
if !sh.init {
|
sh.init = true
|
sh.initCh <- strToError(resp.Error)
|
return
|
}
|
|
// Decode logs for all other responses
|
var rec map[string]interface{}
|
if err := sh.client.dec.Decode(&rec); err != nil {
|
log.Printf("[ERR] Failed to decode stream record: %v", err)
|
sh.client.deregisterHandler(sh.seq)
|
return
|
}
|
select {
|
case sh.eventCh <- rec:
|
default:
|
log.Printf("[ERR] Dropping event! Stream channel full")
|
}
|
}
|
|
func (sh *streamHandler) Cleanup() {
|
if !sh.closed {
|
if !sh.init {
|
sh.init = true
|
sh.initCh <- fmt.Errorf("Stream closed")
|
}
|
if sh.eventCh != nil {
|
close(sh.eventCh)
|
}
|
sh.closed = true
|
}
|
}
|
|
// Stream is used to subscribe to events
|
func (c *RPCClient) Stream(filter string, ch chan<- map[string]interface{}) (StreamHandle, error) {
|
// Setup the request
|
seq := c.getSeq()
|
header := requestHeader{
|
Command: streamCommand,
|
Seq: seq,
|
}
|
req := streamRequest{
|
Type: filter,
|
}
|
|
// Create a monitor handler
|
initCh := make(chan error, 1)
|
handler := &streamHandler{
|
client: c,
|
initCh: initCh,
|
eventCh: ch,
|
seq: seq,
|
}
|
c.handleSeq(seq, handler)
|
|
// Send the request
|
if err := c.send(&header, &req); err != nil {
|
c.deregisterHandler(seq)
|
return 0, err
|
}
|
|
// Wait for a response
|
select {
|
case err := <-initCh:
|
return StreamHandle(seq), err
|
case <-c.shutdownCh:
|
c.deregisterHandler(seq)
|
return 0, clientClosed
|
}
|
}
|
|
type queryHandler struct {
|
client *RPCClient
|
closed bool
|
init bool
|
initCh chan<- error
|
ackCh chan<- string
|
respCh chan<- NodeResponse
|
seq uint64
|
}
|
|
func (qh *queryHandler) Handle(resp *responseHeader) {
|
// Initialize on the first response
|
if !qh.init {
|
qh.init = true
|
qh.initCh <- strToError(resp.Error)
|
return
|
}
|
|
// Decode the query response
|
var rec queryRecord
|
if err := qh.client.dec.Decode(&rec); err != nil {
|
log.Printf("[ERR] Failed to decode query response: %v", err)
|
qh.client.deregisterHandler(qh.seq)
|
return
|
}
|
|
switch rec.Type {
|
case queryRecordAck:
|
select {
|
case qh.ackCh <- rec.From:
|
default:
|
log.Printf("[ERR] Dropping query ack, channel full")
|
}
|
|
case queryRecordResponse:
|
select {
|
case qh.respCh <- NodeResponse{rec.From, rec.Payload}:
|
default:
|
log.Printf("[ERR] Dropping query response, channel full")
|
}
|
|
case queryRecordDone:
|
// No further records coming
|
qh.client.deregisterHandler(qh.seq)
|
|
default:
|
log.Printf("[ERR] Unrecognized query record type: %s", rec.Type)
|
}
|
}
|
|
func (qh *queryHandler) Cleanup() {
|
if !qh.closed {
|
if !qh.init {
|
qh.init = true
|
qh.initCh <- fmt.Errorf("Stream closed")
|
}
|
if qh.ackCh != nil {
|
close(qh.ackCh)
|
}
|
if qh.respCh != nil {
|
close(qh.respCh)
|
}
|
qh.closed = true
|
}
|
}
|
|
// QueryParam is provided to query set various settings.
|
type QueryParam struct {
|
FilterNodes []string // A list of node names to restrict query to
|
FilterTags map[string]string // A map of tag name to regex to filter on
|
RequestAck bool // Should nodes ack the query receipt
|
RelayFactor uint8 // Duplicate response count to be relayed back to sender for redundancy.
|
Timeout time.Duration // Maximum query duration. Optional, will be set automatically.
|
Name string // Opaque query name
|
Payload []byte // Opaque query payload
|
AckCh chan<- string // Channel to send Ack replies on
|
RespCh chan<- NodeResponse // Channel to send responses on
|
}
|
|
// Query initiates a new query message using the given parameters, and streams
|
// acks and responses over the given channels. The channels will not block on
|
// sends and should be buffered. At the end of the query, the channels will be
|
// closed.
|
func (c *RPCClient) Query(params *QueryParam) error {
|
// Setup the request
|
seq := c.getSeq()
|
header := requestHeader{
|
Command: queryCommand,
|
Seq: seq,
|
}
|
req := queryRequest{
|
FilterNodes: params.FilterNodes,
|
FilterTags: params.FilterTags,
|
RequestAck: params.RequestAck,
|
RelayFactor: params.RelayFactor,
|
Timeout: params.Timeout,
|
Name: params.Name,
|
Payload: params.Payload,
|
}
|
|
// Create a query handler
|
initCh := make(chan error, 1)
|
handler := &queryHandler{
|
client: c,
|
initCh: initCh,
|
ackCh: params.AckCh,
|
respCh: params.RespCh,
|
seq: seq,
|
}
|
c.handleSeq(seq, handler)
|
|
// Send the request
|
if err := c.send(&header, &req); err != nil {
|
c.deregisterHandler(seq)
|
return err
|
}
|
|
// Wait for a response
|
select {
|
case err := <-initCh:
|
return err
|
case <-c.shutdownCh:
|
c.deregisterHandler(seq)
|
return clientClosed
|
}
|
}
|
|
// Stop is used to unsubscribe from logs or event streams
|
func (c *RPCClient) Stop(handle StreamHandle) error {
|
// Deregister locally first to stop delivery
|
c.deregisterHandler(uint64(handle))
|
|
header := requestHeader{
|
Command: stopCommand,
|
Seq: c.getSeq(),
|
}
|
req := stopRequest{
|
Stop: uint64(handle),
|
}
|
return c.genericRPC(&header, &req, nil)
|
}
|
|
// handshake is used to perform the initial handshake on connect
|
func (c *RPCClient) handshake() error {
|
header := requestHeader{
|
Command: handshakeCommand,
|
Seq: c.getSeq(),
|
}
|
req := handshakeRequest{
|
Version: maxIPCVersion,
|
}
|
return c.genericRPC(&header, &req, nil)
|
}
|
|
// auth is used to perform the initial authentication on connect
|
func (c *RPCClient) auth(authKey string) error {
|
header := requestHeader{
|
Command: authCommand,
|
Seq: c.getSeq(),
|
}
|
req := authRequest{
|
AuthKey: authKey,
|
}
|
return c.genericRPC(&header, &req, nil)
|
}
|
|
// genericRPC is used to send a request and wait for an
|
// errorSequenceResponse, potentially returning an error
|
func (c *RPCClient) genericRPC(header *requestHeader, req interface{}, resp interface{}) error {
|
// Setup a response handler
|
errCh := make(chan error, 1)
|
handler := func(respHeader *responseHeader) {
|
// If we get an auth error, we should not wait for a request body
|
if respHeader.Error == authRequired {
|
goto SEND_ERR
|
}
|
if resp != nil {
|
err := c.dec.Decode(resp)
|
if err != nil {
|
errCh <- err
|
return
|
}
|
}
|
SEND_ERR:
|
errCh <- strToError(respHeader.Error)
|
}
|
c.handleSeq(header.Seq, &seqCallback{handler: handler})
|
defer c.deregisterHandler(header.Seq)
|
|
// Send the request
|
if err := c.send(header, req); err != nil {
|
return err
|
}
|
|
// Wait for a response
|
select {
|
case err := <-errCh:
|
return err
|
case <-c.shutdownCh:
|
return clientClosed
|
}
|
}
|
|
// strToError converts a string to an error if not blank
|
func strToError(s string) error {
|
if s != "" {
|
return fmt.Errorf(s)
|
}
|
return nil
|
}
|
|
// getSeq returns the next sequence number in a safe manner
|
func (c *RPCClient) getSeq() uint64 {
|
return atomic.AddUint64(&c.seq, 1)
|
}
|
|
// deregisterAll is used to deregister all handlers
|
func (c *RPCClient) deregisterAll() {
|
c.dispatchLock.Lock()
|
defer c.dispatchLock.Unlock()
|
|
for _, seqH := range c.dispatch {
|
seqH.Cleanup()
|
}
|
c.dispatch = make(map[uint64]seqHandler)
|
}
|
|
// deregisterHandler is used to deregister a handler
|
func (c *RPCClient) deregisterHandler(seq uint64) {
|
c.dispatchLock.Lock()
|
seqH, ok := c.dispatch[seq]
|
delete(c.dispatch, seq)
|
c.dispatchLock.Unlock()
|
|
if ok {
|
seqH.Cleanup()
|
}
|
}
|
|
// handleSeq is used to setup a handlerto wait on a response for
|
// a given sequence number.
|
func (c *RPCClient) handleSeq(seq uint64, handler seqHandler) {
|
c.dispatchLock.Lock()
|
defer c.dispatchLock.Unlock()
|
c.dispatch[seq] = handler
|
}
|
|
// respondSeq is used to respond to a given sequence number
|
func (c *RPCClient) respondSeq(seq uint64, respHeader *responseHeader) {
|
c.dispatchLock.Lock()
|
seqL, ok := c.dispatch[seq]
|
c.dispatchLock.Unlock()
|
|
// Get a registered listener, ignore if none
|
if ok {
|
seqL.Handle(respHeader)
|
}
|
}
|
|
// listen is used to processes data coming over the IPC channel,
|
// and wrote it to the correct destination based on seq no
|
func (c *RPCClient) listen() {
|
defer c.Close()
|
var respHeader responseHeader
|
for {
|
if err := c.dec.Decode(&respHeader); err != nil {
|
if !c.shutdown {
|
log.Printf("[ERR] agent.client: Failed to decode response header: %v", err)
|
}
|
break
|
}
|
c.respondSeq(respHeader.Seq, &respHeader)
|
}
|
}
|