视频分析2.0 多进程拆分仓库
zhangmeng
2019-05-07 f963cd025c1aa88ac8b211e24f46ceb0eb64c418
decoder/main.go
@@ -19,6 +19,8 @@
   proc   string
   testIt bool
   asServer bool
)
func init() {
@@ -28,6 +30,8 @@
   flag.StringVar(&ipcURL, "ipc", "ipc:///tmp/pic.ipc", "ipc label")
   flag.BoolVar(&testIt, "test", false, "use test")
   flag.BoolVar(&asServer, "server", false, "run ipc as server")
}
// CameraInfo camera info
@@ -43,28 +47,36 @@
}
var (
   mapCI = make(map[string]CameraInfo)
   port  = 7001
   mapCameraInfo = make(map[string]CameraInfo)
   tcp  = `tcp://192.168.1.124:`
   port = 7001
)
func recvFromIPC(ctx context.Context, url string, ch chan<- CameraInfo) {
func recvCameraInfoFromIPC(ctx context.Context, url string, ch chan<- CameraInfo) {
   ipc := ipc.NewClient(ctx, url)
   for {
      msg := ipc.Recv()
      if msg != nil {
         fmt.Println(string(msg))
         var c CameraInfo
         if err := json.Unmarshal(msg, &c); err == nil {
            if _, ok := mapCameraInfo[c.ID]; ok {
               continue
            }
            ch <- c
            fmt.Printf("recv camere info %+v\n", c)
            msgIpc := MsgIPC{"new decoder", port}
            if b, err := json.Marshal(msgIpc); err == nil {
               ipc.Send(b)
            }
         } else {
            fmt.Println(err)
         }
         msgIpc := MsgIPC{"new decoder", port}
         if b, err := json.Marshal(msgIpc); err == nil {
            ipc.Send(b)
         }
      }
   }
}
@@ -79,25 +91,24 @@
   ctx, cancel := context.WithCancel(context.Background())
   ch := make(chan CameraInfo)
   go recvFromIPC(ctx, "tcp://192.168.1.124:7000", ch)
   // tcpURL := tcp + strconv.Itoa(port)
   // port++
   go recvCameraInfoFromIPC(ctx, ipcURL, ch)
   // demo.SendByIPC("rtsp://admin:a1234567@192.168.1.188:554/h264/ch1/main/av_stream", "cid0",
   //    "tcp://192.168.1.140:7000", false)
   for {
      select {
      case <-ctx.Done():
         return
      case c := <-ch:
         if _, ok := mapCI[c.ID]; !ok {
            mapCI[c.ID] = c
            ipc := "tcp://192.168.1.124:" + strconv.Itoa(port)
         if _, ok := mapCameraInfo[c.ID]; !ok {
            mapCameraInfo[c.ID] = c
            ipcAddr := tcp + strconv.Itoa(port)
            port++
            fmt.Printf("create ipc %s for decode : %s, on camera id %s\n", ipc, c.URL, c.ID)
            url := strings.TrimSpace(c.URL)
            id := strings.TrimSpace(c.ID)
            i := strings.TrimSpace(ipc)
            go runSender(id, url, i)
            addr := strings.TrimSpace(ipcAddr)
            go runSender(id, url, addr)
         }
      }
   }
@@ -106,8 +117,12 @@
func runSender(cameraID, rtspURL, ipcLabel string) {
   d := srv.NewSender(cameraID, rtspURL, ipcLabel)
   if asServer {
      d.RunAsServer()
   }
   d.RunAsClient()
}
func test() {
   fmt.Println("start test")