From b2500a8eb6665ce6efe0a7d954b6f101af83d7ec Mon Sep 17 00:00:00 2001 From: 554325746@qq.com <554325746@qq.com> Date: 星期三, 25 三月 2020 11:31:41 +0800 Subject: [PATCH] debug --- rpc/send.go | 186 ++++++++++++++++++++++++---------------------- 1 files changed, 98 insertions(+), 88 deletions(-) diff --git a/rpc/send.go b/rpc/send.go index c0d4b5c..d79c0b9 100644 --- a/rpc/send.go +++ b/rpc/send.go @@ -1,112 +1,122 @@ package rpc import ( - "context" - "time" + "context" + "time" - "basic.com/valib/deliver.git" + "basic.com/valib/deliver.git" ) // Sender decoder ingo type Sender struct { - ipcURL string - in <-chan []byte - shm bool + ipcURL string + in <-chan []byte + chCap int - fnLogger func(...interface{}) + shm bool + fnLogger func(...interface{}) } // NewSender Sender func NewSender(ipcURL string, in <-chan []byte, shm bool, fn func(...interface{})) *Sender { - return &Sender{ - ipcURL: ipcURL, - in: in, - shm: shm, - fnLogger: fn, - } + return &Sender{ + ipcURL: ipcURL, + in: in, + chCap: cap(in), + + shm: shm, + fnLogger: fn, + } } // Run run a IPC producer func (s *Sender) Run(ctx context.Context) { - if s.shm { - s.runShm(ctx) - } else { - i := deliver.NewServer(deliver.PushPull, s.ipcURL) - if i == nil { - s.fnLogger("sender 2 pubsub nng create error") - return - } - s.run(ctx, i) - } + i := s.createIPC(ctx, 50*time.Millisecond, 200) + + for { + select { + case <-ctx.Done(): + i.Close() + return + case d := <-s.in: + + if len(s.in) > s.chCap/2 { + for i := 0; i < s.chCap/2; i++ { + <-s.in + } + } + if len(s.in) > 0 { + d = <-s.in + } + + if s.shm { + if i == nil { + s.fnLogger("!!!!!!SDK Send createIPC not ready error:", s.ipcURL) + i = s.createIPC(ctx, 50*time.Millisecond, 10) + } + if i == nil { + s.fnLogger("!!!!!!SDK Send createIPC error:", s.ipcURL) + continue + } + + t := time.Now() + + if err := i.Send(d); err != nil { + i.Close() + s.fnLogger("SENDER To:", s.ipcURL, " ERROR: ", err) + + i = s.createIPC(ctx, 50*time.Millisecond, 20) + s.fnLogger("ANALYSIS SENDER CREATE SHM:", s.ipcURL) + } else { + s.fnLogger("~~~~~~ shm send to reid len: ", len(d)) + } + s.fnLogger("&&&&&&Sender------>Reid One Time:", time.Since(t)) + + } else { + err := i.Send(d) + if err != nil { + // logo.Errorln("error sender 2 pubsub: ", err) + } else { + s.fnLogger("mangos send to pubsub len: ", len(d)) + } + } + default: + time.Sleep(10 * time.Millisecond) + } + } } -func (s *Sender) run(ctx context.Context, i deliver.Deliver) { +func (s *Sender) createIPC(ctx context.Context, wait time.Duration, loop int) deliver.Deliver { - for { - select { - case <-ctx.Done(): - i.Close() - return - case d := <-s.in: + mode := deliver.PushPull + if s.shm { + mode = deliver.Shm + } - if s.shm { - if err := i.Send(d); err != nil { - i.Close() - s.fnLogger("SENDER To:", s.ipcURL, " ERROR: ", err) + try := 0 - c, err := deliver.NewServerWithError(deliver.Shm, s.ipcURL) - loopS: - for { - select { - case <-ctx.Done(): - return - default: - if err == nil { - break loopS - } - time.Sleep(time.Second) - c, err = deliver.NewServerWithError(deliver.Shm, s.ipcURL) - s.fnLogger("Sender CLIENT CREATE:", s.ipcURL, " FAILED : ", err) - } - - } - - i = c - s.fnLogger("Sender Create Shm:", s.ipcURL) - } else { - - } - } else { - err := i.Send(d) - if err != nil { - // logo.Errorln("error sender 2 pubsub: ", err) - } else { - s.fnLogger("mangos send to pubsub len: ", len(d)) - } - } - default: - time.Sleep(10 * time.Millisecond) - } - } -} - -func (s *Sender) runShm(ctx context.Context) { - c, err := deliver.NewServerWithError(deliver.Shm, s.ipcURL) -loopSBegin: - for { - select { - case <-ctx.Done(): - return - default: - if err == nil { - break loopSBegin - } - time.Sleep(1 * time.Second) - c, err = deliver.NewServerWithError(deliver.Shm, s.ipcURL) - s.fnLogger("Sender CLIENT CREATE:", s.ipcURL, "FAILED : ", err) - } - } - - s.run(ctx, c) + c, err := deliver.NewClientWithError(mode, s.ipcURL) +loopR: + for { + select { + case <-ctx.Done(): + return nil + default: + if err == nil { + break loopR + } + if loop > 0 { + try++ + if try > loop { + return nil + } + time.Sleep(wait) + } else { + time.Sleep(time.Second) + } + c, err = deliver.NewClientWithError(mode, s.ipcURL) + } + } + return c } -- Gitblit v1.8.0