视频分析2.0 多进程拆分仓库
zhangmeng
2019-04-30 d3618c4e9ceab273a52813c1f2c462912fb81e59
add ipc run as server or not
5个文件已修改
118 ■■■■■ 已修改文件
decoder/demo/simpleIPC.go 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
decoder/main.go 88 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
decoder/valib/ipc @ 8841f9 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
decoder/work/service/decoder.go 13 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
decoder/work/service/reciever.go 11 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
decoder/demo/simpleIPC.go
@@ -5,7 +5,7 @@
)
// SendByIPC send pic by ipc
func SendByIPC(rtspURL, cameraID, ipcURL string) {
func SendByIPC(rtspURL, cameraID, ipcURL string, asServer bool) {
    d := srv.NewDecoder(cameraID, rtspURL, ipcURL)
    d.Run()
    d.Run(asServer)
}
decoder/main.go
@@ -1,11 +1,10 @@
package main
import (
    "bytes"
    "context"
    "decoder/demo"
    "decoder/valib/ipc"
    "encoding/gob"
    "encoding/json"
    "flag"
    "fmt"
    // "videoServer/demo"
@@ -26,46 +25,81 @@
    flag.StringVar(&ipcURL, "ipc", "ipc:///tmp/pic.ipc", "ipc label")
}
func test() {
    fmt.Println("start test")
    fmt.Println(picFolder)
    demo.SendByIPC(streamURL, "camera1", ipcURL)
// CameraInfo camera info
type CameraInfo struct {
    ID  string `json:"Cameraid"`
    URL string `json:"Rtsp"`
}
type cameraInfo struct {
    cameraID string
    videoURL string
// MsgIPC msg for ipc
type MsgIPC struct {
    CMD  string `json:"Command"`
    Port int    `jsong:"PortIpc"`
}
func recvFromIPC(ctx context.Context, url string) (cameraID, rtspURL string) {
var (
    mapCI = make(map[string]CameraInfo)
    port  = 7001
)
func recvFromIPC(ctx context.Context, url string, ch chan<- CameraInfo) {
    ipc := ipc.NewClient(ctx, url)
    for {
        msg := ipc.Recv()
        var buf bytes.Buffer
        buf.Write(msg)
        dec := gob.NewDecoder(&buf)
        var i cameraInfo
        if err := dec.Decode(&i); err != nil {
            fmt.Println("gob decode CameraImage error", err)
            continue
        if msg != nil {
            fmt.Println(string(msg))
            var c CameraInfo
            if err := json.Unmarshal(msg, &c); err == nil {
                ch <- c
                fmt.Printf("recv camere info %+v\n", c)
            } else {
                fmt.Println(err)
            }
            msgIpc := MsgIPC{"new decoder", port}
            if b, err := json.Marshal(msgIpc); err == nil {
                ipc.Send(b)
            }
        }
        return i.cameraID, i.videoURL
    }
}
func main() {
    flag.Parse()
    ctx, cancel := context.WithCancel(context.Background())
    recvFromIPC(ctx, "tcp://192.168.1.156:7000")
    test()
    cancel()
    // test()
    // ctx, cancel := context.WithCancel(context.Background())
    // ch := make(chan CameraInfo)
    // go recvFromIPC(ctx, "tcp://192.168.1.124:7000", ch)
    // for {
    //     select {
    //     case <-ctx.Done():
    //         return
    //     case c := <-ch:
    //         if _, ok := mapCI[c.ID]; !ok {
    //             mapCI[c.ID] = c
    //             ipc := "tcp://192.168.1.124:" + strconv.Itoa(port)
    //             port++
    //             fmt.Printf("create ipc %s for decode : %s, on camera id %s\n", ipc, c.URL, c.ID)
    //             url := strings.TrimSpace(c.URL)
    //             id := strings.TrimSpace(c.ID)
    //             i := strings.TrimSpace(ipc)
    //             go demo.SendByIPC(url, id, i)
    //         }
    //     }
    // }
    // cancel()
}
func test() {
    fmt.Println("start test")
    fmt.Println(picFolder)
    demo.SendByIPC(streamURL, "camera1", ipcURL, true)
}
decoder/valib/ipc
@@ -1 +1 @@
Subproject commit 66d8e6695ada4090bdb81279d594f323be875bc7
Subproject commit 8841f9d2154a8e0653d30fdc04bde8a772665c8c
decoder/work/service/decoder.go
@@ -55,8 +55,13 @@
}
// Run run a decoder
func (dec *Decoder) Run() {
    ipc := ipc.NewServer(dec.ctxIPC.ctx, dec.ipcURL)
func (dec *Decoder) Run(asServer bool) {
    var i *ipc.IPC
    if asServer {
        i = ipc.NewServer(dec.ctxIPC.ctx, dec.ipcURL)
    } else {
        i = ipc.NewClient(dec.ctxIPC.ctx, dec.ipcURL)
    }
    gf := goffmpeg.New()
    if gf != nil {
@@ -68,7 +73,7 @@
    go dec.serializeImageInfo(imageChan, dataChan)
    gf.ActiveDecoder(func(d *[]byte, w, h *int) {
        fmt.Println("get a pic")
        if *w > 0 && *h > 0 {
            i := ImageInfo{*d, *w, *h, dec.cameraID}
            imageChan <- i
@@ -78,7 +83,7 @@
    })
    gf.Run(dec.streamURL)
    ipc.SendFromChannel(dataChan)
    i.SendFromChannel(dataChan)
}
// Stop stop run decoder, must run in goroutine
decoder/work/service/reciever.go
@@ -54,13 +54,18 @@
}
// Run run task from ipc data
func (r *Reciever) Run() {
    ipc := ipc.NewClient(r.ctxIPC.ctx, r.ipcURL)
func (r *Reciever) Run(asServer bool) {
    var i *ipc.IPC
    if asServer {
        i = ipc.NewServer(r.ctxIPC.ctx, r.ipcURL)
    } else {
        i = ipc.NewClient(r.ctxIPC.ctx, r.ipcURL)
    }
    dataChan := make(chan []byte)
    go unserilizeImageInfo(dataChan, r.chImage)
    ipc.RecvToChannel(dataChan)
    i.RecvToChannel(dataChan)
}
// Stop stop reciever, run in goroutine