视频分析2.0 多进程拆分仓库
zhangmeng
2019-04-30 4058b9d3145c9f2c01ad07f0004948636bcbaf7e
init submodule
15个文件已添加
1个文件已修改
298 ■■■■■ 已修改文件
.gitignore 6 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
.gitmodules 12 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
analysis/valib/gosdk @ e3a215 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
analysis/valib/ipc @ 66d8e6 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
decoder/.gitignore 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
decoder/.vscode/settings.json 3 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
decoder/demo/simpleIPC.go 11 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
decoder/go.mod 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
decoder/go.sum 2 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
decoder/main.go 71 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
decoder/runtime/libcffmpeg.so 补丁 | 查看 | 原始文档 | blame | 历史
decoder/valib/goffmpeg @ 7f8e31 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
decoder/valib/ipc @ 66d8e6 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
decoder/work/service/common.go 20 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
decoder/work/service/decoder.go 88 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
decoder/work/service/reciever.go 71 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
.gitignore
@@ -5,17 +5,19 @@
*.obj
# Compiled Dynamic libraries
*.so
#*.so
*.dylib
*.dll
# Compiled Static libraries
*.lai
*.la
*.a
#*.a
*.lib
# Executables
*.exe
*.out
*.app
build/
.gitmodules
New file
@@ -0,0 +1,12 @@
[submodule "analysis/valib/gosdk"]
    path = analysis/valib/gosdk
    url = git://192.168.1.226/valib/gosdk.git
[submodule "analysis/valib/ipc"]
    path = analysis/valib/ipc
    url = git://192.168.1.226/valib/ipc.git
[submodule "decoder/valib/ipc"]
    path = decoder/valib/ipc
    url = git://192.168.1.226/valib/ipc.git
[submodule "decoder/valib/goffmpeg"]
    path = decoder/valib/goffmpeg
    url = git://192.168.1.226/valib/goffmpeg.git
analysis/valib/gosdk
New file
@@ -1 +1 @@
Subproject commit 0000000000000000000000000000000000000000
Subproject commit e3a2151968276a71a617d88965281e279ad68236
analysis/valib/ipc
New file
@@ -1 +1 @@
Subproject commit 0000000000000000000000000000000000000000
Subproject commit 66d8e6695ada4090bdb81279d594f323be875bc7
decoder/.gitignore
New file
@@ -0,0 +1 @@
decoder
decoder/.vscode/settings.json
New file
@@ -0,0 +1,3 @@
{
    "go.formatTool": "goimports"
}
decoder/demo/simpleIPC.go
New file
@@ -0,0 +1,11 @@
package demo
import (
    srv "decoder/work/service"
)
// SendByIPC send pic by ipc
func SendByIPC(rtspURL, cameraID, ipcURL string) {
    d := srv.NewDecoder(cameraID, rtspURL, ipcURL)
    d.Run()
}
decoder/go.mod
New file
@@ -0,0 +1,5 @@
module decoder
go 1.12
require nanomsg.org/go-mangos v1.4.0
decoder/go.sum
New file
@@ -0,0 +1,2 @@
nanomsg.org/go-mangos v1.4.0 h1:pVRLnzXePdSbhWlWdSncYszTagERhMG5zK/vXYmbEdM=
nanomsg.org/go-mangos v1.4.0/go.mod h1:MOor8xUIgwsRMPpLr9xQxe7bT7rciibScOqVyztNxHQ=
decoder/main.go
New file
@@ -0,0 +1,71 @@
package main
import (
    "bytes"
    "context"
    "decoder/demo"
    "decoder/valib/ipc"
    "encoding/gob"
    "flag"
    "fmt"
    // "videoServer/demo"
)
var (
    streamURL string
    picFolder string
    ipcURL string
    proc   string
)
func init() {
    flag.StringVar(&streamURL, "i", "rtsp://192.168.1.203:8554/16.mkv", "input url")
    flag.StringVar(&picFolder, "f", ".", "test pic folder")
    flag.StringVar(&ipcURL, "ipc", "ipc:///tmp/pic.ipc", "ipc label")
}
func test() {
    fmt.Println("start test")
    fmt.Println(picFolder)
    demo.SendByIPC(streamURL, "camera1", ipcURL)
}
type cameraInfo struct {
    cameraID string
    videoURL string
}
func recvFromIPC(ctx context.Context, url string) (cameraID, rtspURL string) {
    ipc := ipc.NewClient(ctx, url)
    for {
        msg := ipc.Recv()
        var buf bytes.Buffer
        buf.Write(msg)
        dec := gob.NewDecoder(&buf)
        var i cameraInfo
        if err := dec.Decode(&i); err != nil {
            fmt.Println("gob decode CameraImage error", err)
            continue
        }
        return i.cameraID, i.videoURL
    }
}
func main() {
    flag.Parse()
    ctx, cancel := context.WithCancel(context.Background())
    recvFromIPC(ctx, "tcp://192.168.1.156:7000")
    cancel()
    // test()
}
decoder/runtime/libcffmpeg.so
Binary files differ
decoder/valib/goffmpeg
New file
@@ -1 +1 @@
Subproject commit 0000000000000000000000000000000000000000
Subproject commit 7f8e310acc232c67ba8e6f72028d677010d1051a
decoder/valib/ipc
New file
@@ -1 +1 @@
Subproject commit 0000000000000000000000000000000000000000
Subproject commit 66d8e6695ada4090bdb81279d594f323be875bc7
decoder/work/service/common.go
New file
@@ -0,0 +1,20 @@
package service
import (
    "context"
)
// ImageInfo deocded image data
type ImageInfo struct {
    Data   []byte
    Width  int
    Height int
    CameraID string
}
// ContextIPC server context
type ContextIPC struct {
    ctx    context.Context
    cancel context.CancelFunc
}
decoder/work/service/decoder.go
New file
@@ -0,0 +1,88 @@
package service
import (
    "bytes"
    "context"
    "decoder/valib/goffmpeg"
    "decoder/valib/ipc"
    "encoding/gob"
    "fmt"
)
// Decoder decoder ingo
type Decoder struct {
    ctxIPC *ContextIPC
    cameraID  string
    streamURL string
    ipcURL    string
    ffmpeg *goffmpeg.GoFFMPEG
}
// NewDecoder decoder
func NewDecoder(cameraID, streamURL, ipcURL string) *Decoder {
    ctx, cancel := context.WithCancel(context.Background())
    return &Decoder{
        ctxIPC: &ContextIPC{ctx, cancel},
        cameraID:  cameraID,
        streamURL: streamURL,
        ipcURL:    ipcURL,
    }
}
func (dec *Decoder) serializeImageInfo(img <-chan ImageInfo, data chan<- []byte) {
    for {
        select {
        case <-dec.ctxIPC.ctx.Done():
            fmt.Println("stop decoder")
            return
        case i := <-img:
            var buf bytes.Buffer
            enc := gob.NewEncoder(&buf)
            if err := enc.Encode(i); err != nil {
                fmt.Println("gob encode camera image error", err)
                continue
            }
            b := buf.Bytes()
            data <- b
        }
    }
}
// Run run a decoder
func (dec *Decoder) Run() {
    ipc := ipc.NewServer(dec.ctxIPC.ctx, dec.ipcURL)
    gf := goffmpeg.New()
    if gf != nil {
        dec.ffmpeg = gf
    }
    imageChan := make(chan ImageInfo)
    dataChan := make(chan []byte)
    go dec.serializeImageInfo(imageChan, dataChan)
    gf.ActiveDecoder(func(d *[]byte, w, h *int) {
        if *w > 0 && *h > 0 {
            i := ImageInfo{*d, *w, *h, dec.cameraID}
            imageChan <- i
        } else {
            fmt.Println("decode jpg error")
        }
    })
    gf.Run(dec.streamURL)
    ipc.SendFromChannel(dataChan)
}
// Stop stop run decoder, must run in goroutine
func (dec *Decoder) Stop() {
    dec.ffmpeg.Free()
    dec.ctxIPC.cancel()
}
decoder/work/service/reciever.go
New file
@@ -0,0 +1,71 @@
package service
import (
    "bytes"
    "context"
    "decoder/valib/ipc"
    "encoding/gob"
    "fmt"
)
// Reciever recv from ipc
type Reciever struct {
    ctxIPC  *ContextIPC
    ipcURL  string
    chImage chan<- ImageInfo
}
func unserilizeImageInfo(data <-chan []byte, img chan<- ImageInfo) {
    for {
        d := <-data
        var buf bytes.Buffer
        buf.Write(d)
        dec := gob.NewDecoder(&buf)
        var i ImageInfo
        if err := dec.Decode(&i); err != nil {
            fmt.Println("gob decode CameraImage error", err)
            continue
        }
        img <- i
    }
}
// NewReciever new recv
func NewReciever(url string, ch chan<- ImageInfo) *Reciever {
    ctx, cancel := context.WithCancel(context.Background())
    return &Reciever{
        ctxIPC:  &ContextIPC{ctx, cancel},
        ipcURL:  url,
        chImage: ch,
    }
}
// NewRecieverWithContext new recver with context
func NewRecieverWithContext(ctx context.Context, url string, ch chan<- ImageInfo) *Reciever {
    return &Reciever{
        ctxIPC:  &ContextIPC{ctx, nil},
        ipcURL:  url,
        chImage: ch,
    }
}
// Run run task from ipc data
func (r *Reciever) Run() {
    ipc := ipc.NewClient(r.ctxIPC.ctx, r.ipcURL)
    dataChan := make(chan []byte)
    go unserilizeImageInfo(dataChan, r.chImage)
    ipc.RecvToChannel(dataChan)
}
// Stop stop reciever, run in goroutine
func (r *Reciever) Stop() {
    if r.ctxIPC.cancel != nil {
        r.ctxIPC.cancel()
    }
}