package serf import ( "testing" "time" "github.com/hashicorp/memberlist" "basic.com/valib/serf.git/testutil" "basic.com/valib/serf.git/testutil/retry" ) func TestSerf_joinLeave_ltime(t *testing.T) { ip1, returnFn1 := testutil.TakeIP() defer returnFn1() ip2, returnFn2 := testutil.TakeIP() defer returnFn2() s1Config := testConfig(t, ip1) s2Config := testConfig(t, ip2) s1, err := Create(s1Config) if err != nil { t.Fatalf("err: %v", err) } defer s1.Shutdown() s2, err := Create(s2Config) if err != nil { t.Fatalf("err: %v", err) } defer s2.Shutdown() waitUntilNumNodes(t, 1, s1, s2) _, err = s1.Join([]string{s2Config.NodeName + "/" + s2Config.MemberlistConfig.BindAddr}, false) if err != nil { t.Fatalf("err: %v", err) } waitUntilNumNodes(t, 2, s1, s2) retry.Run(t, func(r *retry.R) { if s2.members[s1.config.NodeName].statusLTime != 1 { r.Fatalf("join time is not valid %d", s2.members[s1.config.NodeName].statusLTime) } if s2.clock.Time() <= s2.members[s1.config.NodeName].statusLTime { r.Fatalf("join should increment") } }) oldClock := s2.clock.Time() if err := s1.Leave(); err != nil { t.Fatalf("err: %v", err) } retry.Run(t, func(r *retry.R) { // s1 clock should exceed s2 due to leave if s2.clock.Time() <= oldClock { r.Fatalf("leave should increment (%d / %d)", s2.clock.Time(), oldClock) } }) } func TestSerf_join_pendingIntent(t *testing.T) { ip1, returnFn1 := testutil.TakeIP() defer returnFn1() c := testConfig(t, ip1) s, err := Create(c) if err != nil { t.Fatalf("err: %v", err) } defer s.Shutdown() upsertIntent(s.recentIntents, "test", messageJoinType, 5, time.Now) n := memberlist.Node{Name: "test", Addr: nil, Meta: []byte("test"), } s.handleNodeJoin(&n) mem := s.members["test"] if mem.statusLTime != 5 { t.Fatalf("bad join time") } if mem.Status != StatusAlive { t.Fatalf("bad status") } } func TestSerf_join_pendingIntents(t *testing.T) { ip1, returnFn1 := testutil.TakeIP() defer returnFn1() c := testConfig(t, ip1) s, err := Create(c) if err != nil { t.Fatalf("err: %v", err) } defer s.Shutdown() upsertIntent(s.recentIntents, "test", messageJoinType, 5, time.Now) upsertIntent(s.recentIntents, "test", messageLeaveType, 6, time.Now) n := memberlist.Node{Name: "test", Addr: nil, Meta: []byte("test"), } s.handleNodeJoin(&n) mem := s.members["test"] if mem.statusLTime != 6 { t.Fatalf("bad join time") } if mem.Status != StatusLeaving { t.Fatalf("bad status") } } func TestSerf_leaveIntent_bufferEarly(t *testing.T) { ip1, returnFn1 := testutil.TakeIP() defer returnFn1() c := testConfig(t, ip1) s, err := Create(c) if err != nil { t.Fatalf("err: %v", err) } defer s.Shutdown() // Deliver a leave intent message early j := messageLeave{LTime: 10, Node: "test"} if !s.handleNodeLeaveIntent(&j) { t.Fatalf("should rebroadcast") } if s.handleNodeLeaveIntent(&j) { t.Fatalf("should not rebroadcast") } // Check that we buffered if leave, ok := recentIntent(s.recentIntents, "test", messageLeaveType); !ok || leave != 10 { t.Fatalf("bad buffer") } } func TestSerf_leaveIntent_oldMessage(t *testing.T) { ip1, returnFn1 := testutil.TakeIP() defer returnFn1() c := testConfig(t, ip1) s, err := Create(c) if err != nil { t.Fatalf("err: %v", err) } defer s.Shutdown() s.members["test"] = &memberState{ Member: Member{ Status: StatusAlive, }, statusLTime: 12, } j := messageLeave{LTime: 10, Node: "test"} if s.handleNodeLeaveIntent(&j) { t.Fatalf("should not rebroadcast") } if _, ok := recentIntent(s.recentIntents, "test", messageLeaveType); ok { t.Fatalf("should not have buffered intent") } } func TestSerf_leaveIntent_newer(t *testing.T) { ip1, returnFn1 := testutil.TakeIP() defer returnFn1() c := testConfig(t, ip1) s, err := Create(c) if err != nil { t.Fatalf("err: %v", err) } defer s.Shutdown() s.members["test"] = &memberState{ Member: Member{ Status: StatusAlive, }, statusLTime: 12, } j := messageLeave{LTime: 14, Node: "test"} if !s.handleNodeLeaveIntent(&j) { t.Fatalf("should rebroadcast") } if _, ok := recentIntent(s.recentIntents, "test", messageLeaveType); ok { t.Fatalf("should not have buffered intent") } if s.members["test"].Status != StatusLeaving { t.Fatalf("should update status") } if s.clock.Time() != 15 { t.Fatalf("should update clock") } } func TestSerf_joinIntent_bufferEarly(t *testing.T) { ip1, returnFn1 := testutil.TakeIP() defer returnFn1() c := testConfig(t, ip1) s, err := Create(c) if err != nil { t.Fatalf("err: %v", err) } defer s.Shutdown() // Deliver a join intent message early j := messageJoin{LTime: 10, Node: "test"} if !s.handleNodeJoinIntent(&j) { t.Fatalf("should rebroadcast") } if s.handleNodeJoinIntent(&j) { t.Fatalf("should not rebroadcast") } // Check that we buffered if join, ok := recentIntent(s.recentIntents, "test", messageJoinType); !ok || join != 10 { t.Fatalf("bad buffer") } } func TestSerf_joinIntent_oldMessage(t *testing.T) { ip1, returnFn1 := testutil.TakeIP() defer returnFn1() c := testConfig(t, ip1) s, err := Create(c) if err != nil { t.Fatalf("err: %v", err) } defer s.Shutdown() s.members["test"] = &memberState{ statusLTime: 12, } j := messageJoin{LTime: 10, Node: "test"} if s.handleNodeJoinIntent(&j) { t.Fatalf("should not rebroadcast") } // Check that we didn't buffer anything if _, ok := recentIntent(s.recentIntents, "test", messageJoinType); ok { t.Fatalf("should not have buffered intent") } } func TestSerf_joinIntent_newer(t *testing.T) { ip1, returnFn1 := testutil.TakeIP() defer returnFn1() c := testConfig(t, ip1) s, err := Create(c) if err != nil { t.Fatalf("err: %v", err) } defer s.Shutdown() s.members["test"] = &memberState{ statusLTime: 12, } // Deliver a join intent message early j := messageJoin{LTime: 14, Node: "test"} if !s.handleNodeJoinIntent(&j) { t.Fatalf("should rebroadcast") } if _, ok := recentIntent(s.recentIntents, "test", messageJoinType); ok { t.Fatalf("should not have buffered intent") } if s.members["test"].statusLTime != 14 { t.Fatalf("should update join time") } if s.clock.Time() != 15 { t.Fatalf("should update clock") } } func TestSerf_joinIntent_resetLeaving(t *testing.T) { ip1, returnFn1 := testutil.TakeIP() defer returnFn1() c := testConfig(t, ip1) s, err := Create(c) if err != nil { t.Fatalf("err: %v", err) } defer s.Shutdown() s.members["test"] = &memberState{ Member: Member{ Status: StatusLeaving, }, statusLTime: 12, } j := messageJoin{LTime: 14, Node: "test"} if !s.handleNodeJoinIntent(&j) { t.Fatalf("should rebroadcast") } if _, ok := recentIntent(s.recentIntents, "test", messageJoinType); ok { t.Fatalf("should not have buffered intent") } if s.members["test"].statusLTime != 14 { t.Fatalf("should update join time") } if s.members["test"].Status != StatusAlive { t.Fatalf("should update status") } if s.clock.Time() != 15 { t.Fatalf("should update clock") } } func TestSerf_userEvent_oldMessage(t *testing.T) { ip1, returnFn1 := testutil.TakeIP() defer returnFn1() c := testConfig(t, ip1) s, err := Create(c) if err != nil { t.Fatalf("err: %v", err) } defer s.Shutdown() // increase the ltime artificially s.eventClock.Witness(LamportTime(c.EventBuffer + 1000)) msg := messageUserEvent{ LTime: 1, Name: "old", Payload: nil, } if s.handleUserEvent(&msg) { t.Fatalf("should not rebroadcast") } } func TestSerf_userEvent_sameClock(t *testing.T) { ip1, returnFn1 := testutil.TakeIP() defer returnFn1() eventCh := make(chan Event, 4) c := testConfig(t, ip1) c.EventCh = eventCh s, err := Create(c) if err != nil { t.Fatalf("err: %v", err) } defer s.Shutdown() msg := messageUserEvent{ LTime: 1, Name: "first", Payload: []byte("test"), } if !s.handleUserEvent(&msg) { t.Fatalf("should rebroadcast") } msg = messageUserEvent{ LTime: 1, Name: "first", Payload: []byte("newpayload"), } if !s.handleUserEvent(&msg) { t.Fatalf("should rebroadcast") } msg = messageUserEvent{ LTime: 1, Name: "second", Payload: []byte("other"), } if !s.handleUserEvent(&msg) { t.Fatalf("should rebroadcast") } testUserEvents(t, eventCh, []string{"first", "first", "second"}, [][]byte{[]byte("test"), []byte("newpayload"), []byte("other")}) } func TestSerf_query_oldMessage(t *testing.T) { ip1, returnFn1 := testutil.TakeIP() defer returnFn1() c := testConfig(t, ip1) s, err := Create(c) if err != nil { t.Fatalf("err: %v", err) } defer s.Shutdown() // increase the ltime artificially s.queryClock.Witness(LamportTime(c.QueryBuffer + 1000)) msg := messageQuery{ LTime: 1, Name: "old", Payload: nil, } if s.handleQuery(&msg) { t.Fatalf("should not rebroadcast") } } func TestSerf_query_sameClock(t *testing.T) { ip1, returnFn1 := testutil.TakeIP() defer returnFn1() eventCh := make(chan Event, 4) c := testConfig(t, ip1) c.EventCh = eventCh s, err := Create(c) if err != nil { t.Fatalf("err: %v", err) } defer s.Shutdown() msg := messageQuery{ LTime: 1, ID: 1, Name: "foo", Payload: []byte("test"), } if !s.handleQuery(&msg) { t.Fatalf("should rebroadcast") } if s.handleQuery(&msg) { t.Fatalf("should not rebroadcast") } msg = messageQuery{ LTime: 1, ID: 2, Name: "bar", Payload: []byte("newpayload"), } if !s.handleQuery(&msg) { t.Fatalf("should rebroadcast") } if s.handleQuery(&msg) { t.Fatalf("should not rebroadcast") } msg = messageQuery{ LTime: 1, ID: 3, Name: "baz", Payload: []byte("other"), } if !s.handleQuery(&msg) { t.Fatalf("should rebroadcast") } if s.handleQuery(&msg) { t.Fatalf("should not rebroadcast") } testQueryEvents(t, eventCh, []string{"foo", "bar", "baz"}, [][]byte{[]byte("test"), []byte("newpayload"), []byte("other")}) }