package rpc import ( "context" "time" "basic.com/valib/deliver.git" ) // Sender decoder ingo type Sender struct { ipcURL string in <-chan []byte chCap int shm bool fnLogger func(...interface{}) } // NewSender Sender func NewSender(ipcURL string, in <-chan []byte, shm bool, fn func(...interface{})) *Sender { return &Sender{ ipcURL: ipcURL, in: in, chCap: cap(in), shm: shm, fnLogger: fn, } } // Run run a IPC producer func (s *Sender) Run(ctx context.Context) { i := s.createIPC(ctx, 50*time.Millisecond, 200) for { select { case <-ctx.Done(): i.Close() return case d := <-s.in: if len(s.in) > s.chCap/2 { for i := 0; i < s.chCap/2; i++ { <-s.in } } if len(s.in) > 0 { d = <-s.in } if s.shm { if i == nil { s.fnLogger("!!!!!!SDK Send createIPC not ready error:", s.ipcURL) i = s.createIPC(ctx, 50*time.Millisecond, 10) } if i == nil { s.fnLogger("!!!!!!SDK Send createIPC error:", s.ipcURL) continue } t := time.Now() if err := i.Send(d); err != nil { i.Close() s.fnLogger("SENDER To:", s.ipcURL, " ERROR: ", err) i = s.createIPC(ctx, 50*time.Millisecond, 20) s.fnLogger("ANALYSIS SENDER CREATE SHM:", s.ipcURL) } else { s.fnLogger("~~~~~~ shm send to reid len: ", len(d)) } s.fnLogger("&&&&&&Sender------>Reid One Time:", time.Since(t)) } else { err := i.Send(d) if err != nil { // logo.Errorln("error sender 2 pubsub: ", err) } else { s.fnLogger("mangos send to pubsub len: ", len(d)) } } default: time.Sleep(10 * time.Millisecond) } } } func (s *Sender) createIPC(ctx context.Context, wait time.Duration, loop int) deliver.Deliver { mode := deliver.PushPull if s.shm { mode = deliver.Shm } try := 0 c, err := deliver.NewClientWithError(mode, s.ipcURL) loopR: for { select { case <-ctx.Done(): return nil default: if err == nil { break loopR } if loop > 0 { try++ if try > loop { return nil } time.Sleep(wait) } else { time.Sleep(time.Second) } c, err = deliver.NewClientWithError(mode, s.ipcURL) } } return c }