From 495ffcdad0027be02d5fc82825e08f36b6a53b90 Mon Sep 17 00:00:00 2001 From: 554325746@qq.com <554325746@qq.com> Date: 星期二, 24 三月 2020 15:11:24 +0800 Subject: [PATCH] 整理代码缩小收发缓存大小节省内存 --- rpc/recv.go | 107 ++++++++++++++------------ rpc/send.go | 102 ++++++++++++++----------- 2 files changed, 114 insertions(+), 95 deletions(-) diff --git a/rpc/recv.go b/rpc/recv.go index a6bb22e..b43bac7 100644 --- a/rpc/recv.go +++ b/rpc/recv.go @@ -12,17 +12,20 @@ type Reciever struct { ctx context.Context ipcURL string - out chan<- []byte + out chan []byte + chCap int shm bool fnLogger func(...interface{}) } // NewReciever new recv -func NewReciever(url string, out chan<- []byte, shm bool, fn func(...interface{})) *Reciever { +func NewReciever(url string, out chan []byte, shm bool, fn func(...interface{})) *Reciever { return &Reciever{ - ipcURL: url, - out: out, + ipcURL: url, + out: out, + chCap: cap(out), + shm: shm, fnLogger: fn, } @@ -31,16 +34,9 @@ // Run run a IPC client func (r *Reciever) Run(ctx context.Context) { - if r.shm { - r.runShm(ctx) - } else { - r.run(ctx, deliver.NewServer(deliver.PushPull, r.ipcURL)) - } -} - -func (r *Reciever) run(ctx context.Context, i deliver.Deliver) { - count := 0 + + i := r.createIPC(ctx, 50*time.Millisecond, 200) for { select { @@ -50,39 +46,33 @@ default: if r.shm { + if i == nil { + r.fnLogger("!!!!!!SDK Recv createIPC not ready error:", r.ipcURL) + i = r.createIPC(ctx, 50*time.Millisecond, 10) + } + if i == nil { + r.fnLogger("!!!!!!SDK Recv createIPC error:", r.ipcURL) + continue + } if d, err := i.Recv(); err != nil { - i.Close() - r.fnLogger("Reciever RECV From:", r.ipcURL, " ERROR: ", err) - c, err := deliver.NewServerWithError(deliver.Shm, r.ipcURL) - loopR: - for { - select { - case <-ctx.Done(): - return - default: - if err == nil { - break loopR - } - time.Sleep(time.Second) - c, err = deliver.NewServerWithError(deliver.Shm, r.ipcURL) - r.fnLogger("Recver ANALYSIS CREATE:", r.ipcURL, " FAILED : ", err) - } - } - i = c - r.fnLogger("Reciever CREATE SHM:", r.ipcURL) + i.Close() + r.fnLogger("SDK RECV:", r.ipcURL, "ERROR:", err) + + i = r.createIPC(ctx, time.Hour, -1) + r.fnLogger("To Reid Recver CREATE SHM:", r.ipcURL) } else { if d != nil { - count++ - if count > 10 { - count = 0 - r.fnLogger("~~~shm recv image:", len(d)) + if len(r.out) > r.chCap/2 { + for i := 0; i < r.chCap/2; i++ { + <-r.out + } } - if len(d) > 2 { - r.out <- d - } + r.out <- d + r.fnLogger("~~~shm recv from:", r.ipcURL, "image:", len(d)) } } + } else { if msg, err := i.Recv(); err != nil { // logo.Errorln("recv error : ", err, " url: ", r.ipcURL) @@ -93,6 +83,11 @@ r.fnLogger("~~~mangos recv image:", len(msg)) } if len(msg) > 2 { + if len(r.out) > r.chCap/2 { + for i := 0; i < r.chCap/2; i++ { + <-r.out + } + } r.out <- msg } } @@ -102,22 +97,36 @@ } } -func (r *Reciever) runShm(ctx context.Context) { - c, err := deliver.NewServerWithError(deliver.Shm, r.ipcURL) -loopRBegin: +func (r *Reciever) createIPC(ctx context.Context, wait time.Duration, loop int) deliver.Deliver { + + mode := deliver.PushPull + if r.shm { + mode = deliver.Shm + } + + try := 0 + + c, err := deliver.NewClientWithError(mode, r.ipcURL) +loopR: for { select { case <-ctx.Done(): - return + return nil default: if err == nil { - break loopRBegin + break loopR } - time.Sleep(1 * time.Second) - c, err = deliver.NewServerWithError(deliver.Shm, r.ipcURL) - r.fnLogger("Recver CLIENT CREATE", r.ipcURL, "FAILED : ", err) + if loop > 0 { + try++ + if try > loop { + return nil + } + time.Sleep(wait) + } else { + time.Sleep(time.Second) + } + c, err = deliver.NewClientWithError(mode, r.ipcURL) } } - - r.run(ctx, c) + return c } diff --git a/rpc/send.go b/rpc/send.go index d9f7b00..3950476 100644 --- a/rpc/send.go +++ b/rpc/send.go @@ -11,16 +11,19 @@ type Sender struct { ipcURL string in <-chan []byte - shm bool + chCap int + shm bool fnLogger func(...interface{}) } // NewSender Sender func NewSender(ipcURL string, in <-chan []byte, shm bool, fn func(...interface{})) *Sender { return &Sender{ - ipcURL: ipcURL, - in: in, + ipcURL: ipcURL, + in: in, + chCap: cap(in), + shm: shm, fnLogger: fn, } @@ -29,19 +32,7 @@ // Run run a IPC producer func (s *Sender) Run(ctx context.Context) { - if s.shm { - s.runShm(ctx) - } else { - i := deliver.NewServer(deliver.PushPull, s.ipcURL) - if i == nil { - s.fnLogger("sender 2 pubsub nng create error") - return - } - s.run(ctx, i) - } -} - -func (s *Sender) run(ctx context.Context, i deliver.Deliver) { + i := s.createIPC(ctx, 50*time.Millisecond, 200) for { select { @@ -50,33 +41,38 @@ return case d := <-s.in: + if len(s.in) > s.chCap/2 { + for i := 0; i < s.chCap/2; i++ { + <-s.in + } + } + if len(s.in) > 0 { + d = <-s.in + } + if s.shm { + if i == nil { + s.fnLogger("!!!!!!SDK Send createIPC not ready error:", s.ipcURL) + i = s.createIPC(ctx, 50*time.Millisecond, 10) + } + if i == nil { + s.fnLogger("!!!!!!SDK Send createIPC error:", s.ipcURL) + continue + } + + t := time.Now() + if err := i.Send(d); err != nil { i.Close() s.fnLogger("SENDER To:", s.ipcURL, " ERROR: ", err) - c, err := deliver.NewServerWithError(deliver.Shm, s.ipcURL) - loopS: - for { - select { - case <-ctx.Done(): - return - default: - if err == nil { - break loopS - } - time.Sleep(time.Second) - c, err = deliver.NewServerWithError(deliver.Shm, s.ipcURL) - s.fnLogger("Sender CLIENT CREATE:", s.ipcURL, " FAILED : ", err) - } - - } - - i = c - s.fnLogger("Sender Create Shm:", s.ipcURL) + i = s.createIPC(ctx, time.Hour, -1) + s.fnLogger("ANALYSIS SENDER CREATE SHM:", s.ipcURL) } else { - + s.fnLogger("~~~~~~ shm send to reid len: ", len(d)) } + s.fnLogger("&&&&&&Sender------>Reid One Time:", time.Since(t)) + } else { err := i.Send(d) if err != nil { @@ -91,22 +87,36 @@ } } -func (s *Sender) runShm(ctx context.Context) { - c, err := deliver.NewServerWithError(deliver.Shm, s.ipcURL) -loopSBegin: +func (s *Sender) createIPC(ctx context.Context, wait time.Duration, loop int) deliver.Deliver { + + mode := deliver.PushPull + if s.shm { + mode = deliver.Shm + } + + try := 0 + + c, err := deliver.NewClientWithError(mode, s.ipcURL) +loopR: for { select { case <-ctx.Done(): - return + return nil default: if err == nil { - break loopSBegin + break loopR } - time.Sleep(1 * time.Second) - c, err = deliver.NewServerWithError(deliver.Shm, s.ipcURL) - s.fnLogger("Sender CLIENT CREATE:", s.ipcURL, "FAILED : ", err) + if loop > 0 { + try++ + if try > loop { + return nil + } + time.Sleep(wait) + } else { + time.Sleep(time.Second) + } + c, err = deliver.NewClientWithError(mode, s.ipcURL) } } - - s.run(ctx, c) + return c } -- Gitblit v1.8.0