package rpc
|
|
import (
|
"context"
|
"time"
|
|
"basic.com/valib/deliver.git"
|
)
|
|
// Sender decoder ingo
|
type Sender struct {
|
ipcURL string
|
in <-chan []byte
|
shm bool
|
fn func([]byte, bool)
|
|
fnLogger func(...interface{})
|
}
|
|
// NewSender Sender
|
func NewSender(ipcURL string, in <-chan []byte, shm bool, fn func(...interface{})) *Sender {
|
return &Sender{
|
ipcURL: ipcURL,
|
in: in,
|
shm: shm,
|
fn: nil,
|
fnLogger: fn,
|
}
|
}
|
|
// Run run a IPC producer
|
func (s *Sender) Run(ctx context.Context) {
|
|
if s.shm {
|
s.runShm(ctx)
|
} else {
|
i := deliver.NewClient(mode, s.ipcURL)
|
if i == nil {
|
s.fnLogger("sender 2 pubsub nng create error")
|
return
|
}
|
s.run(ctx, i)
|
}
|
}
|
|
func (s *Sender) run(ctx context.Context, i deliver.Deliver) {
|
|
for {
|
select {
|
case <-ctx.Done():
|
i.Close()
|
return
|
case d := <-s.in:
|
|
if s.shm {
|
if err := i.Send(d); err != nil {
|
i.Close()
|
s.fnLogger("ANALYSIS SENDER ERROR: ", err)
|
|
c, err := deliver.NewClientWithError(deliver.Shm, s.ipcURL)
|
for {
|
if err == nil {
|
break
|
}
|
time.Sleep(time.Second)
|
c, err = deliver.NewClientWithError(deliver.Shm, s.ipcURL)
|
s.fnLogger("CLIENT CREATE FAILED : ", err)
|
}
|
i = c
|
} else {
|
|
}
|
} else {
|
err := i.Send(d)
|
if err != nil {
|
// logo.Errorln("error sender 2 pubsub: ", err)
|
} else {
|
s.fnLogger("mangos send to pubsub len: ", len(d))
|
}
|
}
|
default:
|
time.Sleep(10 * time.Millisecond)
|
}
|
}
|
}
|
|
func (s *Sender) runShm(ctx context.Context) {
|
c, err := deliver.NewServerWithError(deliver.Shm, s.ipcURL)
|
for {
|
if err == nil {
|
break
|
}
|
time.Sleep(1 * time.Second)
|
c, err = deliver.NewServerWithError(deliver.Shm, s.ipcURL)
|
s.fnLogger("CLIENT CREATE FAILED : ", err)
|
}
|
s.run(ctx, c)
|
}
|