From f963cd025c1aa88ac8b211e24f46ceb0eb64c418 Mon Sep 17 00:00:00 2001 From: zhangmeng <775834166@qq.com> Date: 星期二, 07 五月 2019 14:19:29 +0800 Subject: [PATCH] 多个解码单元同时运行,注意tcp地址 --- decoder/work/service/ipcsender.go | 36 +++++++++++++++++++++++++++--------- 1 files changed, 27 insertions(+), 9 deletions(-) diff --git a/decoder/work/service/ipcsender.go b/decoder/work/service/ipcsender.go index dc4734e..6c05f7a 100644 --- a/decoder/work/service/ipcsender.go +++ b/decoder/work/service/ipcsender.go @@ -7,6 +7,7 @@ "decoder/valib/ipc" "encoding/gob" "fmt" + "time" ) // Sender decoder ingo @@ -23,6 +24,8 @@ // NewSender Sender func NewSender(cameraID, streamURL, ipcURL string) *Sender { ctx, cancel := context.WithCancel(context.Background()) + + fmt.Printf("create ipc %s for decode : %s, on camera id %s\n", ipcURL, streamURL, cameraID) return &Sender{ ctxIPC: &ContextIPC{ctx, cancel}, @@ -49,6 +52,7 @@ } b := buf.Bytes() data <- b + fmt.Printf("send %d length data ipc address %s\n", len(b), s.ipcURL) } } @@ -58,20 +62,30 @@ gf := goffmpeg.New() if gf != nil { s.ffmpeg = gf + } else { + fmt.Println("create decoder ffmpeg error") + return } imageChan := make(chan ImageInfo) dataChan := make(chan []byte) go s.serializeImageInfo(imageChan, dataChan) - gf.ActiveDecoder(func(d *[]byte, w, h *int) { - if *w > 0 && *h > 0 { - i := ImageInfo{*d, *w, *h, s.cameraID} - imageChan <- i - } else { - fmt.Println("decode jpg error") + gf.BuildDecoder() + + go func(f *goffmpeg.GoFFMPEG, ch chan<- ImageInfo) { + for { + data, wid, hei := f.GetPicDecoder() + if wid > 0 && hei > 0 { + img := ImageInfo{Data: data, Width: wid, Height: hei} + ch <- img + } else { + time.Sleep(time.Millisecond * time.Duration(10)) + } } - }) + + }(gf, imageChan) + gf.Run(s.streamURL) i.SendFromChannel(dataChan) @@ -79,13 +93,17 @@ // RunAsServer run a IPC server func (s *Sender) RunAsServer() { - s.run(ipc.NewServer(s.ctxIPC.ctx, s.ipcURL)) + i := ipc.NewServer(s.ctxIPC.ctx, s.ipcURL) + fmt.Println("ipc :", s.ipcURL, " cameraid:", s.cameraID) + s.run(i) } // RunAsClient run as a IPC client func (s *Sender) RunAsClient() { - s.run(ipc.NewClient(s.ctxIPC.ctx, s.ipcURL)) + i := ipc.NewClient(s.ctxIPC.ctx, s.ipcURL) + fmt.Println("ipc :", s.ipcURL, " cameraid:", s.cameraID) + s.run(i) } // Stop stop run decoder, must run in goroutine -- Gitblit v1.8.0