| | |
| | | package rpc |
| | | |
| | | import ( |
| | | "context" |
| | | "time" |
| | | "context" |
| | | "time" |
| | | |
| | | "basic.com/valib/deliver.git" |
| | | "basic.com/valib/deliver.git" |
| | | ) |
| | | |
| | | // Sender decoder ingo |
| | | type Sender struct { |
| | | ipcURL string |
| | | in <-chan []byte |
| | | shm bool |
| | | ipcURL string |
| | | in <-chan []byte |
| | | shm bool |
| | | |
| | | fnLogger func(...interface{}) |
| | | fnLogger func(...interface{}) |
| | | } |
| | | |
| | | // NewSender Sender |
| | | func NewSender(ipcURL string, in <-chan []byte, shm bool, fn func(...interface{})) *Sender { |
| | | return &Sender{ |
| | | ipcURL: ipcURL, |
| | | in: in, |
| | | shm: shm, |
| | | fnLogger: fn, |
| | | } |
| | | return &Sender{ |
| | | ipcURL: ipcURL, |
| | | in: in, |
| | | shm: shm, |
| | | fnLogger: fn, |
| | | } |
| | | } |
| | | |
| | | // Run run a IPC producer |
| | | func (s *Sender) Run(ctx context.Context) { |
| | | |
| | | if s.shm { |
| | | s.runShm(ctx) |
| | | } else { |
| | | i := deliver.NewServer(deliver.PushPull, s.ipcURL) |
| | | if i == nil { |
| | | s.fnLogger("sender 2 pubsub nng create error") |
| | | return |
| | | } |
| | | s.run(ctx, i) |
| | | } |
| | | if s.shm { |
| | | s.runShm(ctx) |
| | | } else { |
| | | i := deliver.NewServer(deliver.PushPull, s.ipcURL) |
| | | if i == nil { |
| | | s.fnLogger("sender 2 pubsub nng create error") |
| | | return |
| | | } |
| | | s.run(ctx, i) |
| | | } |
| | | } |
| | | |
| | | func (s *Sender) run(ctx context.Context, i deliver.Deliver) { |
| | | |
| | | for { |
| | | select { |
| | | case <-ctx.Done(): |
| | | i.Close() |
| | | return |
| | | case d := <-s.in: |
| | | for { |
| | | select { |
| | | case <-ctx.Done(): |
| | | i.Close() |
| | | return |
| | | case d := <-s.in: |
| | | |
| | | if s.shm { |
| | | if err := i.Send(d); err != nil { |
| | | i.Close() |
| | | s.fnLogger("SENDER To:", s.ipcURL, " ERROR: ", err) |
| | | if s.shm { |
| | | if err := i.Send(d); err != nil { |
| | | i.Close() |
| | | s.fnLogger("SENDER To:", s.ipcURL, " ERROR: ", err) |
| | | |
| | | c, err := deliver.NewServerWithError(deliver.Shm, s.ipcURL) |
| | | loopS: |
| | | for { |
| | | select { |
| | | case <-ctx.Done(): |
| | | return |
| | | default: |
| | | if err == nil { |
| | | break loopS |
| | | } |
| | | time.Sleep(time.Second) |
| | | c, err = deliver.NewServerWithError(deliver.Shm, s.ipcURL) |
| | | s.fnLogger("Sender CLIENT CREATE:", s.ipcURL, " FAILED : ", err) |
| | | } |
| | | c, err := deliver.NewServerWithError(deliver.Shm, s.ipcURL) |
| | | loopS: |
| | | for { |
| | | select { |
| | | case <-ctx.Done(): |
| | | return |
| | | default: |
| | | if err == nil { |
| | | break loopS |
| | | } |
| | | time.Sleep(time.Second) |
| | | c, err = deliver.NewServerWithError(deliver.Shm, s.ipcURL) |
| | | s.fnLogger("Sender CLIENT CREATE:", s.ipcURL, " FAILED : ", err) |
| | | } |
| | | |
| | | } |
| | | } |
| | | |
| | | i = c |
| | | s.fnLogger("Sender Create Shm:", s.ipcURL) |
| | | } else { |
| | | i = c |
| | | s.fnLogger("Sender Create Shm:", s.ipcURL) |
| | | } else { |
| | | |
| | | } |
| | | } else { |
| | | err := i.Send(d) |
| | | if err != nil { |
| | | // logo.Errorln("error sender 2 pubsub: ", err) |
| | | } else { |
| | | s.fnLogger("mangos send to pubsub len: ", len(d)) |
| | | } |
| | | } |
| | | default: |
| | | time.Sleep(10 * time.Millisecond) |
| | | } |
| | | } |
| | | } |
| | | } else { |
| | | err := i.Send(d) |
| | | if err != nil { |
| | | // logo.Errorln("error sender 2 pubsub: ", err) |
| | | } else { |
| | | s.fnLogger("mangos send to pubsub len: ", len(d)) |
| | | } |
| | | } |
| | | default: |
| | | time.Sleep(10 * time.Millisecond) |
| | | } |
| | | } |
| | | } |
| | | |
| | | func (s *Sender) runShm(ctx context.Context) { |
| | | c, err := deliver.NewServerWithError(deliver.Shm, s.ipcURL) |
| | | c, err := deliver.NewServerWithError(deliver.Shm, s.ipcURL) |
| | | loopSBegin: |
| | | for { |
| | | select { |
| | | case <-ctx.Done(): |
| | | return |
| | | default: |
| | | if err == nil { |
| | | break loopSBegin |
| | | } |
| | | time.Sleep(1 * time.Second) |
| | | c, err = deliver.NewServerWithError(deliver.Shm, s.ipcURL) |
| | | s.fnLogger("Sender CLIENT CREATE:", s.ipcURL, "FAILED : ", err) |
| | | } |
| | | } |
| | | for { |
| | | select { |
| | | case <-ctx.Done(): |
| | | return |
| | | default: |
| | | if err == nil { |
| | | break loopSBegin |
| | | } |
| | | time.Sleep(1 * time.Second) |
| | | c, err = deliver.NewServerWithError(deliver.Shm, s.ipcURL) |
| | | s.fnLogger("Sender CLIENT CREATE:", s.ipcURL, "FAILED : ", err) |
| | | } |
| | | } |
| | | |
| | | s.run(ctx, c) |
| | | s.run(ctx, c) |
| | | } |