From d9b98e947f7b5aae886ff02ec27bd92aa34feaaa Mon Sep 17 00:00:00 2001
From: zhangmeng <zhangmeng@aiotlink.com>
Date: 星期一, 24 二月 2020 18:06:43 +0800
Subject: [PATCH] debug send/recv and add logs
---
run.go | 6 +-
rpc/recv.go | 41 +++++++++++++-------
rpc/send.go | 42 ++++++++++++++-------
3 files changed, 57 insertions(+), 32 deletions(-)
diff --git a/rpc/recv.go b/rpc/recv.go
index f2bfc60..aced578 100644
--- a/rpc/recv.go
+++ b/rpc/recv.go
@@ -8,8 +8,6 @@
"basic.com/valib/deliver.git"
)
-const mode = deliver.PushPull
-
// Reciever recv from ipc
type Reciever struct {
ctx context.Context
@@ -36,7 +34,7 @@
if r.shm {
r.runShm(ctx)
} else {
- r.run(ctx, deliver.NewClient(mode, r.ipcURL))
+ r.run(ctx, deliver.NewServer(deliver.PushPull, r.ipcURL))
}
}
@@ -54,19 +52,25 @@
if r.shm {
if d, err := i.Recv(); err != nil {
i.Close()
- r.fnLogger("Reciever RECV ERROR: ", err)
+ r.fnLogger("Reciever RECV From:", r.ipcURL, " ERROR: ", err)
c, err := deliver.NewServerWithError(deliver.Shm, r.ipcURL)
+ loopR:
for {
- if err == nil {
- break
+ select {
+ case <-ctx.Done():
+ return
+ default:
+ if err == nil {
+ break loopR
+ }
+ time.Sleep(time.Second)
+ c, err = deliver.NewServerWithError(deliver.Shm, r.ipcURL)
+ r.fnLogger("Recver ANALYSIS CREATE:", r.ipcURL, " FAILED : ", err)
}
- r.fnLogger("Reciever CREATE FAILED : ", err)
- time.Sleep(time.Second)
- c, err = deliver.NewServerWithError(deliver.Shm, r.ipcURL)
}
i = c
- r.fnLogger("Reciever CREATE SHM")
+ r.fnLogger("Reciever CREATE SHM:", r.ipcURL)
} else {
if d != nil {
count++
@@ -100,13 +104,20 @@
func (r *Reciever) runShm(ctx context.Context) {
c, err := deliver.NewServerWithError(deliver.Shm, r.ipcURL)
+loopRBegin:
for {
- if err == nil {
- break
+ select {
+ case <-ctx.Done():
+ return
+ default:
+ if err == nil {
+ break loopRBegin
+ }
+ time.Sleep(1 * time.Second)
+ c, err = deliver.NewServerWithError(deliver.Shm, r.ipcURL)
+ r.fnLogger("Recver CLIENT CREATE", r.ipcURL, "FAILED : ", err)
}
- r.fnLogger("Reciever CLIENT CREATE FAILED : ", err)
- time.Sleep(1 * time.Second)
- c, err = deliver.NewServerWithError(deliver.Shm, r.ipcURL)
}
+
r.run(ctx, c)
}
diff --git a/rpc/send.go b/rpc/send.go
index 7111a43..c0d4b5c 100644
--- a/rpc/send.go
+++ b/rpc/send.go
@@ -12,7 +12,6 @@
ipcURL string
in <-chan []byte
shm bool
- fn func([]byte, bool)
fnLogger func(...interface{})
}
@@ -23,7 +22,6 @@
ipcURL: ipcURL,
in: in,
shm: shm,
- fn: nil,
fnLogger: fn,
}
}
@@ -34,7 +32,7 @@
if s.shm {
s.runShm(ctx)
} else {
- i := deliver.NewClient(mode, s.ipcURL)
+ i := deliver.NewServer(deliver.PushPull, s.ipcURL)
if i == nil {
s.fnLogger("sender 2 pubsub nng create error")
return
@@ -55,18 +53,27 @@
if s.shm {
if err := i.Send(d); err != nil {
i.Close()
- s.fnLogger("ANALYSIS SENDER ERROR: ", err)
+ s.fnLogger("SENDER To:", s.ipcURL, " ERROR: ", err)
c, err := deliver.NewServerWithError(deliver.Shm, s.ipcURL)
+ loopS:
for {
- if err == nil {
- break
+ select {
+ case <-ctx.Done():
+ return
+ default:
+ if err == nil {
+ break loopS
+ }
+ time.Sleep(time.Second)
+ c, err = deliver.NewServerWithError(deliver.Shm, s.ipcURL)
+ s.fnLogger("Sender CLIENT CREATE:", s.ipcURL, " FAILED : ", err)
}
- time.Sleep(time.Second)
- c, err = deliver.NewServerWithError(deliver.Shm, s.ipcURL)
- s.fnLogger("CLIENT CREATE FAILED : ", err)
+
}
+
i = c
+ s.fnLogger("Sender Create Shm:", s.ipcURL)
} else {
}
@@ -86,13 +93,20 @@
func (s *Sender) runShm(ctx context.Context) {
c, err := deliver.NewServerWithError(deliver.Shm, s.ipcURL)
+loopSBegin:
for {
- if err == nil {
- break
+ select {
+ case <-ctx.Done():
+ return
+ default:
+ if err == nil {
+ break loopSBegin
+ }
+ time.Sleep(1 * time.Second)
+ c, err = deliver.NewServerWithError(deliver.Shm, s.ipcURL)
+ s.fnLogger("Sender CLIENT CREATE:", s.ipcURL, "FAILED : ", err)
}
- time.Sleep(1 * time.Second)
- c, err = deliver.NewServerWithError(deliver.Shm, s.ipcURL)
- s.fnLogger("CLIENT CREATE FAILED : ", err)
}
+
s.run(ctx, c)
}
diff --git a/run.go b/run.go
index cef375d..9e4ee02 100644
--- a/run.go
+++ b/run.go
@@ -154,7 +154,7 @@
}
sdkInfo := msg.Tasklab.Sdkinfos[int(msg.Tasklab.Index)]
- s.fnLogger("reid !!!!!! Recv From Humantrack SDK Result Length: ", len(sdkInfo.Sdkdata))
+ s.fnLogger("reid~~~~~~Recv From Humantrack SDK Result Length: ", len(sdkInfo.Sdkdata))
res := &protomsg.HumanTrackResult{}
if err := proto.Unmarshal(sdkInfo.Sdkdata, res); err != nil {
@@ -181,7 +181,7 @@
if len(res.Result) > 0 {
if out, err := proto.Marshal(res); err == nil {
msg.Tasklab.Sdkinfos[int(msg.Tasklab.Index)].Sdkdata = out
- s.fnLogger("reid !!!!!! Send To Humantrack Result Length:", len(out))
+ s.fnLogger("reid~~~~~~Send To Humantrack Result Length:", len(out))
}
}
@@ -190,7 +190,7 @@
s.fnLogger("reid !!!!!! proto.Marshal Failed To Marshal proto.SdkMessage")
continue
}
- // s.fnLogger("reid !!!!!! MSG Send Back To Humantrack Length:", len(data))
+ s.fnLogger("reid~~~~~~MSG Send Back To Humantrack Length:", len(data))
chSnd <- data
} else {
--
Gitblit v1.8.0