From d85f3edab0d8c495cecd7a81f31a9ead1eb001ac Mon Sep 17 00:00:00 2001 From: zhangmeng <775834166@qq.com> Date: 星期三, 15 一月 2020 09:23:17 +0800 Subject: [PATCH] copy from bgr-2-analysis --- common/recv.go | 46 ++++++++++++++++++++++++++++++++-------------- 1 files changed, 32 insertions(+), 14 deletions(-) diff --git a/common/recv.go b/common/recv.go index fb31433..21d9b5e 100644 --- a/common/recv.go +++ b/common/recv.go @@ -5,26 +5,50 @@ "time" + "basic.com/pubsub/protomsg.git" "basic.com/valib/deliver.git" + "github.com/gogo/protobuf/proto" ) // Reciever recv from ipc type Reciever struct { ctx context.Context ipcURL string - out chan<- []byte + chMsg chan<- MsgRS shm bool fnLogger func(...interface{}) } // NewReciever new recv -func NewReciever(url string, out chan<- []byte, shm bool, fn func(...interface{})) *Reciever { +func NewReciever(url string, chMsg chan<- MsgRS, shm bool, fn func(...interface{})) *Reciever { return &Reciever{ ipcURL: url, - out: out, + chMsg: chMsg, shm: shm, fnLogger: fn, + } +} + +func (r *Reciever) unserilizeProto(ctx context.Context, data <-chan []byte) { + for { + select { + case <-ctx.Done(): + return + default: + d := <-data + if len(d) < 100 { + continue + } + // logo.Infoln(len(d), "reciver鏁版嵁") + msg := protomsg.SdkMessage{} + if err := proto.Unmarshal(d, &msg); err != nil { + r.fnLogger(err, " msg 澶勭悊寮傚父") + continue + } + outMsg := MsgRS{Msg: msg} + r.chMsg <- outMsg + } } } @@ -40,8 +64,9 @@ func (r *Reciever) run(ctx context.Context, i deliver.Deliver) { - // t := time.Now() - // sc := 0 + dataChan := make(chan []byte, 3) + + go r.unserilizeProto(ctx, dataChan) count := 0 @@ -75,7 +100,7 @@ count = 0 r.fnLogger("~~~shm recv image:", len(d)) } - r.out <- d + dataChan <- d } } } else { @@ -87,16 +112,9 @@ count = 0 r.fnLogger("~~~mangos recv image:", len(msg)) } - r.out <- msg + dataChan <- msg } } - - // sc++ - // if sc == 25 { - // logo.Infoln("SDK RECV 25 FRAME USE TIME: ", time.Since(t)) - // sc = 0 - // t = time.Now() - // } } } -- Gitblit v1.8.0