From 495ffcdad0027be02d5fc82825e08f36b6a53b90 Mon Sep 17 00:00:00 2001
From: 554325746@qq.com <554325746@qq.com>
Date: 星期二, 24 三月 2020 15:11:24 +0800
Subject: [PATCH] 整理代码缩小收发缓存大小节省内存

---
 rpc/recv.go |  107 +++++++++++++++++++++++++++++------------------------
 1 files changed, 58 insertions(+), 49 deletions(-)

diff --git a/rpc/recv.go b/rpc/recv.go
index a6bb22e..b43bac7 100644
--- a/rpc/recv.go
+++ b/rpc/recv.go
@@ -12,17 +12,20 @@
 type Reciever struct {
     ctx    context.Context
     ipcURL string
-    out    chan<- []byte
+    out    chan []byte
+    chCap  int
 
     shm      bool
     fnLogger func(...interface{})
 }
 
 // NewReciever new recv
-func NewReciever(url string, out chan<- []byte, shm bool, fn func(...interface{})) *Reciever {
+func NewReciever(url string, out chan []byte, shm bool, fn func(...interface{})) *Reciever {
     return &Reciever{
-        ipcURL:   url,
-        out:      out,
+        ipcURL: url,
+        out:    out,
+        chCap:  cap(out),
+
         shm:      shm,
         fnLogger: fn,
     }
@@ -31,16 +34,9 @@
 // Run run a IPC client
 func (r *Reciever) Run(ctx context.Context) {
 
-    if r.shm {
-        r.runShm(ctx)
-    } else {
-        r.run(ctx, deliver.NewServer(deliver.PushPull, r.ipcURL))
-    }
-}
-
-func (r *Reciever) run(ctx context.Context, i deliver.Deliver) {
-
     count := 0
+
+    i := r.createIPC(ctx, 50*time.Millisecond, 200)
 
     for {
         select {
@@ -50,39 +46,33 @@
         default:
 
             if r.shm {
+                if i == nil {
+                    r.fnLogger("!!!!!!SDK Recv createIPC not ready error:", r.ipcURL)
+                    i = r.createIPC(ctx, 50*time.Millisecond, 10)
+                }
+                if i == nil {
+                    r.fnLogger("!!!!!!SDK Recv createIPC error:", r.ipcURL)
+                    continue
+                }
                 if d, err := i.Recv(); err != nil {
-                    i.Close()
-                    r.fnLogger("Reciever RECV From:", r.ipcURL, " ERROR: ", err)
 
-                    c, err := deliver.NewServerWithError(deliver.Shm, r.ipcURL)
-                loopR:
-                    for {
-                        select {
-                        case <-ctx.Done():
-                            return
-                        default:
-                            if err == nil {
-                                break loopR
-                            }
-                            time.Sleep(time.Second)
-                            c, err = deliver.NewServerWithError(deliver.Shm, r.ipcURL)
-                            r.fnLogger("Recver ANALYSIS CREATE:", r.ipcURL, "  FAILED : ", err)
-                        }
-                    }
-                    i = c
-                    r.fnLogger("Reciever CREATE SHM:", r.ipcURL)
+                    i.Close()
+                    r.fnLogger("SDK RECV:", r.ipcURL, "ERROR:", err)
+
+                    i = r.createIPC(ctx, time.Hour, -1)
+                    r.fnLogger("To Reid Recver CREATE SHM:", r.ipcURL)
                 } else {
                     if d != nil {
-                        count++
-                        if count > 10 {
-                            count = 0
-                            r.fnLogger("~~~shm recv image:", len(d))
+                        if len(r.out) > r.chCap/2 {
+                            for i := 0; i < r.chCap/2; i++ {
+                                <-r.out
+                            }
                         }
-                        if len(d) > 2 {
-                            r.out <- d
-                        }
+                        r.out <- d
+                        r.fnLogger("~~~shm recv from:", r.ipcURL, "image:", len(d))
                     }
                 }
+
             } else {
                 if msg, err := i.Recv(); err != nil {
                     // logo.Errorln("recv error : ", err, " url: ", r.ipcURL)
@@ -93,6 +83,11 @@
                         r.fnLogger("~~~mangos recv image:", len(msg))
                     }
                     if len(msg) > 2 {
+                        if len(r.out) > r.chCap/2 {
+                            for i := 0; i < r.chCap/2; i++ {
+                                <-r.out
+                            }
+                        }
                         r.out <- msg
                     }
                 }
@@ -102,22 +97,36 @@
     }
 }
 
-func (r *Reciever) runShm(ctx context.Context) {
-    c, err := deliver.NewServerWithError(deliver.Shm, r.ipcURL)
-loopRBegin:
+func (r *Reciever) createIPC(ctx context.Context, wait time.Duration, loop int) deliver.Deliver {
+
+    mode := deliver.PushPull
+    if r.shm {
+        mode = deliver.Shm
+    }
+
+    try := 0
+
+    c, err := deliver.NewClientWithError(mode, r.ipcURL)
+loopR:
     for {
         select {
         case <-ctx.Done():
-            return
+            return nil
         default:
             if err == nil {
-                break loopRBegin
+                break loopR
             }
-            time.Sleep(1 * time.Second)
-            c, err = deliver.NewServerWithError(deliver.Shm, r.ipcURL)
-            r.fnLogger("Recver CLIENT CREATE", r.ipcURL, "FAILED : ", err)
+            if loop > 0 {
+                try++
+                if try > loop {
+                    return nil
+                }
+                time.Sleep(wait)
+            } else {
+                time.Sleep(time.Second)
+            }
+            c, err = deliver.NewClientWithError(mode, r.ipcURL)
         }
     }
-
-    r.run(ctx, c)
+    return c
 }

--
Gitblit v1.8.0