From 5459ba1d3f7f944aa97923ed9c09a5dbc7663928 Mon Sep 17 00:00:00 2001 From: zhangmeng <775834166@qq.com> Date: 星期二, 14 一月 2020 18:01:24 +0800 Subject: [PATCH] update --- run.go | 9 go.sum | 2 common/recv.go | 116 ++++++++ common/torule.go | 128 +++++++++ common/helper.go | 174 ++++++++++++ go.mod | 3 common/send.go | 141 ++++++++++ common/flow.go | 124 ++++++++ common/lockList.go | 89 ++++++ 9 files changed, 778 insertions(+), 8 deletions(-) diff --git a/common/flow.go b/common/flow.go new file mode 100644 index 0000000..4dfb11c --- /dev/null +++ b/common/flow.go @@ -0,0 +1,124 @@ +package common + +import ( + "context" + "time" + + "basic.com/pubsub/protomsg.git" + + "basic.com/libgowrapper/sdkstruct.git" +) + +///////////////////////////////////////////////////////////////// + +// FlowCreate create flow +func FlowCreate(ctx context.Context, id string, shm bool, ipc2Rule string, ruleCacheSize int, fn func(...interface{})) (<-chan []byte, chan<- sdkstruct.MsgSDK) { + + const ( + postPull = `_1` + postPush = `_2` + ) + ipcRcv := GetIpcAddress(shm, id+postPull) + ipcSnd := GetIpcAddress(shm, id+postPush) + chRcv := make(chan []byte, 3) + chSnd := make(chan sdkstruct.MsgSDK, 3) + + rcver := NewReciever(ipcRcv, chRcv, shm, fn) + snder := NewSender(ipcSnd, chSnd, shm, fn) + torule := NewToRule(ipc2Rule, ruleCacheSize, fn) + + snder.ApplyCallbackFunc(torule.Push) + + go rcver.Run(ctx) + go snder.Run(ctx) + go torule.Run(ctx) + + return chRcv, chSnd +} + +// WorkFlowSimple work +func WorkFlowSimple(ctx context.Context, out chan<- sdkstruct.MsgSDK, typ string, + fnConsume func() []interface{}, fnRun func([]protomsg.SdkMessage, chan<- sdkstruct.MsgSDK, string), + fn func(...interface{})) { + + tm := time.Now() + sc := 0 + + for { + select { + case <-ctx.Done(): + return + default: + + elems := fnConsume() + if elems == nil || len(elems) == 0 { + time.Sleep(10 * time.Millisecond) + continue + } + + var msgs []protomsg.SdkMessage + for _, v := range elems { + msgs = append(msgs, v.(protomsg.SdkMessage)) + } + + fnRun(msgs, out, typ) + + sc++ + if sc == 25 { + fn(typ, " RUN 25 FRAME USE TIME: ", time.Since(tm)) + sc = 0 + tm = time.Now() + } + if time.Since(tm) > time.Second { + fn(typ, " RUN ", sc, " FRAME USE TIME: ", time.Since(tm)) + sc = 0 + tm = time.Now() + } + } + } +} + +// FlowSimple wrap +func FlowSimple(ctx context.Context, in <-chan []byte, out chan<- sdkstruct.MsgSDK, typ string, + fnProduce func(interface{}), fnConsume func() []interface{}, + fnRun func(protomsg.SdkMessage, chan<- sdkstruct.MsgSDK, string), + fnClose func(), fn func(...interface{})) { + + cb := func(msgs []protomsg.SdkMessage, ch chan<- sdkstruct.MsgSDK, typ string) { + fnRun(msgs[0], ch, typ) + } + + FlowBatch(ctx, in, out, typ, fnProduce, fnConsume, cb, fnClose, fn) + +} + +// FlowBatch batch +func FlowBatch(ctx context.Context, in <-chan []byte, out chan<- sdkstruct.MsgSDK, typ string, + fnProduce func(interface{}), fnConsume func() []interface{}, + fnRun func([]protomsg.SdkMessage, chan<- sdkstruct.MsgSDK, string), + fnClose func(), fn func(...interface{})) { + + chMsg := make(chan protomsg.SdkMessage, 3) + go UnserilizeProto(ctx, in, chMsg, fn) + + go WorkFlowSimple(ctx, out, typ, fnConsume, fnRun, fn) + + for { + select { + case <-ctx.Done(): + fnClose() + return + case rMsg := <-chMsg: + if !ValidRemoteMessage(rMsg, typ, fn) { + fn(typ, " validremotemessage invalid") + EjectResult(nil, rMsg, out) + continue + } + fnProduce(rMsg) + + default: + time.Sleep(10 * time.Millisecond) + } + } + +} diff --git a/common/helper.go b/common/helper.go new file mode 100644 index 0000000..0e016c3 --- /dev/null +++ b/common/helper.go @@ -0,0 +1,174 @@ +package common + +import ( + "context" + "encoding/json" + "fmt" + "io/ioutil" + "strconv" + "time" + + "basic.com/libgowrapper/sdkstruct.git" + "basic.com/pubsub/protomsg.git" + "basic.com/valib/deliver.git" + "github.com/gogo/protobuf/proto" +) + +const mode = deliver.PushPull + +// GetIpcAddress get ipc +func GetIpcAddress(shm bool, id string) string { + if shm { + return id + } + return `ipc:///tmp/` + id + `.ipc` +} + +// SubConfig sub +type SubConfig struct { + SoFile string `json:"so_file_path"` + Env string `json:"runtime"` + Param map[string]string `json:"param"` +} + +// SdkConfig sdk +type SdkConfig struct { + SoFile string `json:"so_file_path"` + Env string `json:"runtime"` + Param map[string]string `json:"param"` + Sub *SubConfig `json:"sub"` +} + +// ReadConfig conf +func ReadConfig(file string) (SdkConfig, error) { + data, err := ioutil.ReadFile(file) + if err != nil { + return SdkConfig{}, fmt.Errorf("READ SDK CONFIG FILE %s ERROR", file) + } + + //璇诲彇鐨勬暟鎹负json鏍煎紡锛岄渶瑕佽繘琛岃В鐮� + var v SdkConfig + err = json.Unmarshal(data, &v) + + return v, err +} + +// Atoi atoi +func Atoi(s string) int { + i, _ := strconv.Atoi(s) + return i +} + +// UnserilizeProto un +func UnserilizeProto(ctx context.Context, data <-chan []byte, out chan<- protomsg.SdkMessage, fn func(...interface{})) { + for { + select { + case <-ctx.Done(): + return + case d := <-data: + if len(d) < 100 { + continue + } + msg := protomsg.SdkMessage{} + if err := proto.Unmarshal(d, &msg); err != nil { + fn(err, " msg 澶勭悊寮傚父") + continue + } + + out <- msg + + default: + time.Sleep(10 * time.Millisecond) + } + } +} + +// Msg2MsgSDK msg->msgsdk +func Msg2MsgSDK(msg protomsg.SdkMessage) *sdkstruct.MsgSDK { + + d, err := proto.Marshal(&msg) + if err != nil { + return nil + } + + index, count := int(msg.Tasklab.Index), len(msg.Tasklab.Sdkinfos) + if index >= count { + return &sdkstruct.MsgSDK{ + MsgData: d, + SdkCount: count, + SdkIndex: index, + SdkDataLen: 0, + } + } + + return &sdkstruct.MsgSDK{ + MsgData: d, + SdkCount: count, + SdkIndex: index, + SdkDataLen: len(d), + } +} + +// EjectResult eject +func EjectResult(res []byte, msg protomsg.SdkMessage, out chan<- sdkstruct.MsgSDK) { + + if res == nil { + s := Msg2MsgSDK(msg) + if s == nil { + return + } + out <- *s + return + } + + msg.Tasklab.Sdkinfos[int(msg.Tasklab.Index)].Sdkdata = res + + s := Msg2MsgSDK(msg) + if s == nil { + return + } + out <- *s +} + +///////////////////////////////////////////////////////////// + +// ValidRemoteMessage valid or not +func ValidRemoteMessage(msg protomsg.SdkMessage, fnName string, fn func(...interface{})) bool { + if msg.Tasklab == nil { + fn(fnName, " recieve msg nil") + return false + } + + sdkLen := len(msg.Tasklab.Sdkinfos) + if sdkLen == 0 { + fn(fnName, " has no sdk info") + return false + } + + curIndex := int(msg.Tasklab.Index) + if curIndex < 0 || curIndex >= sdkLen { + fn(fnName, " tasklab index ", curIndex, " error") + return false + } + if msg.Tasklab.Sdkinfos[curIndex].Sdktype != fnName { + fn(fnName, " is different from ", msg.Tasklab.Sdkinfos[curIndex].Sdktype) + return false + } + return true +} + +// UnpackImage unpack +func UnpackImage(msg protomsg.SdkMessage, fnName string, fn func(...interface{})) *protomsg.Image { + // 鍙嶅簭鍒楀寲鏁版嵁寰楀埌sdk鍏ュ弬 + i := &protomsg.Image{} + err := proto.Unmarshal(msg.Data, i) + if err != nil { + fn(fnName, " protobuf decode CameraImage error: ", err.Error()) + return nil + } + if i.Data == nil { + fn(fnName, " protomsg.Image data null") + return nil + } + return i +} diff --git a/common/lockList.go b/common/lockList.go new file mode 100644 index 0000000..f630a6a --- /dev/null +++ b/common/lockList.go @@ -0,0 +1,89 @@ +package common + +import ( + "container/list" + "sync" +) + +// LockList list +type LockList struct { + cache *list.List + cv *sync.Cond + cond bool + size int +} + +// NewLockList new +func NewLockList(size int) *LockList { + return &LockList{ + cache: list.New(), + cv: sync.NewCond(&sync.Mutex{}), + cond: false, + size: size, + } +} + +// Push push +func (l *LockList) Push(v interface{}) { + l.cv.L.Lock() + l.cache.PushBack(v) + + for l.cache.Len() > l.size { + l.cache.Remove(l.cache.Front()) + } + + l.cond = true + l.cv.Signal() + l.cv.L.Unlock() +} + +// Pop pop +func (l *LockList) Pop() []interface{} { + + var batch []interface{} + + l.cv.L.Lock() + + for !l.cond { + l.cv.Wait() + } + + elem := l.cache.Front() + if elem != nil { + batch = append(batch, elem.Value) + l.cache.Remove(l.cache.Front()) + } + + l.cond = false + l.cv.L.Unlock() + + return batch +} + +// Drain drain all element +func (l *LockList) Drain() []interface{} { + + var batch []interface{} + + l.cv.L.Lock() + + for !l.cond { + l.cv.Wait() + } + + for { + + elem := l.cache.Front() + if elem == nil { + break + } + + batch = append(batch, elem.Value) + l.cache.Remove(l.cache.Front()) + } + + l.cond = false + l.cv.L.Unlock() + + return batch +} diff --git a/common/recv.go b/common/recv.go new file mode 100644 index 0000000..fb31433 --- /dev/null +++ b/common/recv.go @@ -0,0 +1,116 @@ +package common + +import ( + "context" + + "time" + + "basic.com/valib/deliver.git" +) + +// Reciever recv from ipc +type Reciever struct { + ctx context.Context + ipcURL string + out chan<- []byte + + shm bool + fnLogger func(...interface{}) +} + +// NewReciever new recv +func NewReciever(url string, out chan<- []byte, shm bool, fn func(...interface{})) *Reciever { + return &Reciever{ + ipcURL: url, + out: out, + shm: shm, + fnLogger: fn, + } +} + +// Run run a IPC client +func (r *Reciever) Run(ctx context.Context) { + + if r.shm { + r.runShm(ctx) + } else { + r.run(ctx, deliver.NewClient(mode, r.ipcURL)) + } +} + +func (r *Reciever) run(ctx context.Context, i deliver.Deliver) { + + // t := time.Now() + // sc := 0 + + count := 0 + + for { + select { + case <-ctx.Done(): + i.Close() + return + default: + + if r.shm { + if d, err := i.Recv(); err != nil { + i.Close() + r.fnLogger("ANALYSIS RECV ERROR: ", err) + + c, err := deliver.NewClientWithError(deliver.Shm, r.ipcURL) + for { + if err == nil { + break + } + time.Sleep(time.Second) + c, err = deliver.NewClientWithError(deliver.Shm, r.ipcURL) + r.fnLogger("ANALYSIS CREATE FAILED : ", err) + } + i = c + r.fnLogger("ANALYSIS CREATE SHM") + } else { + if d != nil { + count++ + if count > 10 { + count = 0 + r.fnLogger("~~~shm recv image:", len(d)) + } + r.out <- d + } + } + } else { + if msg, err := i.Recv(); err != nil { + // logo.Errorln("recv error : ", err, " url: ", r.ipcURL) + } else { + count++ + if count > 10 { + count = 0 + r.fnLogger("~~~mangos recv image:", len(msg)) + } + r.out <- msg + } + } + + // sc++ + // if sc == 25 { + // logo.Infoln("SDK RECV 25 FRAME USE TIME: ", time.Since(t)) + // sc = 0 + // t = time.Now() + // } + + } + } +} + +func (r *Reciever) runShm(ctx context.Context) { + c, err := deliver.NewClientWithError(deliver.Shm, r.ipcURL) + for { + if err == nil { + break + } + time.Sleep(1 * time.Second) + c, err = deliver.NewClientWithError(deliver.Shm, r.ipcURL) + r.fnLogger("CLIENT CREATE FAILED : ", err) + } + r.run(ctx, c) +} diff --git a/common/send.go b/common/send.go new file mode 100644 index 0000000..a0e7cc4 --- /dev/null +++ b/common/send.go @@ -0,0 +1,141 @@ +package common + +import ( + "context" + "time" + + "basic.com/libgowrapper/sdkstruct.git" + "basic.com/valib/deliver.git" +) + +// Sender decoder ingo +type Sender struct { + ipcURL string + chMsg <-chan sdkstruct.MsgSDK + shm bool + fn func([]byte, bool) + + fnLogger func(...interface{}) +} + +// ApplyCallbackFunc cb +func (s *Sender) ApplyCallbackFunc(f func([]byte, bool)) { + + if s.fn == nil { + s.fn = f + } +} + +// NewSender Sender +func NewSender(ipcURL string, chMsg <-chan sdkstruct.MsgSDK, shm bool, fn func(...interface{})) *Sender { + // logo.Infof("create ipc %s for decode : %s\n", ipcURL, ipcURL) + return &Sender{ + ipcURL: ipcURL, + chMsg: chMsg, + shm: shm, + fn: nil, + fnLogger: fn, + } +} + +// Run run a IPC producer +func (s *Sender) Run(ctx context.Context) { + + if s.shm { + s.runShm(ctx) + } else { + i := deliver.NewClient(mode, s.ipcURL) + if i == nil { + s.fnLogger("sender 2 pubsub nng create error") + return + } + s.run(ctx, i) + } +} + +func (s *Sender) serializeProto(ctx context.Context, data chan<- []byte) { + + for { + select { + case <-ctx.Done(): + s.fnLogger("stop Sender") + return + case i := <-s.chMsg: + + data <- i.MsgData + + if int(i.SdkIndex+1) == i.SdkCount { + if s.fn != nil { + + sFlag := true + if i.SdkDataLen < 2 { + sFlag = false + } + s.fn(i.MsgData, sFlag) + + } + } + default: + time.Sleep(10 * time.Millisecond) + } + } +} + +func (s *Sender) run(ctx context.Context, i deliver.Deliver) { + + // go ruleserver.TimeTicker() + + dataChan := make(chan []byte, 3) + go s.serializeProto(ctx, dataChan) + + for { + select { + case <-ctx.Done(): + i.Close() + return + case d := <-dataChan: + + if s.shm { + if err := i.Send(d); err != nil { + i.Close() + s.fnLogger("ANALYSIS SENDER ERROR: ", err) + + c, err := deliver.NewClientWithError(deliver.Shm, s.ipcURL) + for { + if err == nil { + break + } + time.Sleep(time.Second) + c, err = deliver.NewClientWithError(deliver.Shm, s.ipcURL) + s.fnLogger("CLIENT CREATE FAILED : ", err) + } + i = c + } else { + + } + } else { + err := i.Send(d) + if err != nil { + // logo.Errorln("error sender 2 pubsub: ", err) + } else { + s.fnLogger("mangos send to pubsub len: ", len(d)) + } + } + default: + time.Sleep(10 * time.Millisecond) + } + } +} + +func (s *Sender) runShm(ctx context.Context) { + c, err := deliver.NewClientWithError(deliver.Shm, s.ipcURL) + for { + if err == nil { + break + } + time.Sleep(1 * time.Second) + c, err = deliver.NewClientWithError(deliver.Shm, s.ipcURL) + s.fnLogger("CLIENT CREATE FAILED : ", err) + } + s.run(ctx, c) +} diff --git a/common/torule.go b/common/torule.go new file mode 100644 index 0000000..c8be086 --- /dev/null +++ b/common/torule.go @@ -0,0 +1,128 @@ +package common + +import ( + "container/list" + "context" + "sync" + "time" + + "basic.com/valib/deliver.git" + // "basic.com/pubsub/protomsg.git" + // "github.com/gogo/protobuf/proto" +) + +type runResult struct { + data []byte + valid bool +} + +// ToRule ipc +type ToRule struct { + ipcURL string + maxSize int + cache *list.List + cv *sync.Cond + cond bool + fnLogger func(...interface{}) +} + +// NewToRule send to ruleprocess +func NewToRule(ipcURL string, maxSize int, fn func(...interface{})) *ToRule { + return &ToRule{ + ipcURL: ipcURL, + maxSize: maxSize, + cache: list.New(), + cv: sync.NewCond(&sync.Mutex{}), + cond: false, + fnLogger: fn, + } +} + +// Push data +func (t *ToRule) Push(data []byte, valid bool) { + + t.cv.L.Lock() + result := runResult{data, valid} + t.cache.PushBack(result) + if t.cache.Len() > t.maxSize { + for i := 0; i < t.cache.Len(); { + d := t.cache.Front().Value.(runResult) + if d.valid == false { + t.cache.Remove(t.cache.Front()) + i = i + 2 + } else { + i = i + 1 + } + } + } + if t.cache.Len() > t.maxSize { + for i := 0; i < t.cache.Len(); { + t.cache.Remove(t.cache.Front()) + i = i + 2 + } + } + // logo.Infof("push to cache count : %d\n", t.cache.Len()) + t.cond = true + t.cv.Signal() + t.cv.L.Unlock() +} + +// Run forever +func (t *ToRule) Run(ctx context.Context) { + + var i deliver.Deliver + var err error + + for { + i, err = deliver.NewClientWithError(deliver.PushPull, t.ipcURL) + if err != nil { + time.Sleep(time.Second) + t.fnLogger("wait create to rule ipc", err) + continue + } + break + } + + count := 0 + + for { + select { + case <-ctx.Done(): + return + default: + + var d []byte + t.cv.L.Lock() + + for !t.cond { + t.cv.Wait() + } + + for j := 0; j < 8; j++ { + if t.cache.Len() <= 0 { + break + } + + d = t.cache.Front().Value.(runResult).data + if i != nil && d != nil { + + err := i.Send(d) + if err != nil { + t.fnLogger("!!!!!!!!!!!!!!!!!!!!!!!!!!!", err) + } else { + count++ + if count > 5 { + count = 0 + t.fnLogger("~~~~~~SEND TO RULE CORRECT") + } + } + } + t.cache.Remove(t.cache.Front()) + } + + t.cond = false + t.cv.L.Unlock() + + } + } +} diff --git a/go.mod b/go.mod index 5a3e3ff..ee5ce65 100644 --- a/go.mod +++ b/go.mod @@ -3,8 +3,7 @@ go 1.12 require ( - basic.com/libgowrapper/sdkhelper.git v0.0.0-20200114085924-c24f06e4cc8b - basic.com/libgowrapper/sdkstruct.git v0.0.0-20191220011601-e0b3d1f0183c // indirect + basic.com/libgowrapper/sdkstruct.git v0.0.0-20191220011601-e0b3d1f0183c basic.com/pubsub/protomsg.git v0.0.0-20200108123410-063a805b2573 basic.com/valib/deliver.git v0.0.0-20190927081905-2d390df9ede3 basic.com/valib/godraw.git v0.0.0-20191122082247-26e9987cd183 diff --git a/go.sum b/go.sum index 5217141..99a54be 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,3 @@ -basic.com/libgowrapper/sdkhelper.git v0.0.0-20200114085924-c24f06e4cc8b h1:CWneZPtnrpSv1SDYtB/D+kna3njXNWLExUfQ4xM3Lxc= -basic.com/libgowrapper/sdkhelper.git v0.0.0-20200114085924-c24f06e4cc8b/go.mod h1:eBHanxa92Srb5c/OmupgcAZmLC3Et5HMp9JsebHAIC4= basic.com/libgowrapper/sdkstruct.git v0.0.0-20191220011601-e0b3d1f0183c h1:maWYZw9iSQUuN1jbDxgi9IvKrCD97tiTCv8PkLArZ/I= basic.com/libgowrapper/sdkstruct.git v0.0.0-20191220011601-e0b3d1f0183c/go.mod h1:bNdkzVVGY+oQEcaYN9VlyIK/03WB3NQNQApjiPJjIag= basic.com/pubsub/protomsg.git v0.0.0-20200108123410-063a805b2573 h1:0CkBMLT0OrdC6PIWnpkHxPBMPGayyjIN45NeEOeL+ZA= diff --git a/run.go b/run.go index 31d5a28..fd70be9 100644 --- a/run.go +++ b/run.go @@ -10,7 +10,8 @@ "time" "unsafe" - "basic.com/libgowrapper/sdkhelper.git" + "reid/common" + "basic.com/valib/gogpu.git" "basic.com/pubsub/protomsg.git" @@ -35,7 +36,7 @@ // Create Reid func Create(config string, typ, id string, gpu int, shm bool, ipc2Rule string, ruleMaxSize int, fn func(...interface{}), reserved map[string]interface{}) interface{} { - cfg, err := sdkhelper.ReadConfig(config) + cfg, err := common.ReadConfig(config) if err != nil { fn("Reid SDK Create Error When Read Config: ", err) return nil @@ -55,7 +56,7 @@ } } - gpuM := sdkhelper.Atoi(cfg.Param[sGPU]) + gpuM := common.Atoi(cfg.Param[sGPU]) rGPU := gpu @@ -105,7 +106,7 @@ if p >= 0 { file := s.ipc[p+len(string(suf)):] os.Remove(file) - s.fnLogger("remove:", file) + s.fnLogger("remove:", file) } s.fnLogger("can't listen on rep socket: ", err) time.Sleep(5 * time.Millisecond) -- Gitblit v1.8.0