package serf import ( "fmt" "io/ioutil" "log" "os" "reflect" "testing" "time" ) func TestSnapshotter(t *testing.T) { td, err := ioutil.TempDir("", "serf") if err != nil { t.Fatalf("err: %v", err) } defer os.RemoveAll(td) clock := new(LamportClock) outCh := make(chan Event, 64) stopCh := make(chan struct{}) logger := log.New(os.Stderr, "", log.LstdFlags) inCh, snap, err := NewSnapshotter(td+"snap", snapshotSizeLimit, false, logger, clock, outCh, stopCh) if err != nil { t.Fatalf("err: %v", err) } // Write some user events ue := UserEvent{ LTime: 42, Name: "bar", } inCh <- ue // Write some queries q := &Query{ LTime: 50, Name: "uptime", } inCh <- q // Write some member events clock.Witness(100) meJoin := MemberEvent{ Type: EventMemberJoin, Members: []Member{ Member{ Name: "foo", Addr: []byte{127, 0, 0, 1}, Port: 5000, }, }, } meFail := MemberEvent{ Type: EventMemberFailed, Members: []Member{ Member{ Name: "foo", Addr: []byte{127, 0, 0, 1}, Port: 5000, }, }, } inCh <- meJoin inCh <- meFail inCh <- meJoin // Check these get passed through select { case e := <-outCh: if !reflect.DeepEqual(e, ue) { t.Fatalf("expected user event: %#v", e) } case <-time.After(200 * time.Millisecond): t.Fatalf("timeout") } select { case e := <-outCh: if !reflect.DeepEqual(e, q) { t.Fatalf("expected query event: %#v", e) } case <-time.After(200 * time.Millisecond): t.Fatalf("timeout") } select { case e := <-outCh: if !reflect.DeepEqual(e, meJoin) { t.Fatalf("expected member event: %#v", e) } case <-time.After(200 * time.Millisecond): t.Fatalf("timeout") } select { case e := <-outCh: if !reflect.DeepEqual(e, meFail) { t.Fatalf("expected member event: %#v", e) } case <-time.After(200 * time.Millisecond): t.Fatalf("timeout") } select { case e := <-outCh: if !reflect.DeepEqual(e, meJoin) { t.Fatalf("expected member event: %#v", e) } case <-time.After(200 * time.Millisecond): t.Fatalf("timeout") } // Close the snapshoter close(stopCh) snap.Wait() // Open the snapshoter stopCh = make(chan struct{}) _, snap, err = NewSnapshotter(td+"snap", snapshotSizeLimit, false, logger, clock, outCh, stopCh) if err != nil { t.Fatalf("err: %v", err) } // Check the values if snap.LastClock() != 100 { t.Fatalf("bad clock %d", snap.LastClock()) } if snap.LastEventClock() != 42 { t.Fatalf("bad clock %d", snap.LastEventClock()) } if snap.LastQueryClock() != 50 { t.Fatalf("bad clock %d", snap.LastQueryClock()) } prev := snap.AliveNodes() if len(prev) != 1 { t.Fatalf("expected alive: %#v", prev) } if prev[0].Name != "foo" { t.Fatalf("bad name: %#v", prev[0]) } if prev[0].Addr != "127.0.0.1:5000" { t.Fatalf("bad addr: %#v", prev[0]) } // Close the snapshotter. close(stopCh) snap.Wait() // Open the snapshotter, make sure nothing dies reading with coordinates // disabled. stopCh = make(chan struct{}) _, snap, err = NewSnapshotter(td+"snap", snapshotSizeLimit, false, logger, clock, outCh, stopCh) if err != nil { t.Fatalf("err: %v", err) } close(stopCh) snap.Wait() } func TestSnapshotter_forceCompact(t *testing.T) { td, err := ioutil.TempDir("", "serf") if err != nil { t.Fatalf("err: %v", err) } defer os.RemoveAll(td) clock := new(LamportClock) stopCh := make(chan struct{}) logger := log.New(os.Stderr, "", log.LstdFlags) // Create a very low limit inCh, snap, err := NewSnapshotter(td+"snap", 1024, false, logger, clock, nil, stopCh) if err != nil { t.Fatalf("err: %v", err) } // Write lots of user events for i := 0; i < 1024; i++ { ue := UserEvent{ LTime: LamportTime(i), } inCh <- ue } // Write lots of queries for i := 0; i < 1024; i++ { q := &Query{ LTime: LamportTime(i), } inCh <- q } // Wait for drain for len(inCh) > 0 { time.Sleep(20 * time.Millisecond) } // Close the snapshoter close(stopCh) snap.Wait() // Open the snapshoter stopCh = make(chan struct{}) _, snap, err = NewSnapshotter(td+"snap", snapshotSizeLimit, false, logger, clock, nil, stopCh) if err != nil { t.Fatalf("err: %v", err) } // Check the values if snap.LastEventClock() != 1023 { t.Fatalf("bad clock %d", snap.LastEventClock()) } if snap.LastQueryClock() != 1023 { t.Fatalf("bad clock %d", snap.LastQueryClock()) } close(stopCh) snap.Wait() } func TestSnapshotter_leave(t *testing.T) { td, err := ioutil.TempDir("", "serf") if err != nil { t.Fatalf("err: %v", err) } defer os.RemoveAll(td) clock := new(LamportClock) stopCh := make(chan struct{}) logger := log.New(os.Stderr, "", log.LstdFlags) inCh, snap, err := NewSnapshotter(td+"snap", snapshotSizeLimit, false, logger, clock, nil, stopCh) if err != nil { t.Fatalf("err: %v", err) } // Write a user event ue := UserEvent{ LTime: 42, Name: "bar", } inCh <- ue // Write a query q := &Query{ LTime: 50, Name: "uptime", } inCh <- q // Write some member events clock.Witness(100) meJoin := MemberEvent{ Type: EventMemberJoin, Members: []Member{ Member{ Name: "foo", Addr: []byte{127, 0, 0, 1}, Port: 5000, }, }, } inCh <- meJoin // Wait for drain for len(inCh) > 0 { time.Sleep(20 * time.Millisecond) } // Leave the cluster! snap.Leave() // Close the snapshoter close(stopCh) snap.Wait() // Open the snapshoter stopCh = make(chan struct{}) _, snap, err = NewSnapshotter(td+"snap", snapshotSizeLimit, false, logger, clock, nil, stopCh) if err != nil { t.Fatalf("err: %v", err) } // Check the values if snap.LastClock() != 0 { t.Fatalf("bad clock %d", snap.LastClock()) } if snap.LastEventClock() != 0 { t.Fatalf("bad clock %d", snap.LastEventClock()) } if snap.LastQueryClock() != 0 { t.Fatalf("bad clock %d", snap.LastQueryClock()) } prev := snap.AliveNodes() if len(prev) != 0 { t.Fatalf("expected none alive: %#v", prev) } } func TestSnapshotter_leave_rejoin(t *testing.T) { td, err := ioutil.TempDir("", "serf") if err != nil { t.Fatalf("err: %v", err) } defer os.RemoveAll(td) clock := new(LamportClock) stopCh := make(chan struct{}) logger := log.New(os.Stderr, "", log.LstdFlags) inCh, snap, err := NewSnapshotter(td+"snap", snapshotSizeLimit, true, logger, clock, nil, stopCh) if err != nil { t.Fatalf("err: %v", err) } // Write a user event ue := UserEvent{ LTime: 42, Name: "bar", } inCh <- ue // Write a query q := &Query{ LTime: 50, Name: "uptime", } inCh <- q // Write some member events clock.Witness(100) meJoin := MemberEvent{ Type: EventMemberJoin, Members: []Member{ Member{ Name: "foo", Addr: []byte{127, 0, 0, 1}, Port: 5000, }, }, } inCh <- meJoin // Wait for drain for len(inCh) > 0 { time.Sleep(20 * time.Millisecond) } // Leave the cluster! snap.Leave() // Close the snapshoter close(stopCh) snap.Wait() // Open the snapshoter stopCh = make(chan struct{}) _, snap, err = NewSnapshotter(td+"snap", snapshotSizeLimit, true, logger, clock, nil, stopCh) if err != nil { t.Fatalf("err: %v", err) } // Check the values if snap.LastClock() != 100 { t.Fatalf("bad clock %d", snap.LastClock()) } if snap.LastEventClock() != 42 { t.Fatalf("bad clock %d", snap.LastEventClock()) } if snap.LastQueryClock() != 50 { t.Fatalf("bad clock %d", snap.LastQueryClock()) } prev := snap.AliveNodes() if len(prev) == 0 { t.Fatalf("expected alive: %#v", prev) } } func TestSnapshotter_slowDiskNotBlockingEventCh(t *testing.T) { td, err := ioutil.TempDir("", "serf") if err != nil { t.Fatalf("err: %v", err) } t.Log("Temp dir", td) defer os.RemoveAll(td) clock := new(LamportClock) stopCh := make(chan struct{}) logger := log.New(os.Stderr, "", log.LstdFlags) outCh := make(chan Event, 1024) inCh, snap, err := NewSnapshotter(td+"snap", snapshotSizeLimit, true, logger, clock, outCh, stopCh) if err != nil { t.Fatalf("err: %v", err) } // We need enough events to be much more than the buffers used which are size // 1024. This number processes easily within the 500ms we allow below on my // host provided there is no disk IO on the path (I verified that by just // returning early in tryAppend using the old blocking code). The new async // method should pass without disabling disk writes too! numEvents := 10000 // Write lots of member updates (way bigger than our chan buffers) startCh := make(chan struct{}) go func() { <-startCh for i := 0; i < numEvents; i++ { e := MemberEvent{ Type: EventMemberJoin, Members: []Member{ Member{ Name: fmt.Sprintf("foo%d", i), Addr: []byte{127, 0, byte((i / 256) % 256), byte(i % 256)}, Port: 5000, }, }, } if i%10 == 0 { // 1/10 events is a leave e.Type = EventMemberLeave } inCh <- e // Pace ourselves - if we just throw these out as fast as possible the // read loop below can't keep up and we end up dropping messages due to // backpressure. But we need to still send them all in well less than the // timeout, 10k messages at 1 microsecond should take 10 ms minimum. In // practice it's quite a bit more to actually process and because the // buffer here blocks. time.Sleep(1 * time.Microsecond) } }() // Wait for them all to process through and it should be in a lot less time // than if the disk IO was in serial. This was verified by running this test // against the old serial implementation and seeing it never come close to // passing on my laptop with an SSD. It's not the most robust thing ever but // it's at least a sanity check that we are non-blocking now, and it passes // reliably at least on my machine. I typically see this complete in around // 115ms on my machine so this should give plenty of headroom for slower CI // environments while still being low enough that actual disk IO would // reliably blow it. deadline := time.After(500 * time.Millisecond) numRecvd := 0 start := time.Now() for numRecvd < numEvents { select { // Now we are ready to listen, start the generator goroutine off case startCh <- struct{}{}: continue case <-outCh: numRecvd++ case <-deadline: t.Fatalf("timed out after %s waiting for messages blocked on fake disk IO? "+ "got %d of %d", time.Since(start), numRecvd, numEvents) } } // Close the snapshoter close(stopCh) snap.Wait() } func TestSnapshotter_blockedUpstreamNotBlockingMemberlist(t *testing.T) { td, err := ioutil.TempDir("", "serf") if err != nil { t.Fatalf("err: %v", err) } t.Log("Temp dir", td) defer os.RemoveAll(td) clock := new(LamportClock) stopCh := make(chan struct{}) logger := log.New(os.Stderr, "", log.LstdFlags) // OutCh is unbuffered simulating a slow upstream outCh := make(chan Event) inCh, snap, err := NewSnapshotter(td+"snap", snapshotSizeLimit, true, logger, clock, outCh, stopCh) if err != nil { t.Fatalf("err: %v", err) } // We need enough events to be more than the internal buffer sizes numEvents := 3000 // Send some updates for i := 0; i < numEvents; i++ { e := MemberEvent{ Type: EventMemberJoin, Members: []Member{ Member{ Name: fmt.Sprintf("foo%d", i), Addr: []byte{127, 0, byte((i / 256) % 256), byte(i % 256)}, Port: 5000, }, }, } if i%10 == 0 { // 1/10 events is a leave e.Type = EventMemberLeave } select { case inCh <- e: default: t.Fatalf("inCh should never block") } // Allow just the tiniest time so that the runtime can schedule the // goroutine that's reading this even if they are both on the same physical // core (like in CI). time.Sleep(1 * time.Microsecond) } // Close the snapshoter close(stopCh) snap.Wait() }