From b2500a8eb6665ce6efe0a7d954b6f101af83d7ec Mon Sep 17 00:00:00 2001
From: 554325746@qq.com <554325746@qq.com>
Date: 星期三, 25 三月 2020 11:31:41 +0800
Subject: [PATCH] debug

---
 rpc/send.go |  186 ++++++++++++++++++++++++----------------------
 1 files changed, 98 insertions(+), 88 deletions(-)

diff --git a/rpc/send.go b/rpc/send.go
index c0d4b5c..d79c0b9 100644
--- a/rpc/send.go
+++ b/rpc/send.go
@@ -1,112 +1,122 @@
 package rpc
 
 import (
-	"context"
-	"time"
+    "context"
+    "time"
 
-	"basic.com/valib/deliver.git"
+    "basic.com/valib/deliver.git"
 )
 
 // Sender decoder ingo
 type Sender struct {
-	ipcURL string
-	in     <-chan []byte
-	shm    bool
+    ipcURL string
+    in     <-chan []byte
+    chCap  int
 
-	fnLogger func(...interface{})
+    shm      bool
+    fnLogger func(...interface{})
 }
 
 // NewSender Sender
 func NewSender(ipcURL string, in <-chan []byte, shm bool, fn func(...interface{})) *Sender {
-	return &Sender{
-		ipcURL:   ipcURL,
-		in:       in,
-		shm:      shm,
-		fnLogger: fn,
-	}
+    return &Sender{
+        ipcURL: ipcURL,
+        in:     in,
+        chCap:  cap(in),
+
+        shm:      shm,
+        fnLogger: fn,
+    }
 }
 
 // Run run a IPC producer
 func (s *Sender) Run(ctx context.Context) {
 
-	if s.shm {
-		s.runShm(ctx)
-	} else {
-		i := deliver.NewServer(deliver.PushPull, s.ipcURL)
-		if i == nil {
-			s.fnLogger("sender 2 pubsub nng create error")
-			return
-		}
-		s.run(ctx, i)
-	}
+    i := s.createIPC(ctx, 50*time.Millisecond, 200)
+
+    for {
+        select {
+        case <-ctx.Done():
+            i.Close()
+            return
+        case d := <-s.in:
+
+            if len(s.in) > s.chCap/2 {
+                for i := 0; i < s.chCap/2; i++ {
+                    <-s.in
+                }
+            }
+            if len(s.in) > 0 {
+                d = <-s.in
+            }
+
+            if s.shm {
+                if i == nil {
+                    s.fnLogger("!!!!!!SDK Send createIPC not ready error:", s.ipcURL)
+                    i = s.createIPC(ctx, 50*time.Millisecond, 10)
+                }
+                if i == nil {
+                    s.fnLogger("!!!!!!SDK Send createIPC error:", s.ipcURL)
+                    continue
+                }
+
+                t := time.Now()
+
+                if err := i.Send(d); err != nil {
+                    i.Close()
+                    s.fnLogger("SENDER To:", s.ipcURL, " ERROR: ", err)
+
+                    i = s.createIPC(ctx, 50*time.Millisecond, 20)
+                    s.fnLogger("ANALYSIS SENDER CREATE SHM:", s.ipcURL)
+                } else {
+                    s.fnLogger("~~~~~~ shm send to reid len: ", len(d))
+                }
+                s.fnLogger("&&&&&&Sender------>Reid One Time:", time.Since(t))
+
+            } else {
+                err := i.Send(d)
+                if err != nil {
+                    // logo.Errorln("error sender 2 pubsub: ", err)
+                } else {
+                    s.fnLogger("mangos send to pubsub len: ", len(d))
+                }
+            }
+        default:
+            time.Sleep(10 * time.Millisecond)
+        }
+    }
 }
 
-func (s *Sender) run(ctx context.Context, i deliver.Deliver) {
+func (s *Sender) createIPC(ctx context.Context, wait time.Duration, loop int) deliver.Deliver {
 
-	for {
-		select {
-		case <-ctx.Done():
-			i.Close()
-			return
-		case d := <-s.in:
+    mode := deliver.PushPull
+    if s.shm {
+        mode = deliver.Shm
+    }
 
-			if s.shm {
-				if err := i.Send(d); err != nil {
-					i.Close()
-					s.fnLogger("SENDER To:", s.ipcURL, " ERROR: ", err)
+    try := 0
 
-					c, err := deliver.NewServerWithError(deliver.Shm, s.ipcURL)
-				loopS:
-					for {
-						select {
-						case <-ctx.Done():
-							return
-						default:
-							if err == nil {
-								break loopS
-							}
-							time.Sleep(time.Second)
-							c, err = deliver.NewServerWithError(deliver.Shm, s.ipcURL)
-							s.fnLogger("Sender CLIENT CREATE:", s.ipcURL, " FAILED : ", err)
-						}
-
-					}
-
-					i = c
-					s.fnLogger("Sender Create Shm:", s.ipcURL)
-				} else {
-
-				}
-			} else {
-				err := i.Send(d)
-				if err != nil {
-					// logo.Errorln("error sender 2 pubsub: ", err)
-				} else {
-					s.fnLogger("mangos send to pubsub len: ", len(d))
-				}
-			}
-		default:
-			time.Sleep(10 * time.Millisecond)
-		}
-	}
-}
-
-func (s *Sender) runShm(ctx context.Context) {
-	c, err := deliver.NewServerWithError(deliver.Shm, s.ipcURL)
-loopSBegin:
-	for {
-		select {
-		case <-ctx.Done():
-			return
-		default:
-			if err == nil {
-				break loopSBegin
-			}
-			time.Sleep(1 * time.Second)
-			c, err = deliver.NewServerWithError(deliver.Shm, s.ipcURL)
-			s.fnLogger("Sender CLIENT CREATE:", s.ipcURL, "FAILED : ", err)
-		}
-	}
-
-	s.run(ctx, c)
+    c, err := deliver.NewClientWithError(mode, s.ipcURL)
+loopR:
+    for {
+        select {
+        case <-ctx.Done():
+            return nil
+        default:
+            if err == nil {
+                break loopR
+            }
+            if loop > 0 {
+                try++
+                if try > loop {
+                    return nil
+                }
+                time.Sleep(wait)
+            } else {
+                time.Sleep(time.Second)
+            }
+            c, err = deliver.NewClientWithError(mode, s.ipcURL)
+        }
+    }
+    return c
 }

--
Gitblit v1.8.0