package sdk
|
|
import (
|
"analysis/logo"
|
"analysis/work"
|
"container/list"
|
"context"
|
"sync"
|
"time"
|
)
|
|
// LockList list
|
type LockList struct {
|
cache *list.List
|
cv *sync.Cond
|
cond bool
|
size int
|
}
|
|
// NewLockList new
|
func NewLockList(size int) *LockList {
|
return &LockList{
|
cache: list.New(),
|
cv: sync.NewCond(&sync.Mutex{}),
|
cond: false,
|
size: size,
|
}
|
}
|
|
// Push push
|
func (l *LockList) Push(v interface{}) {
|
l.cv.L.Lock()
|
l.cache.PushBack(v)
|
|
for l.cache.Len() > l.size {
|
l.cache.Remove(l.cache.Front())
|
}
|
|
l.cond = true
|
l.cv.Signal()
|
l.cv.L.Unlock()
|
}
|
|
// Pop pop
|
func (l *LockList) Pop() interface{} {
|
l.cv.L.Lock()
|
|
for !l.cond {
|
l.cv.Wait()
|
}
|
|
elem := l.cache.Front().Value
|
|
l.cache.Remove(l.cache.Front())
|
l.cond = false
|
l.cv.L.Unlock()
|
|
return elem
|
}
|
|
/////////////////////////////////////////////////////////////////
|
|
func flowSimpleWork(ctx context.Context, out chan<- work.MsgRS, typ string,
|
fnConsume func() interface{}, fnRun func(work.MsgRS, chan<- work.MsgRS, string)) {
|
|
tm := time.Now()
|
sc := 0
|
|
for {
|
select {
|
case <-ctx.Done():
|
return
|
default:
|
|
rMsg := fnConsume().(work.MsgRS)
|
|
fnRun(rMsg, out, typ)
|
|
sc++
|
if sc == 25 {
|
logo.Infoln(typ, " RUN 25 FRAME USE TIME: ", time.Since(tm))
|
sc = 0
|
tm = time.Now()
|
}
|
if time.Since(tm) > time.Second {
|
logo.Infof("%s RUN %d FRAME USE TIME: %v", typ, sc, time.Since(tm))
|
sc = 0
|
tm = time.Now()
|
}
|
}
|
}
|
|
}
|
|
// FlowSimple wrap
|
func FlowSimple(ctx context.Context, in <-chan work.MsgRS, out chan<- work.MsgRS, typ string,
|
fnProduce func(interface{}), fnConsume func() interface{},
|
fnRun func(work.MsgRS, chan<- work.MsgRS, string), fnClose func()) {
|
|
go flowSimpleWork(ctx, out, typ, fnConsume, fnRun)
|
|
for {
|
select {
|
case <-ctx.Done():
|
fnClose()
|
return
|
default:
|
rMsg := <-in
|
if !validRemoteMessage(rMsg, typ) {
|
logo.Errorln(typ, " validremotemessage invalid")
|
ejectResult(nil, rMsg, out)
|
continue
|
}
|
fnProduce(rMsg)
|
}
|
}
|
}
|