视频分析2.0 多进程拆分仓库
zhangmeng
2019-05-07 78a05014a9ad534c6fc2aad8a6a22be6ba5032e8
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
package main
 
import (
    "context"
    "decoder/valib/goffmpeg"
    "decoder/valib/ipc"
    srv "decoder/work/service"
    "encoding/json"
    "flag"
    "fmt"
    "strconv"
    "strings"
)
 
var (
    streamURL string
    picFolder string
 
    ipcURL string
    proc   string
 
    testIt bool
 
    asServer bool
)
 
func init() {
    flag.StringVar(&streamURL, "i", "rtsp://192.168.1.203:8554/16.mkv", "input url")
    flag.StringVar(&picFolder, "f", ".", "test pic folder")
 
    flag.StringVar(&ipcURL, "ipc", "ipc:///tmp/pic.ipc", "ipc label")
 
    flag.BoolVar(&testIt, "test", false, "use test")
 
    flag.BoolVar(&asServer, "server", false, "run ipc as server")
}
 
// CameraInfo camera info
type CameraInfo struct {
    ID  string `json:"Cameraid"`
    URL string `json:"Rtsp"`
}
 
// MsgIPC msg for ipc
type MsgIPC struct {
    CMD  string `json:"Command"`
    Port int    `jsong:"PortIpc"`
}
 
var (
    mapCameraInfo = make(map[string]CameraInfo)
)
 
func recvCameraInfoFromIPC(ctx context.Context, url string, ch chan<- CameraInfo) {
    ipc := ipc.NewClient(ctx, url)
 
    for {
        msg := ipc.Recv()
        if msg != nil {
 
            var c CameraInfo
            if err := json.Unmarshal(msg, &c); err == nil {
 
                if _, ok := mapCameraInfo[c.ID]; ok {
                    continue
                }
 
                ch <- c
 
                msgIpc := MsgIPC{"new decoder", 0}
                if b, err := json.Marshal(msgIpc); err == nil {
                    ipc.Send(b)
                }
            } else {
                fmt.Println(err)
            }
 
        }
    }
}
 
func main() {
    flag.Parse()
 
    goffmpeg.InitFFmpeg()
 
    if testIt {
        test()
    }
 
    ctx, cancel := context.WithCancel(context.Background())
 
    ch := make(chan CameraInfo)
 
    tcp := ``
    port := 7001
 
    if strings.Index(ipcURL, "tcp://") == 0 {
        i := strings.LastIndex(ipcURL, ":")
        tcp = ipcURL[0 : i+1]
 
        strPort := ipcURL[i+1:]
        port, _ = strconv.Atoi(strPort)
        port++
    }
 
    go recvCameraInfoFromIPC(ctx, ipcURL, ch)
 
    for {
        select {
        case <-ctx.Done():
            return
        case c := <-ch:
            if _, ok := mapCameraInfo[c.ID]; !ok {
                mapCameraInfo[c.ID] = c
                ipcAddr := tcp + strconv.Itoa(port)
                port++
 
                url := strings.TrimSpace(c.URL)
                id := strings.TrimSpace(c.ID)
                addr := strings.TrimSpace(ipcAddr)
                go runSender(id, url, addr)
            }
        }
    }
 
    goffmpeg.FreeFFmpeg()
    cancel()
}
 
func runSender(cameraID, rtspURL, ipcLabel string) {
    d := srv.NewSender(cameraID, rtspURL, ipcLabel)
    if asServer {
        d.RunAsServer()
    }
    d.RunAsClient()
}
 
func test() {
    fmt.Println("start test")
 
    fmt.Println(picFolder)
 
    runSender("cameraid", streamURL, ipcURL)
}