From 4eb8b9581a0f24d030f9c7b8e7652e72e60ed92f Mon Sep 17 00:00:00 2001 From: zhangmeng <775834166@qq.com> Date: 星期一, 06 五月 2019 13:27:40 +0800 Subject: [PATCH] 整理代码 --- /dev/null | 76 ------------------- decoder/main.go | 11 +- decoder/work/service/ipcsender.go | 95 +++++++++++++++++++++++ 3 files changed, 102 insertions(+), 80 deletions(-) diff --git a/decoder/demo/simpleIPC.go b/decoder/demo/simpleIPC.go deleted file mode 100644 index aee52c8..0000000 --- a/decoder/demo/simpleIPC.go +++ /dev/null @@ -1,11 +0,0 @@ -package demo - -import ( - srv "decoder/work/service" -) - -// SendByIPC send pic by ipc -func SendByIPC(rtspURL, cameraID, ipcURL string, asServer bool) { - d := srv.NewDecoder(cameraID, rtspURL, ipcURL) - d.Run(asServer) -} diff --git a/decoder/main.go b/decoder/main.go index fa513b3..a9d8816 100644 --- a/decoder/main.go +++ b/decoder/main.go @@ -2,14 +2,13 @@ import ( "context" - "decoder/demo" "decoder/valib/ipc" + srv "decoder/work/service" "encoding/json" "flag" "fmt" "strconv" "strings" - // "videoServer/demo" ) var ( @@ -98,17 +97,21 @@ url := strings.TrimSpace(c.URL) id := strings.TrimSpace(c.ID) i := strings.TrimSpace(ipc) - go demo.SendByIPC(url, id, i, false) + go runSender(id, url, i) } } } cancel() } +func runSender(cameraID, rtspURL, ipcLabel string) { + d := srv.NewSender(cameraID, rtspURL, ipcLabel) + d.RunAsClient() +} func test() { fmt.Println("start test") fmt.Println(picFolder) - demo.SendByIPC(streamURL, "camera1", ipcURL, false) + runSender("cameraid", streamURL, ipcURL) } diff --git a/decoder/work/service/decoder.go b/decoder/work/service/decoder.go deleted file mode 100644 index c284325..0000000 --- a/decoder/work/service/decoder.go +++ /dev/null @@ -1,93 +0,0 @@ -package service - -import ( - "bytes" - "context" - "decoder/valib/goffmpeg" - "decoder/valib/ipc" - "encoding/gob" - "fmt" -) - -// Decoder decoder ingo -type Decoder struct { - ctxIPC *ContextIPC - - cameraID string - streamURL string - ipcURL string - - ffmpeg *goffmpeg.GoFFMPEG -} - -// NewDecoder decoder -func NewDecoder(cameraID, streamURL, ipcURL string) *Decoder { - ctx, cancel := context.WithCancel(context.Background()) - - return &Decoder{ - ctxIPC: &ContextIPC{ctx, cancel}, - - cameraID: cameraID, - streamURL: streamURL, - ipcURL: ipcURL, - } -} - -func (dec *Decoder) serializeImageInfo(img <-chan ImageInfo, data chan<- []byte) { - for { - select { - case <-dec.ctxIPC.ctx.Done(): - fmt.Println("stop decoder") - return - case i := <-img: - var buf bytes.Buffer - enc := gob.NewEncoder(&buf) - - if err := enc.Encode(i); err != nil { - fmt.Println("gob encode camera image error", err) - continue - } - b := buf.Bytes() - data <- b - } - - } -} - -// Run run a decoder -func (dec *Decoder) Run(asServer bool) { - var i *ipc.IPC - if asServer { - i = ipc.NewServer(dec.ctxIPC.ctx, dec.ipcURL) - } else { - i = ipc.NewClient(dec.ctxIPC.ctx, dec.ipcURL) - } - - gf := goffmpeg.New() - if gf != nil { - dec.ffmpeg = gf - } - imageChan := make(chan ImageInfo) - dataChan := make(chan []byte) - - go dec.serializeImageInfo(imageChan, dataChan) - - // str := "hello" - gf.ActiveDecoder(func(d *[]byte, w, h *int) { - if *w > 0 && *h > 0 { - i := ImageInfo{*d, *w, *h, dec.cameraID} - imageChan <- i - } else { - fmt.Println("decode jpg error") - } - }) - gf.Run(dec.streamURL) - - i.SendFromChannel(dataChan) -} - -// Stop stop run decoder, must run in goroutine -func (dec *Decoder) Stop() { - dec.ffmpeg.Free() - dec.ctxIPC.cancel() -} diff --git a/decoder/work/service/ipcsender.go b/decoder/work/service/ipcsender.go new file mode 100644 index 0000000..dc4734e --- /dev/null +++ b/decoder/work/service/ipcsender.go @@ -0,0 +1,95 @@ +package service + +import ( + "bytes" + "context" + "decoder/valib/goffmpeg" + "decoder/valib/ipc" + "encoding/gob" + "fmt" +) + +// Sender decoder ingo +type Sender struct { + ctxIPC *ContextIPC + + cameraID string + streamURL string + ipcURL string + + ffmpeg *goffmpeg.GoFFMPEG +} + +// NewSender Sender +func NewSender(cameraID, streamURL, ipcURL string) *Sender { + ctx, cancel := context.WithCancel(context.Background()) + + return &Sender{ + ctxIPC: &ContextIPC{ctx, cancel}, + + cameraID: cameraID, + streamURL: streamURL, + ipcURL: ipcURL, + } +} + +func (s *Sender) serializeImageInfo(img <-chan ImageInfo, data chan<- []byte) { + for { + select { + case <-s.ctxIPC.ctx.Done(): + fmt.Println("stop Sender") + return + case i := <-img: + var buf bytes.Buffer + enc := gob.NewEncoder(&buf) + + if err := enc.Encode(i); err != nil { + fmt.Println("gob encode camera image error", err) + continue + } + b := buf.Bytes() + data <- b + } + + } +} + +func (s *Sender) run(i *ipc.IPC) { + gf := goffmpeg.New() + if gf != nil { + s.ffmpeg = gf + } + imageChan := make(chan ImageInfo) + dataChan := make(chan []byte) + + go s.serializeImageInfo(imageChan, dataChan) + + gf.ActiveDecoder(func(d *[]byte, w, h *int) { + if *w > 0 && *h > 0 { + i := ImageInfo{*d, *w, *h, s.cameraID} + imageChan <- i + } else { + fmt.Println("decode jpg error") + } + }) + gf.Run(s.streamURL) + + i.SendFromChannel(dataChan) +} + +// RunAsServer run a IPC server +func (s *Sender) RunAsServer() { + s.run(ipc.NewServer(s.ctxIPC.ctx, s.ipcURL)) + +} + +// RunAsClient run as a IPC client +func (s *Sender) RunAsClient() { + s.run(ipc.NewClient(s.ctxIPC.ctx, s.ipcURL)) +} + +// Stop stop run decoder, must run in goroutine +func (s *Sender) Stop() { + s.ffmpeg.Free() + s.ctxIPC.cancel() +} diff --git a/decoder/work/service/reciever.go b/decoder/work/service/reciever.go deleted file mode 100644 index 8c83c14..0000000 --- a/decoder/work/service/reciever.go +++ /dev/null @@ -1,76 +0,0 @@ -package service - -import ( - "bytes" - "context" - "decoder/valib/ipc" - "encoding/gob" - "fmt" -) - -// Reciever recv from ipc -type Reciever struct { - ctxIPC *ContextIPC - ipcURL string - chImage chan<- ImageInfo -} - -func unserilizeImageInfo(data <-chan []byte, img chan<- ImageInfo) { - for { - d := <-data - - var buf bytes.Buffer - buf.Write(d) - - dec := gob.NewDecoder(&buf) - - var i ImageInfo - if err := dec.Decode(&i); err != nil { - fmt.Println("gob decode CameraImage error", err) - continue - } - img <- i - } -} - -// NewReciever new recv -func NewReciever(url string, ch chan<- ImageInfo) *Reciever { - ctx, cancel := context.WithCancel(context.Background()) - - return &Reciever{ - ctxIPC: &ContextIPC{ctx, cancel}, - ipcURL: url, - chImage: ch, - } -} - -// NewRecieverWithContext new recver with context -func NewRecieverWithContext(ctx context.Context, url string, ch chan<- ImageInfo) *Reciever { - return &Reciever{ - ctxIPC: &ContextIPC{ctx, nil}, - ipcURL: url, - chImage: ch, - } -} - -// Run run task from ipc data -func (r *Reciever) Run(asServer bool) { - var i *ipc.IPC - if asServer { - i = ipc.NewServer(r.ctxIPC.ctx, r.ipcURL) - } else { - i = ipc.NewClient(r.ctxIPC.ctx, r.ipcURL) - } - - dataChan := make(chan []byte) - go unserilizeImageInfo(dataChan, r.chImage) - - i.RecvToChannel(dataChan) -} - -// Stop stop reciever, run in goroutine -func (r *Reciever) Stop() { - if r.ctxIPC.cancel != nil { - r.ctxIPC.cancel() - } -} -- Gitblit v1.8.0