package serf import ( "time" ) // coalescer is a simple interface that must be implemented to be // used inside of a coalesceLoop type coalescer interface { // Can the coalescer handle this event, if not it is // directly passed through to the destination channel Handle(Event) bool // Invoked to coalesce the given event Coalesce(Event) // Invoked to flush the coalesced events Flush(outChan chan<- Event) } // coalescedEventCh returns an event channel where the events are coalesced // using the given coalescer. func coalescedEventCh(outCh chan<- Event, shutdownCh <-chan struct{}, cPeriod time.Duration, qPeriod time.Duration, c coalescer) chan<- Event { inCh := make(chan Event, 1024) go coalesceLoop(inCh, outCh, shutdownCh, cPeriod, qPeriod, c) return inCh } // coalesceLoop is a simple long-running routine that manages the high-level // flow of coalescing based on quiescence and a maximum quantum period. func coalesceLoop(inCh <-chan Event, outCh chan<- Event, shutdownCh <-chan struct{}, coalescePeriod time.Duration, quiescentPeriod time.Duration, c coalescer) { var quiescent <-chan time.Time var quantum <-chan time.Time shutdown := false INGEST: // Reset the timers quantum = nil quiescent = nil for { select { case e := <-inCh: // Ignore any non handled events if !c.Handle(e) { outCh <- e continue } // Start a new quantum if we need to // and restart the quiescent timer if quantum == nil { quantum = time.After(coalescePeriod) } quiescent = time.After(quiescentPeriod) // Coalesce the event c.Coalesce(e) case <-quantum: goto FLUSH case <-quiescent: goto FLUSH case <-shutdownCh: shutdown = true goto FLUSH } } FLUSH: // Flush the coalesced events c.Flush(outCh) // Restart ingestion if we are not done if !shutdown { goto INGEST } }