package common import ( "container/list" "context" "sync" "time" "basic.com/valib/deliver.git" // "basic.com/pubsub/protomsg.git" // "github.com/gogo/protobuf/proto" ) type runResult struct { data []byte valid bool } // ToRule ipc type ToRule struct { ipcURL string maxSize int cache *list.List cv *sync.Cond cond bool fnLogger func(...interface{}) } // NewToRule send to ruleprocess func NewToRule(ipcURL string, maxSize int, fn func(...interface{})) *ToRule { return &ToRule{ ipcURL: ipcURL, maxSize: maxSize, cache: list.New(), cv: sync.NewCond(&sync.Mutex{}), cond: false, fnLogger: fn, } } // Push data func (t *ToRule) Push(data []byte, valid bool) { t.cv.L.Lock() result := runResult{data, valid} t.cache.PushBack(result) if t.cache.Len() > t.maxSize { for i := 0; i < t.cache.Len(); { d := t.cache.Front().Value.(runResult) if d.valid == false { t.cache.Remove(t.cache.Front()) i = i + 2 } else { i = i + 1 } } } if t.cache.Len() > t.maxSize { for i := 0; i < t.cache.Len(); { t.cache.Remove(t.cache.Front()) i = i + 2 } } // logo.Infof("push to cache count : %d\n", t.cache.Len()) t.cond = true t.cv.Signal() t.cv.L.Unlock() } // Run forever func (t *ToRule) Run(ctx context.Context) { var i deliver.Deliver var err error for { i, err = deliver.NewClientWithError(deliver.PushPull, t.ipcURL) if err != nil { time.Sleep(time.Second) t.fnLogger("wait create to rule ipc", err) continue } break } count := 0 for { select { case <-ctx.Done(): return default: var d []byte t.cv.L.Lock() for !t.cond { t.cv.Wait() } for j := 0; j < 8; j++ { if t.cache.Len() <= 0 { break } d = t.cache.Front().Value.(runResult).data if i != nil && d != nil { err := i.Send(d) if err != nil { t.fnLogger("!!!!!!!!!!!!!!!!!!!!!!!!!!!", err) } else { count++ if count > 5 { count = 0 t.fnLogger("~~~~~~SEND TO RULE CORRECT") } } } t.cache.Remove(t.cache.Front()) } t.cond = false t.cv.L.Unlock() } } }