New file |
| | |
| | | package sdk |
| | | |
| | | import ( |
| | | "analysis/logo" |
| | | "analysis/work" |
| | | "container/list" |
| | | "context" |
| | | "sync" |
| | | "time" |
| | | ) |
| | | |
| | | // LockList list |
| | | type LockList struct { |
| | | cache *list.List |
| | | cv *sync.Cond |
| | | cond bool |
| | | size int |
| | | } |
| | | |
| | | // NewLockList new |
| | | func NewLockList(size int) *LockList { |
| | | return &LockList{ |
| | | cache: list.New(), |
| | | cv: sync.NewCond(&sync.Mutex{}), |
| | | cond: false, |
| | | size: size, |
| | | } |
| | | } |
| | | |
| | | // Push push |
| | | func (l *LockList) Push(v interface{}) { |
| | | l.cv.L.Lock() |
| | | l.cache.PushBack(v) |
| | | |
| | | for l.cache.Len() > l.size { |
| | | l.cache.Remove(l.cache.Front()) |
| | | } |
| | | |
| | | l.cond = true |
| | | l.cv.Signal() |
| | | l.cv.L.Unlock() |
| | | } |
| | | |
| | | // Pop pop |
| | | func (l *LockList) Pop() interface{} { |
| | | l.cv.L.Lock() |
| | | |
| | | for !l.cond { |
| | | l.cv.Wait() |
| | | } |
| | | |
| | | elem := l.cache.Front().Value |
| | | |
| | | l.cache.Remove(l.cache.Front()) |
| | | l.cond = false |
| | | l.cv.L.Unlock() |
| | | |
| | | return elem |
| | | } |
| | | |
| | | ///////////////////////////////////////////////////////////////// |
| | | |
| | | func flowSimpleWork(ctx context.Context, out chan<- work.MsgRS, typ string, |
| | | fnConsume func() interface{}, fnRun func(work.MsgRS, chan<- work.MsgRS, string)) { |
| | | |
| | | tm := time.Now() |
| | | sc := 0 |
| | | |
| | | for { |
| | | select { |
| | | case <-ctx.Done(): |
| | | return |
| | | default: |
| | | |
| | | rMsg := fnConsume().(work.MsgRS) |
| | | |
| | | fnRun(rMsg, out, typ) |
| | | |
| | | sc++ |
| | | if sc == 25 { |
| | | logo.Infoln(typ, " RUN 25 FRAME USE TIME: ", time.Since(tm)) |
| | | sc = 0 |
| | | tm = time.Now() |
| | | } |
| | | if time.Since(tm) > time.Second { |
| | | logo.Infof(typ, " RUN %d FRAME USE TIME: %v", sc, time.Since(tm)) |
| | | sc = 0 |
| | | tm = time.Now() |
| | | } |
| | | } |
| | | } |
| | | |
| | | } |
| | | |
| | | // FlowSimple wrap |
| | | func FlowSimple(ctx context.Context, in <-chan work.MsgRS, out chan<- work.MsgRS, typ string, |
| | | fnProduce func(interface{}), fnConsume func() interface{}, |
| | | fnRun func(work.MsgRS, chan<- work.MsgRS, string), fnClose func()) { |
| | | |
| | | go flowSimpleWork(ctx, out, typ, fnConsume, fnRun) |
| | | |
| | | for { |
| | | select { |
| | | case <-ctx.Done(): |
| | | fnClose() |
| | | return |
| | | default: |
| | | rMsg := <-in |
| | | if !validRemoteMessage(rMsg, typ) { |
| | | logo.Errorln(typ, " validremotemessage invalid") |
| | | ejectResult(nil, rMsg, out) |
| | | continue |
| | | } |
| | | fnProduce(rMsg) |
| | | } |
| | | } |
| | | } |