package agent
|
|
import (
|
"log"
|
"time"
|
|
"github.com/hashicorp/serf/serf"
|
)
|
|
// queryResponseStream is used to stream the query results back to a client
|
type queryResponseStream struct {
|
client streamClient
|
logger *log.Logger
|
seq uint64
|
}
|
|
func newQueryResponseStream(client streamClient, seq uint64, logger *log.Logger) *queryResponseStream {
|
qs := &queryResponseStream{
|
client: client,
|
logger: logger,
|
seq: seq,
|
}
|
return qs
|
}
|
|
// Stream is a long running routine used to stream the results of a query back to a client
|
func (qs *queryResponseStream) Stream(resp *serf.QueryResponse) {
|
// Setup a timer for the query ending
|
remaining := resp.Deadline().Sub(time.Now())
|
done := time.After(remaining)
|
|
ackCh := resp.AckCh()
|
respCh := resp.ResponseCh()
|
for {
|
select {
|
case a := <-ackCh:
|
if err := qs.sendAck(a); err != nil {
|
qs.logger.Printf("[ERR] agent.ipc: Failed to stream ack to %v: %v", qs.client, err)
|
return
|
}
|
case r := <-respCh:
|
if err := qs.sendResponse(r.From, r.Payload); err != nil {
|
qs.logger.Printf("[ERR] agent.ipc: Failed to stream response to %v: %v", qs.client, err)
|
return
|
}
|
case <-done:
|
if err := qs.sendDone(); err != nil {
|
qs.logger.Printf("[ERR] agent.ipc: Failed to stream query end to %v: %v", qs.client, err)
|
}
|
return
|
}
|
}
|
}
|
|
// sendAck is used to send a single ack
|
func (qs *queryResponseStream) sendAck(from string) error {
|
header := responseHeader{
|
Seq: qs.seq,
|
Error: "",
|
}
|
rec := queryRecord{
|
Type: queryRecordAck,
|
From: from,
|
}
|
return qs.client.Send(&header, &rec)
|
}
|
|
// sendResponse is used to send a single response
|
func (qs *queryResponseStream) sendResponse(from string, payload []byte) error {
|
header := responseHeader{
|
Seq: qs.seq,
|
Error: "",
|
}
|
rec := queryRecord{
|
Type: queryRecordResponse,
|
From: from,
|
Payload: payload,
|
}
|
return qs.client.Send(&header, &rec)
|
}
|
|
// sendDone is used to signal the end
|
func (qs *queryResponseStream) sendDone() error {
|
header := responseHeader{
|
Seq: qs.seq,
|
Error: "",
|
}
|
rec := queryRecord{
|
Type: queryRecordDone,
|
}
|
return qs.client.Send(&header, &rec)
|
}
|