From d85f3edab0d8c495cecd7a81f31a9ead1eb001ac Mon Sep 17 00:00:00 2001 From: zhangmeng <775834166@qq.com> Date: 星期三, 15 一月 2020 09:23:17 +0800 Subject: [PATCH] copy from bgr-2-analysis --- common/send.go | 31 +++++++++++++++++++------------ 1 files changed, 19 insertions(+), 12 deletions(-) diff --git a/common/send.go b/common/send.go index a0e7cc4..4b0a0e6 100644 --- a/common/send.go +++ b/common/send.go @@ -4,14 +4,14 @@ "context" "time" - "basic.com/libgowrapper/sdkstruct.git" "basic.com/valib/deliver.git" + "github.com/gogo/protobuf/proto" ) // Sender decoder ingo type Sender struct { ipcURL string - chMsg <-chan sdkstruct.MsgSDK + chMsg <-chan MsgRS shm bool fn func([]byte, bool) @@ -27,7 +27,7 @@ } // NewSender Sender -func NewSender(ipcURL string, chMsg <-chan sdkstruct.MsgSDK, shm bool, fn func(...interface{})) *Sender { +func NewSender(ipcURL string, chMsg <-chan MsgRS, shm bool, fn func(...interface{})) *Sender { // logo.Infof("create ipc %s for decode : %s\n", ipcURL, ipcURL) return &Sender{ ipcURL: ipcURL, @@ -62,19 +62,28 @@ return case i := <-s.chMsg: - data <- i.MsgData + d, err := proto.Marshal(&i.Msg) - if int(i.SdkIndex+1) == i.SdkCount { + if err != nil { + s.fnLogger("protobuf encode ipc sender error: ", err) + continue + } + + data <- d + + if int(i.Msg.Tasklab.Index+1) == len(i.Msg.Tasklab.Sdkinfos) { if s.fn != nil { - sFlag := true - if i.SdkDataLen < 2 { - sFlag = false + for _, v := range i.Msg.Tasklab.Sdkinfos { + if len(v.Sdkdata) < 2 { + sFlag = false + break + } } - s.fn(i.MsgData, sFlag) - + s.fn(d, sFlag) } } + default: time.Sleep(10 * time.Millisecond) } @@ -82,8 +91,6 @@ } func (s *Sender) run(ctx context.Context, i deliver.Deliver) { - - // go ruleserver.TimeTicker() dataChan := make(chan []byte, 3) go s.serializeProto(ctx, dataChan) -- Gitblit v1.8.0