package common
|
|
import (
|
"container/list"
|
"context"
|
"sync"
|
"time"
|
|
"basic.com/valib/deliver.git"
|
// "basic.com/pubsub/protomsg.git"
|
// "github.com/gogo/protobuf/proto"
|
)
|
|
type runResult struct {
|
data []byte
|
valid bool
|
}
|
|
// ToRule ipc
|
type ToRule struct {
|
ipcURL string
|
maxSize int
|
cache *list.List
|
cv *sync.Cond
|
cond bool
|
fnLogger func(...interface{})
|
}
|
|
// NewToRule send to ruleprocess
|
func NewToRule(ipcURL string, maxSize int, fn func(...interface{})) *ToRule {
|
return &ToRule{
|
ipcURL: ipcURL,
|
maxSize: maxSize,
|
cache: list.New(),
|
cv: sync.NewCond(&sync.Mutex{}),
|
cond: false,
|
fnLogger: fn,
|
}
|
}
|
|
// Push data
|
func (t *ToRule) Push(data []byte, valid bool) {
|
|
t.cv.L.Lock()
|
result := runResult{data, valid}
|
t.cache.PushBack(result)
|
if t.cache.Len() > t.maxSize {
|
for i := 0; i < t.cache.Len(); {
|
d := t.cache.Front().Value.(runResult)
|
if d.valid == false {
|
t.cache.Remove(t.cache.Front())
|
i = i + 2
|
} else {
|
i = i + 1
|
}
|
}
|
}
|
if t.cache.Len() > t.maxSize {
|
for i := 0; i < t.cache.Len(); {
|
t.cache.Remove(t.cache.Front())
|
i = i + 2
|
}
|
}
|
// logo.Infof("push to cache count : %d\n", t.cache.Len())
|
t.cond = true
|
t.cv.Signal()
|
t.cv.L.Unlock()
|
}
|
|
// Run forever
|
func (t *ToRule) Run(ctx context.Context) {
|
|
var i deliver.Deliver
|
var err error
|
|
for {
|
i, err = deliver.NewClientWithError(deliver.PushPull, t.ipcURL)
|
if err != nil {
|
time.Sleep(time.Second)
|
t.fnLogger("wait create to rule ipc", err)
|
continue
|
}
|
break
|
}
|
|
count := 0
|
|
for {
|
select {
|
case <-ctx.Done():
|
return
|
default:
|
|
var d []byte
|
t.cv.L.Lock()
|
|
for !t.cond {
|
t.cv.Wait()
|
}
|
|
for j := 0; j < 8; j++ {
|
if t.cache.Len() <= 0 {
|
break
|
}
|
|
d = t.cache.Front().Value.(runResult).data
|
if i != nil && d != nil {
|
|
err := i.Send(d)
|
if err != nil {
|
t.fnLogger("!!!!!!!!!!!!!!!!!!!!!!!!!!!", err)
|
} else {
|
count++
|
if count > 5 {
|
count = 0
|
t.fnLogger("~~~~~~SEND TO RULE CORRECT")
|
}
|
}
|
}
|
t.cache.Remove(t.cache.Front())
|
}
|
|
t.cond = false
|
t.cv.L.Unlock()
|
|
}
|
}
|
}
|