视频分析2.0 多进程拆分仓库
zhangmeng
2019-05-06 4eb8b9581a0f24d030f9c7b8e7652e72e60ed92f
整理代码
3个文件已删除
1个文件已添加
1个文件已修改
286 ■■■■■ 已修改文件
decoder/demo/simpleIPC.go 11 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
decoder/main.go 11 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
decoder/work/service/decoder.go 93 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
decoder/work/service/ipcsender.go 95 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
decoder/work/service/reciever.go 76 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
decoder/demo/simpleIPC.go
File was deleted
decoder/main.go
@@ -2,14 +2,13 @@
import (
    "context"
    "decoder/demo"
    "decoder/valib/ipc"
    srv "decoder/work/service"
    "encoding/json"
    "flag"
    "fmt"
    "strconv"
    "strings"
    // "videoServer/demo"
)
var (
@@ -98,17 +97,21 @@
                url := strings.TrimSpace(c.URL)
                id := strings.TrimSpace(c.ID)
                i := strings.TrimSpace(ipc)
                go demo.SendByIPC(url, id, i, false)
                go runSender(id, url, i)
            }
        }
    }
    cancel()
}
func runSender(cameraID, rtspURL, ipcLabel string) {
    d := srv.NewSender(cameraID, rtspURL, ipcLabel)
    d.RunAsClient()
}
func test() {
    fmt.Println("start test")
    fmt.Println(picFolder)
    demo.SendByIPC(streamURL, "camera1", ipcURL, false)
    runSender("cameraid", streamURL, ipcURL)
}
decoder/work/service/decoder.go
File was deleted
decoder/work/service/ipcsender.go
New file
@@ -0,0 +1,95 @@
package service
import (
    "bytes"
    "context"
    "decoder/valib/goffmpeg"
    "decoder/valib/ipc"
    "encoding/gob"
    "fmt"
)
// Sender decoder ingo
type Sender struct {
    ctxIPC *ContextIPC
    cameraID  string
    streamURL string
    ipcURL    string
    ffmpeg *goffmpeg.GoFFMPEG
}
// NewSender Sender
func NewSender(cameraID, streamURL, ipcURL string) *Sender {
    ctx, cancel := context.WithCancel(context.Background())
    return &Sender{
        ctxIPC: &ContextIPC{ctx, cancel},
        cameraID:  cameraID,
        streamURL: streamURL,
        ipcURL:    ipcURL,
    }
}
func (s *Sender) serializeImageInfo(img <-chan ImageInfo, data chan<- []byte) {
    for {
        select {
        case <-s.ctxIPC.ctx.Done():
            fmt.Println("stop Sender")
            return
        case i := <-img:
            var buf bytes.Buffer
            enc := gob.NewEncoder(&buf)
            if err := enc.Encode(i); err != nil {
                fmt.Println("gob encode camera image error", err)
                continue
            }
            b := buf.Bytes()
            data <- b
        }
    }
}
func (s *Sender) run(i *ipc.IPC) {
    gf := goffmpeg.New()
    if gf != nil {
        s.ffmpeg = gf
    }
    imageChan := make(chan ImageInfo)
    dataChan := make(chan []byte)
    go s.serializeImageInfo(imageChan, dataChan)
    gf.ActiveDecoder(func(d *[]byte, w, h *int) {
        if *w > 0 && *h > 0 {
            i := ImageInfo{*d, *w, *h, s.cameraID}
            imageChan <- i
        } else {
            fmt.Println("decode jpg error")
        }
    })
    gf.Run(s.streamURL)
    i.SendFromChannel(dataChan)
}
// RunAsServer run a IPC server
func (s *Sender) RunAsServer() {
    s.run(ipc.NewServer(s.ctxIPC.ctx, s.ipcURL))
}
// RunAsClient run as a IPC client
func (s *Sender) RunAsClient() {
    s.run(ipc.NewClient(s.ctxIPC.ctx, s.ipcURL))
}
// Stop stop run decoder, must run in goroutine
func (s *Sender) Stop() {
    s.ffmpeg.Free()
    s.ctxIPC.cancel()
}
decoder/work/service/reciever.go
File was deleted