| | |
| | | type Reciever struct { |
| | | ctx context.Context |
| | | ipcURL string |
| | | out chan<- []byte |
| | | out chan []byte |
| | | chCap int |
| | | |
| | | shm bool |
| | | fnLogger func(...interface{}) |
| | | } |
| | | |
| | | // NewReciever new recv |
| | | func NewReciever(url string, out chan<- []byte, shm bool, fn func(...interface{})) *Reciever { |
| | | func NewReciever(url string, out chan []byte, shm bool, fn func(...interface{})) *Reciever { |
| | | return &Reciever{ |
| | | ipcURL: url, |
| | | out: out, |
| | | ipcURL: url, |
| | | out: out, |
| | | chCap: cap(out), |
| | | |
| | | shm: shm, |
| | | fnLogger: fn, |
| | | } |
| | |
| | | // Run run a IPC client |
| | | func (r *Reciever) Run(ctx context.Context) { |
| | | |
| | | if r.shm { |
| | | r.runShm(ctx) |
| | | } else { |
| | | r.run(ctx, deliver.NewServer(deliver.PushPull, r.ipcURL)) |
| | | } |
| | | } |
| | | |
| | | func (r *Reciever) run(ctx context.Context, i deliver.Deliver) { |
| | | |
| | | count := 0 |
| | | |
| | | i := r.createIPC(ctx, 50*time.Millisecond, 200) |
| | | |
| | | for { |
| | | select { |
| | |
| | | default: |
| | | |
| | | if r.shm { |
| | | if i == nil { |
| | | r.fnLogger("!!!!!!SDK Recv createIPC not ready error:", r.ipcURL) |
| | | i = r.createIPC(ctx, 50*time.Millisecond, 10) |
| | | } |
| | | if i == nil { |
| | | r.fnLogger("!!!!!!SDK Recv createIPC error:", r.ipcURL) |
| | | continue |
| | | } |
| | | if d, err := i.Recv(); err != nil { |
| | | i.Close() |
| | | r.fnLogger("Reciever RECV From:", r.ipcURL, " ERROR: ", err) |
| | | |
| | | c, err := deliver.NewServerWithError(deliver.Shm, r.ipcURL) |
| | | loopR: |
| | | for { |
| | | select { |
| | | case <-ctx.Done(): |
| | | return |
| | | default: |
| | | if err == nil { |
| | | break loopR |
| | | } |
| | | time.Sleep(time.Second) |
| | | c, err = deliver.NewServerWithError(deliver.Shm, r.ipcURL) |
| | | r.fnLogger("Recver ANALYSIS CREATE:", r.ipcURL, " FAILED : ", err) |
| | | } |
| | | } |
| | | i = c |
| | | r.fnLogger("Reciever CREATE SHM:", r.ipcURL) |
| | | i.Close() |
| | | r.fnLogger("SDK RECV:", r.ipcURL, "ERROR:", err) |
| | | |
| | | i = r.createIPC(ctx, time.Hour, -1) |
| | | r.fnLogger("To Reid Recver CREATE SHM:", r.ipcURL) |
| | | } else { |
| | | if d != nil { |
| | | count++ |
| | | if count > 10 { |
| | | count = 0 |
| | | r.fnLogger("~~~shm recv image:", len(d)) |
| | | if len(r.out) > r.chCap/2 { |
| | | for i := 0; i < r.chCap/2; i++ { |
| | | <-r.out |
| | | } |
| | | } |
| | | if len(d) > 2 { |
| | | r.out <- d |
| | | } |
| | | r.out <- d |
| | | r.fnLogger("~~~shm recv from:", r.ipcURL, "image:", len(d)) |
| | | } |
| | | } |
| | | |
| | | } else { |
| | | if msg, err := i.Recv(); err != nil { |
| | | // logo.Errorln("recv error : ", err, " url: ", r.ipcURL) |
| | |
| | | r.fnLogger("~~~mangos recv image:", len(msg)) |
| | | } |
| | | if len(msg) > 2 { |
| | | if len(r.out) > r.chCap/2 { |
| | | for i := 0; i < r.chCap/2; i++ { |
| | | <-r.out |
| | | } |
| | | } |
| | | r.out <- msg |
| | | } |
| | | } |
| | |
| | | } |
| | | } |
| | | |
| | | func (r *Reciever) runShm(ctx context.Context) { |
| | | c, err := deliver.NewServerWithError(deliver.Shm, r.ipcURL) |
| | | loopRBegin: |
| | | func (r *Reciever) createIPC(ctx context.Context, wait time.Duration, loop int) deliver.Deliver { |
| | | |
| | | mode := deliver.PushPull |
| | | if r.shm { |
| | | mode = deliver.Shm |
| | | } |
| | | |
| | | try := 0 |
| | | |
| | | c, err := deliver.NewClientWithError(mode, r.ipcURL) |
| | | loopR: |
| | | for { |
| | | select { |
| | | case <-ctx.Done(): |
| | | return |
| | | return nil |
| | | default: |
| | | if err == nil { |
| | | break loopRBegin |
| | | break loopR |
| | | } |
| | | time.Sleep(1 * time.Second) |
| | | c, err = deliver.NewServerWithError(deliver.Shm, r.ipcURL) |
| | | r.fnLogger("Recver CLIENT CREATE", r.ipcURL, "FAILED : ", err) |
| | | if loop > 0 { |
| | | try++ |
| | | if try > loop { |
| | | return nil |
| | | } |
| | | time.Sleep(wait) |
| | | } else { |
| | | time.Sleep(time.Second) |
| | | } |
| | | c, err = deliver.NewClientWithError(mode, r.ipcURL) |
| | | } |
| | | } |
| | | |
| | | r.run(ctx, c) |
| | | return c |
| | | } |
| | |
| | | type Sender struct { |
| | | ipcURL string |
| | | in <-chan []byte |
| | | shm bool |
| | | chCap int |
| | | |
| | | shm bool |
| | | fnLogger func(...interface{}) |
| | | } |
| | | |
| | | // NewSender Sender |
| | | func NewSender(ipcURL string, in <-chan []byte, shm bool, fn func(...interface{})) *Sender { |
| | | return &Sender{ |
| | | ipcURL: ipcURL, |
| | | in: in, |
| | | ipcURL: ipcURL, |
| | | in: in, |
| | | chCap: cap(in), |
| | | |
| | | shm: shm, |
| | | fnLogger: fn, |
| | | } |
| | |
| | | // Run run a IPC producer |
| | | func (s *Sender) Run(ctx context.Context) { |
| | | |
| | | if s.shm { |
| | | s.runShm(ctx) |
| | | } else { |
| | | i := deliver.NewServer(deliver.PushPull, s.ipcURL) |
| | | if i == nil { |
| | | s.fnLogger("sender 2 pubsub nng create error") |
| | | return |
| | | } |
| | | s.run(ctx, i) |
| | | } |
| | | } |
| | | |
| | | func (s *Sender) run(ctx context.Context, i deliver.Deliver) { |
| | | i := s.createIPC(ctx, 50*time.Millisecond, 200) |
| | | |
| | | for { |
| | | select { |
| | |
| | | return |
| | | case d := <-s.in: |
| | | |
| | | if len(s.in) > s.chCap/2 { |
| | | for i := 0; i < s.chCap/2; i++ { |
| | | <-s.in |
| | | } |
| | | } |
| | | if len(s.in) > 0 { |
| | | d = <-s.in |
| | | } |
| | | |
| | | if s.shm { |
| | | if i == nil { |
| | | s.fnLogger("!!!!!!SDK Send createIPC not ready error:", s.ipcURL) |
| | | i = s.createIPC(ctx, 50*time.Millisecond, 10) |
| | | } |
| | | if i == nil { |
| | | s.fnLogger("!!!!!!SDK Send createIPC error:", s.ipcURL) |
| | | continue |
| | | } |
| | | |
| | | t := time.Now() |
| | | |
| | | if err := i.Send(d); err != nil { |
| | | i.Close() |
| | | s.fnLogger("SENDER To:", s.ipcURL, " ERROR: ", err) |
| | | |
| | | c, err := deliver.NewServerWithError(deliver.Shm, s.ipcURL) |
| | | loopS: |
| | | for { |
| | | select { |
| | | case <-ctx.Done(): |
| | | return |
| | | default: |
| | | if err == nil { |
| | | break loopS |
| | | } |
| | | time.Sleep(time.Second) |
| | | c, err = deliver.NewServerWithError(deliver.Shm, s.ipcURL) |
| | | s.fnLogger("Sender CLIENT CREATE:", s.ipcURL, " FAILED : ", err) |
| | | } |
| | | |
| | | } |
| | | |
| | | i = c |
| | | s.fnLogger("Sender Create Shm:", s.ipcURL) |
| | | i = s.createIPC(ctx, time.Hour, -1) |
| | | s.fnLogger("ANALYSIS SENDER CREATE SHM:", s.ipcURL) |
| | | } else { |
| | | |
| | | s.fnLogger("~~~~~~ shm send to reid len: ", len(d)) |
| | | } |
| | | s.fnLogger("&&&&&&Sender------>Reid One Time:", time.Since(t)) |
| | | |
| | | } else { |
| | | err := i.Send(d) |
| | | if err != nil { |
| | |
| | | } |
| | | } |
| | | |
| | | func (s *Sender) runShm(ctx context.Context) { |
| | | c, err := deliver.NewServerWithError(deliver.Shm, s.ipcURL) |
| | | loopSBegin: |
| | | func (s *Sender) createIPC(ctx context.Context, wait time.Duration, loop int) deliver.Deliver { |
| | | |
| | | mode := deliver.PushPull |
| | | if s.shm { |
| | | mode = deliver.Shm |
| | | } |
| | | |
| | | try := 0 |
| | | |
| | | c, err := deliver.NewClientWithError(mode, s.ipcURL) |
| | | loopR: |
| | | for { |
| | | select { |
| | | case <-ctx.Done(): |
| | | return |
| | | return nil |
| | | default: |
| | | if err == nil { |
| | | break loopSBegin |
| | | break loopR |
| | | } |
| | | time.Sleep(1 * time.Second) |
| | | c, err = deliver.NewServerWithError(deliver.Shm, s.ipcURL) |
| | | s.fnLogger("Sender CLIENT CREATE:", s.ipcURL, "FAILED : ", err) |
| | | if loop > 0 { |
| | | try++ |
| | | if try > loop { |
| | | return nil |
| | | } |
| | | time.Sleep(wait) |
| | | } else { |
| | | time.Sleep(time.Second) |
| | | } |
| | | c, err = deliver.NewClientWithError(mode, s.ipcURL) |
| | | } |
| | | } |
| | | |
| | | s.run(ctx, c) |
| | | return c |
| | | } |