package work import ( "analysis/goconv" "analysis/logo" "context" "time" "basic.com/pubsub/protomsg.git" "basic.com/valib/deliver.git" "github.com/gogo/protobuf/proto" ) // Sender decoder ingo type Sender struct { ipcURL string chMsg <-chan MsgRS shm bool fn func([]byte, bool) } // ApplyCallbackFunc cb func (s *Sender) ApplyCallbackFunc(f func([]byte, bool)) { if s.fn == nil { s.fn = f } } // NewSender Sender func NewSender(ipcURL string, chMsg <-chan MsgRS, shm bool) *Sender { // logo.Infof("create ipc %s for decode : %s\n", ipcURL, ipcURL) return &Sender{ ipcURL: ipcURL, chMsg: chMsg, shm: shm, fn: nil, } } func unpackImage(msg MsgRS, fnName string) *protomsg.Image { // 解压获取传入的数据 bData, err := UnCompress(msg.Msg.Data) if err != nil { logo.Errorf("%s uncompress image failed\n", fnName) return nil } // 反序列化数据得到sdk入参 i := &protomsg.Image{} err = proto.Unmarshal(bData, i) if err != nil { logo.Errorf("%s protobuf decode CameraImage error : %s\n", fnName, err.Error()) return nil } if i.Data == nil { logo.Errorf("%s protomsg.Image data null\n", fnName) return nil } return i } func (s *Sender) serializeProto(ctx context.Context, data chan<- []byte) { for { select { case <-ctx.Done(): logo.Infoln("stop Sender") return case i := <-s.chMsg: d, err := proto.Marshal(&i.Msg) if err != nil { logo.Errorln("protobuf encode ipc sender error: ", err) continue } data <- d if int(i.Msg.Tasklab.Index+1) == len(i.Msg.Tasklab.Sdkinfos) { if s.fn != nil { imgInfo := unpackImage(i, "sender") if imgInfo.Data == nil { continue } imgProto := protomsg.Image{ Data: goconv.YUV2BGR(imgInfo.Data, int(imgInfo.Width), int(imgInfo.Height)), Width: int32(imgInfo.Width), Height: int32(imgInfo.Height), Timestamp: imgInfo.Timestamp, Id: imgInfo.Id, Cid: imgInfo.Cid, } var sendData []byte if b, err := proto.Marshal(&imgProto); err == nil { i.Msg.Data = b sendData, err = proto.Marshal(&i.Msg) if err != nil { continue } } sFlag := true for _, v := range i.Msg.Tasklab.Sdkinfos { if len(v.Sdkdata) < 2 { sFlag = false break } } s.fn(sendData, sFlag) } } } } } func (s *Sender) run(ctx context.Context, i deliver.Deliver) { // go ruleserver.TimeTicker() dataChan := make(chan []byte) go s.serializeProto(ctx, dataChan) for { select { case <-ctx.Done(): i.Close() return default: d := <-dataChan if s.shm { if err := i.Send(d); err != nil { i.Close() logo.Infoln("ANALYSIS SENDER ERROR: ", err) c, err := deliver.NewClientWithError(deliver.Shm, s.ipcURL) for { if err == nil { break } time.Sleep(time.Second) c, err = deliver.NewClientWithError(deliver.Shm, s.ipcURL) logo.Infoln("CLIENT CREATE FAILED : ", err) } i = c } else { } } else { err := i.Send(d) if err != nil { // logo.Errorln("error sender 2 pubsub: ", err) } else { logo.Infoln("mangos send to pubsub len: ", len(d)) } } } } } // Run run a IPC producer func (s *Sender) Run(ctx context.Context) { if s.shm { s.runShm(ctx) } else { i := deliver.NewClient(mode, s.ipcURL) if i == nil { logo.Errorln("sender 2 pubsub nng create error") return } s.run(ctx, i) } } func (s *Sender) runShm(ctx context.Context) { c, err := deliver.NewClientWithError(deliver.Shm, s.ipcURL) for { if err == nil { break } time.Sleep(1 * time.Second) c, err = deliver.NewClientWithError(deliver.Shm, s.ipcURL) logo.Infoln("CLIENT CREATE FAILED : ", err) } s.run(ctx, c) }