From b2500a8eb6665ce6efe0a7d954b6f101af83d7ec Mon Sep 17 00:00:00 2001 From: 554325746@qq.com <554325746@qq.com> Date: 星期三, 25 三月 2020 11:31:41 +0800 Subject: [PATCH] debug --- rpc/recv.go | 196 +++++++++++++++++++++++++++---------------------- 1 files changed, 108 insertions(+), 88 deletions(-) diff --git a/rpc/recv.go b/rpc/recv.go index f2bfc60..8106513 100644 --- a/rpc/recv.go +++ b/rpc/recv.go @@ -1,112 +1,132 @@ package rpc import ( - "context" + "context" - "time" + "time" - "basic.com/valib/deliver.git" + "basic.com/valib/deliver.git" ) - -const mode = deliver.PushPull // Reciever recv from ipc type Reciever struct { - ctx context.Context - ipcURL string - out chan<- []byte + ctx context.Context + ipcURL string + out chan []byte + chCap int - shm bool - fnLogger func(...interface{}) + shm bool + fnLogger func(...interface{}) } // NewReciever new recv -func NewReciever(url string, out chan<- []byte, shm bool, fn func(...interface{})) *Reciever { - return &Reciever{ - ipcURL: url, - out: out, - shm: shm, - fnLogger: fn, - } +func NewReciever(url string, out chan []byte, shm bool, fn func(...interface{})) *Reciever { + return &Reciever{ + ipcURL: url, + out: out, + chCap: cap(out), + + shm: shm, + fnLogger: fn, + } } // Run run a IPC client func (r *Reciever) Run(ctx context.Context) { - if r.shm { - r.runShm(ctx) - } else { - r.run(ctx, deliver.NewClient(mode, r.ipcURL)) - } + count := 0 + + i := r.createIPC(ctx, 50*time.Millisecond, 200) + + for { + select { + case <-ctx.Done(): + i.Close() + return + default: + + if r.shm { + if i == nil { + r.fnLogger("!!!!!!SDK Recv createIPC not ready error:", r.ipcURL) + i = r.createIPC(ctx, 50*time.Millisecond, 10) + } + if i == nil { + r.fnLogger("!!!!!!SDK Recv createIPC error:", r.ipcURL) + continue + } + if d, err := i.Recv(); err != nil { + + i.Close() + r.fnLogger("SDK RECV:", r.ipcURL, "ERROR:", err) + + i = r.createIPC(ctx, 50*time.Millisecond, 20) + r.fnLogger("To Reid Recver CREATE SHM:", r.ipcURL) + } else { + if d != nil { + if len(r.out) > r.chCap/2 { + for i := 0; i < r.chCap/2; i++ { + <-r.out + } + } + r.out <- d + r.fnLogger("~~~shm recv from:", r.ipcURL, "image:", len(d)) + } + } + + } else { + if msg, err := i.Recv(); err != nil { + // logo.Errorln("recv error : ", err, " url: ", r.ipcURL) + } else { + count++ + if count > 10 { + count = 0 + r.fnLogger("~~~mangos recv image:", len(msg)) + } + if len(msg) > 2 { + if len(r.out) > r.chCap/2 { + for i := 0; i < r.chCap/2; i++ { + <-r.out + } + } + r.out <- msg + } + } + } + time.Sleep(10 * time.Millisecond) + } + } } -func (r *Reciever) run(ctx context.Context, i deliver.Deliver) { +func (r *Reciever) createIPC(ctx context.Context, wait time.Duration, loop int) deliver.Deliver { - count := 0 + mode := deliver.PushPull + if r.shm { + mode = deliver.Shm + } - for { - select { - case <-ctx.Done(): - i.Close() - return - default: + try := 0 - if r.shm { - if d, err := i.Recv(); err != nil { - i.Close() - r.fnLogger("Reciever RECV ERROR: ", err) - - c, err := deliver.NewServerWithError(deliver.Shm, r.ipcURL) - for { - if err == nil { - break - } - r.fnLogger("Reciever CREATE FAILED : ", err) - time.Sleep(time.Second) - c, err = deliver.NewServerWithError(deliver.Shm, r.ipcURL) - } - i = c - r.fnLogger("Reciever CREATE SHM") - } else { - if d != nil { - count++ - if count > 10 { - count = 0 - r.fnLogger("~~~shm recv image:", len(d)) - } - if len(d) > 2 { - r.out <- d - } - } - } - } else { - if msg, err := i.Recv(); err != nil { - // logo.Errorln("recv error : ", err, " url: ", r.ipcURL) - } else { - count++ - if count > 10 { - count = 0 - r.fnLogger("~~~mangos recv image:", len(msg)) - } - if len(msg) > 2 { - r.out <- msg - } - } - } - time.Sleep(10 * time.Millisecond) - } - } -} - -func (r *Reciever) runShm(ctx context.Context) { - c, err := deliver.NewServerWithError(deliver.Shm, r.ipcURL) - for { - if err == nil { - break - } - r.fnLogger("Reciever CLIENT CREATE FAILED : ", err) - time.Sleep(1 * time.Second) - c, err = deliver.NewServerWithError(deliver.Shm, r.ipcURL) - } - r.run(ctx, c) + c, err := deliver.NewClientWithError(mode, r.ipcURL) +loopR: + for { + select { + case <-ctx.Done(): + return nil + default: + if err == nil { + break loopR + } + if loop > 0 { + try++ + if try > loop { + return nil + } + time.Sleep(wait) + } else { + time.Sleep(time.Second) + } + c, err = deliver.NewClientWithError(mode, r.ipcURL) + } + } + return c } -- Gitblit v1.8.0