zhangmeng
2019-12-13 2d25b62b60da018412ed164b6fd29470498cea17
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
package work
 
import (
    "analysis/logo"
    "container/list"
    "context"
    "sync"
    "time"
 
    "basic.com/valib/deliver.git"
    // "basic.com/pubsub/protomsg.git"
    // "github.com/gogo/protobuf/proto"
)
 
type runResult struct {
    data  []byte
    valid bool
}
 
// ToRule ipc
type ToRule struct {
    maxSize int
    cache   *list.List
    cv      *sync.Cond
    cond    bool
}
 
// NewToRule send to ruleprocess
func NewToRule(maxSize int) *ToRule {
    return &ToRule{
        maxSize: maxSize,
        cache:   list.New(),
        cv:      sync.NewCond(&sync.Mutex{}),
        cond:    false,
    }
}
 
// Push data
func (t *ToRule) Push(data []byte, valid bool) {
    t.cv.L.Lock()
    result := runResult{data, valid}
    t.cache.PushBack(result)
    if t.cache.Len() > t.maxSize {
        for i := 0; i < t.cache.Len(); {
            d := t.cache.Front().Value.(runResult)
            if d.valid == false {
                t.cache.Remove(t.cache.Front())
                i = i + 2
            } else {
                i = i + 1
            }
        }
    }
    if t.cache.Len() > t.maxSize {
        for i := 0; i < t.cache.Len(); {
            t.cache.Remove(t.cache.Front())
            i = i + 2
        }
    }
    // logo.Infof("push to cache count : %d\n", t.cache.Len())
    t.cond = true
    t.cv.Signal()
    t.cv.L.Unlock()
}
 
// Run forever
func (t *ToRule) Run(ctx context.Context, ipcAddr string) {
    var i deliver.Deliver
    var err error
 
    for {
        i, err = deliver.NewClientWithError(deliver.PushPull, ipcAddr)
        if err != nil {
            time.Sleep(time.Second)
            logo.Errorln("wait create to rule ipc", err)
            continue
        }
        break
    }
 
    for {
        select {
        case <-ctx.Done():
            return
        default:
 
            var d []byte
            t.cv.L.Lock()
 
            for !t.cond {
                t.cv.Wait()
            }
 
            for j := 0; j < 8; j++ {
                if t.cache.Len() <= 0 {
                    break
                }
 
                d = t.cache.Front().Value.(runResult).data
                if i != nil && d != nil {
 
                    err := i.Send(d)
                    if err != nil {
                        logo.Errorln("!!!!!!!!!!!!!!!!!!!!!!!!!!!", err)
                    } else {
                        logo.Infoln("~~~~~~SEND TO RULE CORRECT")
                        // msg := protomsg.SdkMessage{}
                        // if err := proto.Unmarshal(d, &msg); err != nil {
                        //     logo.Errorln(err, " msg 处理异常")
                        //     continue
                        // }
                        // for _, v := range msg.Tasklab.Sdkinfos {
                        //     logo.Infof("%d SDK DATA SEND TO RULE PROCESS CAMERA ID %s TASKID: %s, SKD %s, LEN: %d\n", len(msg.Tasklab.Sdkinfos), msg.Cid, msg.Tasklab.Taskid, v.Sdktype, len(v.Sdkdata))
                        // }
                    }
                }
                t.cache.Remove(t.cache.Front())
            }
 
            t.cond = false
            t.cv.L.Unlock()
 
        }
    }
}