From 400044583627cc72a1a3071f1a389e18953cbba0 Mon Sep 17 00:00:00 2001 From: zhangmeng <775834166@qq.com> Date: 星期三, 15 一月 2020 09:42:30 +0800 Subject: [PATCH] update --- /dev/null | 128 ------------------------- common/helper.go | 131 -------------------------- 2 files changed, 0 insertions(+), 259 deletions(-) diff --git a/common/flow.go b/common/flow.go deleted file mode 100644 index 4dfb11c..0000000 --- a/common/flow.go +++ /dev/null @@ -1,124 +0,0 @@ -package common - -import ( - "context" - "time" - - "basic.com/pubsub/protomsg.git" - - "basic.com/libgowrapper/sdkstruct.git" -) - -///////////////////////////////////////////////////////////////// - -// FlowCreate create flow -func FlowCreate(ctx context.Context, id string, shm bool, ipc2Rule string, ruleCacheSize int, fn func(...interface{})) (<-chan []byte, chan<- sdkstruct.MsgSDK) { - - const ( - postPull = `_1` - postPush = `_2` - ) - ipcRcv := GetIpcAddress(shm, id+postPull) - ipcSnd := GetIpcAddress(shm, id+postPush) - chRcv := make(chan []byte, 3) - chSnd := make(chan sdkstruct.MsgSDK, 3) - - rcver := NewReciever(ipcRcv, chRcv, shm, fn) - snder := NewSender(ipcSnd, chSnd, shm, fn) - torule := NewToRule(ipc2Rule, ruleCacheSize, fn) - - snder.ApplyCallbackFunc(torule.Push) - - go rcver.Run(ctx) - go snder.Run(ctx) - go torule.Run(ctx) - - return chRcv, chSnd -} - -// WorkFlowSimple work -func WorkFlowSimple(ctx context.Context, out chan<- sdkstruct.MsgSDK, typ string, - fnConsume func() []interface{}, fnRun func([]protomsg.SdkMessage, chan<- sdkstruct.MsgSDK, string), - fn func(...interface{})) { - - tm := time.Now() - sc := 0 - - for { - select { - case <-ctx.Done(): - return - default: - - elems := fnConsume() - if elems == nil || len(elems) == 0 { - time.Sleep(10 * time.Millisecond) - continue - } - - var msgs []protomsg.SdkMessage - for _, v := range elems { - msgs = append(msgs, v.(protomsg.SdkMessage)) - } - - fnRun(msgs, out, typ) - - sc++ - if sc == 25 { - fn(typ, " RUN 25 FRAME USE TIME: ", time.Since(tm)) - sc = 0 - tm = time.Now() - } - if time.Since(tm) > time.Second { - fn(typ, " RUN ", sc, " FRAME USE TIME: ", time.Since(tm)) - sc = 0 - tm = time.Now() - } - } - } -} - -// FlowSimple wrap -func FlowSimple(ctx context.Context, in <-chan []byte, out chan<- sdkstruct.MsgSDK, typ string, - fnProduce func(interface{}), fnConsume func() []interface{}, - fnRun func(protomsg.SdkMessage, chan<- sdkstruct.MsgSDK, string), - fnClose func(), fn func(...interface{})) { - - cb := func(msgs []protomsg.SdkMessage, ch chan<- sdkstruct.MsgSDK, typ string) { - fnRun(msgs[0], ch, typ) - } - - FlowBatch(ctx, in, out, typ, fnProduce, fnConsume, cb, fnClose, fn) - -} - -// FlowBatch batch -func FlowBatch(ctx context.Context, in <-chan []byte, out chan<- sdkstruct.MsgSDK, typ string, - fnProduce func(interface{}), fnConsume func() []interface{}, - fnRun func([]protomsg.SdkMessage, chan<- sdkstruct.MsgSDK, string), - fnClose func(), fn func(...interface{})) { - - chMsg := make(chan protomsg.SdkMessage, 3) - go UnserilizeProto(ctx, in, chMsg, fn) - - go WorkFlowSimple(ctx, out, typ, fnConsume, fnRun, fn) - - for { - select { - case <-ctx.Done(): - fnClose() - return - case rMsg := <-chMsg: - if !ValidRemoteMessage(rMsg, typ, fn) { - fn(typ, " validremotemessage invalid") - EjectResult(nil, rMsg, out) - continue - } - fnProduce(rMsg) - - default: - time.Sleep(10 * time.Millisecond) - } - } - -} diff --git a/common/helper.go b/common/helper.go index 0e016c3..dc6f7b0 100644 --- a/common/helper.go +++ b/common/helper.go @@ -1,28 +1,11 @@ package common import ( - "context" "encoding/json" "fmt" "io/ioutil" "strconv" - "time" - - "basic.com/libgowrapper/sdkstruct.git" - "basic.com/pubsub/protomsg.git" - "basic.com/valib/deliver.git" - "github.com/gogo/protobuf/proto" ) - -const mode = deliver.PushPull - -// GetIpcAddress get ipc -func GetIpcAddress(shm bool, id string) string { - if shm { - return id - } - return `ipc:///tmp/` + id + `.ipc` -} // SubConfig sub type SubConfig struct { @@ -56,119 +39,5 @@ // Atoi atoi func Atoi(s string) int { i, _ := strconv.Atoi(s) - return i -} - -// UnserilizeProto un -func UnserilizeProto(ctx context.Context, data <-chan []byte, out chan<- protomsg.SdkMessage, fn func(...interface{})) { - for { - select { - case <-ctx.Done(): - return - case d := <-data: - if len(d) < 100 { - continue - } - msg := protomsg.SdkMessage{} - if err := proto.Unmarshal(d, &msg); err != nil { - fn(err, " msg 澶勭悊寮傚父") - continue - } - - out <- msg - - default: - time.Sleep(10 * time.Millisecond) - } - } -} - -// Msg2MsgSDK msg->msgsdk -func Msg2MsgSDK(msg protomsg.SdkMessage) *sdkstruct.MsgSDK { - - d, err := proto.Marshal(&msg) - if err != nil { - return nil - } - - index, count := int(msg.Tasklab.Index), len(msg.Tasklab.Sdkinfos) - if index >= count { - return &sdkstruct.MsgSDK{ - MsgData: d, - SdkCount: count, - SdkIndex: index, - SdkDataLen: 0, - } - } - - return &sdkstruct.MsgSDK{ - MsgData: d, - SdkCount: count, - SdkIndex: index, - SdkDataLen: len(d), - } -} - -// EjectResult eject -func EjectResult(res []byte, msg protomsg.SdkMessage, out chan<- sdkstruct.MsgSDK) { - - if res == nil { - s := Msg2MsgSDK(msg) - if s == nil { - return - } - out <- *s - return - } - - msg.Tasklab.Sdkinfos[int(msg.Tasklab.Index)].Sdkdata = res - - s := Msg2MsgSDK(msg) - if s == nil { - return - } - out <- *s -} - -///////////////////////////////////////////////////////////// - -// ValidRemoteMessage valid or not -func ValidRemoteMessage(msg protomsg.SdkMessage, fnName string, fn func(...interface{})) bool { - if msg.Tasklab == nil { - fn(fnName, " recieve msg nil") - return false - } - - sdkLen := len(msg.Tasklab.Sdkinfos) - if sdkLen == 0 { - fn(fnName, " has no sdk info") - return false - } - - curIndex := int(msg.Tasklab.Index) - if curIndex < 0 || curIndex >= sdkLen { - fn(fnName, " tasklab index ", curIndex, " error") - return false - } - if msg.Tasklab.Sdkinfos[curIndex].Sdktype != fnName { - fn(fnName, " is different from ", msg.Tasklab.Sdkinfos[curIndex].Sdktype) - return false - } - return true -} - -// UnpackImage unpack -func UnpackImage(msg protomsg.SdkMessage, fnName string, fn func(...interface{})) *protomsg.Image { - // 鍙嶅簭鍒楀寲鏁版嵁寰楀埌sdk鍏ュ弬 - i := &protomsg.Image{} - err := proto.Unmarshal(msg.Data, i) - if err != nil { - fn(fnName, " protobuf decode CameraImage error: ", err.Error()) - return nil - } - if i.Data == nil { - fn(fnName, " protomsg.Image data null") - return nil - } return i } diff --git a/common/lockList.go b/common/lockList.go deleted file mode 100644 index f630a6a..0000000 --- a/common/lockList.go +++ /dev/null @@ -1,89 +0,0 @@ -package common - -import ( - "container/list" - "sync" -) - -// LockList list -type LockList struct { - cache *list.List - cv *sync.Cond - cond bool - size int -} - -// NewLockList new -func NewLockList(size int) *LockList { - return &LockList{ - cache: list.New(), - cv: sync.NewCond(&sync.Mutex{}), - cond: false, - size: size, - } -} - -// Push push -func (l *LockList) Push(v interface{}) { - l.cv.L.Lock() - l.cache.PushBack(v) - - for l.cache.Len() > l.size { - l.cache.Remove(l.cache.Front()) - } - - l.cond = true - l.cv.Signal() - l.cv.L.Unlock() -} - -// Pop pop -func (l *LockList) Pop() []interface{} { - - var batch []interface{} - - l.cv.L.Lock() - - for !l.cond { - l.cv.Wait() - } - - elem := l.cache.Front() - if elem != nil { - batch = append(batch, elem.Value) - l.cache.Remove(l.cache.Front()) - } - - l.cond = false - l.cv.L.Unlock() - - return batch -} - -// Drain drain all element -func (l *LockList) Drain() []interface{} { - - var batch []interface{} - - l.cv.L.Lock() - - for !l.cond { - l.cv.Wait() - } - - for { - - elem := l.cache.Front() - if elem == nil { - break - } - - batch = append(batch, elem.Value) - l.cache.Remove(l.cache.Front()) - } - - l.cond = false - l.cv.L.Unlock() - - return batch -} diff --git a/common/recv.go b/common/recv.go deleted file mode 100644 index fb31433..0000000 --- a/common/recv.go +++ /dev/null @@ -1,116 +0,0 @@ -package common - -import ( - "context" - - "time" - - "basic.com/valib/deliver.git" -) - -// Reciever recv from ipc -type Reciever struct { - ctx context.Context - ipcURL string - out chan<- []byte - - shm bool - fnLogger func(...interface{}) -} - -// NewReciever new recv -func NewReciever(url string, out chan<- []byte, shm bool, fn func(...interface{})) *Reciever { - return &Reciever{ - ipcURL: url, - out: out, - shm: shm, - fnLogger: fn, - } -} - -// Run run a IPC client -func (r *Reciever) Run(ctx context.Context) { - - if r.shm { - r.runShm(ctx) - } else { - r.run(ctx, deliver.NewClient(mode, r.ipcURL)) - } -} - -func (r *Reciever) run(ctx context.Context, i deliver.Deliver) { - - // t := time.Now() - // sc := 0 - - count := 0 - - for { - select { - case <-ctx.Done(): - i.Close() - return - default: - - if r.shm { - if d, err := i.Recv(); err != nil { - i.Close() - r.fnLogger("ANALYSIS RECV ERROR: ", err) - - c, err := deliver.NewClientWithError(deliver.Shm, r.ipcURL) - for { - if err == nil { - break - } - time.Sleep(time.Second) - c, err = deliver.NewClientWithError(deliver.Shm, r.ipcURL) - r.fnLogger("ANALYSIS CREATE FAILED : ", err) - } - i = c - r.fnLogger("ANALYSIS CREATE SHM") - } else { - if d != nil { - count++ - if count > 10 { - count = 0 - r.fnLogger("~~~shm recv image:", len(d)) - } - r.out <- d - } - } - } else { - if msg, err := i.Recv(); err != nil { - // logo.Errorln("recv error : ", err, " url: ", r.ipcURL) - } else { - count++ - if count > 10 { - count = 0 - r.fnLogger("~~~mangos recv image:", len(msg)) - } - r.out <- msg - } - } - - // sc++ - // if sc == 25 { - // logo.Infoln("SDK RECV 25 FRAME USE TIME: ", time.Since(t)) - // sc = 0 - // t = time.Now() - // } - - } - } -} - -func (r *Reciever) runShm(ctx context.Context) { - c, err := deliver.NewClientWithError(deliver.Shm, r.ipcURL) - for { - if err == nil { - break - } - time.Sleep(1 * time.Second) - c, err = deliver.NewClientWithError(deliver.Shm, r.ipcURL) - r.fnLogger("CLIENT CREATE FAILED : ", err) - } - r.run(ctx, c) -} diff --git a/common/send.go b/common/send.go deleted file mode 100644 index a0e7cc4..0000000 --- a/common/send.go +++ /dev/null @@ -1,141 +0,0 @@ -package common - -import ( - "context" - "time" - - "basic.com/libgowrapper/sdkstruct.git" - "basic.com/valib/deliver.git" -) - -// Sender decoder ingo -type Sender struct { - ipcURL string - chMsg <-chan sdkstruct.MsgSDK - shm bool - fn func([]byte, bool) - - fnLogger func(...interface{}) -} - -// ApplyCallbackFunc cb -func (s *Sender) ApplyCallbackFunc(f func([]byte, bool)) { - - if s.fn == nil { - s.fn = f - } -} - -// NewSender Sender -func NewSender(ipcURL string, chMsg <-chan sdkstruct.MsgSDK, shm bool, fn func(...interface{})) *Sender { - // logo.Infof("create ipc %s for decode : %s\n", ipcURL, ipcURL) - return &Sender{ - ipcURL: ipcURL, - chMsg: chMsg, - shm: shm, - fn: nil, - fnLogger: fn, - } -} - -// Run run a IPC producer -func (s *Sender) Run(ctx context.Context) { - - if s.shm { - s.runShm(ctx) - } else { - i := deliver.NewClient(mode, s.ipcURL) - if i == nil { - s.fnLogger("sender 2 pubsub nng create error") - return - } - s.run(ctx, i) - } -} - -func (s *Sender) serializeProto(ctx context.Context, data chan<- []byte) { - - for { - select { - case <-ctx.Done(): - s.fnLogger("stop Sender") - return - case i := <-s.chMsg: - - data <- i.MsgData - - if int(i.SdkIndex+1) == i.SdkCount { - if s.fn != nil { - - sFlag := true - if i.SdkDataLen < 2 { - sFlag = false - } - s.fn(i.MsgData, sFlag) - - } - } - default: - time.Sleep(10 * time.Millisecond) - } - } -} - -func (s *Sender) run(ctx context.Context, i deliver.Deliver) { - - // go ruleserver.TimeTicker() - - dataChan := make(chan []byte, 3) - go s.serializeProto(ctx, dataChan) - - for { - select { - case <-ctx.Done(): - i.Close() - return - case d := <-dataChan: - - if s.shm { - if err := i.Send(d); err != nil { - i.Close() - s.fnLogger("ANALYSIS SENDER ERROR: ", err) - - c, err := deliver.NewClientWithError(deliver.Shm, s.ipcURL) - for { - if err == nil { - break - } - time.Sleep(time.Second) - c, err = deliver.NewClientWithError(deliver.Shm, s.ipcURL) - s.fnLogger("CLIENT CREATE FAILED : ", err) - } - i = c - } else { - - } - } else { - err := i.Send(d) - if err != nil { - // logo.Errorln("error sender 2 pubsub: ", err) - } else { - s.fnLogger("mangos send to pubsub len: ", len(d)) - } - } - default: - time.Sleep(10 * time.Millisecond) - } - } -} - -func (s *Sender) runShm(ctx context.Context) { - c, err := deliver.NewClientWithError(deliver.Shm, s.ipcURL) - for { - if err == nil { - break - } - time.Sleep(1 * time.Second) - c, err = deliver.NewClientWithError(deliver.Shm, s.ipcURL) - s.fnLogger("CLIENT CREATE FAILED : ", err) - } - s.run(ctx, c) -} diff --git a/common/torule.go b/common/torule.go deleted file mode 100644 index c8be086..0000000 --- a/common/torule.go +++ /dev/null @@ -1,128 +0,0 @@ -package common - -import ( - "container/list" - "context" - "sync" - "time" - - "basic.com/valib/deliver.git" - // "basic.com/pubsub/protomsg.git" - // "github.com/gogo/protobuf/proto" -) - -type runResult struct { - data []byte - valid bool -} - -// ToRule ipc -type ToRule struct { - ipcURL string - maxSize int - cache *list.List - cv *sync.Cond - cond bool - fnLogger func(...interface{}) -} - -// NewToRule send to ruleprocess -func NewToRule(ipcURL string, maxSize int, fn func(...interface{})) *ToRule { - return &ToRule{ - ipcURL: ipcURL, - maxSize: maxSize, - cache: list.New(), - cv: sync.NewCond(&sync.Mutex{}), - cond: false, - fnLogger: fn, - } -} - -// Push data -func (t *ToRule) Push(data []byte, valid bool) { - - t.cv.L.Lock() - result := runResult{data, valid} - t.cache.PushBack(result) - if t.cache.Len() > t.maxSize { - for i := 0; i < t.cache.Len(); { - d := t.cache.Front().Value.(runResult) - if d.valid == false { - t.cache.Remove(t.cache.Front()) - i = i + 2 - } else { - i = i + 1 - } - } - } - if t.cache.Len() > t.maxSize { - for i := 0; i < t.cache.Len(); { - t.cache.Remove(t.cache.Front()) - i = i + 2 - } - } - // logo.Infof("push to cache count : %d\n", t.cache.Len()) - t.cond = true - t.cv.Signal() - t.cv.L.Unlock() -} - -// Run forever -func (t *ToRule) Run(ctx context.Context) { - - var i deliver.Deliver - var err error - - for { - i, err = deliver.NewClientWithError(deliver.PushPull, t.ipcURL) - if err != nil { - time.Sleep(time.Second) - t.fnLogger("wait create to rule ipc", err) - continue - } - break - } - - count := 0 - - for { - select { - case <-ctx.Done(): - return - default: - - var d []byte - t.cv.L.Lock() - - for !t.cond { - t.cv.Wait() - } - - for j := 0; j < 8; j++ { - if t.cache.Len() <= 0 { - break - } - - d = t.cache.Front().Value.(runResult).data - if i != nil && d != nil { - - err := i.Send(d) - if err != nil { - t.fnLogger("!!!!!!!!!!!!!!!!!!!!!!!!!!!", err) - } else { - count++ - if count > 5 { - count = 0 - t.fnLogger("~~~~~~SEND TO RULE CORRECT") - } - } - } - t.cache.Remove(t.cache.Front()) - } - - t.cond = false - t.cv.L.Unlock() - - } - } -} -- Gitblit v1.8.0