package serf import ( "fmt" "reflect" "testing" "time" ) // Mock EventCounter type const EventCounter EventType = 9000 type counterEvent struct { delta int } func (c counterEvent) EventType() EventType { return EventCounter } func (c counterEvent) String() string { return fmt.Sprintf("CounterEvent %d", c.delta) } // Mock coalescer type mockCoalesce struct { value int } func (c *mockCoalesce) Handle(e Event) bool { return e.EventType() == EventCounter } func (c *mockCoalesce) Coalesce(e Event) { c.value += e.(counterEvent).delta } func (c *mockCoalesce) Flush(outChan chan<- Event) { outChan <- counterEvent{c.value} c.value = 0 } func testCoalescer(cPeriod, qPeriod time.Duration) (chan<- Event, <-chan Event, chan<- struct{}) { in := make(chan Event, 64) out := make(chan Event) shutdown := make(chan struct{}) c := &mockCoalesce{} go coalesceLoop(in, out, shutdown, cPeriod, qPeriod, c) return in, out, shutdown } func TestCoalescer_basic(t *testing.T) { in, out, shutdown := testCoalescer(5*time.Millisecond, time.Second) defer close(shutdown) send := []Event{ counterEvent{1}, counterEvent{39}, counterEvent{2}, } for _, e := range send { in <- e } select { case e := <-out: if e.EventType() != EventCounter { t.Fatalf("expected counter, got: %d", e.EventType()) } if e.(counterEvent).delta != 42 { t.Fatalf("bad: %#v", e) } case <-time.After(50 * time.Millisecond): t.Fatalf("timeout") } } func TestCoalescer_quiescent(t *testing.T) { // This tests the quiescence by creating a long coalescence period // with a short quiescent period and waiting only a multiple of the // quiescent period for results. in, out, shutdown := testCoalescer(10*time.Second, 10*time.Millisecond) defer close(shutdown) send := []Event{ counterEvent{1}, counterEvent{39}, counterEvent{2}, } for _, e := range send { in <- e } select { case e := <-out: if e.EventType() != EventCounter { t.Fatalf("expected counter, got: %d", e.EventType()) } if e.(counterEvent).delta != 42 { t.Fatalf("bad: %#v", e) } case <-time.After(50 * time.Millisecond): t.Fatalf("timeout") } } func TestCoalescer_passThrough(t *testing.T) { in, out, shutdown := testCoalescer(time.Second, time.Second) defer close(shutdown) send := []Event{ UserEvent{ Name: "test", Payload: []byte("foo"), }, } for _, e := range send { in <- e } select { case e := <-out: if e.EventType() != EventUser { t.Fatalf("expected user event, got: %d", e.EventType()) } if e.(UserEvent).Name != "test" { t.Fatalf("name should be test. %v", e) } if !reflect.DeepEqual([]byte("foo"), e.(UserEvent).Payload) { t.Fatalf("bad: %#v", e.(UserEvent).Payload) } case <-time.After(50 * time.Millisecond): t.Fatalf("timeout") } }