package work
|
|
import (
|
"analysis/logo"
|
"context"
|
|
"time"
|
|
"basic.com/valib/deliver.git"
|
|
"basic.com/pubsub/protomsg.git"
|
"github.com/gogo/protobuf/proto"
|
)
|
|
// Reciever recv from ipc
|
type Reciever struct {
|
ctx context.Context
|
ipcURL string
|
chMsg chan MsgRS
|
|
shm bool
|
}
|
|
// NewReciever new recv
|
func NewReciever(url string, chMsg chan MsgRS, shm bool) *Reciever {
|
return &Reciever{
|
ipcURL: url,
|
chMsg: chMsg,
|
shm: shm,
|
}
|
}
|
|
func (r *Reciever) unserilizeProto(ctx context.Context, data <-chan []byte) {
|
for {
|
select {
|
case <-ctx.Done():
|
return
|
default:
|
d := <-data
|
if len(d) < 100 {
|
continue
|
}
|
// logo.Infoln(len(d), "reciver数据")
|
msg := protomsg.SdkMessage{}
|
if err := proto.Unmarshal(d, &msg); err != nil {
|
logo.Errorln(err, " msg 处理异常")
|
continue
|
}
|
// logo.Infoln("RECV MSG: ", msg.Cid, " TASK: ", msg.Tasklab.Taskid, " SDK: ", msg.Tasklab.Sdkinfos[msg.Tasklab.Index].Sdktype)
|
outMsg := MsgRS{Msg: msg}
|
r.chMsg <- outMsg
|
}
|
}
|
}
|
|
func (r *Reciever) run(ctx context.Context, i deliver.Deliver) {
|
dataChan := make(chan []byte)
|
|
go r.unserilizeProto(ctx, dataChan)
|
|
// t := time.Now()
|
// sc := 0
|
|
for {
|
select {
|
case <-ctx.Done():
|
i.Close()
|
return
|
default:
|
|
if r.shm {
|
if d, err := i.Recv(); err != nil {
|
i.Close()
|
logo.Infoln("ANALYSIS RECV ERROR: ", err)
|
|
c, err := deliver.NewClientWithError(deliver.Shm, r.ipcURL)
|
for {
|
if err == nil {
|
break
|
}
|
time.Sleep(time.Second)
|
c, err = deliver.NewClientWithError(deliver.Shm, r.ipcURL)
|
logo.Infoln("ANALYSIS CREATE FAILED : ", err)
|
}
|
i = c
|
logo.Infoln("ANALYSIS CREATE SHM")
|
} else {
|
if d != nil {
|
logo.Infoln("~~~shm recv image:", len(d))
|
dataChan <- d
|
}
|
}
|
} else {
|
if msg, err := i.Recv(); err != nil {
|
// logo.Errorln("recv error : ", err, " url: ", r.ipcURL)
|
} else {
|
logo.Infoln("~~~mangos recv image:", len(msg))
|
dataChan <- msg
|
}
|
}
|
|
// sc++
|
// if sc == 25 {
|
// logo.Infoln("SDK RECV 25 FRAME USE TIME: ", time.Since(t))
|
// sc = 0
|
// t = time.Now()
|
// }
|
|
}
|
}
|
}
|
|
// Run run a IPC client
|
func (r *Reciever) Run(ctx context.Context) {
|
if r.shm {
|
r.runShm(ctx)
|
} else {
|
r.run(ctx, deliver.NewClient(mode, r.ipcURL))
|
}
|
}
|
|
func (r *Reciever) runShm(ctx context.Context) {
|
c, err := deliver.NewClientWithError(deliver.Shm, r.ipcURL)
|
for {
|
if err == nil {
|
break
|
}
|
time.Sleep(1 * time.Second)
|
c, err = deliver.NewClientWithError(deliver.Shm, r.ipcURL)
|
logo.Infoln("CLIENT CREATE FAILED : ", err)
|
}
|
r.run(ctx, c)
|
}
|
|
///////////////////////////////////////////////////
|
|
///////// test
|
|
///////////////////////////////////////////////////
|
|
// Run2 run a IPC server or client
|
func (r *Reciever) Run2(ctx context.Context, server bool) {
|
if server {
|
r.run(ctx, deliver.NewServer(deliver.PushPull, r.ipcURL))
|
} else {
|
r.run(ctx, deliver.NewClient(deliver.PushPull, r.ipcURL))
|
}
|
}
|