视频分析2.0 多进程拆分仓库
zhangmeng
2019-05-08 19445d14e73f4bb96e218a65e126ae526f89537d
context使用
3个文件已修改
47 ■■■■■ 已修改文件
decoder/main.go 18 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
decoder/work/service/common.go 10 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
decoder/work/service/ipcsender.go 19 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
decoder/main.go
@@ -84,11 +84,11 @@
    goffmpeg.InitFFmpeg()
    if testIt {
        test()
    }
    ctx, cancel := context.WithCancel(context.Background())
    if testIt {
        test(ctx)
    }
    ch := make(chan CameraInfo)
@@ -119,7 +119,7 @@
                url := strings.TrimSpace(c.URL)
                id := strings.TrimSpace(c.ID)
                addr := strings.TrimSpace(ipcAddr)
                go runSender(id, url, addr)
                go runSender(ctx, id, url, addr)
            }
        }
    }
@@ -128,18 +128,18 @@
    cancel()
}
func runSender(cameraID, rtspURL, ipcLabel string) {
    d := srv.NewSender(cameraID, rtspURL, ipcLabel)
func runSender(ctx context.Context, cameraID, rtspURL, ipcLabel string) {
    d := srv.NewSender(ctx, cameraID, rtspURL, ipcLabel)
    if asServer {
        d.RunAsServer()
    }
    d.RunAsClient()
}
func test() {
func test(ctx context.Context) {
    fmt.Println("start test")
    fmt.Println(picFolder)
    runSender("cameraid", streamURL, ipcURL)
    runSender(ctx, "cameraid", streamURL, ipcURL)
}
decoder/work/service/common.go
@@ -1,9 +1,5 @@
package service
import (
    "context"
)
// ImageInfo deocded image data
type ImageInfo struct {
    Data   []byte
@@ -11,10 +7,4 @@
    Height int
    // CameraID string
}
// ContextIPC server context
type ContextIPC struct {
    ctx    context.Context
    cancel context.CancelFunc
}
decoder/work/service/ipcsender.go
@@ -12,7 +12,7 @@
// Sender decoder ingo
type Sender struct {
    ctxIPC *ContextIPC
    ctx context.Context
    cameraID  string
    streamURL string
@@ -22,13 +22,12 @@
}
// NewSender Sender
func NewSender(cameraID, streamURL, ipcURL string) *Sender {
    ctx, cancel := context.WithCancel(context.Background())
func NewSender(ctx context.Context, cameraID, streamURL, ipcURL string) *Sender {
    fmt.Printf("create ipc %s for decode : %s, on camera id %s\n", ipcURL, streamURL, cameraID)
    return &Sender{
        ctxIPC: &ContextIPC{ctx, cancel},
        ctx: ctx,
        cameraID:  cameraID,
        streamURL: streamURL,
@@ -39,7 +38,7 @@
func (s *Sender) serializeImageInfo(img <-chan ImageInfo, data chan<- []byte) {
    for {
        select {
        case <-s.ctxIPC.ctx.Done():
        case <-s.ctx.Done():
            fmt.Println("stop Sender")
            return
        case i := <-img:
@@ -92,7 +91,7 @@
// RunAsServer run a IPC server
func (s *Sender) RunAsServer() {
    i := ipc.NewServer(s.ctxIPC.ctx, s.ipcURL)
    i := ipc.NewServer(s.ctx, s.ipcURL)
    fmt.Println("ipc :", s.ipcURL, " cameraid:", s.cameraID)
    s.run(i)
@@ -100,13 +99,7 @@
// RunAsClient run as a IPC client
func (s *Sender) RunAsClient() {
    i := ipc.NewClient(s.ctxIPC.ctx, s.ipcURL)
    i := ipc.NewClient(s.ctx, s.ipcURL)
    fmt.Println("ipc :", s.ipcURL, " cameraid:", s.cameraID)
    s.run(i)
}
// Stop stop run decoder, must run in goroutine
func (s *Sender) Stop() {
    s.ffmpeg.Free()
    s.ctxIPC.cancel()
}