From 495ffcdad0027be02d5fc82825e08f36b6a53b90 Mon Sep 17 00:00:00 2001
From: 554325746@qq.com <554325746@qq.com>
Date: 星期二, 24 三月 2020 15:11:24 +0800
Subject: [PATCH] 整理代码缩小收发缓存大小节省内存
---
rpc/recv.go | 107 ++++++++++++++------------
rpc/send.go | 102 ++++++++++++++-----------
2 files changed, 114 insertions(+), 95 deletions(-)
diff --git a/rpc/recv.go b/rpc/recv.go
index a6bb22e..b43bac7 100644
--- a/rpc/recv.go
+++ b/rpc/recv.go
@@ -12,17 +12,20 @@
type Reciever struct {
ctx context.Context
ipcURL string
- out chan<- []byte
+ out chan []byte
+ chCap int
shm bool
fnLogger func(...interface{})
}
// NewReciever new recv
-func NewReciever(url string, out chan<- []byte, shm bool, fn func(...interface{})) *Reciever {
+func NewReciever(url string, out chan []byte, shm bool, fn func(...interface{})) *Reciever {
return &Reciever{
- ipcURL: url,
- out: out,
+ ipcURL: url,
+ out: out,
+ chCap: cap(out),
+
shm: shm,
fnLogger: fn,
}
@@ -31,16 +34,9 @@
// Run run a IPC client
func (r *Reciever) Run(ctx context.Context) {
- if r.shm {
- r.runShm(ctx)
- } else {
- r.run(ctx, deliver.NewServer(deliver.PushPull, r.ipcURL))
- }
-}
-
-func (r *Reciever) run(ctx context.Context, i deliver.Deliver) {
-
count := 0
+
+ i := r.createIPC(ctx, 50*time.Millisecond, 200)
for {
select {
@@ -50,39 +46,33 @@
default:
if r.shm {
+ if i == nil {
+ r.fnLogger("!!!!!!SDK Recv createIPC not ready error:", r.ipcURL)
+ i = r.createIPC(ctx, 50*time.Millisecond, 10)
+ }
+ if i == nil {
+ r.fnLogger("!!!!!!SDK Recv createIPC error:", r.ipcURL)
+ continue
+ }
if d, err := i.Recv(); err != nil {
- i.Close()
- r.fnLogger("Reciever RECV From:", r.ipcURL, " ERROR: ", err)
- c, err := deliver.NewServerWithError(deliver.Shm, r.ipcURL)
- loopR:
- for {
- select {
- case <-ctx.Done():
- return
- default:
- if err == nil {
- break loopR
- }
- time.Sleep(time.Second)
- c, err = deliver.NewServerWithError(deliver.Shm, r.ipcURL)
- r.fnLogger("Recver ANALYSIS CREATE:", r.ipcURL, " FAILED : ", err)
- }
- }
- i = c
- r.fnLogger("Reciever CREATE SHM:", r.ipcURL)
+ i.Close()
+ r.fnLogger("SDK RECV:", r.ipcURL, "ERROR:", err)
+
+ i = r.createIPC(ctx, time.Hour, -1)
+ r.fnLogger("To Reid Recver CREATE SHM:", r.ipcURL)
} else {
if d != nil {
- count++
- if count > 10 {
- count = 0
- r.fnLogger("~~~shm recv image:", len(d))
+ if len(r.out) > r.chCap/2 {
+ for i := 0; i < r.chCap/2; i++ {
+ <-r.out
+ }
}
- if len(d) > 2 {
- r.out <- d
- }
+ r.out <- d
+ r.fnLogger("~~~shm recv from:", r.ipcURL, "image:", len(d))
}
}
+
} else {
if msg, err := i.Recv(); err != nil {
// logo.Errorln("recv error : ", err, " url: ", r.ipcURL)
@@ -93,6 +83,11 @@
r.fnLogger("~~~mangos recv image:", len(msg))
}
if len(msg) > 2 {
+ if len(r.out) > r.chCap/2 {
+ for i := 0; i < r.chCap/2; i++ {
+ <-r.out
+ }
+ }
r.out <- msg
}
}
@@ -102,22 +97,36 @@
}
}
-func (r *Reciever) runShm(ctx context.Context) {
- c, err := deliver.NewServerWithError(deliver.Shm, r.ipcURL)
-loopRBegin:
+func (r *Reciever) createIPC(ctx context.Context, wait time.Duration, loop int) deliver.Deliver {
+
+ mode := deliver.PushPull
+ if r.shm {
+ mode = deliver.Shm
+ }
+
+ try := 0
+
+ c, err := deliver.NewClientWithError(mode, r.ipcURL)
+loopR:
for {
select {
case <-ctx.Done():
- return
+ return nil
default:
if err == nil {
- break loopRBegin
+ break loopR
}
- time.Sleep(1 * time.Second)
- c, err = deliver.NewServerWithError(deliver.Shm, r.ipcURL)
- r.fnLogger("Recver CLIENT CREATE", r.ipcURL, "FAILED : ", err)
+ if loop > 0 {
+ try++
+ if try > loop {
+ return nil
+ }
+ time.Sleep(wait)
+ } else {
+ time.Sleep(time.Second)
+ }
+ c, err = deliver.NewClientWithError(mode, r.ipcURL)
}
}
-
- r.run(ctx, c)
+ return c
}
diff --git a/rpc/send.go b/rpc/send.go
index d9f7b00..3950476 100644
--- a/rpc/send.go
+++ b/rpc/send.go
@@ -11,16 +11,19 @@
type Sender struct {
ipcURL string
in <-chan []byte
- shm bool
+ chCap int
+ shm bool
fnLogger func(...interface{})
}
// NewSender Sender
func NewSender(ipcURL string, in <-chan []byte, shm bool, fn func(...interface{})) *Sender {
return &Sender{
- ipcURL: ipcURL,
- in: in,
+ ipcURL: ipcURL,
+ in: in,
+ chCap: cap(in),
+
shm: shm,
fnLogger: fn,
}
@@ -29,19 +32,7 @@
// Run run a IPC producer
func (s *Sender) Run(ctx context.Context) {
- if s.shm {
- s.runShm(ctx)
- } else {
- i := deliver.NewServer(deliver.PushPull, s.ipcURL)
- if i == nil {
- s.fnLogger("sender 2 pubsub nng create error")
- return
- }
- s.run(ctx, i)
- }
-}
-
-func (s *Sender) run(ctx context.Context, i deliver.Deliver) {
+ i := s.createIPC(ctx, 50*time.Millisecond, 200)
for {
select {
@@ -50,33 +41,38 @@
return
case d := <-s.in:
+ if len(s.in) > s.chCap/2 {
+ for i := 0; i < s.chCap/2; i++ {
+ <-s.in
+ }
+ }
+ if len(s.in) > 0 {
+ d = <-s.in
+ }
+
if s.shm {
+ if i == nil {
+ s.fnLogger("!!!!!!SDK Send createIPC not ready error:", s.ipcURL)
+ i = s.createIPC(ctx, 50*time.Millisecond, 10)
+ }
+ if i == nil {
+ s.fnLogger("!!!!!!SDK Send createIPC error:", s.ipcURL)
+ continue
+ }
+
+ t := time.Now()
+
if err := i.Send(d); err != nil {
i.Close()
s.fnLogger("SENDER To:", s.ipcURL, " ERROR: ", err)
- c, err := deliver.NewServerWithError(deliver.Shm, s.ipcURL)
- loopS:
- for {
- select {
- case <-ctx.Done():
- return
- default:
- if err == nil {
- break loopS
- }
- time.Sleep(time.Second)
- c, err = deliver.NewServerWithError(deliver.Shm, s.ipcURL)
- s.fnLogger("Sender CLIENT CREATE:", s.ipcURL, " FAILED : ", err)
- }
-
- }
-
- i = c
- s.fnLogger("Sender Create Shm:", s.ipcURL)
+ i = s.createIPC(ctx, time.Hour, -1)
+ s.fnLogger("ANALYSIS SENDER CREATE SHM:", s.ipcURL)
} else {
-
+ s.fnLogger("~~~~~~ shm send to reid len: ", len(d))
}
+ s.fnLogger("&&&&&&Sender------>Reid One Time:", time.Since(t))
+
} else {
err := i.Send(d)
if err != nil {
@@ -91,22 +87,36 @@
}
}
-func (s *Sender) runShm(ctx context.Context) {
- c, err := deliver.NewServerWithError(deliver.Shm, s.ipcURL)
-loopSBegin:
+func (s *Sender) createIPC(ctx context.Context, wait time.Duration, loop int) deliver.Deliver {
+
+ mode := deliver.PushPull
+ if s.shm {
+ mode = deliver.Shm
+ }
+
+ try := 0
+
+ c, err := deliver.NewClientWithError(mode, s.ipcURL)
+loopR:
for {
select {
case <-ctx.Done():
- return
+ return nil
default:
if err == nil {
- break loopSBegin
+ break loopR
}
- time.Sleep(1 * time.Second)
- c, err = deliver.NewServerWithError(deliver.Shm, s.ipcURL)
- s.fnLogger("Sender CLIENT CREATE:", s.ipcURL, "FAILED : ", err)
+ if loop > 0 {
+ try++
+ if try > loop {
+ return nil
+ }
+ time.Sleep(wait)
+ } else {
+ time.Sleep(time.Second)
+ }
+ c, err = deliver.NewClientWithError(mode, s.ipcURL)
}
}
-
- s.run(ctx, c)
+ return c
}
--
Gitblit v1.8.0