From d9b98e947f7b5aae886ff02ec27bd92aa34feaaa Mon Sep 17 00:00:00 2001 From: zhangmeng <zhangmeng@aiotlink.com> Date: 星期一, 24 二月 2020 18:06:43 +0800 Subject: [PATCH] debug send/recv and add logs --- run.go | 6 +- rpc/recv.go | 41 +++++++++++++------- rpc/send.go | 42 ++++++++++++++------- 3 files changed, 57 insertions(+), 32 deletions(-) diff --git a/rpc/recv.go b/rpc/recv.go index f2bfc60..aced578 100644 --- a/rpc/recv.go +++ b/rpc/recv.go @@ -8,8 +8,6 @@ "basic.com/valib/deliver.git" ) -const mode = deliver.PushPull - // Reciever recv from ipc type Reciever struct { ctx context.Context @@ -36,7 +34,7 @@ if r.shm { r.runShm(ctx) } else { - r.run(ctx, deliver.NewClient(mode, r.ipcURL)) + r.run(ctx, deliver.NewServer(deliver.PushPull, r.ipcURL)) } } @@ -54,19 +52,25 @@ if r.shm { if d, err := i.Recv(); err != nil { i.Close() - r.fnLogger("Reciever RECV ERROR: ", err) + r.fnLogger("Reciever RECV From:", r.ipcURL, " ERROR: ", err) c, err := deliver.NewServerWithError(deliver.Shm, r.ipcURL) + loopR: for { - if err == nil { - break + select { + case <-ctx.Done(): + return + default: + if err == nil { + break loopR + } + time.Sleep(time.Second) + c, err = deliver.NewServerWithError(deliver.Shm, r.ipcURL) + r.fnLogger("Recver ANALYSIS CREATE:", r.ipcURL, " FAILED : ", err) } - r.fnLogger("Reciever CREATE FAILED : ", err) - time.Sleep(time.Second) - c, err = deliver.NewServerWithError(deliver.Shm, r.ipcURL) } i = c - r.fnLogger("Reciever CREATE SHM") + r.fnLogger("Reciever CREATE SHM:", r.ipcURL) } else { if d != nil { count++ @@ -100,13 +104,20 @@ func (r *Reciever) runShm(ctx context.Context) { c, err := deliver.NewServerWithError(deliver.Shm, r.ipcURL) +loopRBegin: for { - if err == nil { - break + select { + case <-ctx.Done(): + return + default: + if err == nil { + break loopRBegin + } + time.Sleep(1 * time.Second) + c, err = deliver.NewServerWithError(deliver.Shm, r.ipcURL) + r.fnLogger("Recver CLIENT CREATE", r.ipcURL, "FAILED : ", err) } - r.fnLogger("Reciever CLIENT CREATE FAILED : ", err) - time.Sleep(1 * time.Second) - c, err = deliver.NewServerWithError(deliver.Shm, r.ipcURL) } + r.run(ctx, c) } diff --git a/rpc/send.go b/rpc/send.go index 7111a43..c0d4b5c 100644 --- a/rpc/send.go +++ b/rpc/send.go @@ -12,7 +12,6 @@ ipcURL string in <-chan []byte shm bool - fn func([]byte, bool) fnLogger func(...interface{}) } @@ -23,7 +22,6 @@ ipcURL: ipcURL, in: in, shm: shm, - fn: nil, fnLogger: fn, } } @@ -34,7 +32,7 @@ if s.shm { s.runShm(ctx) } else { - i := deliver.NewClient(mode, s.ipcURL) + i := deliver.NewServer(deliver.PushPull, s.ipcURL) if i == nil { s.fnLogger("sender 2 pubsub nng create error") return @@ -55,18 +53,27 @@ if s.shm { if err := i.Send(d); err != nil { i.Close() - s.fnLogger("ANALYSIS SENDER ERROR: ", err) + s.fnLogger("SENDER To:", s.ipcURL, " ERROR: ", err) c, err := deliver.NewServerWithError(deliver.Shm, s.ipcURL) + loopS: for { - if err == nil { - break + select { + case <-ctx.Done(): + return + default: + if err == nil { + break loopS + } + time.Sleep(time.Second) + c, err = deliver.NewServerWithError(deliver.Shm, s.ipcURL) + s.fnLogger("Sender CLIENT CREATE:", s.ipcURL, " FAILED : ", err) } - time.Sleep(time.Second) - c, err = deliver.NewServerWithError(deliver.Shm, s.ipcURL) - s.fnLogger("CLIENT CREATE FAILED : ", err) + } + i = c + s.fnLogger("Sender Create Shm:", s.ipcURL) } else { } @@ -86,13 +93,20 @@ func (s *Sender) runShm(ctx context.Context) { c, err := deliver.NewServerWithError(deliver.Shm, s.ipcURL) +loopSBegin: for { - if err == nil { - break + select { + case <-ctx.Done(): + return + default: + if err == nil { + break loopSBegin + } + time.Sleep(1 * time.Second) + c, err = deliver.NewServerWithError(deliver.Shm, s.ipcURL) + s.fnLogger("Sender CLIENT CREATE:", s.ipcURL, "FAILED : ", err) } - time.Sleep(1 * time.Second) - c, err = deliver.NewServerWithError(deliver.Shm, s.ipcURL) - s.fnLogger("CLIENT CREATE FAILED : ", err) } + s.run(ctx, c) } diff --git a/run.go b/run.go index cef375d..9e4ee02 100644 --- a/run.go +++ b/run.go @@ -154,7 +154,7 @@ } sdkInfo := msg.Tasklab.Sdkinfos[int(msg.Tasklab.Index)] - s.fnLogger("reid !!!!!! Recv From Humantrack SDK Result Length: ", len(sdkInfo.Sdkdata)) + s.fnLogger("reid~~~~~~Recv From Humantrack SDK Result Length: ", len(sdkInfo.Sdkdata)) res := &protomsg.HumanTrackResult{} if err := proto.Unmarshal(sdkInfo.Sdkdata, res); err != nil { @@ -181,7 +181,7 @@ if len(res.Result) > 0 { if out, err := proto.Marshal(res); err == nil { msg.Tasklab.Sdkinfos[int(msg.Tasklab.Index)].Sdkdata = out - s.fnLogger("reid !!!!!! Send To Humantrack Result Length:", len(out)) + s.fnLogger("reid~~~~~~Send To Humantrack Result Length:", len(out)) } } @@ -190,7 +190,7 @@ s.fnLogger("reid !!!!!! proto.Marshal Failed To Marshal proto.SdkMessage") continue } - // s.fnLogger("reid !!!!!! MSG Send Back To Humantrack Length:", len(data)) + s.fnLogger("reid~~~~~~MSG Send Back To Humantrack Length:", len(data)) chSnd <- data } else { -- Gitblit v1.8.0