package rpc import ( "context" "time" "basic.com/valib/deliver.git" ) // Sender decoder ingo type Sender struct { ipcURL string in <-chan []byte shm bool fnLogger func(...interface{}) } // NewSender Sender func NewSender(ipcURL string, in <-chan []byte, shm bool, fn func(...interface{})) *Sender { return &Sender{ ipcURL: ipcURL, in: in, shm: shm, fnLogger: fn, } } // Run run a IPC producer func (s *Sender) Run(ctx context.Context) { if s.shm { s.runShm(ctx) } else { i := deliver.NewServer(deliver.PushPull, s.ipcURL) if i == nil { s.fnLogger("sender 2 pubsub nng create error") return } s.run(ctx, i) } } func (s *Sender) run(ctx context.Context, i deliver.Deliver) { for { select { case <-ctx.Done(): i.Close() return case d := <-s.in: if s.shm { if err := i.Send(d); err != nil { i.Close() s.fnLogger("SENDER To:", s.ipcURL, " ERROR: ", err) c, err := deliver.NewServerWithError(deliver.Shm, s.ipcURL) loopS: for { select { case <-ctx.Done(): return default: if err == nil { break loopS } time.Sleep(time.Second) c, err = deliver.NewServerWithError(deliver.Shm, s.ipcURL) s.fnLogger("Sender CLIENT CREATE:", s.ipcURL, " FAILED : ", err) } } i = c s.fnLogger("Sender Create Shm:", s.ipcURL) } else { } } else { err := i.Send(d) if err != nil { // logo.Errorln("error sender 2 pubsub: ", err) } else { s.fnLogger("mangos send to pubsub len: ", len(d)) } } default: time.Sleep(10 * time.Millisecond) } } } func (s *Sender) runShm(ctx context.Context) { c, err := deliver.NewServerWithError(deliver.Shm, s.ipcURL) loopSBegin: for { select { case <-ctx.Done(): return default: if err == nil { break loopSBegin } time.Sleep(1 * time.Second) c, err = deliver.NewServerWithError(deliver.Shm, s.ipcURL) s.fnLogger("Sender CLIENT CREATE:", s.ipcURL, "FAILED : ", err) } } s.run(ctx, c) }