package work
|
|
import (
|
"analysis/logo"
|
"container/list"
|
"context"
|
"sync"
|
"time"
|
|
"basic.com/valib/deliver.git"
|
// "basic.com/pubsub/protomsg.git"
|
// "github.com/gogo/protobuf/proto"
|
)
|
|
type runResult struct {
|
data []byte
|
valid bool
|
}
|
|
// ToRule ipc
|
type ToRule struct {
|
maxSize int
|
cache *list.List
|
cv *sync.Cond
|
cond bool
|
}
|
|
// NewToRule send to ruleprocess
|
func NewToRule(maxSize int) *ToRule {
|
return &ToRule{
|
maxSize: maxSize,
|
cache: list.New(),
|
cv: sync.NewCond(&sync.Mutex{}),
|
cond: false,
|
}
|
}
|
|
// Push data
|
func (t *ToRule) Push(data []byte, valid bool) {
|
t.cv.L.Lock()
|
result := runResult{data, valid}
|
t.cache.PushBack(result)
|
if t.cache.Len() > t.maxSize {
|
for i := 0; i < t.cache.Len(); {
|
d := t.cache.Front().Value.(runResult)
|
if d.valid == false {
|
t.cache.Remove(t.cache.Front())
|
i = i + 2
|
} else {
|
i = i + 1
|
}
|
}
|
}
|
if t.cache.Len() > t.maxSize {
|
for i := 0; i < t.cache.Len(); {
|
t.cache.Remove(t.cache.Front())
|
i = i + 2
|
}
|
}
|
// logo.Infof("push to cache count : %d\n", t.cache.Len())
|
t.cond = true
|
t.cv.Signal()
|
t.cv.L.Unlock()
|
}
|
|
// Run forever
|
func (t *ToRule) Run(ctx context.Context, ipcAddr string) {
|
var i deliver.Deliver
|
var err error
|
|
for {
|
i, err = deliver.NewClientWithError(deliver.PushPull, ipcAddr)
|
if err != nil {
|
time.Sleep(time.Second)
|
logo.Errorln("wait create to rule ipc", err)
|
continue
|
}
|
break
|
}
|
|
for {
|
select {
|
case <-ctx.Done():
|
return
|
default:
|
|
var d []byte
|
t.cv.L.Lock()
|
|
for !t.cond {
|
t.cv.Wait()
|
}
|
|
for j := 0; j < 8; j++ {
|
if t.cache.Len() <= 0 {
|
break
|
}
|
|
d = t.cache.Front().Value.(runResult).data
|
if i != nil && d != nil {
|
|
err := i.Send(d)
|
if err != nil {
|
logo.Errorln("!!!!!!!!!!!!!!!!!!!!!!!!!!!", err)
|
} else {
|
logo.Infoln("~~~~~~SEND TO RULE CORRECT")
|
// msg := protomsg.SdkMessage{}
|
// if err := proto.Unmarshal(d, &msg); err != nil {
|
// logo.Errorln(err, " msg 处理异常")
|
// continue
|
// }
|
// for _, v := range msg.Tasklab.Sdkinfos {
|
// logo.Infof("%d SDK DATA SEND TO RULE PROCESS CAMERA ID %s TASKID: %s, SKD %s, LEN: %d\n", len(msg.Tasklab.Sdkinfos), msg.Cid, msg.Tasklab.Taskid, v.Sdktype, len(v.Sdkdata))
|
// }
|
}
|
}
|
t.cache.Remove(t.cache.Front())
|
}
|
|
t.cond = false
|
t.cv.L.Unlock()
|
|
}
|
}
|
}
|