package agent
|
|
import (
|
"bytes"
|
"log"
|
"net"
|
"os"
|
"testing"
|
"time"
|
|
"github.com/hashicorp/serf/serf"
|
)
|
|
type MockStreamClient struct {
|
headers []*responseHeader
|
objs []interface{}
|
err error
|
}
|
|
func (m *MockStreamClient) Send(h *responseHeader, o interface{}) error {
|
m.headers = append(m.headers, h)
|
m.objs = append(m.objs, o)
|
return m.err
|
}
|
|
func (m *MockStreamClient) RegisterQuery(q *serf.Query) uint64 {
|
return 42
|
}
|
|
func TestIPCEventStream(t *testing.T) {
|
sc := &MockStreamClient{}
|
filters := ParseEventFilter("user:foobar,member-join,query:deploy")
|
es := newEventStream(sc, filters, 42, log.New(os.Stderr, "", log.LstdFlags))
|
defer es.Stop()
|
|
es.HandleEvent(serf.UserEvent{
|
LTime: 123,
|
Name: "foobar",
|
Payload: []byte("test"),
|
Coalesce: true,
|
})
|
es.HandleEvent(serf.UserEvent{
|
LTime: 124,
|
Name: "ignore",
|
Payload: []byte("test"),
|
Coalesce: true,
|
})
|
es.HandleEvent(serf.MemberEvent{
|
Type: serf.EventMemberJoin,
|
Members: []serf.Member{
|
serf.Member{
|
Name: "TestNode",
|
Addr: net.IP([]byte{127, 0, 0, 1}),
|
Port: 12345,
|
Tags: map[string]string{"role": "node"},
|
Status: serf.StatusAlive,
|
ProtocolMin: 0,
|
ProtocolMax: 0,
|
ProtocolCur: 0,
|
DelegateMin: 0,
|
DelegateMax: 0,
|
DelegateCur: 0,
|
},
|
},
|
})
|
es.HandleEvent(&serf.Query{
|
LTime: 125,
|
Name: "deploy",
|
Payload: []byte("test"),
|
})
|
|
time.Sleep(5 * time.Millisecond)
|
|
if len(sc.headers) != 3 {
|
t.Fatalf("expected 2 messages!")
|
}
|
for _, h := range sc.headers {
|
if h.Seq != 42 {
|
t.Fatalf("bad seq")
|
}
|
if h.Error != "" {
|
t.Fatalf("bad err")
|
}
|
}
|
|
obj1 := sc.objs[0].(*userEventRecord)
|
if obj1.Event != "user" {
|
t.Fatalf("bad event: %#v", obj1)
|
}
|
if obj1.LTime != 123 {
|
t.Fatalf("bad event: %#v", obj1)
|
}
|
if obj1.Name != "foobar" {
|
t.Fatalf("bad event: %#v", obj1)
|
}
|
if bytes.Compare(obj1.Payload, []byte("test")) != 0 {
|
t.Fatalf("bad event: %#v", obj1)
|
}
|
if !obj1.Coalesce {
|
t.Fatalf("bad event: %#v", obj1)
|
}
|
|
obj2 := sc.objs[1].(*memberEventRecord)
|
if obj2.Event != "member-join" {
|
t.Fatalf("bad event: %#v", obj2)
|
}
|
mem1 := obj2.Members[0]
|
if mem1.Name != "TestNode" {
|
t.Fatalf("bad member: %#v", mem1)
|
}
|
if bytes.Compare(mem1.Addr, []byte{127, 0, 0, 1}) != 0 {
|
t.Fatalf("bad member: %#v", mem1)
|
}
|
if mem1.Port != 12345 {
|
t.Fatalf("bad member: %#v", mem1)
|
}
|
if mem1.Status != "alive" {
|
t.Fatalf("bad member: %#v", mem1)
|
}
|
if mem1.ProtocolMin != 0 {
|
t.Fatalf("bad member: %#v", mem1)
|
}
|
if mem1.ProtocolMax != 0 {
|
t.Fatalf("bad member: %#v", mem1)
|
}
|
if mem1.ProtocolCur != 0 {
|
t.Fatalf("bad member: %#v", mem1)
|
}
|
if mem1.DelegateMin != 0 {
|
t.Fatalf("bad member: %#v", mem1)
|
}
|
if mem1.DelegateMax != 0 {
|
t.Fatalf("bad member: %#v", mem1)
|
}
|
if mem1.DelegateCur != 0 {
|
t.Fatalf("bad member: %#v", mem1)
|
}
|
|
obj3 := sc.objs[2].(*queryEventRecord)
|
if obj3.Event != "query" {
|
t.Fatalf("bad query: %#v", obj3)
|
}
|
if obj3.ID != 42 {
|
t.Fatalf("bad query: %#v", obj3)
|
}
|
if obj3.LTime != 125 {
|
t.Fatalf("bad query: %#v", obj3)
|
}
|
if obj3.Name != "deploy" {
|
t.Fatalf("bad query: %#v", obj3)
|
}
|
if bytes.Compare(obj3.Payload, []byte("test")) != 0 {
|
t.Fatalf("bad query: %#v", obj3)
|
}
|
|
}
|