视频分析2.0 多进程拆分仓库
zhangmeng
2019-04-30 d3618c4e9ceab273a52813c1f2c462912fb81e59
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
package service
 
import (
    "bytes"
    "context"
    "decoder/valib/goffmpeg"
    "decoder/valib/ipc"
    "encoding/gob"
    "fmt"
)
 
// Decoder decoder ingo
type Decoder struct {
    ctxIPC *ContextIPC
 
    cameraID  string
    streamURL string
    ipcURL    string
 
    ffmpeg *goffmpeg.GoFFMPEG
}
 
// NewDecoder decoder
func NewDecoder(cameraID, streamURL, ipcURL string) *Decoder {
    ctx, cancel := context.WithCancel(context.Background())
 
    return &Decoder{
        ctxIPC: &ContextIPC{ctx, cancel},
 
        cameraID:  cameraID,
        streamURL: streamURL,
        ipcURL:    ipcURL,
    }
}
 
func (dec *Decoder) serializeImageInfo(img <-chan ImageInfo, data chan<- []byte) {
    for {
        select {
        case <-dec.ctxIPC.ctx.Done():
            fmt.Println("stop decoder")
            return
        case i := <-img:
            var buf bytes.Buffer
            enc := gob.NewEncoder(&buf)
 
            if err := enc.Encode(i); err != nil {
                fmt.Println("gob encode camera image error", err)
                continue
            }
            b := buf.Bytes()
            data <- b
        }
 
    }
}
 
// Run run a decoder
func (dec *Decoder) Run(asServer bool) {
    var i *ipc.IPC
    if asServer {
        i = ipc.NewServer(dec.ctxIPC.ctx, dec.ipcURL)
    } else {
        i = ipc.NewClient(dec.ctxIPC.ctx, dec.ipcURL)
    }
 
    gf := goffmpeg.New()
    if gf != nil {
        dec.ffmpeg = gf
    }
    imageChan := make(chan ImageInfo)
    dataChan := make(chan []byte)
 
    go dec.serializeImageInfo(imageChan, dataChan)
 
    gf.ActiveDecoder(func(d *[]byte, w, h *int) {
        fmt.Println("get a pic")
        if *w > 0 && *h > 0 {
            i := ImageInfo{*d, *w, *h, dec.cameraID}
            imageChan <- i
        } else {
            fmt.Println("decode jpg error")
        }
    })
    gf.Run(dec.streamURL)
 
    i.SendFromChannel(dataChan)
}
 
// Stop stop run decoder, must run in goroutine
func (dec *Decoder) Stop() {
    dec.ffmpeg.Free()
    dec.ctxIPC.cancel()
}