zhangmeng
2020-01-13 94e9f50569bd20a697edb36711d017de1c19d1a5
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
package sdk
 
import (
    "analysis/logo"
    "analysis/work"
    "container/list"
    "context"
    "sync"
    "time"
)
 
// LockList list
type LockList struct {
    cache *list.List
    cv    *sync.Cond
    cond  bool
    size  int
}
 
// NewLockList new
func NewLockList(size int) *LockList {
    return &LockList{
        cache: list.New(),
        cv:    sync.NewCond(&sync.Mutex{}),
        cond:  false,
        size:  size,
    }
}
 
// Push push
func (l *LockList) Push(v interface{}) {
    l.cv.L.Lock()
    l.cache.PushBack(v)
 
    for l.cache.Len() > l.size {
        l.cache.Remove(l.cache.Front())
    }
 
    l.cond = true
    l.cv.Signal()
    l.cv.L.Unlock()
}
 
// Pop pop
func (l *LockList) Pop() interface{} {
    l.cv.L.Lock()
 
    for !l.cond {
        l.cv.Wait()
    }
 
    elem := l.cache.Front().Value
 
    l.cache.Remove(l.cache.Front())
    l.cond = false
    l.cv.L.Unlock()
 
    return elem
}
 
/////////////////////////////////////////////////////////////////
 
func flowSimpleWork(ctx context.Context, out chan<- work.MsgRS, typ string,
    fnConsume func() interface{}, fnRun func(work.MsgRS, chan<- work.MsgRS, string)) {
 
    tm := time.Now()
    sc := 0
 
    for {
        select {
        case <-ctx.Done():
            return
        default:
 
            rMsg := fnConsume().(work.MsgRS)
 
            fnRun(rMsg, out, typ)
 
            sc++
            if sc == 25 {
                logo.Infoln(typ, " RUN 25 FRAME USE TIME: ", time.Since(tm))
                sc = 0
                tm = time.Now()
            }
            if time.Since(tm) > time.Second {
                logo.Infof("%s RUN %d FRAME USE TIME: %v", typ, sc, time.Since(tm))
                sc = 0
                tm = time.Now()
            }
        }
    }
 
}
 
// FlowSimple wrap
func FlowSimple(ctx context.Context, in <-chan work.MsgRS, out chan<- work.MsgRS, typ string,
    fnProduce func(interface{}), fnConsume func() interface{},
    fnRun func(work.MsgRS, chan<- work.MsgRS, string), fnClose func()) {
 
    go flowSimpleWork(ctx, out, typ, fnConsume, fnRun)
 
    for {
        select {
        case <-ctx.Done():
            fnClose()
            return
        default:
            rMsg := <-in
            if !validRemoteMessage(rMsg, typ) {
                logo.Errorln(typ, " validremotemessage invalid")
                ejectResult(nil, rMsg, out)
                continue
            }
            fnProduce(rMsg)
        }
    }
}