视频分析2.0 多进程拆分仓库
zhangmeng
2019-05-08 ce15b43db3e60acc65ddd25de253b8577c2693aa
context使用
1个文件已删除
4个文件已修改
229 ■■■■■ 已修改文件
analysis/demo/winYolo.go 46 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
analysis/main.go 108 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
analysis/proc.go 36 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
analysis/work/service/common.go 12 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
analysis/work/service/ipcreciever.go 27 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
analysis/demo/winYolo.go
@@ -3,8 +3,10 @@
import (
    "analysis/valib/gosdk"
    srv "analysis/work/service"
    "fmt"
    "image"
    "image/color"
    "math/rand"
    "strconv"
    "sync"
@@ -53,12 +55,25 @@
    }
}
var yoloDetInfo []gosdk.CObjInfo
var startYolo bool
var m sync.Mutex
type Yolo struct {
    yoloDetInfo []gosdk.CObjInfo
    startYolo   bool
    m           sync.Mutex
}
func NewYolo() *Yolo {
    return &Yolo{
        startYolo: false,
    }
}
// ShowYolo show yolo result
func ShowYolo(ch <-chan srv.ImageInfo) {
func (y *Yolo) ShowYolo(ch <-chan srv.ImageInfo) {
    y.ShowYoloWithName(ch, "Yolo")
}
// ShowYoloWithName show yolo result
func (y *Yolo) ShowYoloWithName(ch <-chan srv.ImageInfo, winName string) {
    cfg := "./data/yolo/cfg/yolov3.cfg"
    weights := "./data/yolo/yolov3.weights"
@@ -66,7 +81,10 @@
    yolo := gosdk.InitYolo(cfg, weights, name, 0)
    window := gocv.NewWindow("Yolo")
    winName = winName + strconv.Itoa(rand.Intn(10))
    window := gocv.NewWindow(winName)
    fmt.Println("create window : ", winName)
    pic := gocv.NewMat()
@@ -74,25 +92,25 @@
        i := <-ch
        if !startYolo {
        if !y.startYolo {
            go func(yolo *gosdk.YoloHandle, i srv.ImageInfo) {
                startYolo = true
                y.startYolo = true
                img := gosdk.SDKImage{Data: i.Data, Width: i.Width, Height: i.Height}
                obj := gosdk.YoloDetect(yolo, img, 0.4, 0)
                m.Lock()
                yoloDetInfo = obj
                m.Unlock()
                startYolo = false
                y.m.Lock()
                y.yoloDetInfo = obj
                y.m.Unlock()
                y.startYolo = false
            }(yolo, i)
        }
        img, _ := gocv.NewMatFromBytes(i.Height, i.Width, gocv.MatTypeCV8UC3, i.Data)
        var obj []gosdk.CObjInfo
        m.Lock()
        obj = yoloDetInfo
        m.Unlock()
        y.m.Lock()
        obj = y.yoloDetInfo
        y.m.Unlock()
        cvDrawObject(&img, obj)
        // show obj
        gocv.Resize(img, &pic, image.Pt(0, 0), 0.5, 0.5, gocv.InterpolationDefault)
analysis/main.go
@@ -2,26 +2,39 @@
import (
    "analysis/demo"
    "analysis/valib/ipc"
    srv "analysis/work/service"
    "bytes"
    "context"
    "encoding/gob"
    "encoding/json"
    "flag"
    "fmt"
    "strconv"
    "time"
)
var (
    streamURL string
    picFolder string
    asServer bool
    ipcURL    string
    streamURL string
    ipcServer bool
    procType  string
    procCount int
)
func init() {
    flag.StringVar(&streamURL, "i", "rtsp://192.168.1.203:8554/16.mkv", "input url")
    flag.StringVar(&picFolder, "f", ".", "test pic folder")
    flag.StringVar(&ipcURL, "ipc", "ipc:///tmp/pic.ipc", "ipc label")
    flag.StringVar(&proc, "proc", "", "proc name")
    flag.StringVar(&ipcURL, "ipc", "ipc:///tmp/piipc", "ipc label")
    flag.BoolVar(&asServer, "server", false, "run ipc as server")
    flag.BoolVar(&ipcServer, "server", false, "run ipc as server")
    flag.IntVar(&procCount, "c", 1, "proc run count")
}
const (
@@ -29,21 +42,96 @@
    reciever = "rcv"
)
//   每一个任务标签
type TaskLabel struct {
    Taskid string
    Sdkids []string
    Index  int
}
//每一条加工后的数据流
type SdkMessage struct {
    Cid     string
    Tasklab TaskLabel
    Data    []byte
}
type ChSdkMsg struct {
    sdkMsg SdkMessage
    ch     chan<- srv.ImageInfo
}
var mapSdkMsg = make(map[string]ChSdkMsg)
func recvSdkMsgInfoFromIPC(ctx context.Context, url string) {
    ipc := ipc.NewClient(ctx, url)
    fmt.Println("ipc address: ", url)
    for {
        msg := ipc.Recv()
        if msg != nil {
            var m SdkMessage
            if err := json.Unmarshal(msg, &m); err == nil {
                if v, ok := mapSdkMsg[m.Cid]; !ok {
                    imageChan := make(chan srv.ImageInfo)
                    mapSdkMsg[m.Cid] = ChSdkMsg{m, imageChan}
                    y := demo.NewYolo()
                    go y.ShowYoloWithName(imageChan, m.Cid)
                } else {
                    fmt.Println("recv ipc : ", url, " cameraid : ", v.sdkMsg.Cid)
                    var buf bytes.Buffer
                    buf.Write(v.sdkMsg.Data)
                    dec := gob.NewDecoder(&buf)
                    var i srv.ImageInfo
                    if err := dec.Decode(&i); err != nil {
                        fmt.Println("gob decode CameraImage error", err)
                        continue
                    }
                    v.ch <- i
                }
            } else {
                fmt.Println(err)
            }
        }
    }
}
func main() {
    flag.Parse()
    fmt.Println("start test, pic folder: ", picFolder)
    imageChan := make(chan srv.ImageInfo)
    d := srv.NewReciever(ipcURL, imageChan)
    tcp := "tcp://192.168.1.124:"
    port := 9000
    ctx, cancel := context.WithCancel(context.Background())
    for i := 0; i < procCount; i++ {
        go recvSdkMsgInfoFromIPC(ctx, tcp+strconv.Itoa(port))
        port++
    }
    for {
        time.Sleep(time.Duration(2) * time.Second)
    }
    cancel()
}
    if asServer {
func oneTest(ctx context.Context) {
    imageChan := make(chan srv.ImageInfo)
    d := srv.NewReciever(ctx, ipcURL, imageChan)
    if ipcServer {
        go d.RunAsServer()
    } else {
        go d.RunAsClient()
    }
    demo.ShowYolo(imageChan)
    y := demo.NewYolo()
    y.ShowYolo(imageChan)
    fakeStartProc()
}
analysis/proc.go
File was deleted
analysis/work/service/common.go
@@ -1,20 +1,10 @@
package service
import (
    "context"
)
// ImageInfo deocded image data
type ImageInfo struct {
    Data   []byte
    Width  int
    Height int
    CameraID string
}
// ContextIPC server context
type ContextIPC struct {
    ctx    context.Context
    cancel context.CancelFunc
    // CameraID string
}
analysis/work/service/ipcreciever.go
@@ -10,7 +10,7 @@
// Reciever recv from ipc
type Reciever struct {
    ctxIPC  *ContextIPC
    ctx     context.Context
    ipcURL  string
    chImage chan<- ImageInfo
}
@@ -34,20 +34,10 @@
}
// NewReciever new recv
func NewReciever(url string, ch chan<- ImageInfo) *Reciever {
    ctx, cancel := context.WithCancel(context.Background())
func NewReciever(ctx context.Context, url string, ch chan<- ImageInfo) *Reciever {
    return &Reciever{
        ctxIPC:  &ContextIPC{ctx, cancel},
        ipcURL:  url,
        chImage: ch,
    }
}
// NewRecieverWithContext new recver with context
func NewRecieverWithContext(ctx context.Context, url string, ch chan<- ImageInfo) *Reciever {
    return &Reciever{
        ctxIPC:  &ContextIPC{ctx, nil},
        ctx:     ctx,
        ipcURL:  url,
        chImage: ch,
    }
@@ -62,18 +52,11 @@
// RunAsServer run a IPC server
func (r *Reciever) RunAsServer() {
    r.run(ipc.NewServer(r.ctxIPC.ctx, r.ipcURL))
    r.run(ipc.NewServer(r.ctx, r.ipcURL))
}
// RunAsClient run as a IPC client
func (r *Reciever) RunAsClient() {
    r.run(ipc.NewClient(r.ctxIPC.ctx, r.ipcURL))
}
// Stop stop reciever, run in goroutine
func (r *Reciever) Stop() {
    if r.ctxIPC.cancel != nil {
        r.ctxIPC.cancel()
    }
    r.run(ipc.NewClient(r.ctx, r.ipcURL))
}