From b2500a8eb6665ce6efe0a7d954b6f101af83d7ec Mon Sep 17 00:00:00 2001
From: 554325746@qq.com <554325746@qq.com>
Date: 星期三, 25 三月 2020 11:31:41 +0800
Subject: [PATCH] debug

---
 rpc/recv.go |  207 +++++++++++++++++++++++++++------------------------
 1 files changed, 108 insertions(+), 99 deletions(-)

diff --git a/rpc/recv.go b/rpc/recv.go
index aced578..8106513 100644
--- a/rpc/recv.go
+++ b/rpc/recv.go
@@ -1,123 +1,132 @@
 package rpc
 
 import (
-	"context"
+    "context"
 
-	"time"
+    "time"
 
-	"basic.com/valib/deliver.git"
+    "basic.com/valib/deliver.git"
 )
 
 // Reciever recv from ipc
 type Reciever struct {
-	ctx    context.Context
-	ipcURL string
-	out    chan<- []byte
+    ctx    context.Context
+    ipcURL string
+    out    chan []byte
+    chCap  int
 
-	shm      bool
-	fnLogger func(...interface{})
+    shm      bool
+    fnLogger func(...interface{})
 }
 
 // NewReciever new recv
-func NewReciever(url string, out chan<- []byte, shm bool, fn func(...interface{})) *Reciever {
-	return &Reciever{
-		ipcURL:   url,
-		out:      out,
-		shm:      shm,
-		fnLogger: fn,
-	}
+func NewReciever(url string, out chan []byte, shm bool, fn func(...interface{})) *Reciever {
+    return &Reciever{
+        ipcURL: url,
+        out:    out,
+        chCap:  cap(out),
+
+        shm:      shm,
+        fnLogger: fn,
+    }
 }
 
 // Run run a IPC client
 func (r *Reciever) Run(ctx context.Context) {
 
-	if r.shm {
-		r.runShm(ctx)
-	} else {
-		r.run(ctx, deliver.NewServer(deliver.PushPull, r.ipcURL))
-	}
+    count := 0
+
+    i := r.createIPC(ctx, 50*time.Millisecond, 200)
+
+    for {
+        select {
+        case <-ctx.Done():
+            i.Close()
+            return
+        default:
+
+            if r.shm {
+                if i == nil {
+                    r.fnLogger("!!!!!!SDK Recv createIPC not ready error:", r.ipcURL)
+                    i = r.createIPC(ctx, 50*time.Millisecond, 10)
+                }
+                if i == nil {
+                    r.fnLogger("!!!!!!SDK Recv createIPC error:", r.ipcURL)
+                    continue
+                }
+                if d, err := i.Recv(); err != nil {
+
+                    i.Close()
+                    r.fnLogger("SDK RECV:", r.ipcURL, "ERROR:", err)
+
+                    i = r.createIPC(ctx, 50*time.Millisecond, 20)
+                    r.fnLogger("To Reid Recver CREATE SHM:", r.ipcURL)
+                } else {
+                    if d != nil {
+                        if len(r.out) > r.chCap/2 {
+                            for i := 0; i < r.chCap/2; i++ {
+                                <-r.out
+                            }
+                        }
+                        r.out <- d
+                        r.fnLogger("~~~shm recv from:", r.ipcURL, "image:", len(d))
+                    }
+                }
+
+            } else {
+                if msg, err := i.Recv(); err != nil {
+                    // logo.Errorln("recv error : ", err, " url: ", r.ipcURL)
+                } else {
+                    count++
+                    if count > 10 {
+                        count = 0
+                        r.fnLogger("~~~mangos recv image:", len(msg))
+                    }
+                    if len(msg) > 2 {
+                        if len(r.out) > r.chCap/2 {
+                            for i := 0; i < r.chCap/2; i++ {
+                                <-r.out
+                            }
+                        }
+                        r.out <- msg
+                    }
+                }
+            }
+            time.Sleep(10 * time.Millisecond)
+        }
+    }
 }
 
-func (r *Reciever) run(ctx context.Context, i deliver.Deliver) {
+func (r *Reciever) createIPC(ctx context.Context, wait time.Duration, loop int) deliver.Deliver {
 
-	count := 0
+    mode := deliver.PushPull
+    if r.shm {
+        mode = deliver.Shm
+    }
 
-	for {
-		select {
-		case <-ctx.Done():
-			i.Close()
-			return
-		default:
+    try := 0
 
-			if r.shm {
-				if d, err := i.Recv(); err != nil {
-					i.Close()
-					r.fnLogger("Reciever RECV From:", r.ipcURL, " ERROR: ", err)
-
-					c, err := deliver.NewServerWithError(deliver.Shm, r.ipcURL)
-				loopR:
-					for {
-						select {
-						case <-ctx.Done():
-							return
-						default:
-							if err == nil {
-								break loopR
-							}
-							time.Sleep(time.Second)
-							c, err = deliver.NewServerWithError(deliver.Shm, r.ipcURL)
-							r.fnLogger("Recver ANALYSIS CREATE:", r.ipcURL, "  FAILED : ", err)
-						}
-					}
-					i = c
-					r.fnLogger("Reciever CREATE SHM:", r.ipcURL)
-				} else {
-					if d != nil {
-						count++
-						if count > 10 {
-							count = 0
-							r.fnLogger("~~~shm recv image:", len(d))
-						}
-						if len(d) > 2 {
-							r.out <- d
-						}
-					}
-				}
-			} else {
-				if msg, err := i.Recv(); err != nil {
-					// logo.Errorln("recv error : ", err, " url: ", r.ipcURL)
-				} else {
-					count++
-					if count > 10 {
-						count = 0
-						r.fnLogger("~~~mangos recv image:", len(msg))
-					}
-					if len(msg) > 2 {
-						r.out <- msg
-					}
-				}
-			}
-			time.Sleep(10 * time.Millisecond)
-		}
-	}
-}
-
-func (r *Reciever) runShm(ctx context.Context) {
-	c, err := deliver.NewServerWithError(deliver.Shm, r.ipcURL)
-loopRBegin:
-	for {
-		select {
-		case <-ctx.Done():
-			return
-		default:
-			if err == nil {
-				break loopRBegin
-			}
-			time.Sleep(1 * time.Second)
-			c, err = deliver.NewServerWithError(deliver.Shm, r.ipcURL)
-			r.fnLogger("Recver CLIENT CREATE", r.ipcURL, "FAILED : ", err)
-		}
-	}
-
-	r.run(ctx, c)
+    c, err := deliver.NewClientWithError(mode, r.ipcURL)
+loopR:
+    for {
+        select {
+        case <-ctx.Done():
+            return nil
+        default:
+            if err == nil {
+                break loopR
+            }
+            if loop > 0 {
+                try++
+                if try > loop {
+                    return nil
+                }
+                time.Sleep(wait)
+            } else {
+                time.Sleep(time.Second)
+            }
+            c, err = deliver.NewClientWithError(mode, r.ipcURL)
+        }
+    }
+    return c
 }

--
Gitblit v1.8.0