package serf
|
|
import (
|
"time"
|
)
|
|
// coalescer is a simple interface that must be implemented to be
|
// used inside of a coalesceLoop
|
type coalescer interface {
|
// Can the coalescer handle this event, if not it is
|
// directly passed through to the destination channel
|
Handle(Event) bool
|
|
// Invoked to coalesce the given event
|
Coalesce(Event)
|
|
// Invoked to flush the coalesced events
|
Flush(outChan chan<- Event)
|
}
|
|
// coalescedEventCh returns an event channel where the events are coalesced
|
// using the given coalescer.
|
func coalescedEventCh(outCh chan<- Event, shutdownCh <-chan struct{},
|
cPeriod time.Duration, qPeriod time.Duration, c coalescer) chan<- Event {
|
inCh := make(chan Event, 1024)
|
go coalesceLoop(inCh, outCh, shutdownCh, cPeriod, qPeriod, c)
|
return inCh
|
}
|
|
// coalesceLoop is a simple long-running routine that manages the high-level
|
// flow of coalescing based on quiescence and a maximum quantum period.
|
func coalesceLoop(inCh <-chan Event, outCh chan<- Event, shutdownCh <-chan struct{},
|
coalescePeriod time.Duration, quiescentPeriod time.Duration, c coalescer) {
|
var quiescent <-chan time.Time
|
var quantum <-chan time.Time
|
shutdown := false
|
|
INGEST:
|
// Reset the timers
|
quantum = nil
|
quiescent = nil
|
|
for {
|
select {
|
case e := <-inCh:
|
// Ignore any non handled events
|
if !c.Handle(e) {
|
outCh <- e
|
continue
|
}
|
|
// Start a new quantum if we need to
|
// and restart the quiescent timer
|
if quantum == nil {
|
quantum = time.After(coalescePeriod)
|
}
|
quiescent = time.After(quiescentPeriod)
|
|
// Coalesce the event
|
c.Coalesce(e)
|
|
case <-quantum:
|
goto FLUSH
|
case <-quiescent:
|
goto FLUSH
|
case <-shutdownCh:
|
shutdown = true
|
goto FLUSH
|
}
|
}
|
|
FLUSH:
|
// Flush the coalesced events
|
c.Flush(outCh)
|
|
// Restart ingestion if we are not done
|
if !shutdown {
|
goto INGEST
|
}
|
}
|