From 495ffcdad0027be02d5fc82825e08f36b6a53b90 Mon Sep 17 00:00:00 2001 From: 554325746@qq.com <554325746@qq.com> Date: 星期二, 24 三月 2020 15:11:24 +0800 Subject: [PATCH] 整理代码缩小收发缓存大小节省内存 --- rpc/send.go | 102 ++++++++++++++++++++++++++++----------------------- 1 files changed, 56 insertions(+), 46 deletions(-) diff --git a/rpc/send.go b/rpc/send.go index d9f7b00..3950476 100644 --- a/rpc/send.go +++ b/rpc/send.go @@ -11,16 +11,19 @@ type Sender struct { ipcURL string in <-chan []byte - shm bool + chCap int + shm bool fnLogger func(...interface{}) } // NewSender Sender func NewSender(ipcURL string, in <-chan []byte, shm bool, fn func(...interface{})) *Sender { return &Sender{ - ipcURL: ipcURL, - in: in, + ipcURL: ipcURL, + in: in, + chCap: cap(in), + shm: shm, fnLogger: fn, } @@ -29,19 +32,7 @@ // Run run a IPC producer func (s *Sender) Run(ctx context.Context) { - if s.shm { - s.runShm(ctx) - } else { - i := deliver.NewServer(deliver.PushPull, s.ipcURL) - if i == nil { - s.fnLogger("sender 2 pubsub nng create error") - return - } - s.run(ctx, i) - } -} - -func (s *Sender) run(ctx context.Context, i deliver.Deliver) { + i := s.createIPC(ctx, 50*time.Millisecond, 200) for { select { @@ -50,33 +41,38 @@ return case d := <-s.in: + if len(s.in) > s.chCap/2 { + for i := 0; i < s.chCap/2; i++ { + <-s.in + } + } + if len(s.in) > 0 { + d = <-s.in + } + if s.shm { + if i == nil { + s.fnLogger("!!!!!!SDK Send createIPC not ready error:", s.ipcURL) + i = s.createIPC(ctx, 50*time.Millisecond, 10) + } + if i == nil { + s.fnLogger("!!!!!!SDK Send createIPC error:", s.ipcURL) + continue + } + + t := time.Now() + if err := i.Send(d); err != nil { i.Close() s.fnLogger("SENDER To:", s.ipcURL, " ERROR: ", err) - c, err := deliver.NewServerWithError(deliver.Shm, s.ipcURL) - loopS: - for { - select { - case <-ctx.Done(): - return - default: - if err == nil { - break loopS - } - time.Sleep(time.Second) - c, err = deliver.NewServerWithError(deliver.Shm, s.ipcURL) - s.fnLogger("Sender CLIENT CREATE:", s.ipcURL, " FAILED : ", err) - } - - } - - i = c - s.fnLogger("Sender Create Shm:", s.ipcURL) + i = s.createIPC(ctx, time.Hour, -1) + s.fnLogger("ANALYSIS SENDER CREATE SHM:", s.ipcURL) } else { - + s.fnLogger("~~~~~~ shm send to reid len: ", len(d)) } + s.fnLogger("&&&&&&Sender------>Reid One Time:", time.Since(t)) + } else { err := i.Send(d) if err != nil { @@ -91,22 +87,36 @@ } } -func (s *Sender) runShm(ctx context.Context) { - c, err := deliver.NewServerWithError(deliver.Shm, s.ipcURL) -loopSBegin: +func (s *Sender) createIPC(ctx context.Context, wait time.Duration, loop int) deliver.Deliver { + + mode := deliver.PushPull + if s.shm { + mode = deliver.Shm + } + + try := 0 + + c, err := deliver.NewClientWithError(mode, s.ipcURL) +loopR: for { select { case <-ctx.Done(): - return + return nil default: if err == nil { - break loopSBegin + break loopR } - time.Sleep(1 * time.Second) - c, err = deliver.NewServerWithError(deliver.Shm, s.ipcURL) - s.fnLogger("Sender CLIENT CREATE:", s.ipcURL, "FAILED : ", err) + if loop > 0 { + try++ + if try > loop { + return nil + } + time.Sleep(wait) + } else { + time.Sleep(time.Second) + } + c, err = deliver.NewClientWithError(mode, s.ipcURL) } } - - s.run(ctx, c) + return c } -- Gitblit v1.8.0