longganhua
2019-07-18 2fcec5d0debb4819c651e8f3b1287f18de9efee9
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
package agent
 
import (
    "log"
    "time"
 
    "github.com/hashicorp/serf/serf"
)
 
// queryResponseStream is used to stream the query results back to a client
type queryResponseStream struct {
    client streamClient
    logger *log.Logger
    seq    uint64
}
 
func newQueryResponseStream(client streamClient, seq uint64, logger *log.Logger) *queryResponseStream {
    qs := &queryResponseStream{
        client: client,
        logger: logger,
        seq:    seq,
    }
    return qs
}
 
// Stream is a long running routine used to stream the results of a query back to a client
func (qs *queryResponseStream) Stream(resp *serf.QueryResponse) {
    // Setup a timer for the query ending
    remaining := resp.Deadline().Sub(time.Now())
    done := time.After(remaining)
 
    ackCh := resp.AckCh()
    respCh := resp.ResponseCh()
    for {
        select {
        case a := <-ackCh:
            if err := qs.sendAck(a); err != nil {
                qs.logger.Printf("[ERR] agent.ipc: Failed to stream ack to %v: %v", qs.client, err)
                return
            }
        case r := <-respCh:
            if err := qs.sendResponse(r.From, r.Payload); err != nil {
                qs.logger.Printf("[ERR] agent.ipc: Failed to stream response to %v: %v", qs.client, err)
                return
            }
        case <-done:
            if err := qs.sendDone(); err != nil {
                qs.logger.Printf("[ERR] agent.ipc: Failed to stream query end to %v: %v", qs.client, err)
            }
            return
        }
    }
}
 
// sendAck is used to send a single ack
func (qs *queryResponseStream) sendAck(from string) error {
    header := responseHeader{
        Seq:   qs.seq,
        Error: "",
    }
    rec := queryRecord{
        Type: queryRecordAck,
        From: from,
    }
    return qs.client.Send(&header, &rec)
}
 
// sendResponse is used to send a single response
func (qs *queryResponseStream) sendResponse(from string, payload []byte) error {
    header := responseHeader{
        Seq:   qs.seq,
        Error: "",
    }
    rec := queryRecord{
        Type:    queryRecordResponse,
        From:    from,
        Payload: payload,
    }
    return qs.client.Send(&header, &rec)
}
 
// sendDone is used to signal the end
func (qs *queryResponseStream) sendDone() error {
    header := responseHeader{
        Seq:   qs.seq,
        Error: "",
    }
    rec := queryRecord{
        Type: queryRecordDone,
    }
    return qs.client.Send(&header, &rec)
}