派生自 libgowrapper/face

zhangmeng
2020-01-15 0ffd95f2278e860736e46f8b73f357470f5a3d91
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
package common
 
import (
    "container/list"
    "context"
    "sync"
    "time"
 
    "basic.com/valib/deliver.git"
    // "basic.com/pubsub/protomsg.git"
    // "github.com/gogo/protobuf/proto"
)
 
type runResult struct {
    data  []byte
    valid bool
}
 
// ToRule ipc
type ToRule struct {
    ipcURL   string
    maxSize  int
    cache    *list.List
    cv       *sync.Cond
    cond     bool
    fnLogger func(...interface{})
}
 
// NewToRule send to ruleprocess
func NewToRule(ipcURL string, maxSize int, fn func(...interface{})) *ToRule {
    return &ToRule{
        ipcURL:   ipcURL,
        maxSize:  maxSize,
        cache:    list.New(),
        cv:       sync.NewCond(&sync.Mutex{}),
        cond:     false,
        fnLogger: fn,
    }
}
 
// Push data
func (t *ToRule) Push(data []byte, valid bool) {
 
    t.cv.L.Lock()
    result := runResult{data, valid}
    t.cache.PushBack(result)
    if t.cache.Len() > t.maxSize {
        for i := 0; i < t.cache.Len(); {
            d := t.cache.Front().Value.(runResult)
            if d.valid == false {
                t.cache.Remove(t.cache.Front())
                i = i + 2
            } else {
                i = i + 1
            }
        }
    }
    if t.cache.Len() > t.maxSize {
        for i := 0; i < t.cache.Len(); {
            t.cache.Remove(t.cache.Front())
            i = i + 2
        }
    }
    // logo.Infof("push to cache count : %d\n", t.cache.Len())
    t.cond = true
    t.cv.Signal()
    t.cv.L.Unlock()
}
 
// Run forever
func (t *ToRule) Run(ctx context.Context) {
 
    var i deliver.Deliver
    var err error
 
    for {
        i, err = deliver.NewClientWithError(deliver.PushPull, t.ipcURL)
        if err != nil {
            time.Sleep(time.Second)
            t.fnLogger("wait create to rule ipc", err)
            continue
        }
        break
    }
 
    count := 0
 
    for {
        select {
        case <-ctx.Done():
            return
        default:
 
            var d []byte
            t.cv.L.Lock()
 
            for !t.cond {
                t.cv.Wait()
            }
 
            for j := 0; j < 8; j++ {
                if t.cache.Len() <= 0 {
                    break
                }
 
                d = t.cache.Front().Value.(runResult).data
                if i != nil && d != nil {
 
                    err := i.Send(d)
                    if err != nil {
                        t.fnLogger("!!!!!!!!!!!!!!!!!!!!!!!!!!!", err)
                    } else {
                        count++
                        if count > 5 {
                            count = 0
                            t.fnLogger("~~~~~~SEND TO RULE CORRECT")
                        }
                    }
                }
                t.cache.Remove(t.cache.Front())
            }
 
            t.cond = false
            t.cv.L.Unlock()
 
        }
    }
}