package service import ( "bytes" "context" "decoder/valib/goffmpeg" "decoder/valib/ipc" "encoding/gob" "fmt" ) // Sender decoder ingo type Sender struct { ctxIPC *ContextIPC cameraID string streamURL string ipcURL string ffmpeg *goffmpeg.GoFFMPEG } // NewSender Sender func NewSender(cameraID, streamURL, ipcURL string) *Sender { ctx, cancel := context.WithCancel(context.Background()) return &Sender{ ctxIPC: &ContextIPC{ctx, cancel}, cameraID: cameraID, streamURL: streamURL, ipcURL: ipcURL, } } func (s *Sender) serializeImageInfo(img <-chan ImageInfo, data chan<- []byte) { for { select { case <-s.ctxIPC.ctx.Done(): fmt.Println("stop Sender") return case i := <-img: var buf bytes.Buffer enc := gob.NewEncoder(&buf) if err := enc.Encode(i); err != nil { fmt.Println("gob encode camera image error", err) continue } b := buf.Bytes() data <- b } } } func (s *Sender) run(i *ipc.IPC) { gf := goffmpeg.New() if gf != nil { s.ffmpeg = gf } imageChan := make(chan ImageInfo) dataChan := make(chan []byte) go s.serializeImageInfo(imageChan, dataChan) gf.ActiveDecoder(func(d *[]byte, w, h *int) { if *w > 0 && *h > 0 { i := ImageInfo{*d, *w, *h, s.cameraID} imageChan <- i } else { fmt.Println("decode jpg error") } }) gf.Run(s.streamURL) i.SendFromChannel(dataChan) } // RunAsServer run a IPC server func (s *Sender) RunAsServer() { s.run(ipc.NewServer(s.ctxIPC.ctx, s.ipcURL)) } // RunAsClient run as a IPC client func (s *Sender) RunAsClient() { s.run(ipc.NewClient(s.ctxIPC.ctx, s.ipcURL)) } // Stop stop run decoder, must run in goroutine func (s *Sender) Stop() { s.ffmpeg.Free() s.ctxIPC.cancel() }