package work
|
|
import (
|
"analysis/goconv"
|
"analysis/logo"
|
|
"context"
|
"time"
|
|
"basic.com/pubsub/protomsg.git"
|
"basic.com/valib/deliver.git"
|
|
"github.com/gogo/protobuf/proto"
|
)
|
|
// Sender decoder ingo
|
type Sender struct {
|
ipcURL string
|
chMsg <-chan MsgRS
|
shm bool
|
fn func([]byte, bool)
|
}
|
|
// ApplyCallbackFunc cb
|
func (s *Sender) ApplyCallbackFunc(f func([]byte, bool)) {
|
if s.fn == nil {
|
s.fn = f
|
}
|
}
|
|
// NewSender Sender
|
func NewSender(ipcURL string, chMsg <-chan MsgRS, shm bool) *Sender {
|
// logo.Infof("create ipc %s for decode : %s\n", ipcURL, ipcURL)
|
return &Sender{
|
ipcURL: ipcURL,
|
chMsg: chMsg,
|
shm: shm,
|
fn: nil,
|
}
|
}
|
|
func unpackImage(msg MsgRS, fnName string) *protomsg.Image {
|
// 解压获取传入的数据
|
bData, err := UnCompress(msg.Msg.Data)
|
if err != nil {
|
logo.Errorf("%s uncompress image failed\n", fnName)
|
return nil
|
}
|
// 反序列化数据得到sdk入参
|
i := &protomsg.Image{}
|
err = proto.Unmarshal(bData, i)
|
if err != nil {
|
logo.Errorf("%s protobuf decode CameraImage error : %s\n", fnName, err.Error())
|
return nil
|
}
|
if i.Data == nil {
|
logo.Errorf("%s protomsg.Image data null\n", fnName)
|
return nil
|
}
|
return i
|
}
|
|
func (s *Sender) serializeProto(ctx context.Context, data chan<- []byte) {
|
|
for {
|
select {
|
case <-ctx.Done():
|
logo.Infoln("stop Sender")
|
return
|
case i := <-s.chMsg:
|
|
d, err := proto.Marshal(&i.Msg)
|
|
if err != nil {
|
logo.Errorln("protobuf encode ipc sender error: ", err)
|
continue
|
}
|
|
data <- d
|
|
if int(i.Msg.Tasklab.Index+1) == len(i.Msg.Tasklab.Sdkinfos) {
|
if s.fn != nil {
|
imgInfo := unpackImage(i, "sender")
|
if imgInfo.Data == nil {
|
continue
|
}
|
imgProto := protomsg.Image{
|
Data: goconv.YUV2BGR(imgInfo.Data, int(imgInfo.Width), int(imgInfo.Height)),
|
Width: int32(imgInfo.Width),
|
Height: int32(imgInfo.Height),
|
Timestamp: imgInfo.Timestamp,
|
Id: imgInfo.Id,
|
Cid: imgInfo.Cid,
|
}
|
var sendData []byte
|
if b, err := proto.Marshal(&imgProto); err == nil {
|
i.Msg.Data = b
|
sendData, err = proto.Marshal(&i.Msg)
|
if err != nil {
|
continue
|
}
|
}
|
sFlag := true
|
for _, v := range i.Msg.Tasklab.Sdkinfos {
|
if len(v.Sdkdata) < 2 {
|
sFlag = false
|
break
|
}
|
}
|
s.fn(sendData, sFlag)
|
|
}
|
}
|
}
|
}
|
}
|
|
func (s *Sender) run(ctx context.Context, i deliver.Deliver) {
|
|
// go ruleserver.TimeTicker()
|
|
dataChan := make(chan []byte)
|
go s.serializeProto(ctx, dataChan)
|
|
for {
|
select {
|
case <-ctx.Done():
|
i.Close()
|
return
|
default:
|
|
d := <-dataChan
|
|
if s.shm {
|
if err := i.Send(d); err != nil {
|
i.Close()
|
logo.Infoln("ANALYSIS SENDER ERROR: ", err)
|
|
c, err := deliver.NewClientWithError(deliver.Shm, s.ipcURL)
|
for {
|
if err == nil {
|
break
|
}
|
time.Sleep(time.Second)
|
c, err = deliver.NewClientWithError(deliver.Shm, s.ipcURL)
|
logo.Infoln("CLIENT CREATE FAILED : ", err)
|
}
|
i = c
|
} else {
|
|
}
|
} else {
|
err := i.Send(d)
|
if err != nil {
|
// logo.Errorln("error sender 2 pubsub: ", err)
|
} else {
|
logo.Infoln("mangos send to pubsub len: ", len(d))
|
}
|
}
|
}
|
}
|
}
|
|
// Run run a IPC producer
|
func (s *Sender) Run(ctx context.Context) {
|
|
if s.shm {
|
s.runShm(ctx)
|
} else {
|
i := deliver.NewClient(mode, s.ipcURL)
|
if i == nil {
|
logo.Errorln("sender 2 pubsub nng create error")
|
return
|
}
|
s.run(ctx, i)
|
}
|
}
|
|
func (s *Sender) runShm(ctx context.Context) {
|
c, err := deliver.NewClientWithError(deliver.Shm, s.ipcURL)
|
for {
|
if err == nil {
|
break
|
}
|
time.Sleep(1 * time.Second)
|
c, err = deliver.NewClientWithError(deliver.Shm, s.ipcURL)
|
logo.Infoln("CLIENT CREATE FAILED : ", err)
|
}
|
s.run(ctx, c)
|
}
|