package rpc import ( "context" "time" "basic.com/valib/deliver.git" ) const mode = deliver.PushPull // Reciever recv from ipc type Reciever struct { ctx context.Context ipcURL string out chan<- []byte shm bool fnLogger func(...interface{}) } // NewReciever new recv func NewReciever(url string, out chan<- []byte, shm bool, fn func(...interface{})) *Reciever { return &Reciever{ ipcURL: url, out: out, shm: shm, fnLogger: fn, } } // Run run a IPC client func (r *Reciever) Run(ctx context.Context) { if r.shm { r.runShm(ctx) } else { r.run(ctx, deliver.NewClient(mode, r.ipcURL)) } } func (r *Reciever) run(ctx context.Context, i deliver.Deliver) { count := 0 for { select { case <-ctx.Done(): i.Close() return default: if r.shm { if d, err := i.Recv(); err != nil { i.Close() r.fnLogger("Reciever RECV ERROR: ", err) c, err := deliver.NewServerWithError(deliver.Shm, r.ipcURL) for { if err == nil { break } r.fnLogger("Reciever CREATE FAILED : ", err) time.Sleep(time.Second) c, err = deliver.NewServerWithError(deliver.Shm, r.ipcURL) } i = c r.fnLogger("Reciever CREATE SHM") } else { if d != nil { count++ if count > 10 { count = 0 r.fnLogger("~~~shm recv image:", len(d)) } if len(d) > 2 { r.out <- d } } } } else { if msg, err := i.Recv(); err != nil { // logo.Errorln("recv error : ", err, " url: ", r.ipcURL) } else { count++ if count > 10 { count = 0 r.fnLogger("~~~mangos recv image:", len(msg)) } if len(d) > 2 { r.out <- msg } } } time.Sleep(10 * time.Millisecond) } } } func (r *Reciever) runShm(ctx context.Context) { c, err := deliver.NewServerWithError(deliver.Shm, r.ipcURL) for { if err == nil { break } r.fnLogger("Reciever CLIENT CREATE FAILED : ", err) time.Sleep(1 * time.Second) c, err = deliver.NewServerWithError(deliver.Shm, r.ipcURL) } r.run(ctx, c) }