package serf import ( "bytes" "context" "encoding/base64" "fmt" "io/ioutil" "net" "os" "path/filepath" "reflect" "strings" "sync" "testing" "time" "github.com/hashicorp/go-msgpack/codec" "github.com/hashicorp/memberlist" "basic.com/valib/serf.git/coordinate" "basic.com/valib/serf.git/testutil" "basic.com/valib/serf.git/testutil/retry" ) func testConfig(t *testing.T, ip net.IP) *Config { config := DefaultConfig() config.Init() config.MemberlistConfig.BindAddr = ip.String() // Set probe intervals that are aggressive for finding bad nodes config.MemberlistConfig.GossipInterval = 5 * time.Millisecond config.MemberlistConfig.ProbeInterval = 50 * time.Millisecond config.MemberlistConfig.ProbeTimeout = 25 * time.Millisecond config.MemberlistConfig.TCPTimeout = 100 * time.Millisecond config.MemberlistConfig.SuspicionMult = 1 // Activate the strictest version of memberlist validation to ensure // we properly pass node names through the serf layer. config.MemberlistConfig.RequireNodeNames = true config.NodeName = fmt.Sprintf("node-%s", config.MemberlistConfig.BindAddr) // Set a short reap interval so that it can run during the test config.ReapInterval = 1 * time.Second // Set a short reconnect interval so that it can run a lot during tests config.ReconnectInterval = 100 * time.Millisecond // Set basically zero on the reconnect/tombstone timeouts so that // they're removed on the first ReapInterval. config.ReconnectTimeout = 1 * time.Microsecond config.TombstoneTimeout = 1 * time.Microsecond if t != nil { config.Logger = testutil.TestLoggerWithName(t, config.NodeName) config.MemberlistConfig.Logger = config.Logger } return config } // compatible with testing.TB and *retry.R type testFailer interface { Fatalf(format string, args ...interface{}) } // testMember tests that a member in a list is in a given state. func testMember(tf testFailer, members []Member, name string, status MemberStatus) { for _, m := range members { if m.Name == name { if m.Status != status { tf.Fatalf("bad state for %s: %d", name, m.Status) } return } } if status == StatusNone { // We didn't expect to find it return } tf.Fatalf("node not found: %s", name) } // testMemberStatus is testMember but returns an error // instead of failing the test func testMemberStatus(tf testFailer, members []Member, name string, status MemberStatus) { for _, m := range members { if m.Name == name { if m.Status != status { tf.Fatalf("bad state for %s: %d", name, m.Status) } return } } if status == StatusNone { // We didn't expect to find it return } tf.Fatalf("node not found: %s", name) } func TestCreate_badProtocolVersion(t *testing.T) { cases := []struct { version uint8 err bool }{ {ProtocolVersionMin, false}, {ProtocolVersionMax, false}, // TODO(mitchellh): uncommon when we're over 0 //{ProtocolVersionMin - 1, true}, {ProtocolVersionMax + 1, true}, {ProtocolVersionMax - 1, false}, } ip1, returnFn1 := testutil.TakeIP() defer returnFn1() for _, tc := range cases { tc := tc t.Run(fmt.Sprintf("version-%d", tc.version), func(t *testing.T) { c := testConfig(t, ip1) c.ProtocolVersion = tc.version s, err := Create(c) if tc.err && err == nil { t.Errorf("Should've failed with version: %d", tc.version) } else if !tc.err && err != nil { t.Errorf("Version '%d' error: %s", tc.version, err) } if err == nil { s.Shutdown() } }) } } func TestSerf_eventsFailed(t *testing.T) { ip1, returnFn1 := testutil.TakeIP() defer returnFn1() ip2, returnFn2 := testutil.TakeIP() defer returnFn2() // Create the s1 config with an event channel so we can listen eventCh := make(chan Event, 4) s1Config := testConfig(t, ip1) s1Config.EventCh = eventCh s2Config := testConfig(t, ip2) s1, err := Create(s1Config) if err != nil { t.Fatalf("err: %v", err) } defer s1.Shutdown() s2, err := Create(s2Config) if err != nil { t.Fatalf("err: %v", err) } defer s2.Shutdown() waitUntilNumNodes(t, 1, s1, s2) _, err = s1.Join([]string{s2Config.NodeName + "/" + s2Config.MemberlistConfig.BindAddr}, false) if err != nil { t.Fatalf("err: %v", err) } waitUntilNumNodes(t, 2, s1, s2) err = s2.Shutdown() if err != nil { t.Fatalf("err: %v", err) } waitUntilNumNodes(t, 1, s1) // Since s2 shutdown, we check the events to make sure we got failures. testEvents(t, eventCh, s2Config.NodeName, []EventType{EventMemberJoin, EventMemberFailed, EventMemberReap}) } func TestSerf_eventsJoin(t *testing.T) { ip1, returnFn1 := testutil.TakeIP() defer returnFn1() ip2, returnFn2 := testutil.TakeIP() defer returnFn2() // Create the s1 config with an event channel so we can listen eventCh := make(chan Event, 4) s1Config := testConfig(t, ip1) s1Config.EventCh = eventCh s2Config := testConfig(t, ip2) s1, err := Create(s1Config) if err != nil { t.Fatalf("err: %v", err) } defer s1.Shutdown() s2, err := Create(s2Config) if err != nil { t.Fatalf("err: %v", err) } defer s2.Shutdown() waitUntilNumNodes(t, 1, s1, s2) _, err = s1.Join([]string{s2Config.NodeName + "/" + s2Config.MemberlistConfig.BindAddr}, false) if err != nil { t.Fatalf("err: %v", err) } waitUntilNumNodes(t, 2, s1, s2) testEvents(t, eventCh, s2Config.NodeName, []EventType{EventMemberJoin}) } func TestSerf_eventsLeave(t *testing.T) { ip1, returnFn1 := testutil.TakeIP() defer returnFn1() ip2, returnFn2 := testutil.TakeIP() defer returnFn2() // Create the s1 config with an event channel so we can listen eventCh := make(chan Event, 4) s1Config := testConfig(t, ip1) s1Config.EventCh = eventCh // Make the reap interval longer in this test // so that the leave does not also cause a reap s1Config.ReapInterval = 30 * time.Second s2Config := testConfig(t, ip2) s1, err := Create(s1Config) if err != nil { t.Fatalf("err: %v", err) } defer s1.Shutdown() s2, err := Create(s2Config) if err != nil { t.Fatalf("err: %v", err) } defer s2.Shutdown() waitUntilNumNodes(t, 1, s1, s2) _, err = s1.Join([]string{s2Config.NodeName + "/" + s2Config.MemberlistConfig.BindAddr}, false) if err != nil { t.Fatalf("err: %v", err) } waitUntilNumNodes(t, 2, s1, s2) if err := s2.Leave(); err != nil { t.Fatalf("err: %v", err) } retry.Run(t, func(r *retry.R) { testMemberStatus(r, s1.Members(), s2Config.NodeName, StatusLeft) }) // Now that s2 has left, we check the events to make sure we got // a leave event in s1 about the leave. testEvents(t, eventCh, s2Config.NodeName, []EventType{EventMemberJoin, EventMemberLeave}) } func TestSerf_RemoveFailed_eventsLeave(t *testing.T) { ip1, returnFn1 := testutil.TakeIP() defer returnFn1() ip2, returnFn2 := testutil.TakeIP() defer returnFn2() // Create the s1 config with an event channel so we can listen eventCh := make(chan Event, 4) s1Config := testConfig(t, ip1) s1Config.EventCh = eventCh s2Config := testConfig(t, ip2) s1, err := Create(s1Config) if err != nil { t.Fatalf("err: %v", err) } defer s1.Shutdown() s2, err := Create(s2Config) if err != nil { t.Fatalf("err: %v", err) } defer s2.Shutdown() waitUntilNumNodes(t, 1, s1, s2) _, err = s1.Join([]string{s2Config.NodeName + "/" + s2Config.MemberlistConfig.BindAddr}, false) if err != nil { t.Fatalf("err: %v", err) } waitUntilNumNodes(t, 2, s1, s2) if err := s2.Shutdown(); err != nil { t.Fatalf("err: %v", err) } time.Sleep(s2Config.MemberlistConfig.ProbeInterval * 3) if err := s1.RemoveFailedNode(s2Config.NodeName); err != nil { t.Fatalf("err: %v", err) } retry.Run(t, func(r *retry.R) { testMemberStatus(r, s1.Members(), s2Config.NodeName, StatusLeft) }) // Now that s2 has failed and been marked as left, we check the // events to make sure we got a leave event in s1 about the leave. testEvents(t, eventCh, s2Config.NodeName, []EventType{EventMemberJoin, EventMemberFailed, EventMemberLeave}) } func TestSerf_eventsUser(t *testing.T) { ip1, returnFn1 := testutil.TakeIP() defer returnFn1() ip2, returnFn2 := testutil.TakeIP() defer returnFn2() // Create the s1 config with an event channel so we can listen eventCh := make(chan Event, 4) s1Config := testConfig(t, ip1) s2Config := testConfig(t, ip2) s2Config.EventCh = eventCh s1, err := Create(s1Config) if err != nil { t.Fatalf("err: %v", err) } defer s1.Shutdown() s2, err := Create(s2Config) if err != nil { t.Fatalf("err: %v", err) } defer s2.Shutdown() waitUntilNumNodes(t, 1, s1, s2) _, err = s1.Join([]string{s2Config.NodeName + "/" + s2Config.MemberlistConfig.BindAddr}, false) if err != nil { t.Fatalf("err: %v", err) } waitUntilNumNodes(t, 2, s1, s2) // Fire a user event if err := s1.UserEvent("event!", []byte("test"), false); err != nil { t.Fatalf("err: %v", err) } // testutil.Yield() // Fire a user event if err := s1.UserEvent("second", []byte("foobar"), false); err != nil { t.Fatalf("err: %v", err) } // testutil.Yield() // check the events to make sure we got // a leave event in s1 about the leave. testUserEvents(t, eventCh, []string{"event!", "second"}, [][]byte{[]byte("test"), []byte("foobar")}) } func TestSerf_eventsUser_sizeLimit(t *testing.T) { ip1, returnFn1 := testutil.TakeIP() defer returnFn1() // Create the s1 config with an event channel so we can listen s1Config := testConfig(t, ip1) s1, err := Create(s1Config) if err != nil { t.Fatalf("err: %v", err) } defer s1.Shutdown() waitUntilNumNodes(t, 1, s1) name := "this is too large an event" payload := make([]byte, s1Config.UserEventSizeLimit) err = s1.UserEvent(name, payload, false) if err == nil { t.Fatalf("expect error") } if !strings.HasPrefix(err.Error(), "user event exceeds") { t.Fatalf("should get size limit error") } } func TestSerf_getQueueMax(t *testing.T) { s := &Serf{ config: DefaultConfig(), } // We don't need a running Serf so fake it out with the required // state. s.members = make(map[string]*memberState) for i := 0; i < 100; i++ { name := fmt.Sprintf("Member%d", i) s.members[name] = &memberState{ Member: Member{ Name: name, }, } } // Default mode just uses the max depth. if got, want := s.getQueueMax(), 4096; got != want { t.Fatalf("got %d want %d", got, want) } // Now configure a min which should take precedence. s.config.MinQueueDepth = 1024 if got, want := s.getQueueMax(), 1024; got != want { t.Fatalf("got %d want %d", got, want) } // Bring it under the number of nodes, so the calculation based on // the number of nodes takes precedence. s.config.MinQueueDepth = 16 if got, want := s.getQueueMax(), 200; got != want { t.Fatalf("got %d want %d", got, want) } // Try adjusting the node count. s.members["another"] = &memberState{ Member: Member{ Name: "another", }, } if got, want := s.getQueueMax(), 202; got != want { t.Fatalf("got %d want %d", got, want) } } func TestSerf_joinLeave(t *testing.T) { ip1, returnFn1 := testutil.TakeIP() defer returnFn1() ip2, returnFn2 := testutil.TakeIP() defer returnFn2() s1Config := testConfig(t, ip1) s2Config := testConfig(t, ip2) s1, err := Create(s1Config) if err != nil { t.Fatalf("err: %v", err) } defer s1.Shutdown() s2, err := Create(s2Config) if err != nil { t.Fatalf("err: %v", err) } defer s2.Shutdown() waitUntilNumNodes(t, 1, s1, s2) _, err = s1.Join([]string{s2Config.NodeName + "/" + s2Config.MemberlistConfig.BindAddr}, false) if err != nil { t.Fatalf("err: %v", err) } waitUntilNumNodes(t, 2, s1, s2) if err := s1.Leave(); err != nil { t.Fatalf("err: %v", err) } // Give the reaper time to reap nodes time.Sleep(s1Config.ReapInterval * 2) waitUntilNumNodes(t, 1, s1, s2) } // Bug: GH-58 func TestSerf_leaveRejoinDifferentRole(t *testing.T) { ip1, returnFn1 := testutil.TakeIP() defer returnFn1() ip2, returnFn2 := testutil.TakeIP() defer returnFn2() s1Config := testConfig(t, ip1) s2Config := testConfig(t, ip2) s2Config.Tags["role"] = "foo" s1, err := Create(s1Config) if err != nil { t.Fatalf("err: %v", err) } defer s1.Shutdown() s2, err := Create(s2Config) if err != nil { t.Fatalf("err: %v", err) } defer s2.Shutdown() waitUntilNumNodes(t, 1, s1, s2) _, err = s1.Join([]string{s2Config.NodeName + "/" + s2Config.MemberlistConfig.BindAddr}, false) if err != nil { t.Fatalf("err: %v", err) } waitUntilNumNodes(t, 2, s1, s2) if err := s2.Leave(); err != nil { t.Fatalf("err: %v", err) } if err := s2.Shutdown(); err != nil { t.Fatalf("err: %v", err) } testutil.Yield() // Make s3 look just like s2, but create a new node with a new role s3Config := testConfig(t, ip2) s3Config.MemberlistConfig.BindAddr = s2Config.MemberlistConfig.BindAddr s3Config.NodeName = s2Config.NodeName s3Config.Tags["role"] = "bar" s3, err := Create(s3Config) if err != nil { t.Fatalf("err: %v", err) } defer s3.Shutdown() _, err = s3.Join([]string{s1Config.NodeName + "/" + s1Config.MemberlistConfig.BindAddr}, false) if err != nil { t.Fatalf("err: %v", err) } waitUntilNumNodes(t, 2, s1, s3) retry.Run(t, func(r *retry.R) { var member *Member for _, m := range s1.Members() { if m.Name == s3Config.NodeName { member = &m break } } if member == nil { r.Fatalf("couldn't find member") } if member.Tags["role"] != s3Config.Tags["role"] { r.Fatalf("bad role: %s", member.Tags["role"]) } }) } func TestSerf_forceLeaveFailed(t *testing.T) { ip1, returnFn1 := testutil.TakeIP() defer returnFn1() ip2, returnFn2 := testutil.TakeIP() defer returnFn2() ip3, returnFn3 := testutil.TakeIP() defer returnFn3() s1Config := testConfig(t, ip1) s2Config := testConfig(t, ip2) s3Config := testConfig(t, ip3) s1, err := Create(s1Config) if err != nil { t.Fatalf("err: %v", err) } defer s1.Shutdown() s2, err := Create(s2Config) if err != nil { t.Fatalf("err: %v", err) } defer s2.Shutdown() s3, err := Create(s3Config) if err != nil { t.Fatalf("err: %v", err) } defer s3.Shutdown() waitUntilNumNodes(t, 1, s1, s2, s3) _, err = s1.Join([]string{s2Config.NodeName + "/" + s2Config.MemberlistConfig.BindAddr}, false) if err != nil { t.Fatalf("err: %v", err) } _, err = s1.Join([]string{s3Config.NodeName + "/" + s3Config.MemberlistConfig.BindAddr}, false) if err != nil { t.Fatalf("err: %v", err) } waitUntilNumNodes(t, 3, s1, s2, s3) //Put s2 in failed state if err := s2.Shutdown(); err != nil { t.Fatalf("err: %v", err) } retry.Run(t, func(r *retry.R) { testMemberStatus(r, s1.Members(), s2Config.NodeName, StatusFailed) }) if err := s1.forceLeave(s2.config.NodeName, true); err != nil { t.Fatalf("err: %v", err) } waitUntilNumNodes(t, 2, s1, s3) } func TestSerf_forceLeaveLeaving(t *testing.T) { ip1, returnFn1 := testutil.TakeIP() defer returnFn1() ip2, returnFn2 := testutil.TakeIP() defer returnFn2() ip3, returnFn3 := testutil.TakeIP() defer returnFn3() s1Config := testConfig(t, ip1) s2Config := testConfig(t, ip2) s3Config := testConfig(t, ip3) //make it so it doesn't get reaped // allow for us to see the leaving state s1Config.TombstoneTimeout = 1 * time.Hour s1Config.LeavePropagateDelay = 5 * time.Second s2Config.TombstoneTimeout = 1 * time.Hour s2Config.LeavePropagateDelay = 5 * time.Second s3Config.TombstoneTimeout = 1 * time.Hour s3Config.LeavePropagateDelay = 5 * time.Second s1, err := Create(s1Config) if err != nil { t.Fatalf("err: %v", err) } defer s1.Shutdown() s2, err := Create(s2Config) if err != nil { t.Fatalf("err: %v", err) } defer s2.Shutdown() s3, err := Create(s3Config) if err != nil { t.Fatalf("err: %v", err) } defer s3.Shutdown() waitUntilNumNodes(t, 1, s1, s2, s3) _, err = s1.Join([]string{s2Config.NodeName + "/" + s2Config.MemberlistConfig.BindAddr}, true) if err != nil { t.Fatalf("err: %v", err) } _, err = s1.Join([]string{s3Config.NodeName + "/" + s3Config.MemberlistConfig.BindAddr}, true) if err != nil { t.Fatalf("err: %v", err) } waitUntilNumNodes(t, 3, s1, s2, s3) //Put s2 in left state if err := s2.Leave(); err != nil { t.Fatalf("err: %v", err) } retry.Run(t, func(r *retry.R) { testMemberStatus(r, s1.Members(), s2Config.NodeName, StatusLeft) }) if err := s1.forceLeave(s2.config.NodeName, true); err != nil { t.Fatalf("err: %v", err) } waitUntilNumNodes(t, 2, s1, s3) } func TestSerf_forceLeaveLeft(t *testing.T) { ip1, returnFn1 := testutil.TakeIP() defer returnFn1() ip2, returnFn2 := testutil.TakeIP() defer returnFn2() ip3, returnFn3 := testutil.TakeIP() defer returnFn3() s1Config := testConfig(t, ip1) s2Config := testConfig(t, ip2) s3Config := testConfig(t, ip3) //make it so it doesn't get reaped s1Config.TombstoneTimeout = 1 * time.Hour s2Config.TombstoneTimeout = 1 * time.Hour s3Config.TombstoneTimeout = 1 * time.Hour s1, err := Create(s1Config) if err != nil { t.Fatalf("err: %v", err) } defer s1.Shutdown() s2, err := Create(s2Config) if err != nil { t.Fatalf("err: %v", err) } defer s2.Shutdown() s3, err := Create(s3Config) if err != nil { t.Fatalf("err: %v", err) } defer s3.Shutdown() waitUntilNumNodes(t, 1, s1, s2, s3) _, err = s1.Join([]string{s2Config.NodeName + "/" + s2Config.MemberlistConfig.BindAddr}, true) if err != nil { t.Fatalf("err: %v", err) } _, err = s1.Join([]string{s3Config.NodeName + "/" + s3Config.MemberlistConfig.BindAddr}, true) if err != nil { t.Fatalf("err: %v", err) } waitUntilNumNodes(t, 3, s1, s2, s3) //Put s2 in left state if err := s2.Leave(); err != nil { t.Fatalf("err: %v", err) } retry.Run(t, func(r *retry.R) { testMemberStatus(r, s1.Members(), s2Config.NodeName, StatusLeft) }) if err := s1.forceLeave(s2.config.NodeName, true); err != nil { t.Fatalf("err: %v", err) } waitUntilNumNodes(t, 2, s1, s3) } func TestSerf_reconnect(t *testing.T) { ip1, returnFn1 := testutil.TakeIP() defer returnFn1() ip2, returnFn2 := testutil.TakeIP() defer returnFn2() eventCh := make(chan Event, 64) s1Config := testConfig(t, ip1) s1Config.EventCh = eventCh s2Config := testConfig(t, ip2) s2Addr := s2Config.MemberlistConfig.BindAddr s2Name := s2Config.NodeName s1, err := Create(s1Config) if err != nil { t.Fatalf("err: %v", err) } defer s1.Shutdown() s2, err := Create(s2Config) if err != nil { t.Fatalf("err: %v", err) } defer s2.Shutdown() waitUntilNumNodes(t, 1, s1, s2) _, err = s1.Join([]string{s2Config.NodeName + "/" + s2Config.MemberlistConfig.BindAddr}, false) if err != nil { t.Fatalf("err: %v", err) } waitUntilNumNodes(t, 2, s1, s2) // Now force the shutdown of s2 so it appears to fail. if err := s2.Shutdown(); err != nil { t.Fatalf("err: %v", err) } time.Sleep(s2Config.MemberlistConfig.ProbeInterval * 5) // Bring back s2 by mimicking its name and address s2Config = testConfig(t, ip2) s2Config.MemberlistConfig.BindAddr = s2Addr s2Config.NodeName = s2Name s2, err = Create(s2Config) if err != nil { t.Fatalf("err: %v", err) } waitUntilNumNodes(t, 2, s1, s2) // time.Sleep(s1Config.ReconnectInterval * 5) testEvents(t, eventCh, s2Name, []EventType{EventMemberJoin, EventMemberFailed, EventMemberJoin}) } func TestSerf_reconnect_sameIP(t *testing.T) { ip1, returnFn1 := testutil.TakeIP() defer returnFn1() ip2, returnFn2 := testutil.TakeIP() defer returnFn2() eventCh := make(chan Event, 64) s1Config := testConfig(t, ip1) s1Config.EventCh = eventCh s2Config := testConfig(t, ip2) s2Config.MemberlistConfig.BindAddr = s1Config.MemberlistConfig.BindAddr s2Config.MemberlistConfig.BindPort = s1Config.MemberlistConfig.BindPort + 1 s2Addr := fmt.Sprintf("%s:%d", s2Config.MemberlistConfig.BindAddr, s2Config.MemberlistConfig.BindPort) s2Name := s2Config.NodeName s1, err := Create(s1Config) if err != nil { t.Fatalf("err: %v", err) } defer s1.Shutdown() s2, err := Create(s2Config) if err != nil { t.Fatalf("err: %v", err) } defer s2.Shutdown() waitUntilNumNodes(t, 1, s1, s2) _, err = s1.Join([]string{s2Config.NodeName + "/" + s2Addr}, false) if err != nil { t.Fatalf("err: %v", err) } waitUntilNumNodes(t, 2, s1, s2) // Now force the shutdown of s2 so it appears to fail. if err := s2.Shutdown(); err != nil { t.Fatalf("err: %v", err) } time.Sleep(s2Config.MemberlistConfig.ProbeInterval * 5) // Bring back s2 by mimicking its name and address s2Config = testConfig(t, ip2) s2Config.MemberlistConfig.BindAddr = s1Config.MemberlistConfig.BindAddr s2Config.MemberlistConfig.BindPort = s1Config.MemberlistConfig.BindPort + 1 s2Config.NodeName = s2Name s2, err = Create(s2Config) if err != nil { t.Fatalf("err: %v", err) } // time.Sleep(s1Config.ReconnectInterval * 5) waitUntilNumNodes(t, 2, s1, s2) testEvents(t, eventCh, s2Name, []EventType{EventMemberJoin, EventMemberFailed, EventMemberJoin}) } func TestSerf_update(t *testing.T) { ip1, returnFn1 := testutil.TakeIP() defer returnFn1() ip2, returnFn2 := testutil.TakeIP() defer returnFn2() eventCh := make(chan Event, 64) s1Config := testConfig(t, ip1) s1Config.EventCh = eventCh s2Config := testConfig(t, ip2) s2Addr := s2Config.MemberlistConfig.BindAddr s2Name := s2Config.NodeName s1, err := Create(s1Config) if err != nil { t.Fatalf("err: %v", err) } defer s1.Shutdown() s2, err := Create(s2Config) if err != nil { t.Fatalf("err: %v", err) } defer s2.Shutdown() waitUntilNumNodes(t, 1, s1, s2) _, err = s1.Join([]string{s2Config.NodeName + "/" + s2Config.MemberlistConfig.BindAddr}, false) if err != nil { t.Fatalf("err: %v", err) } waitUntilNumNodes(t, 2, s1, s2) // Now force the shutdown of s2 so it appears to fail. if err := s2.Shutdown(); err != nil { t.Fatalf("err: %v", err) } // Don't wait for a failure to be detected. Bring back s2 immediately // by mimicking its name and address. s2Config = testConfig(t, ip2) s2Config.MemberlistConfig.BindAddr = s2Addr s2Config.NodeName = s2Name // Add a tag to force an update event, and add a version downgrade as // well (that alone won't trigger an update). s2Config.ProtocolVersion-- s2Config.Tags["foo"] = "bar" // We try for a little while to wait for s2 to fully shutdown since the // shutdown method doesn't block until that's done. start := time.Now() for { s2, err = Create(s2Config) if err == nil { defer s2.Shutdown() break } else if !strings.Contains(err.Error(), "address already in use") { t.Fatalf("err: %v", err) } if time.Now().Sub(start) > 2*time.Second { t.Fatalf("timed out trying to restart") } } _, err = s2.Join([]string{s1Config.NodeName + "/" + s1Config.MemberlistConfig.BindAddr}, false) if err != nil { t.Fatalf("err: %v", err) } waitUntilNumNodes(t, 2, s1, s2) testEvents(t, eventCh, s2Name, []EventType{EventMemberJoin, EventMemberUpdate}) // Verify that the member data got updated. found := false members := s1.Members() for _, member := range members { if member.Name == s2Name { found = true if member.Tags["foo"] != "bar" || member.DelegateCur != s2Config.ProtocolVersion { t.Fatalf("bad: %#v", member) } } } if !found { t.Fatalf("didn't find s2 in members") } } func TestSerf_role(t *testing.T) { ip1, returnFn1 := testutil.TakeIP() defer returnFn1() ip2, returnFn2 := testutil.TakeIP() defer returnFn2() s1Config := testConfig(t, ip1) s2Config := testConfig(t, ip2) s1Config.Tags["role"] = "web" s2Config.Tags["role"] = "lb" s1, err := Create(s1Config) if err != nil { t.Fatalf("err: %v", err) } defer s1.Shutdown() s2, err := Create(s2Config) if err != nil { t.Fatalf("err: %v", err) } defer s2.Shutdown() waitUntilNumNodes(t, 1, s1, s2) _, err = s1.Join([]string{s2Config.NodeName + "/" + s2Config.MemberlistConfig.BindAddr}, false) if err != nil { t.Fatalf("err: %v", err) } waitUntilNumNodes(t, 2, s1, s2) retry.Run(t, func(r *retry.R) { roles := make(map[string]string) for _, m := range s1.Members() { roles[m.Name] = m.Tags["role"] } if roles[s1Config.NodeName] != "web" { r.Fatalf("bad role for web: %s", roles[s1Config.NodeName]) } if roles[s2Config.NodeName] != "lb" { r.Fatalf("bad role for lb: %s", roles[s2Config.NodeName]) } }) } func TestSerfProtocolVersion(t *testing.T) { ip1, returnFn1 := testutil.TakeIP() defer returnFn1() config := testConfig(t, ip1) config.ProtocolVersion = ProtocolVersionMax s1, err := Create(config) if err != nil { t.Fatalf("err: %v", err) } defer s1.Shutdown() actual := s1.ProtocolVersion() if actual != ProtocolVersionMax { t.Fatalf("bad: %#v", actual) } } func TestSerfRemoveFailedNode(t *testing.T) { ip1, returnFn1 := testutil.TakeIP() defer returnFn1() ip2, returnFn2 := testutil.TakeIP() defer returnFn2() ip3, returnFn3 := testutil.TakeIP() defer returnFn3() s1Config := testConfig(t, ip1) s2Config := testConfig(t, ip2) s3Config := testConfig(t, ip3) s1, err := Create(s1Config) if err != nil { t.Fatalf("err: %v", err) } defer s1.Shutdown() s2, err := Create(s2Config) if err != nil { t.Fatalf("err: %v", err) } defer s2.Shutdown() s3, err := Create(s3Config) if err != nil { t.Fatalf("err: %v", err) } defer s3.Shutdown() waitUntilNumNodes(t, 1, s1, s2, s3) _, err = s1.Join([]string{s2Config.NodeName + "/" + s2Config.MemberlistConfig.BindAddr}, false) if err != nil { t.Fatalf("err: %v", err) } _, err = s1.Join([]string{s3Config.NodeName + "/" + s3Config.MemberlistConfig.BindAddr}, false) if err != nil { t.Fatalf("err: %v", err) } waitUntilNumNodes(t, 3, s1, s2, s3) // Now force the shutdown of s2 so it appears to fail. if err := s2.Shutdown(); err != nil { t.Fatalf("err: %v", err) } time.Sleep(s2Config.MemberlistConfig.ProbeInterval * 5) retry.Run(t, func(r *retry.R) { // Verify that s2 is "failed" testMember(r, s1.Members(), s2Config.NodeName, StatusFailed) }) // Now remove the failed node if err := s1.RemoveFailedNode(s2Config.NodeName); err != nil { t.Fatalf("err: %v", err) } retry.Run(t, func(r *retry.R) { // Verify that s2 is gone testMember(r, s1.Members(), s2Config.NodeName, StatusLeft) testMember(r, s3.Members(), s2Config.NodeName, StatusLeft) }) } func TestSerfRemoveFailedNode_prune(t *testing.T) { ip1, returnFn1 := testutil.TakeIP() defer returnFn1() ip2, returnFn2 := testutil.TakeIP() defer returnFn2() ip3, returnFn3 := testutil.TakeIP() defer returnFn3() s1Config := testConfig(t, ip1) s2Config := testConfig(t, ip2) s3Config := testConfig(t, ip3) s1, err := Create(s1Config) if err != nil { t.Fatalf("err: %v", err) } defer s1.Shutdown() s2, err := Create(s2Config) if err != nil { t.Fatalf("err: %v", err) } defer s2.Shutdown() s3, err := Create(s3Config) if err != nil { t.Fatalf("err: %v", err) } defer s3.Shutdown() waitUntilNumNodes(t, 1, s1, s2, s3) _, err = s1.Join([]string{s2Config.NodeName + "/" + s2Config.MemberlistConfig.BindAddr}, false) if err != nil { t.Fatalf("err: %v", err) } _, err = s1.Join([]string{s3Config.NodeName + "/" + s3Config.MemberlistConfig.BindAddr}, false) if err != nil { t.Fatalf("err: %v", err) } waitUntilNumNodes(t, 3, s1, s2, s3) // Now force the shutdown of s2 so it appears to fail. if err := s2.Shutdown(); err != nil { t.Fatalf("err: %v", err) } time.Sleep(s2Config.MemberlistConfig.ProbeInterval * 5) // Verify that s2 is "failed" retry.Run(t, func(r *retry.R) { testMember(r, s1.Members(), s2Config.NodeName, StatusFailed) }) // Now remove the failed node if err := s1.RemoveFailedNodePrune(s2Config.NodeName); err != nil { t.Fatalf("err: %v", err) } // Check to make sure it's gone waitUntilNumNodes(t, 2, s1, s3) } func TestSerfRemoveFailedNode_ourself(t *testing.T) { ip1, returnFn1 := testutil.TakeIP() defer returnFn1() s1Config := testConfig(t, ip1) s1, err := Create(s1Config) if err != nil { t.Fatalf("err: %v", err) } defer s1.Shutdown() waitUntilNumNodes(t, 1, s1) if err := s1.RemoveFailedNode("somebody"); err != nil { t.Fatalf("err: %v", err) } } func TestSerfState(t *testing.T) { ip1, returnFn1 := testutil.TakeIP() defer returnFn1() s1, err := Create(testConfig(t, ip1)) if err != nil { t.Fatalf("err: %v", err) } defer s1.Shutdown() if s1.State() != SerfAlive { t.Fatalf("bad state: %d", s1.State()) } if err := s1.Leave(); err != nil { t.Fatalf("err: %v", err) } if s1.State() != SerfLeft { t.Fatalf("bad state: %d", s1.State()) } if err := s1.Shutdown(); err != nil { t.Fatalf("err: %v", err) } if s1.State() != SerfShutdown { t.Fatalf("bad state: %d", s1.State()) } } func TestSerf_ReapHandler_Shutdown(t *testing.T) { ip1, returnFn1 := testutil.TakeIP() defer returnFn1() s, err := Create(testConfig(t, ip1)) if err != nil { t.Fatalf("err: %v", err) } // Make sure the reap handler exits on shutdown. doneCh := make(chan struct{}) go func() { s.handleReap() close(doneCh) }() s.Shutdown() select { case <-doneCh: case <-time.After(1 * time.Second): t.Fatalf("timeout") } } func TestSerf_ReapHandler(t *testing.T) { ip1, returnFn1 := testutil.TakeIP() defer returnFn1() c := testConfig(t, ip1) c.ReapInterval = time.Nanosecond c.TombstoneTimeout = time.Second * 6 c.RecentIntentTimeout = time.Second * 7 s, err := Create(c) if err != nil { t.Fatalf("err: %v", err) } defer s.Shutdown() m := Member{} s.leftMembers = []*memberState{ &memberState{m, 0, time.Now()}, &memberState{m, 0, time.Now().Add(-5 * time.Second)}, &memberState{m, 0, time.Now().Add(-10 * time.Second)}, } upsertIntent(s.recentIntents, "alice", messageJoinType, 1, time.Now) upsertIntent(s.recentIntents, "bob", messageJoinType, 2, func() time.Time { return time.Now().Add(-10 * time.Second) }) upsertIntent(s.recentIntents, "carol", messageLeaveType, 1, time.Now) upsertIntent(s.recentIntents, "doug", messageLeaveType, 2, func() time.Time { return time.Now().Add(-10 * time.Second) }) go func() { time.Sleep(time.Millisecond) s.Shutdown() }() s.handleReap() if len(s.leftMembers) != 2 { t.Fatalf("should be shorter") } if _, ok := recentIntent(s.recentIntents, "alice", messageJoinType); !ok { t.Fatalf("should be buffered") } if _, ok := recentIntent(s.recentIntents, "bob", messageJoinType); ok { t.Fatalf("should be reaped") } if _, ok := recentIntent(s.recentIntents, "carol", messageLeaveType); !ok { t.Fatalf("should be buffered") } if _, ok := recentIntent(s.recentIntents, "doug", messageLeaveType); ok { t.Fatalf("should be reaped") } } func TestSerf_Reap(t *testing.T) { ip1, returnFn1 := testutil.TakeIP() defer returnFn1() s, err := Create(testConfig(t, ip1)) if err != nil { t.Fatalf("err: %v", err) } defer s.Shutdown() m := Member{} old := []*memberState{ &memberState{m, 0, time.Now()}, &memberState{m, 0, time.Now().Add(-5 * time.Second)}, &memberState{m, 0, time.Now().Add(-10 * time.Second)}, } old = s.reap(old, time.Now(), time.Second*6) if len(old) != 2 { t.Fatalf("should be shorter") } } func TestRemoveOldMember(t *testing.T) { old := []*memberState{ &memberState{Member: Member{Name: "foo"}}, &memberState{Member: Member{Name: "bar"}}, &memberState{Member: Member{Name: "baz"}}, } old = removeOldMember(old, "bar") if len(old) != 2 { t.Fatalf("should be shorter") } if old[1].Name == "bar" { t.Fatalf("should remove old member") } } func TestRecentIntent(t *testing.T) { if _, ok := recentIntent(nil, "foo", messageJoinType); ok { t.Fatalf("should get nothing on empty recent") } now := time.Now() expire := func() time.Time { return now.Add(-2 * time.Second) } save := func() time.Time { return now } intents := make(map[string]nodeIntent) if _, ok := recentIntent(intents, "foo", messageJoinType); ok { t.Fatalf("should get nothing on empty recent") } if added := upsertIntent(intents, "foo", messageJoinType, 1, expire); !added { t.Fatalf("should have added") } if added := upsertIntent(intents, "bar", messageLeaveType, 2, expire); !added { t.Fatalf("should have added") } if added := upsertIntent(intents, "baz", messageJoinType, 3, save); !added { t.Fatalf("should have added") } if added := upsertIntent(intents, "bar", messageJoinType, 4, expire); !added { t.Fatalf("should have added") } if added := upsertIntent(intents, "bar", messageJoinType, 0, expire); added { t.Fatalf("should not have added") } if added := upsertIntent(intents, "bar", messageJoinType, 5, expire); !added { t.Fatalf("should have added") } if ltime, ok := recentIntent(intents, "foo", messageJoinType); !ok || ltime != 1 { t.Fatalf("bad: %v %v", ok, ltime) } if ltime, ok := recentIntent(intents, "bar", messageJoinType); !ok || ltime != 5 { t.Fatalf("bad: %v %v", ok, ltime) } if ltime, ok := recentIntent(intents, "baz", messageJoinType); !ok || ltime != 3 { t.Fatalf("bad: %v %v", ok, ltime) } if _, ok := recentIntent(intents, "tubez", messageJoinType); ok { t.Fatalf("should get nothing") } reapIntents(intents, now, time.Second) if _, ok := recentIntent(intents, "foo", messageJoinType); ok { t.Fatalf("should get nothing") } if _, ok := recentIntent(intents, "bar", messageJoinType); ok { t.Fatalf("should get nothing") } if ltime, ok := recentIntent(intents, "baz", messageJoinType); !ok || ltime != 3 { t.Fatalf("bad: %v %v", ok, ltime) } if _, ok := recentIntent(intents, "tubez", messageJoinType); ok { t.Fatalf("should get nothing") } reapIntents(intents, now.Add(2*time.Second), time.Second) if _, ok := recentIntent(intents, "baz", messageJoinType); ok { t.Fatalf("should get nothing") } } func TestMemberStatus_String(t *testing.T) { status := []MemberStatus{StatusNone, StatusAlive, StatusLeaving, StatusLeft, StatusFailed} expect := []string{"none", "alive", "leaving", "left", "failed"} for idx, s := range status { if s.String() != expect[idx] { t.Fatalf("got string %v, expected %v", s.String(), expect[idx]) } } other := MemberStatus(100) defer func() { if r := recover(); r == nil { t.Fatalf("expected panic") } }() _ = other.String() } func TestSerf_joinLeaveJoin(t *testing.T) { ip1, returnFn1 := testutil.TakeIP() defer returnFn1() ip2, returnFn2 := testutil.TakeIP() defer returnFn2() s1Config := testConfig(t, ip1) s1Config.ReapInterval = 10 * time.Second s1, err := Create(s1Config) if err != nil { t.Fatalf("err: %v", err) } defer s1.Shutdown() s2Config := testConfig(t, ip2) s2Config.ReapInterval = 10 * time.Second s2, err := Create(s2Config) if err != nil { t.Fatalf("err: %v", err) } defer s2.Shutdown() waitUntilNumNodes(t, 1, s1, s2) _, err = s1.Join([]string{s2Config.NodeName + "/" + s2Config.MemberlistConfig.BindAddr}, false) if err != nil { t.Fatalf("err: %v", err) } waitUntilNumNodes(t, 2, s1, s2) // Leave and shutdown if err := s2.Leave(); err != nil { t.Fatalf("err: %v", err) } if err := s2.Shutdown(); err != nil { t.Fatalf("err: %v", err) } // Give the reaper time to reap nodes time.Sleep(s1Config.MemberlistConfig.ProbeInterval * 5) // s1 should see the node as having left retry.Run(t, func(r *retry.R) { mems := s1.Members() anyLeft := false for _, m := range mems { if m.Status == StatusLeft { anyLeft = true break } } if !anyLeft { r.Fatalf("node should have left!") } }) // Bring node 2 back s2, err = Create(s2Config) if err != nil { t.Fatalf("err: %v", err) } defer s2.Shutdown() waitUntilNumNodes(t, 1, s2) // Re-attempt the join _, err = s1.Join([]string{s2Config.NodeName + "/" + s2Config.MemberlistConfig.BindAddr}, false) if err != nil { t.Fatalf("err: %v", err) } retry.Run(t, func(r *retry.R) { // Should be back to both members if s1.NumNodes() != 2 { r.Fatalf("s1 members: %d", s1.NumNodes()) } if s2.NumNodes() != 2 { r.Fatalf("s2 members: %d", s2.NumNodes()) } // s1 should see the node as alive mems := s1.Members() anyLeft := false for _, m := range mems { if m.Status == StatusLeft { anyLeft = true break } } if anyLeft { r.Fatalf("all nodes should be alive!") } }) } func TestSerf_Join_IgnoreOld(t *testing.T) { ip1, returnFn1 := testutil.TakeIP() defer returnFn1() ip2, returnFn2 := testutil.TakeIP() defer returnFn2() // Create the s1 config with an event channel so we can listen eventCh := make(chan Event, 4) s1Config := testConfig(t, ip1) s2Config := testConfig(t, ip2) s2Config.EventCh = eventCh s1, err := Create(s1Config) if err != nil { t.Fatalf("err: %v", err) } defer s1.Shutdown() s2, err := Create(s2Config) if err != nil { t.Fatalf("err: %v", err) } defer s2.Shutdown() waitUntilNumNodes(t, 1, s1, s2) // Fire a user event if err := s1.UserEvent("event!", []byte("test"), false); err != nil { t.Fatalf("err: %v", err) } testutil.Yield() // Fire a user event if err := s1.UserEvent("second", []byte("foobar"), false); err != nil { t.Fatalf("err: %v", err) } testutil.Yield() // join with ignoreOld set to true! should not get events _, err = s2.Join([]string{s1Config.NodeName + "/" + s1Config.MemberlistConfig.BindAddr}, true) if err != nil { t.Fatalf("err: %v", err) } waitUntilNumNodes(t, 2, s1, s2) // check the events to make sure we got nothing testUserEvents(t, eventCh, []string{}, [][]byte{}) } func TestSerf_SnapshotRecovery(t *testing.T) { td, err := ioutil.TempDir("", "serf") if err != nil { t.Fatalf("err: %v", err) } defer os.RemoveAll(td) ip1, returnFn1 := testutil.TakeIP() defer returnFn1() ip2, returnFn2 := testutil.TakeIP() defer returnFn2() s1Config := testConfig(t, ip1) s2Config := testConfig(t, ip2) s2Config.SnapshotPath = td + "snap" s1, err := Create(s1Config) if err != nil { t.Fatalf("err: %v", err) } defer s1.Shutdown() s2, err := Create(s2Config) if err != nil { t.Fatalf("err: %v", err) } defer s2.Shutdown() waitUntilNumNodes(t, 1, s1, s2) _, err = s1.Join([]string{s2Config.NodeName + "/" + s2Config.MemberlistConfig.BindAddr}, false) if err != nil { t.Fatalf("err: %v", err) } waitUntilNumNodes(t, 2, s1, s2) // Fire a user event if err := s1.UserEvent("event!", []byte("test"), false); err != nil { t.Fatalf("err: %v", err) } testutil.Yield() // Now force the shutdown of s2 so it appears to fail. if err := s2.Shutdown(); err != nil { t.Fatalf("err: %v", err) } time.Sleep(s2Config.MemberlistConfig.ProbeInterval * 10) // Verify that s2 is "failed" testMember(t, s1.Members(), s2Config.NodeName, StatusFailed) // Now remove the failed node if err := s1.RemoveFailedNode(s2Config.NodeName); err != nil { t.Fatalf("err: %v", err) } // Verify that s2 is gone testMember(t, s1.Members(), s2Config.NodeName, StatusLeft) // Listen for events eventCh := make(chan Event, 4) s2Config.EventCh = eventCh // Restart s2 from the snapshot now! s2, err = Create(s2Config) if err != nil { t.Fatalf("err: %v", err) } defer s2.Shutdown() // Wait for the node to auto rejoin start := time.Now() for time.Now().Sub(start) < time.Second { members := s1.Members() if len(members) == 2 && members[0].Status == StatusAlive && members[1].Status == StatusAlive { break } time.Sleep(10 * time.Millisecond) } // Verify that s2 is "alive" testMember(t, s1.Members(), s2Config.NodeName, StatusAlive) testMember(t, s2.Members(), s1Config.NodeName, StatusAlive) // Check the events to make sure we got nothing testUserEvents(t, eventCh, []string{}, [][]byte{}) } func TestSerf_Leave_SnapshotRecovery(t *testing.T) { td, err := ioutil.TempDir("", "serf") if err != nil { t.Fatalf("err: %v", err) } defer os.RemoveAll(td) ip1, returnFn1 := testutil.TakeIP() defer returnFn1() ip2, returnFn2 := testutil.TakeIP() defer returnFn2() // Use a longer reap interval to allow the leave intent to propagate before the node is reaped s1Config := testConfig(t, ip1) s1Config.ReapInterval = 30 * time.Second s2Config := testConfig(t, ip2) s2Config.SnapshotPath = td + "snap" s2Config.ReapInterval = 30 * time.Second s1, err := Create(s1Config) if err != nil { t.Fatalf("err: %v", err) } defer s1.Shutdown() s2, err := Create(s2Config) if err != nil { t.Fatalf("err: %v", err) } defer s2.Shutdown() waitUntilNumNodes(t, 1, s1, s2) _, err = s1.Join([]string{s2Config.NodeName + "/" + s2Config.MemberlistConfig.BindAddr}, false) if err != nil { t.Fatalf("err: %v", err) } waitUntilNumNodes(t, 2, s1, s2) if err := s2.Leave(); err != nil { t.Fatalf("err: %v", err) } if err := s2.Shutdown(); err != nil { t.Fatalf("err: %v", err) } time.Sleep(s2Config.MemberlistConfig.ProbeInterval * 5) // Verify that s2 is "left" retry.Run(t, func(r *retry.R) { testMember(r, s1.Members(), s2Config.NodeName, StatusLeft) }) // Restart s2 from the snapshot now! s2Config.EventCh = nil s2, err = Create(s2Config) if err != nil { t.Fatalf("err: %v", err) } defer s2.Shutdown() // Wait for the node to auto rejoin // Verify that s2 is didn't join retry.Run(t, func(r *retry.R) { if s2.NumNodes() != 1 { r.Fatalf("bad members: %#v", s2.Members()) } testMember(r, s1.Members(), s2Config.NodeName, StatusLeft) }) } func TestSerf_SetTags(t *testing.T) { ip1, returnFn1 := testutil.TakeIP() defer returnFn1() ip2, returnFn2 := testutil.TakeIP() defer returnFn2() eventCh := make(chan Event, 4) s1Config := testConfig(t, ip1) s1Config.EventCh = eventCh s1, err := Create(s1Config) if err != nil { t.Fatalf("err: %v", err) } defer s1.Shutdown() s2Config := testConfig(t, ip2) s2, err := Create(s2Config) if err != nil { t.Fatalf("err: %v", err) } defer s2.Shutdown() waitUntilNumNodes(t, 1, s1, s2) _, err = s1.Join([]string{s2Config.NodeName + "/" + s2Config.MemberlistConfig.BindAddr}, false) if err != nil { t.Fatalf("err: %v", err) } waitUntilNumNodes(t, 2, s1, s2) // Update the tags if err := s1.SetTags(map[string]string{"port": "8000"}); err != nil { t.Fatalf("err: %v", err) } if err := s2.SetTags(map[string]string{"datacenter": "east-aws"}); err != nil { t.Fatalf("err: %v", err) } // wait until the tags are updated everywhere before continuing retry.Run(t, func(r *retry.R) { // Verify the new tags m1m := s1.Members() m1mTags := make(map[string]map[string]string) for _, m := range m1m { m1mTags[m.Name] = m.Tags } if m := m1mTags[s1.config.NodeName]; m["port"] != "8000" { r.Fatalf("bad: %v", m1mTags) } if m := m1mTags[s2.config.NodeName]; m["datacenter"] != "east-aws" { r.Fatalf("bad: %v", m1mTags) } m2m := s2.Members() m2mTags := make(map[string]map[string]string) for _, m := range m2m { m2mTags[m.Name] = m.Tags } if m := m2mTags[s1.config.NodeName]; m["port"] != "8000" { r.Fatalf("bad: %v", m1mTags) } if m := m2mTags[s2.config.NodeName]; m["datacenter"] != "east-aws" { r.Fatalf("bad: %v", m1mTags) } }) // we check the events to make sure we got failures. testEvents(t, eventCh, s2Config.NodeName, []EventType{EventMemberJoin, EventMemberUpdate}) } func TestSerf_Query(t *testing.T) { ip1, returnFn1 := testutil.TakeIP() defer returnFn1() ip2, returnFn2 := testutil.TakeIP() defer returnFn2() eventCh := make(chan Event, 4) s1Config := testConfig(t, ip1) s1Config.EventCh = eventCh s1, err := Create(s1Config) if err != nil { t.Fatalf("err: %v", err) } defer s1.Shutdown() // Listen for the query var wg sync.WaitGroup defer wg.Wait() ctx, cancel := context.WithCancel(context.Background()) defer cancel() wg.Add(1) go func() { defer wg.Done() for { select { case <-ctx.Done(): return case e := <-eventCh: if e.EventType() != EventQuery { continue } q := e.(*Query) if err := q.Respond([]byte("test")); err != nil { t.Fatalf("err: %v", err) } return case <-time.After(time.Second): t.Fatalf("timeout") } } }() s2Config := testConfig(t, ip2) s2, err := Create(s2Config) if err != nil { t.Fatalf("err: %v", err) } defer s2.Shutdown() waitUntilNumNodes(t, 1, s1, s2) _, err = s1.Join([]string{s2Config.NodeName + "/" + s2Config.MemberlistConfig.BindAddr}, false) if err != nil { t.Fatalf("err: %v", err) } waitUntilNumNodes(t, 2, s1, s2) // Start a query from s2 params := s2.DefaultQueryParams() params.RequestAck = true resp, err := s2.Query("load", []byte("sup girl"), params) if err != nil { t.Fatalf("err: %v", err) } var acks []string var responses []string ackCh := resp.AckCh() respCh := resp.ResponseCh() for i := 0; i < 3; i++ { select { case a := <-ackCh: acks = append(acks, a) case r := <-respCh: if r.From != s1Config.NodeName { t.Fatalf("bad: %v", r) } if string(r.Payload) != "test" { t.Fatalf("bad: %v", r) } responses = append(responses, r.From) case <-time.After(time.Second): t.Fatalf("timeout") } } if len(acks) != 2 { t.Fatalf("missing acks: %v", acks) } if len(responses) != 1 { t.Fatalf("missing responses: %v", responses) } } func TestSerf_Query_Filter(t *testing.T) { ip1, returnFn1 := testutil.TakeIP() defer returnFn1() ip2, returnFn2 := testutil.TakeIP() defer returnFn2() ip3, returnFn3 := testutil.TakeIP() defer returnFn3() eventCh := make(chan Event, 4) s1Config := testConfig(t, ip1) s1Config.EventCh = eventCh s1, err := Create(s1Config) if err != nil { t.Fatalf("err: %v", err) } defer s1.Shutdown() // Listen for the query var wg sync.WaitGroup defer wg.Wait() ctx, cancel := context.WithCancel(context.Background()) defer cancel() wg.Add(1) go func() { defer wg.Done() for { select { case <-ctx.Done(): return case e := <-eventCh: if e.EventType() != EventQuery { continue } q := e.(*Query) if err := q.Respond([]byte("test")); err != nil { t.Fatalf("err: %v", err) } return case <-time.After(time.Second): t.Fatalf("timeout") } } }() s2Config := testConfig(t, ip2) s2, err := Create(s2Config) if err != nil { t.Fatalf("err: %v", err) } defer s2.Shutdown() waitUntilNumNodes(t, 1, s1, s2) _, err = s1.Join([]string{s2Config.NodeName + "/" + s2Config.MemberlistConfig.BindAddr}, false) if err != nil { t.Fatalf("err: %v", err) } waitUntilNumNodes(t, 2, s1, s2) s3Config := testConfig(t, ip3) s3, err := Create(s3Config) if err != nil { t.Fatalf("err: %v", err) } defer s3.Shutdown() waitUntilNumNodes(t, 1, s3) _, err = s1.Join([]string{s3Config.NodeName + "/" + s3Config.MemberlistConfig.BindAddr}, false) if err != nil { t.Fatalf("err: %v", err) } waitUntilNumNodes(t, 3, s1, s2, s3) // Filter to only s1! params := s2.DefaultQueryParams() params.FilterNodes = []string{s1Config.NodeName} params.RequestAck = true params.RelayFactor = 1 // Start a query from s2 resp, err := s2.Query("load", []byte("sup girl"), params) if err != nil { t.Fatalf("err: %v", err) } var acks []string var responses []string ackCh := resp.AckCh() respCh := resp.ResponseCh() for i := 0; i < 2; i++ { select { case a := <-ackCh: acks = append(acks, a) case r := <-respCh: if r.From != s1Config.NodeName { t.Fatalf("bad: %v", r) } if string(r.Payload) != "test" { t.Fatalf("bad: %v", r) } responses = append(responses, r.From) case <-time.After(time.Second): t.Fatalf("timeout") } } if len(acks) != 1 { t.Fatalf("missing acks: %v", acks) } if len(responses) != 1 { t.Fatalf("missing responses: %v", responses) } } func TestSerf_Query_Deduplicate(t *testing.T) { s := &Serf{} // Set up a dummy query and response mq := &messageQuery{ LTime: 123, ID: 123, Timeout: time.Second, Flags: queryFlagAck, } query := newQueryResponse(3, mq) response := &messageQueryResponse{ LTime: mq.LTime, ID: mq.ID, From: "node1", } s.queryResponse = map[LamportTime]*QueryResponse{mq.LTime: query} // Send a few duplicate responses s.handleQueryResponse(response) s.handleQueryResponse(response) response.Flags |= queryFlagAck s.handleQueryResponse(response) s.handleQueryResponse(response) // Ensure we only get one NodeResponse off the channel select { case <-query.respCh: default: t.Fatalf("Should have a response") } select { case <-query.ackCh: default: t.Fatalf("Should have an ack") } select { case <-query.respCh: t.Fatalf("Should not have any other responses") default: } select { case <-query.ackCh: t.Fatalf("Should not have any other acks") default: } } func TestSerf_Query_sizeLimit(t *testing.T) { ip1, returnFn1 := testutil.TakeIP() defer returnFn1() s1Config := testConfig(t, ip1) s1, err := Create(s1Config) if err != nil { t.Fatalf("err: %v", err) } defer s1.Shutdown() name := "this is too large a query" payload := make([]byte, s1.config.QuerySizeLimit) _, err = s1.Query(name, payload, nil) if err == nil { t.Fatalf("should get error") } if !strings.HasPrefix(err.Error(), "query exceeds limit of ") { t.Fatalf("should get size limit error: %v", err) } } func TestSerf_Query_sizeLimitIncreased(t *testing.T) { ip1, returnFn1 := testutil.TakeIP() defer returnFn1() s1Config := testConfig(t, ip1) s1, err := Create(s1Config) if err != nil { t.Fatalf("err: %v", err) } defer s1.Shutdown() name := "this is too large a query" payload := make([]byte, s1.config.QuerySizeLimit) s1.config.QuerySizeLimit = 2 * s1.config.QuerySizeLimit _, err = s1.Query(name, payload, nil) if err != nil { t.Fatalf("should not get error: %v", err) } } func TestSerf_NameResolution(t *testing.T) { ip1, returnFn1 := testutil.TakeIP() defer returnFn1() ip2, returnFn2 := testutil.TakeIP() defer returnFn2() ip3, returnFn3 := testutil.TakeIP() defer returnFn3() s1Config := testConfig(t, ip1) s2Config := testConfig(t, ip2) s3Config := testConfig(t, ip3) s1, err := Create(s1Config) if err != nil { t.Fatalf("err: %v", err) } defer s1.Shutdown() s2, err := Create(s2Config) if err != nil { t.Fatalf("err: %v", err) } defer s2.Shutdown() // Create an artificial node name conflict! s3Config.NodeName = s1Config.NodeName s3, err := Create(s3Config) if err != nil { t.Fatalf("err: %v", err) } defer s3.Shutdown() waitUntilNumNodes(t, 1, s1, s2, s3) // Join s1 to s2 first. s2 should vote for s1 in conflict _, err = s1.Join([]string{s2Config.NodeName + "/" + s2Config.MemberlistConfig.BindAddr}, false) if err != nil { t.Fatalf("err: %v", err) } waitUntilNumNodes(t, 2, s1, s2) waitUntilNumNodes(t, 1, s3) _, err = s1.Join([]string{s3Config.NodeName + "/" + s3Config.MemberlistConfig.BindAddr}, false) if err != nil { t.Fatalf("err: %v", err) } // Wait for the query period to end time.Sleep(s1.DefaultQueryTimeout() * 2) retry.Run(t, func(r *retry.R) { // s3 should have shutdown, while s1 is running if s1.State() != SerfAlive { r.Fatalf("bad: %v", s1.State()) } if s2.State() != SerfAlive { r.Fatalf("bad: %v", s2.State()) } if s3.State() != SerfShutdown { r.Fatalf("bad: %v", s3.State()) } }) } func TestSerf_LocalMember(t *testing.T) { ip1, returnFn1 := testutil.TakeIP() defer returnFn1() s1Config := testConfig(t, ip1) s1, err := Create(s1Config) if err != nil { t.Fatalf("err: %v", err) } defer s1.Shutdown() m := s1.LocalMember() if m.Name != s1Config.NodeName { t.Fatalf("bad: %v", m) } if !reflect.DeepEqual(m.Tags, s1Config.Tags) { t.Fatalf("bad: %v", m) } if m.Status != StatusAlive { t.Fatalf("bad: %v", m) } newTags := map[string]string{ "foo": "bar", "test": "ing", } if err := s1.SetTags(newTags); err != nil { t.Fatalf("err: %v", err) } m = s1.LocalMember() if !reflect.DeepEqual(m.Tags, newTags) { t.Fatalf("bad: %v", m) } } func TestSerf_WriteKeyringFile(t *testing.T) { existing := "T9jncgl9mbLus+baTTa7q7nPSUrXwbDi2dhbtqir37s=" newKey := "HvY8ubRZMgafUOWvrOadwOckVa1wN3QWAo46FVKbVN8=" td, err := ioutil.TempDir("", "serf") if err != nil { t.Fatalf("err: %v", err) } defer os.RemoveAll(td) keyringFile := filepath.Join(td, "tags.json") existingBytes, err := base64.StdEncoding.DecodeString(existing) if err != nil { t.Fatalf("err: %v", err) } keys := [][]byte{existingBytes} keyring, err := memberlist.NewKeyring(keys, existingBytes) if err != nil { t.Fatalf("err: %v", err) } ip1, returnFn1 := testutil.TakeIP() defer returnFn1() s1Config := testConfig(t, ip1) s1Config.MemberlistConfig.Keyring = keyring s1Config.KeyringFile = keyringFile s1, err := Create(s1Config) if err != nil { t.Fatalf("err: %v", err) } defer s1.Shutdown() manager := s1.KeyManager() if _, err := manager.InstallKey(newKey); err != nil { t.Fatalf("err: %v", err) } content, err := ioutil.ReadFile(keyringFile) if err != nil { t.Fatalf("err: %v", err) } lines := strings.Split(string(content), "\n") if len(lines) != 4 { t.Fatalf("bad: %v", lines) } // Ensure both the original key and the new key are present in the file if !strings.Contains(string(content), existing) { t.Fatalf("key not found in keyring file: %s", existing) } if !strings.Contains(string(content), newKey) { t.Fatalf("key not found in keyring file: %s", newKey) } // Ensure the existing key remains primary. This is in position 1 because // the file writer will use json.MarshalIndent(), leaving the first line as // the opening bracket. if !strings.Contains(lines[1], existing) { t.Fatalf("expected key to be primary: %s", existing) } // Swap primary keys if _, err := manager.UseKey(newKey); err != nil { t.Fatalf("err: %v", err) } content, err = ioutil.ReadFile(keyringFile) if err != nil { t.Fatalf("err: %v", err) } lines = strings.Split(string(content), "\n") if len(lines) != 4 { t.Fatalf("bad: %v", lines) } // Key order should have changed in keyring file if !strings.Contains(lines[1], newKey) { t.Fatalf("expected key to be primary: %s", newKey) } // Remove the old key if _, err := manager.RemoveKey(existing); err != nil { t.Fatalf("err: %v", err) } content, err = ioutil.ReadFile(keyringFile) if err != nil { t.Fatalf("err: %v", err) } lines = strings.Split(string(content), "\n") if len(lines) != 3 { t.Fatalf("bad: %v", lines) } // Only the new key should now be present in the keyring file if len(lines) != 3 { t.Fatalf("bad: %v", lines) } if !strings.Contains(lines[1], newKey) { t.Fatalf("expected key to be primary: %s", newKey) } } func TestSerfStats(t *testing.T) { ip1, returnFn1 := testutil.TakeIP() defer returnFn1() config := testConfig(t, ip1) s1, err := Create(config) if err != nil { t.Fatalf("err: %v", err) } defer s1.Shutdown() stats := s1.Stats() expected := map[string]string{ "event_queue": "0", "event_time": "1", "failed": "0", "intent_queue": "0", "left": "0", "health_score": "0", "member_time": "1", "members": "1", "query_queue": "0", "query_time": "1", "encrypted": "false", } for key, val := range expected { v, ok := stats[key] if !ok { t.Fatalf("key not found in stats: %s", key) } if v != val { t.Fatalf("bad: %s = %s", key, val) } } } type CancelMergeDelegate struct { invoked bool } func (c *CancelMergeDelegate) NotifyMerge(members []*Member) error { c.invoked = true return fmt.Errorf("Merge canceled") } func TestSerf_Join_Cancel(t *testing.T) { ip1, returnFn1 := testutil.TakeIP() defer returnFn1() ip2, returnFn2 := testutil.TakeIP() defer returnFn2() s1Config := testConfig(t, ip1) merge1 := &CancelMergeDelegate{} s1Config.Merge = merge1 s2Config := testConfig(t, ip2) merge2 := &CancelMergeDelegate{} s2Config.Merge = merge2 s1, err := Create(s1Config) if err != nil { t.Fatalf("err: %v", err) } defer s1.Shutdown() s2, err := Create(s2Config) if err != nil { t.Fatalf("err: %v", err) } defer s2.Shutdown() waitUntilNumNodes(t, 0, s1, s2) _, err = s1.Join([]string{s2Config.NodeName + "/" + s2Config.MemberlistConfig.BindAddr}, false) if err == nil { t.Fatalf("expect error") } if !strings.Contains(err.Error(), "Merge canceled") { t.Fatalf("err: %v", err) } waitUntilNumNodes(t, 0, s1, s2) if !merge1.invoked { t.Fatalf("should invoke") } if !merge2.invoked { t.Fatalf("should invoke") } } func TestSerf_Coordinates(t *testing.T) { ip1, returnFn1 := testutil.TakeIP() defer returnFn1() ip2, returnFn2 := testutil.TakeIP() defer returnFn2() ip3, returnFn3 := testutil.TakeIP() defer returnFn3() s1Config := testConfig(t, ip1) s1Config.DisableCoordinates = false s1Config.MemberlistConfig.ProbeInterval = time.Duration(2) * time.Millisecond s1, err := Create(s1Config) if err != nil { t.Fatalf("err: %v", err) } defer s1.Shutdown() s2Config := testConfig(t, ip2) s2Config.DisableCoordinates = false s2Config.MemberlistConfig.ProbeInterval = time.Duration(2) * time.Millisecond s2, err := Create(s2Config) if err != nil { t.Fatalf("err: %v", err) } defer s2.Shutdown() waitUntilNumNodes(t, 1, s1, s2) // Make sure both nodes start out the origin so we can prove they did // an update later. c1, err := s1.GetCoordinate() if err != nil { t.Fatalf("err: %v", err) } c2, err := s2.GetCoordinate() if err != nil { t.Fatalf("err: %v", err) } const zeroThreshold = 20.0e-6 if c1.DistanceTo(c2).Seconds() > zeroThreshold { t.Fatalf("coordinates didn't start at the origin") } // Join the two nodes together and give them time to probe each other. _, err = s1.Join([]string{s2Config.NodeName + "/" + s2Config.MemberlistConfig.BindAddr}, false) if err != nil { t.Fatalf("could not join s1 and s2: %s", err) } waitUntilNumNodes(t, 2, s1, s2) retry.Run(t, func(r *retry.R) { // See if they know about each other. if _, ok := s1.GetCachedCoordinate(s2.config.NodeName); !ok { r.Fatalf("s1 didn't get a coordinate for s2: %s", err) } if _, ok := s2.GetCachedCoordinate(s1.config.NodeName); !ok { r.Fatalf("s2 didn't get a coordinate for s1: %s", err) } // With only one ping they won't have a good estimate of the other node's // coordinate, but they should both have updated their own coordinate. c1, err = s1.GetCoordinate() if err != nil { r.Fatalf("err: %v", err) } c2, err = s2.GetCoordinate() if err != nil { r.Fatalf("err: %v", err) } if c1.DistanceTo(c2).Seconds() < zeroThreshold { r.Fatalf("coordinates didn't update after probes") } // Make sure they cached their own current coordinate after the update. c1c, ok := s1.GetCachedCoordinate(s1.config.NodeName) if !ok { r.Fatalf("s1 didn't cache coordinate for s1") } if !reflect.DeepEqual(c1, c1c) { r.Fatalf("coordinates are not equal: %v != %v", c1, c1c) } }) // Break up the cluster and make sure the coordinates get removed by // the reaper. if err := s2.Leave(); err != nil { t.Fatalf("s2 could not leave: %s", err) } time.Sleep(s1Config.ReapInterval * 2) waitUntilNumNodes(t, 1, s1) retry.Run(t, func(r *retry.R) { if _, ok := s1.GetCachedCoordinate(s2.config.NodeName); ok { r.Fatalf("s1 should have removed s2's cached coordinate") } }) // Try a setup with coordinates disabled. s3Config := testConfig(t, ip3) s3Config.DisableCoordinates = true s3Config.MemberlistConfig.ProbeInterval = time.Duration(2) * time.Millisecond s3, err := Create(s3Config) if err != nil { t.Fatalf("err: %v", err) } defer s3.Shutdown() waitUntilNumNodes(t, 1, s1, s3) _, err = s3.Join([]string{s1Config.NodeName + "/" + s1Config.MemberlistConfig.BindAddr}, false) if err != nil { t.Fatalf("could not join s1 and s3: %s", err) } waitUntilNumNodes(t, 2, s1, s3) retry.Run(t, func(r *retry.R) { _, err = s3.GetCoordinate() if err == nil || !strings.Contains(err.Error(), "Coordinates are disabled") { r.Fatalf("expected coordinate disabled error, got %s", err) } if _, ok := s3.GetCachedCoordinate(s1.config.NodeName); ok { r.Fatalf("should not have been able to get cached coordinate") } }) } // pingVersionMetaDelegate is used to monkey patch a ping delegate so that it // sends ping messages with an unknown version number. type pingVersionMetaDelegate struct { pingDelegate } // AckPayload is called to produce a payload to send back in response to a ping // request. In this case we send back a bogus ping response with a bad version // and payload. func (p *pingVersionMetaDelegate) AckPayload() []byte { var buf bytes.Buffer // Send back the next ping version, which is bad by default. version := []byte{PingVersion + 1} buf.Write(version) buf.Write([]byte("this is bad and not a real message")) return buf.Bytes() } func TestSerf_PingDelegateVersioning(t *testing.T) { ip1, returnFn1 := testutil.TakeIP() defer returnFn1() ip2, returnFn2 := testutil.TakeIP() defer returnFn2() s1Config := testConfig(t, ip1) s1Config.DisableCoordinates = false s1Config.MemberlistConfig.ProbeInterval = time.Duration(2) * time.Millisecond s2Config := testConfig(t, ip2) s2Config.DisableCoordinates = false s2Config.MemberlistConfig.ProbeInterval = time.Duration(2) * time.Millisecond s1, err := Create(s1Config) if err != nil { t.Fatalf("err: %v", err) } defer s1.Shutdown() s2, err := Create(s2Config) if err != nil { t.Fatalf("err: %v", err) } defer s2.Shutdown() // Monkey patch s1 to send weird versions of the ping messages. s1.config.MemberlistConfig.Ping = &pingVersionMetaDelegate{pingDelegate{s1}} waitUntilNumNodes(t, 1, s1, s2) // Join the two nodes together and give them time to probe each other. _, err = s1.Join([]string{s2Config.NodeName + "/" + s2Config.MemberlistConfig.BindAddr}, false) if err != nil { t.Fatalf("could not join s1 and s2: %s", err) } // They both should show 2 members, but only s1 should know about s2 // in the cache, since s1 spoke an alien ping protocol. waitUntilNumNodes(t, 2, s1, s2) retry.Run(t, func(r *retry.R) { if _, ok := s1.GetCachedCoordinate(s2.config.NodeName); !ok { r.Fatalf("s1 didn't get a coordinate for s2: %s", err) } if _, ok := s2.GetCachedCoordinate(s1.config.NodeName); ok { r.Fatalf("s2 got an unexpected coordinate for s1") } }) } // pingDimensionMetaDelegate is used to monkey patch a ping delegate so that it // sends coordinates with the wrong number of dimensions. type pingDimensionMetaDelegate struct { t *testing.T pingDelegate } // AckPayload is called to produce a payload to send back in response to a ping // request. In this case we send back a legit ping response with a bad coordinate. func (p *pingDimensionMetaDelegate) AckPayload() []byte { var buf bytes.Buffer // The first byte is the version number, forming a simple header. version := []byte{PingVersion} buf.Write(version) // Make a bad coordinate with the wrong number of dimensions. coord := coordinate.NewCoordinate(coordinate.DefaultConfig()) coord.Vec = make([]float64, 2*len(coord.Vec)) // The rest of the message is the serialized coordinate. enc := codec.NewEncoder(&buf, &codec.MsgpackHandle{}) if err := enc.Encode(coord); err != nil { p.t.Fatalf("err: %v", err) } return buf.Bytes() } func TestSerf_PingDelegateRogueCoordinate(t *testing.T) { ip1, returnFn1 := testutil.TakeIP() defer returnFn1() ip2, returnFn2 := testutil.TakeIP() defer returnFn2() s1Config := testConfig(t, ip1) s1Config.DisableCoordinates = false s1Config.MemberlistConfig.ProbeInterval = time.Duration(2) * time.Millisecond s2Config := testConfig(t, ip2) s2Config.DisableCoordinates = false s2Config.MemberlistConfig.ProbeInterval = time.Duration(2) * time.Millisecond s1, err := Create(s1Config) if err != nil { t.Fatalf("err: %v", err) } defer s1.Shutdown() s2, err := Create(s2Config) if err != nil { t.Fatalf("err: %v", err) } defer s2.Shutdown() // Monkey patch s1 to send ping messages with bad coordinates. s1.config.MemberlistConfig.Ping = &pingDimensionMetaDelegate{t, pingDelegate{s1}} waitUntilNumNodes(t, 1, s1, s2) // Join the two nodes together and give them time to probe each other. _, err = s1.Join([]string{s2Config.NodeName + "/" + s2Config.MemberlistConfig.BindAddr}, false) if err != nil { t.Fatalf("could not join s1 and s2: %s", err) } // They both should show 2 members, but only s1 should know about s2 // in the cache, since s1 sent a bad coordinate. waitUntilNumNodes(t, 2, s1, s2) retry.Run(t, func(r *retry.R) { if _, ok := s1.GetCachedCoordinate(s2.config.NodeName); !ok { r.Fatalf("s1 didn't get a coordinate for s2: %s", err) } if _, ok := s2.GetCachedCoordinate(s1.config.NodeName); ok { r.Fatalf("s2 got an unexpected coordinate for s1") } }) } func TestSerf_NumNodes(t *testing.T) { ip1, returnFn1 := testutil.TakeIP() defer returnFn1() ip2, returnFn2 := testutil.TakeIP() defer returnFn2() s1Config := testConfig(t, ip1) s2Config := testConfig(t, ip2) s1, err := Create(s1Config) if err != nil { t.Fatalf("err: %v", err) } defer s1.Shutdown() if s1.NumNodes() != 1 { t.Fatalf("Expected 1 members") } s2, err := Create(s2Config) if err != nil { t.Fatalf("err: %v", err) } defer s2.Shutdown() waitUntilNumNodes(t, 1, s1, s2) _, err = s1.Join([]string{s2Config.NodeName + "/" + s2Config.MemberlistConfig.BindAddr}, false) if err != nil { t.Fatalf("err: %v", err) } waitUntilNumNodes(t, 2, s1, s2) } func waitUntilNumNodes(t *testing.T, desiredNodes int, serfs ...*Serf) { t.Helper() retry.Run(t, func(r *retry.R) { t.Helper() for i, s := range serfs { if n := s.NumNodes(); desiredNodes != n { r.Fatalf("s%d got %d expected %d", (i + 1), n, desiredNodes) } } }) }