From edaba09c43ef7eb013964f162255d73fedb64f9c Mon Sep 17 00:00:00 2001
From: zhangmeng <775834166@qq.com>
Date: 星期二, 17 十二月 2019 10:03:48 +0800
Subject: [PATCH] stash

---
 work/sdk/flow.go |  117 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
 1 files changed, 117 insertions(+), 0 deletions(-)

diff --git a/work/sdk/flow.go b/work/sdk/flow.go
new file mode 100644
index 0000000..c4e8019
--- /dev/null
+++ b/work/sdk/flow.go
@@ -0,0 +1,117 @@
+package sdk
+
+import (
+	"analysis/logo"
+	"analysis/work"
+	"container/list"
+	"context"
+	"sync"
+	"time"
+)
+
+// LockList list
+type LockList struct {
+	cache *list.List
+	cv    *sync.Cond
+	cond  bool
+	size  int
+}
+
+// NewLockList new
+func NewLockList(size int) *LockList {
+	return &LockList{
+		cache: list.New(),
+		cv:    sync.NewCond(&sync.Mutex{}),
+		cond:  false,
+		size:  size,
+	}
+}
+
+// Push push
+func (l *LockList) Push(v interface{}) {
+	l.cv.L.Lock()
+	l.cache.PushBack(v)
+
+	for l.cache.Len() > l.size {
+		l.cache.Remove(l.cache.Front())
+	}
+
+	l.cond = true
+	l.cv.Signal()
+	l.cv.L.Unlock()
+}
+
+// Pop pop
+func (l *LockList) Pop() interface{} {
+	l.cv.L.Lock()
+
+	for !l.cond {
+		l.cv.Wait()
+	}
+
+	elem := l.cache.Front().Value
+
+	l.cache.Remove(l.cache.Front())
+	l.cond = false
+	l.cv.L.Unlock()
+
+	return elem
+}
+
+/////////////////////////////////////////////////////////////////
+
+func flowSimpleWork(ctx context.Context, out chan<- work.MsgRS, typ string,
+	fnConsume func() interface{}, fnRun func(work.MsgRS, chan<- work.MsgRS, string)) {
+
+	tm := time.Now()
+	sc := 0
+
+	for {
+		select {
+		case <-ctx.Done():
+			return
+		default:
+
+			rMsg := fnConsume().(work.MsgRS)
+
+			fnRun(rMsg, out, typ)
+
+			sc++
+			if sc == 25 {
+				logo.Infoln(typ, " RUN 25 FRAME USE TIME: ", time.Since(tm))
+				sc = 0
+				tm = time.Now()
+			}
+			if time.Since(tm) > time.Second {
+				logo.Infof(typ, " RUN %d FRAME USE TIME: %v", sc, time.Since(tm))
+				sc = 0
+				tm = time.Now()
+			}
+		}
+	}
+
+}
+
+// FlowSimple wrap
+func FlowSimple(ctx context.Context, in <-chan work.MsgRS, out chan<- work.MsgRS, typ string,
+	fnProduce func(interface{}), fnConsume func() interface{},
+	fnRun func(work.MsgRS, chan<- work.MsgRS, string), fnClose func()) {
+
+	go flowSimpleWork(ctx, out, typ, fnConsume, fnRun)
+
+	for {
+		select {
+		case <-ctx.Done():
+			fnClose()
+			return
+		default:
+			rMsg := <-in
+			if !validRemoteMessage(rMsg, typ) {
+				logo.Errorln(typ, " validremotemessage invalid")
+				ejectResult(nil, rMsg, out)
+				continue
+			}
+			fnProduce(rMsg)
+		}
+	}
+}

--
Gitblit v1.8.0