From d9b98e947f7b5aae886ff02ec27bd92aa34feaaa Mon Sep 17 00:00:00 2001
From: zhangmeng <zhangmeng@aiotlink.com>
Date: 星期一, 24 二月 2020 18:06:43 +0800
Subject: [PATCH] debug send/recv and add logs

---
 run.go      |    6 +-
 rpc/recv.go |   41 +++++++++++++-------
 rpc/send.go |   42 ++++++++++++++-------
 3 files changed, 57 insertions(+), 32 deletions(-)

diff --git a/rpc/recv.go b/rpc/recv.go
index f2bfc60..aced578 100644
--- a/rpc/recv.go
+++ b/rpc/recv.go
@@ -8,8 +8,6 @@
 	"basic.com/valib/deliver.git"
 )
 
-const mode = deliver.PushPull
-
 // Reciever recv from ipc
 type Reciever struct {
 	ctx    context.Context
@@ -36,7 +34,7 @@
 	if r.shm {
 		r.runShm(ctx)
 	} else {
-		r.run(ctx, deliver.NewClient(mode, r.ipcURL))
+		r.run(ctx, deliver.NewServer(deliver.PushPull, r.ipcURL))
 	}
 }
 
@@ -54,19 +52,25 @@
 			if r.shm {
 				if d, err := i.Recv(); err != nil {
 					i.Close()
-					r.fnLogger("Reciever RECV ERROR: ", err)
+					r.fnLogger("Reciever RECV From:", r.ipcURL, " ERROR: ", err)
 
 					c, err := deliver.NewServerWithError(deliver.Shm, r.ipcURL)
+				loopR:
 					for {
-						if err == nil {
-							break
+						select {
+						case <-ctx.Done():
+							return
+						default:
+							if err == nil {
+								break loopR
+							}
+							time.Sleep(time.Second)
+							c, err = deliver.NewServerWithError(deliver.Shm, r.ipcURL)
+							r.fnLogger("Recver ANALYSIS CREATE:", r.ipcURL, "  FAILED : ", err)
 						}
-						r.fnLogger("Reciever CREATE FAILED : ", err)
-						time.Sleep(time.Second)
-						c, err = deliver.NewServerWithError(deliver.Shm, r.ipcURL)
 					}
 					i = c
-					r.fnLogger("Reciever CREATE SHM")
+					r.fnLogger("Reciever CREATE SHM:", r.ipcURL)
 				} else {
 					if d != nil {
 						count++
@@ -100,13 +104,20 @@
 
 func (r *Reciever) runShm(ctx context.Context) {
 	c, err := deliver.NewServerWithError(deliver.Shm, r.ipcURL)
+loopRBegin:
 	for {
-		if err == nil {
-			break
+		select {
+		case <-ctx.Done():
+			return
+		default:
+			if err == nil {
+				break loopRBegin
+			}
+			time.Sleep(1 * time.Second)
+			c, err = deliver.NewServerWithError(deliver.Shm, r.ipcURL)
+			r.fnLogger("Recver CLIENT CREATE", r.ipcURL, "FAILED : ", err)
 		}
-		r.fnLogger("Reciever CLIENT CREATE FAILED : ", err)
-		time.Sleep(1 * time.Second)
-		c, err = deliver.NewServerWithError(deliver.Shm, r.ipcURL)
 	}
+
 	r.run(ctx, c)
 }
diff --git a/rpc/send.go b/rpc/send.go
index 7111a43..c0d4b5c 100644
--- a/rpc/send.go
+++ b/rpc/send.go
@@ -12,7 +12,6 @@
 	ipcURL string
 	in     <-chan []byte
 	shm    bool
-	fn     func([]byte, bool)
 
 	fnLogger func(...interface{})
 }
@@ -23,7 +22,6 @@
 		ipcURL:   ipcURL,
 		in:       in,
 		shm:      shm,
-		fn:       nil,
 		fnLogger: fn,
 	}
 }
@@ -34,7 +32,7 @@
 	if s.shm {
 		s.runShm(ctx)
 	} else {
-		i := deliver.NewClient(mode, s.ipcURL)
+		i := deliver.NewServer(deliver.PushPull, s.ipcURL)
 		if i == nil {
 			s.fnLogger("sender 2 pubsub nng create error")
 			return
@@ -55,18 +53,27 @@
 			if s.shm {
 				if err := i.Send(d); err != nil {
 					i.Close()
-					s.fnLogger("ANALYSIS SENDER ERROR: ", err)
+					s.fnLogger("SENDER To:", s.ipcURL, " ERROR: ", err)
 
 					c, err := deliver.NewServerWithError(deliver.Shm, s.ipcURL)
+				loopS:
 					for {
-						if err == nil {
-							break
+						select {
+						case <-ctx.Done():
+							return
+						default:
+							if err == nil {
+								break loopS
+							}
+							time.Sleep(time.Second)
+							c, err = deliver.NewServerWithError(deliver.Shm, s.ipcURL)
+							s.fnLogger("Sender CLIENT CREATE:", s.ipcURL, " FAILED : ", err)
 						}
-						time.Sleep(time.Second)
-						c, err = deliver.NewServerWithError(deliver.Shm, s.ipcURL)
-						s.fnLogger("CLIENT CREATE FAILED : ", err)
+
 					}
+
 					i = c
+					s.fnLogger("Sender Create Shm:", s.ipcURL)
 				} else {
 
 				}
@@ -86,13 +93,20 @@
 
 func (s *Sender) runShm(ctx context.Context) {
 	c, err := deliver.NewServerWithError(deliver.Shm, s.ipcURL)
+loopSBegin:
 	for {
-		if err == nil {
-			break
+		select {
+		case <-ctx.Done():
+			return
+		default:
+			if err == nil {
+				break loopSBegin
+			}
+			time.Sleep(1 * time.Second)
+			c, err = deliver.NewServerWithError(deliver.Shm, s.ipcURL)
+			s.fnLogger("Sender CLIENT CREATE:", s.ipcURL, "FAILED : ", err)
 		}
-		time.Sleep(1 * time.Second)
-		c, err = deliver.NewServerWithError(deliver.Shm, s.ipcURL)
-		s.fnLogger("CLIENT CREATE FAILED : ", err)
 	}
+
 	s.run(ctx, c)
 }
diff --git a/run.go b/run.go
index cef375d..9e4ee02 100644
--- a/run.go
+++ b/run.go
@@ -154,7 +154,7 @@
 			}
 
 			sdkInfo := msg.Tasklab.Sdkinfos[int(msg.Tasklab.Index)]
-			s.fnLogger("reid !!!!!! Recv From Humantrack SDK Result Length: ", len(sdkInfo.Sdkdata))
+			s.fnLogger("reid~~~~~~Recv From Humantrack SDK Result Length: ", len(sdkInfo.Sdkdata))
 
 			res := &protomsg.HumanTrackResult{}
 			if err := proto.Unmarshal(sdkInfo.Sdkdata, res); err != nil {
@@ -181,7 +181,7 @@
 			if len(res.Result) > 0 {
 				if out, err := proto.Marshal(res); err == nil {
 					msg.Tasklab.Sdkinfos[int(msg.Tasklab.Index)].Sdkdata = out
-					s.fnLogger("reid !!!!!! Send To Humantrack Result Length:", len(out))
+					s.fnLogger("reid~~~~~~Send To Humantrack Result Length:", len(out))
 				}
 			}
 
@@ -190,7 +190,7 @@
 					s.fnLogger("reid !!!!!! proto.Marshal Failed To Marshal proto.SdkMessage")
 					continue
 				}
-				// s.fnLogger("reid !!!!!! MSG Send Back To Humantrack Length:", len(data))
+				s.fnLogger("reid~~~~~~MSG Send Back To Humantrack Length:", len(data))
 
 				chSnd <- data
 			} else {

--
Gitblit v1.8.0