package rpc
|
|
import (
|
"context"
|
|
"time"
|
|
"basic.com/valib/deliver.git"
|
)
|
|
// Reciever recv from ipc
|
type Reciever struct {
|
ctx context.Context
|
ipcURL string
|
out chan []byte
|
chCap int
|
|
shm bool
|
fnLogger func(...interface{})
|
}
|
|
// NewReciever new recv
|
func NewReciever(url string, out chan []byte, shm bool, fn func(...interface{})) *Reciever {
|
return &Reciever{
|
ipcURL: url,
|
out: out,
|
chCap: cap(out),
|
|
shm: shm,
|
fnLogger: fn,
|
}
|
}
|
|
// Run run a IPC client
|
func (r *Reciever) Run(ctx context.Context) {
|
|
count := 0
|
|
i := r.createIPC(ctx, 50*time.Millisecond, 200)
|
|
for {
|
select {
|
case <-ctx.Done():
|
i.Close()
|
return
|
default:
|
|
if r.shm {
|
if i == nil {
|
r.fnLogger("!!!!!!SDK Recv createIPC not ready error:", r.ipcURL)
|
i = r.createIPC(ctx, 50*time.Millisecond, 10)
|
}
|
if i == nil {
|
r.fnLogger("!!!!!!SDK Recv createIPC error:", r.ipcURL)
|
continue
|
}
|
if d, err := i.Recv(); err != nil {
|
|
i.Close()
|
r.fnLogger("SDK RECV:", r.ipcURL, "ERROR:", err)
|
|
i = r.createIPC(ctx, 50*time.Millisecond, 20)
|
r.fnLogger("To Reid Recver CREATE SHM:", r.ipcURL)
|
} else {
|
if d != nil {
|
if len(r.out) > r.chCap/2 {
|
for i := 0; i < r.chCap/2; i++ {
|
<-r.out
|
}
|
}
|
r.out <- d
|
r.fnLogger("~~~shm recv from:", r.ipcURL, "image:", len(d))
|
}
|
}
|
|
} else {
|
if msg, err := i.Recv(); err != nil {
|
// logo.Errorln("recv error : ", err, " url: ", r.ipcURL)
|
} else {
|
count++
|
if count > 10 {
|
count = 0
|
r.fnLogger("~~~mangos recv image:", len(msg))
|
}
|
if len(msg) > 2 {
|
if len(r.out) > r.chCap/2 {
|
for i := 0; i < r.chCap/2; i++ {
|
<-r.out
|
}
|
}
|
r.out <- msg
|
}
|
}
|
}
|
time.Sleep(10 * time.Millisecond)
|
}
|
}
|
}
|
|
func (r *Reciever) createIPC(ctx context.Context, wait time.Duration, loop int) deliver.Deliver {
|
|
mode := deliver.PushPull
|
if r.shm {
|
mode = deliver.Shm
|
}
|
|
try := 0
|
|
c, err := deliver.NewClientWithError(mode, r.ipcURL)
|
loopR:
|
for {
|
select {
|
case <-ctx.Done():
|
return nil
|
default:
|
if err == nil {
|
break loopR
|
}
|
if loop > 0 {
|
try++
|
if try > loop {
|
return nil
|
}
|
time.Sleep(wait)
|
} else {
|
time.Sleep(time.Second)
|
}
|
c, err = deliver.NewClientWithError(mode, r.ipcURL)
|
}
|
}
|
return c
|
}
|