视频分析2.0 多进程拆分仓库
zhangmeng
2019-05-06 788f46d4e67c08c7e4f26f62c90f8d85849bcf18
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
package service
 
import (
    "bytes"
    "context"
    "decoder/valib/goffmpeg"
    "decoder/valib/ipc"
    "encoding/gob"
    "fmt"
)
 
// Sender decoder ingo
type Sender struct {
    ctxIPC *ContextIPC
 
    cameraID  string
    streamURL string
    ipcURL    string
 
    ffmpeg *goffmpeg.GoFFMPEG
}
 
// NewSender Sender
func NewSender(cameraID, streamURL, ipcURL string) *Sender {
    ctx, cancel := context.WithCancel(context.Background())
 
    return &Sender{
        ctxIPC: &ContextIPC{ctx, cancel},
 
        cameraID:  cameraID,
        streamURL: streamURL,
        ipcURL:    ipcURL,
    }
}
 
func (s *Sender) serializeImageInfo(img <-chan ImageInfo, data chan<- []byte) {
    for {
        select {
        case <-s.ctxIPC.ctx.Done():
            fmt.Println("stop Sender")
            return
        case i := <-img:
            var buf bytes.Buffer
            enc := gob.NewEncoder(&buf)
 
            if err := enc.Encode(i); err != nil {
                fmt.Println("gob encode camera image error", err)
                continue
            }
            b := buf.Bytes()
            data <- b
        }
 
    }
}
 
func (s *Sender) run(i *ipc.IPC) {
    gf := goffmpeg.New()
    if gf != nil {
        s.ffmpeg = gf
    }
    imageChan := make(chan ImageInfo)
    dataChan := make(chan []byte)
 
    go s.serializeImageInfo(imageChan, dataChan)
 
    gf.ActiveDecoder(func(d *[]byte, w, h *int) {
        if *w > 0 && *h > 0 {
            i := ImageInfo{*d, *w, *h, s.cameraID}
            imageChan <- i
        } else {
            fmt.Println("decode jpg error")
        }
    })
    gf.Run(s.streamURL)
 
    i.SendFromChannel(dataChan)
}
 
// RunAsServer run a IPC server
func (s *Sender) RunAsServer() {
    s.run(ipc.NewServer(s.ctxIPC.ctx, s.ipcURL))
 
}
 
// RunAsClient run as a IPC client
func (s *Sender) RunAsClient() {
    s.run(ipc.NewClient(s.ctxIPC.ctx, s.ipcURL))
}
 
// Stop stop run decoder, must run in goroutine
func (s *Sender) Stop() {
    s.ffmpeg.Free()
    s.ctxIPC.cancel()
}