package work import ( "analysis/logo" "container/list" "context" "sync" "time" "basic.com/valib/deliver.git" // "basic.com/pubsub/protomsg.git" // "github.com/gogo/protobuf/proto" ) type runResult struct { data []byte valid bool } // ToRule ipc type ToRule struct { maxSize int cache *list.List cv *sync.Cond cond bool } // NewToRule send to ruleprocess func NewToRule(maxSize int) *ToRule { return &ToRule{ maxSize: maxSize, cache: list.New(), cv: sync.NewCond(&sync.Mutex{}), cond: false, } } // Push data func (t *ToRule) Push(data []byte, valid bool) { t.cv.L.Lock() result := runResult{data, valid} t.cache.PushBack(result) if t.cache.Len() > t.maxSize { for i := 0; i < t.cache.Len(); { d := t.cache.Front().Value.(runResult) if d.valid == false { t.cache.Remove(t.cache.Front()) i = i + 2 } else { i = i + 1 } } } if t.cache.Len() > t.maxSize { for i := 0; i < t.cache.Len(); { t.cache.Remove(t.cache.Front()) i = i + 2 } } // logo.Infof("push to cache count : %d\n", t.cache.Len()) t.cond = true t.cv.Signal() t.cv.L.Unlock() } // Run forever func (t *ToRule) Run(ctx context.Context, ipcAddr string) { var i deliver.Deliver var err error for { i, err = deliver.NewClientWithError(deliver.PushPull, ipcAddr) if err != nil { time.Sleep(time.Second) logo.Errorln("wait create to rule ipc", err) continue } break } for { select { case <-ctx.Done(): return default: var d []byte t.cv.L.Lock() for !t.cond { t.cv.Wait() } for j := 0; j < 8; j++ { if t.cache.Len() <= 0 { break } d = t.cache.Front().Value.(runResult).data if i != nil && d != nil { err := i.Send(d) if err != nil { logo.Errorln("!!!!!!!!!!!!!!!!!!!!!!!!!!!", err) } else { logo.Infoln("~~~~~~SEND TO RULE CORRECT") // msg := protomsg.SdkMessage{} // if err := proto.Unmarshal(d, &msg); err != nil { // logo.Errorln(err, " msg 处理异常") // continue // } // for _, v := range msg.Tasklab.Sdkinfos { // logo.Infof("%d SDK DATA SEND TO RULE PROCESS CAMERA ID %s TASKID: %s, SKD %s, LEN: %d\n", len(msg.Tasklab.Sdkinfos), msg.Cid, msg.Tasklab.Taskid, v.Sdktype, len(v.Sdkdata)) // } } } t.cache.Remove(t.cache.Front()) } t.cond = false t.cv.L.Unlock() } } }