liuxiaolong
2022-06-28 37714b1093c04061e636e5b1d27179652e671c0a
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
package serf
 
type coalesceEvent struct {
    Type   EventType
    Member *Member
}
 
type memberEventCoalescer struct {
    lastEvents   map[string]EventType
    latestEvents map[string]coalesceEvent
}
 
func (c *memberEventCoalescer) Handle(e Event) bool {
    switch e.EventType() {
    case EventMemberJoin:
        return true
    case EventMemberLeave:
        return true
    case EventMemberFailed:
        return true
    case EventMemberUpdate:
        return true
    case EventMemberReap:
        return true
    default:
        return false
    }
}
 
func (c *memberEventCoalescer) Coalesce(raw Event) {
    e := raw.(MemberEvent)
    for _, m := range e.Members {
        c.latestEvents[m.Name] = coalesceEvent{
            Type:   e.Type,
            Member: &m,
        }
    }
}
 
func (c *memberEventCoalescer) Flush(outCh chan<- Event) {
    // Coalesce the various events we got into a single set of events.
    events := make(map[EventType]*MemberEvent)
    for name, cevent := range c.latestEvents {
        previous, ok := c.lastEvents[name]
 
        // If we sent the same event before, then ignore
        // unless it is a MemberUpdate
        if ok && previous == cevent.Type && cevent.Type != EventMemberUpdate {
            continue
        }
 
        // Update our last event
        c.lastEvents[name] = cevent.Type
 
        // Add it to our event
        newEvent, ok := events[cevent.Type]
        if !ok {
            newEvent = &MemberEvent{Type: cevent.Type}
            events[cevent.Type] = newEvent
        }
        newEvent.Members = append(newEvent.Members, *cevent.Member)
    }
 
    // Send out those events
    for _, event := range events {
        outCh <- *event
    }
}