package rpc import ( "context" "time" "basic.com/valib/deliver.git" ) // Reciever recv from ipc type Reciever struct { ctx context.Context ipcURL string out chan []byte chCap int shm bool fnLogger func(...interface{}) } // NewReciever new recv func NewReciever(url string, out chan []byte, shm bool, fn func(...interface{})) *Reciever { return &Reciever{ ipcURL: url, out: out, chCap: cap(out), shm: shm, fnLogger: fn, } } // Run run a IPC client func (r *Reciever) Run(ctx context.Context) { count := 0 i := r.createIPC(ctx, 50*time.Millisecond, 200) for { select { case <-ctx.Done(): i.Close() return default: if r.shm { if i == nil { r.fnLogger("!!!!!!SDK Recv createIPC not ready error:", r.ipcURL) i = r.createIPC(ctx, 50*time.Millisecond, 10) } if i == nil { r.fnLogger("!!!!!!SDK Recv createIPC error:", r.ipcURL) continue } if d, err := i.Recv(); err != nil { i.Close() r.fnLogger("SDK RECV:", r.ipcURL, "ERROR:", err) i = r.createIPC(ctx, 50*time.Millisecond, 20) r.fnLogger("To Reid Recver CREATE SHM:", r.ipcURL) } else { if d != nil { if len(r.out) > r.chCap/2 { for i := 0; i < r.chCap/2; i++ { <-r.out } } r.out <- d r.fnLogger("~~~shm recv from:", r.ipcURL, "image:", len(d)) } } } else { if msg, err := i.Recv(); err != nil { // logo.Errorln("recv error : ", err, " url: ", r.ipcURL) } else { count++ if count > 10 { count = 0 r.fnLogger("~~~mangos recv image:", len(msg)) } if len(msg) > 2 { if len(r.out) > r.chCap/2 { for i := 0; i < r.chCap/2; i++ { <-r.out } } r.out <- msg } } } time.Sleep(10 * time.Millisecond) } } } func (r *Reciever) createIPC(ctx context.Context, wait time.Duration, loop int) deliver.Deliver { mode := deliver.PushPull if r.shm { mode = deliver.Shm } try := 0 c, err := deliver.NewClientWithError(mode, r.ipcURL) loopR: for { select { case <-ctx.Done(): return nil default: if err == nil { break loopR } if loop > 0 { try++ if try > loop { return nil } time.Sleep(wait) } else { time.Sleep(time.Second) } c, err = deliver.NewClientWithError(mode, r.ipcURL) } } return c }