liuxiaolong
2020-09-16 b04c3b2bf52d57550e8bfdc7f09859962b7c1396
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
package serf
 
import (
    "time"
)
 
// coalescer is a simple interface that must be implemented to be
// used inside of a coalesceLoop
type coalescer interface {
    // Can the coalescer handle this event, if not it is
    // directly passed through to the destination channel
    Handle(Event) bool
 
    // Invoked to coalesce the given event
    Coalesce(Event)
 
    // Invoked to flush the coalesced events
    Flush(outChan chan<- Event)
}
 
// coalescedEventCh returns an event channel where the events are coalesced
// using the given coalescer.
func coalescedEventCh(outCh chan<- Event, shutdownCh <-chan struct{},
    cPeriod time.Duration, qPeriod time.Duration, c coalescer) chan<- Event {
    inCh := make(chan Event, 1024)
    go coalesceLoop(inCh, outCh, shutdownCh, cPeriod, qPeriod, c)
    return inCh
}
 
// coalesceLoop is a simple long-running routine that manages the high-level
// flow of coalescing based on quiescence and a maximum quantum period.
func coalesceLoop(inCh <-chan Event, outCh chan<- Event, shutdownCh <-chan struct{},
    coalescePeriod time.Duration, quiescentPeriod time.Duration, c coalescer) {
    var quiescent <-chan time.Time
    var quantum <-chan time.Time
    shutdown := false
 
INGEST:
    // Reset the timers
    quantum = nil
    quiescent = nil
 
    for {
        select {
        case e := <-inCh:
            // Ignore any non handled events
            if !c.Handle(e) {
                outCh <- e
                continue
            }
 
            // Start a new quantum if we need to
            // and restart the quiescent timer
            if quantum == nil {
                quantum = time.After(coalescePeriod)
            }
            quiescent = time.After(quiescentPeriod)
 
            // Coalesce the event
            c.Coalesce(e)
 
        case <-quantum:
            goto FLUSH
        case <-quiescent:
            goto FLUSH
        case <-shutdownCh:
            shutdown = true
            goto FLUSH
        }
    }
 
FLUSH:
    // Flush the coalesced events
    c.Flush(outCh)
 
    // Restart ingestion if we are not done
    if !shutdown {
        goto INGEST
    }
}