From edaba09c43ef7eb013964f162255d73fedb64f9c Mon Sep 17 00:00:00 2001 From: zhangmeng <775834166@qq.com> Date: 星期二, 17 十二月 2019 10:03:48 +0800 Subject: [PATCH] stash --- work/sdk/flow.go | 117 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 files changed, 117 insertions(+), 0 deletions(-) diff --git a/work/sdk/flow.go b/work/sdk/flow.go new file mode 100644 index 0000000..c4e8019 --- /dev/null +++ b/work/sdk/flow.go @@ -0,0 +1,117 @@ +package sdk + +import ( + "analysis/logo" + "analysis/work" + "container/list" + "context" + "sync" + "time" +) + +// LockList list +type LockList struct { + cache *list.List + cv *sync.Cond + cond bool + size int +} + +// NewLockList new +func NewLockList(size int) *LockList { + return &LockList{ + cache: list.New(), + cv: sync.NewCond(&sync.Mutex{}), + cond: false, + size: size, + } +} + +// Push push +func (l *LockList) Push(v interface{}) { + l.cv.L.Lock() + l.cache.PushBack(v) + + for l.cache.Len() > l.size { + l.cache.Remove(l.cache.Front()) + } + + l.cond = true + l.cv.Signal() + l.cv.L.Unlock() +} + +// Pop pop +func (l *LockList) Pop() interface{} { + l.cv.L.Lock() + + for !l.cond { + l.cv.Wait() + } + + elem := l.cache.Front().Value + + l.cache.Remove(l.cache.Front()) + l.cond = false + l.cv.L.Unlock() + + return elem +} + +///////////////////////////////////////////////////////////////// + +func flowSimpleWork(ctx context.Context, out chan<- work.MsgRS, typ string, + fnConsume func() interface{}, fnRun func(work.MsgRS, chan<- work.MsgRS, string)) { + + tm := time.Now() + sc := 0 + + for { + select { + case <-ctx.Done(): + return + default: + + rMsg := fnConsume().(work.MsgRS) + + fnRun(rMsg, out, typ) + + sc++ + if sc == 25 { + logo.Infoln(typ, " RUN 25 FRAME USE TIME: ", time.Since(tm)) + sc = 0 + tm = time.Now() + } + if time.Since(tm) > time.Second { + logo.Infof(typ, " RUN %d FRAME USE TIME: %v", sc, time.Since(tm)) + sc = 0 + tm = time.Now() + } + } + } + +} + +// FlowSimple wrap +func FlowSimple(ctx context.Context, in <-chan work.MsgRS, out chan<- work.MsgRS, typ string, + fnProduce func(interface{}), fnConsume func() interface{}, + fnRun func(work.MsgRS, chan<- work.MsgRS, string), fnClose func()) { + + go flowSimpleWork(ctx, out, typ, fnConsume, fnRun) + + for { + select { + case <-ctx.Done(): + fnClose() + return + default: + rMsg := <-in + if !validRemoteMessage(rMsg, typ) { + logo.Errorln(typ, " validremotemessage invalid") + ejectResult(nil, rMsg, out) + continue + } + fnProduce(rMsg) + } + } +} -- Gitblit v1.8.0