视频分析2.0 多进程拆分仓库
zhangmeng
2019-05-07 f963cd025c1aa88ac8b211e24f46ceb0eb64c418
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
package service
 
import (
    "bytes"
    "context"
    "decoder/valib/goffmpeg"
    "decoder/valib/ipc"
    "encoding/gob"
    "fmt"
    "time"
)
 
// Sender decoder ingo
type Sender struct {
    ctxIPC *ContextIPC
 
    cameraID  string
    streamURL string
    ipcURL    string
 
    ffmpeg *goffmpeg.GoFFMPEG
}
 
// NewSender Sender
func NewSender(cameraID, streamURL, ipcURL string) *Sender {
    ctx, cancel := context.WithCancel(context.Background())
 
    fmt.Printf("create ipc %s for decode : %s, on camera id %s\n", ipcURL, streamURL, cameraID)
 
    return &Sender{
        ctxIPC: &ContextIPC{ctx, cancel},
 
        cameraID:  cameraID,
        streamURL: streamURL,
        ipcURL:    ipcURL,
    }
}
 
func (s *Sender) serializeImageInfo(img <-chan ImageInfo, data chan<- []byte) {
    for {
        select {
        case <-s.ctxIPC.ctx.Done():
            fmt.Println("stop Sender")
            return
        case i := <-img:
            var buf bytes.Buffer
            enc := gob.NewEncoder(&buf)
 
            if err := enc.Encode(i); err != nil {
                fmt.Println("gob encode camera image error", err)
                continue
            }
            b := buf.Bytes()
            data <- b
            fmt.Printf("send %d length data ipc address %s\n", len(b), s.ipcURL)
        }
 
    }
}
 
func (s *Sender) run(i *ipc.IPC) {
    gf := goffmpeg.New()
    if gf != nil {
        s.ffmpeg = gf
    } else {
        fmt.Println("create decoder ffmpeg error")
        return
    }
    imageChan := make(chan ImageInfo)
    dataChan := make(chan []byte)
 
    go s.serializeImageInfo(imageChan, dataChan)
 
    gf.BuildDecoder()
 
    go func(f *goffmpeg.GoFFMPEG, ch chan<- ImageInfo) {
        for {
            data, wid, hei := f.GetPicDecoder()
            if wid > 0 && hei > 0 {
                img := ImageInfo{Data: data, Width: wid, Height: hei}
                ch <- img
            } else {
                time.Sleep(time.Millisecond * time.Duration(10))
            }
        }
 
    }(gf, imageChan)
 
    gf.Run(s.streamURL)
 
    i.SendFromChannel(dataChan)
}
 
// RunAsServer run a IPC server
func (s *Sender) RunAsServer() {
    i := ipc.NewServer(s.ctxIPC.ctx, s.ipcURL)
    fmt.Println("ipc :", s.ipcURL, " cameraid:", s.cameraID)
    s.run(i)
 
}
 
// RunAsClient run as a IPC client
func (s *Sender) RunAsClient() {
    i := ipc.NewClient(s.ctxIPC.ctx, s.ipcURL)
    fmt.Println("ipc :", s.ipcURL, " cameraid:", s.cameraID)
    s.run(i)
}
 
// Stop stop run decoder, must run in goroutine
func (s *Sender) Stop() {
    s.ffmpeg.Free()
    s.ctxIPC.cancel()
}