From d85f3edab0d8c495cecd7a81f31a9ead1eb001ac Mon Sep 17 00:00:00 2001
From: zhangmeng <775834166@qq.com>
Date: 星期三, 15 一月 2020 09:23:17 +0800
Subject: [PATCH] copy from bgr-2-analysis

---
 common/recv.go |   46 ++++++++++++++++++++++++++++++++--------------
 1 files changed, 32 insertions(+), 14 deletions(-)

diff --git a/common/recv.go b/common/recv.go
index fb31433..21d9b5e 100644
--- a/common/recv.go
+++ b/common/recv.go
@@ -5,26 +5,50 @@
 
 	"time"
 
+	"basic.com/pubsub/protomsg.git"
 	"basic.com/valib/deliver.git"
+	"github.com/gogo/protobuf/proto"
 )
 
 // Reciever recv from ipc
 type Reciever struct {
 	ctx    context.Context
 	ipcURL string
-	out    chan<- []byte
+	chMsg  chan<- MsgRS
 
 	shm      bool
 	fnLogger func(...interface{})
 }
 
 // NewReciever new recv
-func NewReciever(url string, out chan<- []byte, shm bool, fn func(...interface{})) *Reciever {
+func NewReciever(url string, chMsg chan<- MsgRS, shm bool, fn func(...interface{})) *Reciever {
 	return &Reciever{
 		ipcURL:   url,
-		out:      out,
+		chMsg:    chMsg,
 		shm:      shm,
 		fnLogger: fn,
+	}
+}
+
+func (r *Reciever) unserilizeProto(ctx context.Context, data <-chan []byte) {
+	for {
+		select {
+		case <-ctx.Done():
+			return
+		default:
+			d := <-data
+			if len(d) < 100 {
+				continue
+			}
+			// logo.Infoln(len(d), "reciver鏁版嵁")
+			msg := protomsg.SdkMessage{}
+			if err := proto.Unmarshal(d, &msg); err != nil {
+				r.fnLogger(err, " msg 澶勭悊寮傚父")
+				continue
+			}
+			outMsg := MsgRS{Msg: msg}
+			r.chMsg <- outMsg
+		}
 	}
 }
 
@@ -40,8 +64,9 @@
 
 func (r *Reciever) run(ctx context.Context, i deliver.Deliver) {
 
-	// t := time.Now()
-	// sc := 0
+	dataChan := make(chan []byte, 3)
+
+	go r.unserilizeProto(ctx, dataChan)
 
 	count := 0
 
@@ -75,7 +100,7 @@
 							count = 0
 							r.fnLogger("~~~shm recv image:", len(d))
 						}
-						r.out <- d
+						dataChan <- d
 					}
 				}
 			} else {
@@ -87,16 +112,9 @@
 						count = 0
 						r.fnLogger("~~~mangos recv image:", len(msg))
 					}
-					r.out <- msg
+					dataChan <- msg
 				}
 			}
-
-			// sc++
-			// if sc == 25 {
-			// 	logo.Infoln("SDK RECV 25 FRAME USE TIME: ", time.Since(t))
-			// 	sc = 0
-			// 	t = time.Now()
-			// }
 
 		}
 	}

--
Gitblit v1.8.0