package service
|
|
import (
|
"bytes"
|
"context"
|
"decoder/valib/goffmpeg"
|
"decoder/valib/ipc"
|
"encoding/gob"
|
"fmt"
|
)
|
|
// Sender decoder ingo
|
type Sender struct {
|
ctxIPC *ContextIPC
|
|
cameraID string
|
streamURL string
|
ipcURL string
|
|
ffmpeg *goffmpeg.GoFFMPEG
|
}
|
|
// NewSender Sender
|
func NewSender(cameraID, streamURL, ipcURL string) *Sender {
|
ctx, cancel := context.WithCancel(context.Background())
|
|
return &Sender{
|
ctxIPC: &ContextIPC{ctx, cancel},
|
|
cameraID: cameraID,
|
streamURL: streamURL,
|
ipcURL: ipcURL,
|
}
|
}
|
|
func (s *Sender) serializeImageInfo(img <-chan ImageInfo, data chan<- []byte) {
|
for {
|
select {
|
case <-s.ctxIPC.ctx.Done():
|
fmt.Println("stop Sender")
|
return
|
case i := <-img:
|
var buf bytes.Buffer
|
enc := gob.NewEncoder(&buf)
|
|
if err := enc.Encode(i); err != nil {
|
fmt.Println("gob encode camera image error", err)
|
continue
|
}
|
b := buf.Bytes()
|
data <- b
|
}
|
|
}
|
}
|
|
func (s *Sender) run(i *ipc.IPC) {
|
gf := goffmpeg.New()
|
if gf != nil {
|
s.ffmpeg = gf
|
}
|
imageChan := make(chan ImageInfo)
|
dataChan := make(chan []byte)
|
|
go s.serializeImageInfo(imageChan, dataChan)
|
|
gf.ActiveDecoder(func(d *[]byte, w, h *int) {
|
if *w > 0 && *h > 0 {
|
i := ImageInfo{*d, *w, *h, s.cameraID}
|
imageChan <- i
|
} else {
|
fmt.Println("decode jpg error")
|
}
|
})
|
gf.Run(s.streamURL)
|
|
i.SendFromChannel(dataChan)
|
}
|
|
// RunAsServer run a IPC server
|
func (s *Sender) RunAsServer() {
|
s.run(ipc.NewServer(s.ctxIPC.ctx, s.ipcURL))
|
|
}
|
|
// RunAsClient run as a IPC client
|
func (s *Sender) RunAsClient() {
|
s.run(ipc.NewClient(s.ctxIPC.ctx, s.ipcURL))
|
}
|
|
// Stop stop run decoder, must run in goroutine
|
func (s *Sender) Stop() {
|
s.ffmpeg.Free()
|
s.ctxIPC.cancel()
|
}
|