package service import ( "bytes" "context" "decoder/valib/goffmpeg" "decoder/valib/ipc" "encoding/gob" "fmt" "time" ) // Sender decoder ingo type Sender struct { ctx context.Context cameraID string streamURL string ipcURL string ffmpeg *goffmpeg.GoFFMPEG } // NewSender Sender func NewSender(ctx context.Context, cameraID, streamURL, ipcURL string) *Sender { fmt.Printf("create ipc %s for decode : %s, on camera id %s\n", ipcURL, streamURL, cameraID) return &Sender{ ctx: ctx, cameraID: cameraID, streamURL: streamURL, ipcURL: ipcURL, } } func (s *Sender) serializeImageInfo(img <-chan ImageInfo, data chan<- []byte) { for { select { case <-s.ctx.Done(): fmt.Println("stop Sender") return case i := <-img: var buf bytes.Buffer enc := gob.NewEncoder(&buf) if err := enc.Encode(i); err != nil { fmt.Println("gob encode camera image error", err) continue } b := buf.Bytes() data <- b fmt.Printf("send %d length data ipc address %s\n", len(b), s.ipcURL) } } } func (s *Sender) run(i *ipc.IPC) { gf := goffmpeg.New() if gf != nil { s.ffmpeg = gf } else { fmt.Println("create decoder ffmpeg error") return } imageChan := make(chan ImageInfo) dataChan := make(chan []byte) go s.serializeImageInfo(imageChan, dataChan) gf.BuildDecoder() go func(f *goffmpeg.GoFFMPEG, ch chan<- ImageInfo) { for { data, wid, hei := f.GetPicDecoder() if wid > 0 && hei > 0 { img := ImageInfo{Data: data, Width: wid, Height: hei} ch <- img } else { time.Sleep(time.Millisecond * time.Duration(20)) } } }(gf, imageChan) gf.Run(s.streamURL) i.SendFromChannel(dataChan) } // RunAsServer run a IPC server func (s *Sender) RunAsServer() { i := ipc.NewServer(s.ctx, s.ipcURL) fmt.Println("ipc :", s.ipcURL, " cameraid:", s.cameraID) s.run(i) } // RunAsClient run as a IPC client func (s *Sender) RunAsClient() { i := ipc.NewClient(s.ctx, s.ipcURL) fmt.Println("ipc :", s.ipcURL, " cameraid:", s.cameraID) s.run(i) }