package sdk import ( "analysis/logo" "analysis/work" "container/list" "context" "sync" "time" ) // LockList list type LockList struct { cache *list.List cv *sync.Cond cond bool size int } // NewLockList new func NewLockList(size int) *LockList { return &LockList{ cache: list.New(), cv: sync.NewCond(&sync.Mutex{}), cond: false, size: size, } } // Push push func (l *LockList) Push(v interface{}) { l.cv.L.Lock() l.cache.PushBack(v) for l.cache.Len() > l.size { l.cache.Remove(l.cache.Front()) } l.cond = true l.cv.Signal() l.cv.L.Unlock() } // Pop pop func (l *LockList) Pop() interface{} { l.cv.L.Lock() for !l.cond { l.cv.Wait() } elem := l.cache.Front().Value l.cache.Remove(l.cache.Front()) l.cond = false l.cv.L.Unlock() return elem } ///////////////////////////////////////////////////////////////// func flowSimpleWork(ctx context.Context, out chan<- work.MsgRS, typ string, fnConsume func() interface{}, fnRun func(work.MsgRS, chan<- work.MsgRS, string)) { tm := time.Now() sc := 0 for { select { case <-ctx.Done(): return default: rMsg := fnConsume().(work.MsgRS) fnRun(rMsg, out, typ) sc++ if sc == 25 { logo.Infoln(typ, " RUN 25 FRAME USE TIME: ", time.Since(tm)) sc = 0 tm = time.Now() } if time.Since(tm) > time.Second { logo.Infof("%s RUN %d FRAME USE TIME: %v", typ, sc, time.Since(tm)) sc = 0 tm = time.Now() } } } } // FlowSimple wrap func FlowSimple(ctx context.Context, in <-chan work.MsgRS, out chan<- work.MsgRS, typ string, fnProduce func(interface{}), fnConsume func() interface{}, fnRun func(work.MsgRS, chan<- work.MsgRS, string), fnClose func()) { go flowSimpleWork(ctx, out, typ, fnConsume, fnRun) for { select { case <-ctx.Done(): fnClose() return default: rMsg := <-in if !validRemoteMessage(rMsg, typ) { logo.Errorln(typ, " validremotemessage invalid") ejectResult(nil, rMsg, out) continue } fnProduce(rMsg) } } }