package agent
|
|
import (
|
"fmt"
|
"log"
|
|
"github.com/hashicorp/serf/serf"
|
)
|
|
type streamClient interface {
|
Send(*responseHeader, interface{}) error
|
RegisterQuery(*serf.Query) uint64
|
}
|
|
// eventStream is used to stream events to a client over IPC
|
type eventStream struct {
|
client streamClient
|
eventCh chan serf.Event
|
filters []EventFilter
|
logger *log.Logger
|
seq uint64
|
}
|
|
func newEventStream(client streamClient, filters []EventFilter, seq uint64, logger *log.Logger) *eventStream {
|
es := &eventStream{
|
client: client,
|
eventCh: make(chan serf.Event, 512),
|
filters: filters,
|
logger: logger,
|
seq: seq,
|
}
|
go es.stream()
|
return es
|
}
|
|
func (es *eventStream) HandleEvent(e serf.Event) {
|
// Check the event
|
for _, f := range es.filters {
|
if f.Invoke(e) {
|
goto HANDLE
|
}
|
}
|
return
|
|
// Do a non-blocking send
|
HANDLE:
|
select {
|
case es.eventCh <- e:
|
default:
|
es.logger.Printf("[WARN] agent.ipc: Dropping event to %v", es.client)
|
}
|
}
|
|
func (es *eventStream) Stop() {
|
close(es.eventCh)
|
}
|
|
func (es *eventStream) stream() {
|
var err error
|
for event := range es.eventCh {
|
switch e := event.(type) {
|
case serf.MemberEvent:
|
err = es.sendMemberEvent(e)
|
case serf.UserEvent:
|
err = es.sendUserEvent(e)
|
case *serf.Query:
|
err = es.sendQuery(e)
|
default:
|
err = fmt.Errorf("Unknown event type: %s", event.EventType().String())
|
}
|
if err != nil {
|
es.logger.Printf("[ERR] agent.ipc: Failed to stream event to %v: %v",
|
es.client, err)
|
return
|
}
|
}
|
}
|
|
// sendMemberEvent is used to send a single member event
|
func (es *eventStream) sendMemberEvent(me serf.MemberEvent) error {
|
members := make([]Member, 0, len(me.Members))
|
for _, m := range me.Members {
|
sm := Member{
|
Name: m.Name,
|
Addr: m.Addr,
|
Port: m.Port,
|
Tags: m.Tags,
|
Status: m.Status.String(),
|
ProtocolMin: m.ProtocolMin,
|
ProtocolMax: m.ProtocolMax,
|
ProtocolCur: m.ProtocolCur,
|
DelegateMin: m.DelegateMin,
|
DelegateMax: m.DelegateMax,
|
DelegateCur: m.DelegateCur,
|
}
|
members = append(members, sm)
|
}
|
|
header := responseHeader{
|
Seq: es.seq,
|
Error: "",
|
}
|
rec := memberEventRecord{
|
Event: me.String(),
|
Members: members,
|
}
|
return es.client.Send(&header, &rec)
|
}
|
|
// sendUserEvent is used to send a single user event
|
func (es *eventStream) sendUserEvent(ue serf.UserEvent) error {
|
header := responseHeader{
|
Seq: es.seq,
|
Error: "",
|
}
|
rec := userEventRecord{
|
Event: ue.EventType().String(),
|
LTime: ue.LTime,
|
Name: ue.Name,
|
Payload: ue.Payload,
|
Coalesce: ue.Coalesce,
|
}
|
return es.client.Send(&header, &rec)
|
}
|
|
// sendQuery is used to send a single query event
|
func (es *eventStream) sendQuery(q *serf.Query) error {
|
id := es.client.RegisterQuery(q)
|
|
header := responseHeader{
|
Seq: es.seq,
|
Error: "",
|
}
|
rec := queryEventRecord{
|
Event: q.EventType().String(),
|
ID: id,
|
LTime: q.LTime,
|
Name: q.Name,
|
Payload: q.Payload,
|
}
|
return es.client.Send(&header, &rec)
|
}
|