From 495ffcdad0027be02d5fc82825e08f36b6a53b90 Mon Sep 17 00:00:00 2001
From: 554325746@qq.com <554325746@qq.com>
Date: 星期二, 24 三月 2020 15:11:24 +0800
Subject: [PATCH] 整理代码缩小收发缓存大小节省内存

---
 rpc/send.go |  102 ++++++++++++++++++++++++++++-----------------------
 1 files changed, 56 insertions(+), 46 deletions(-)

diff --git a/rpc/send.go b/rpc/send.go
index d9f7b00..3950476 100644
--- a/rpc/send.go
+++ b/rpc/send.go
@@ -11,16 +11,19 @@
 type Sender struct {
     ipcURL string
     in     <-chan []byte
-    shm    bool
+    chCap  int
 
+    shm      bool
     fnLogger func(...interface{})
 }
 
 // NewSender Sender
 func NewSender(ipcURL string, in <-chan []byte, shm bool, fn func(...interface{})) *Sender {
     return &Sender{
-        ipcURL:   ipcURL,
-        in:       in,
+        ipcURL: ipcURL,
+        in:     in,
+        chCap:  cap(in),
+
         shm:      shm,
         fnLogger: fn,
     }
@@ -29,19 +32,7 @@
 // Run run a IPC producer
 func (s *Sender) Run(ctx context.Context) {
 
-    if s.shm {
-        s.runShm(ctx)
-    } else {
-        i := deliver.NewServer(deliver.PushPull, s.ipcURL)
-        if i == nil {
-            s.fnLogger("sender 2 pubsub nng create error")
-            return
-        }
-        s.run(ctx, i)
-    }
-}
-
-func (s *Sender) run(ctx context.Context, i deliver.Deliver) {
+    i := s.createIPC(ctx, 50*time.Millisecond, 200)
 
     for {
         select {
@@ -50,33 +41,38 @@
             return
         case d := <-s.in:
 
+            if len(s.in) > s.chCap/2 {
+                for i := 0; i < s.chCap/2; i++ {
+                    <-s.in
+                }
+            }
+            if len(s.in) > 0 {
+                d = <-s.in
+            }
+
             if s.shm {
+                if i == nil {
+                    s.fnLogger("!!!!!!SDK Send createIPC not ready error:", s.ipcURL)
+                    i = s.createIPC(ctx, 50*time.Millisecond, 10)
+                }
+                if i == nil {
+                    s.fnLogger("!!!!!!SDK Send createIPC error:", s.ipcURL)
+                    continue
+                }
+
+                t := time.Now()
+
                 if err := i.Send(d); err != nil {
                     i.Close()
                     s.fnLogger("SENDER To:", s.ipcURL, " ERROR: ", err)
 
-                    c, err := deliver.NewServerWithError(deliver.Shm, s.ipcURL)
-                loopS:
-                    for {
-                        select {
-                        case <-ctx.Done():
-                            return
-                        default:
-                            if err == nil {
-                                break loopS
-                            }
-                            time.Sleep(time.Second)
-                            c, err = deliver.NewServerWithError(deliver.Shm, s.ipcURL)
-                            s.fnLogger("Sender CLIENT CREATE:", s.ipcURL, " FAILED : ", err)
-                        }
-
-                    }
-
-                    i = c
-                    s.fnLogger("Sender Create Shm:", s.ipcURL)
+                    i = s.createIPC(ctx, time.Hour, -1)
+                    s.fnLogger("ANALYSIS SENDER CREATE SHM:", s.ipcURL)
                 } else {
-
+                    s.fnLogger("~~~~~~ shm send to reid len: ", len(d))
                 }
+                s.fnLogger("&&&&&&Sender------>Reid One Time:", time.Since(t))
+
             } else {
                 err := i.Send(d)
                 if err != nil {
@@ -91,22 +87,36 @@
     }
 }
 
-func (s *Sender) runShm(ctx context.Context) {
-    c, err := deliver.NewServerWithError(deliver.Shm, s.ipcURL)
-loopSBegin:
+func (s *Sender) createIPC(ctx context.Context, wait time.Duration, loop int) deliver.Deliver {
+
+    mode := deliver.PushPull
+    if s.shm {
+        mode = deliver.Shm
+    }
+
+    try := 0
+
+    c, err := deliver.NewClientWithError(mode, s.ipcURL)
+loopR:
     for {
         select {
         case <-ctx.Done():
-            return
+            return nil
         default:
             if err == nil {
-                break loopSBegin
+                break loopR
             }
-            time.Sleep(1 * time.Second)
-            c, err = deliver.NewServerWithError(deliver.Shm, s.ipcURL)
-            s.fnLogger("Sender CLIENT CREATE:", s.ipcURL, "FAILED : ", err)
+            if loop > 0 {
+                try++
+                if try > loop {
+                    return nil
+                }
+                time.Sleep(wait)
+            } else {
+                time.Sleep(time.Second)
+            }
+            c, err = deliver.NewClientWithError(mode, s.ipcURL)
         }
     }
-
-    s.run(ctx, c)
+    return c
 }

--
Gitblit v1.8.0