视频分析2.0 多进程拆分仓库
zhangmeng
2019-05-08 19445d14e73f4bb96e218a65e126ae526f89537d
decoder/main.go
@@ -1,14 +1,15 @@
package main
import (
   "bytes"
   "context"
   "decoder/demo"
   "decoder/valib/goffmpeg"
   "decoder/valib/ipc"
   "encoding/gob"
   srv "decoder/work/service"
   "encoding/json"
   "flag"
   "fmt"
   // "videoServer/demo"
   "strconv"
   "strings"
)
var (
@@ -17,6 +18,10 @@
   ipcURL string
   proc   string
   testIt bool
   asServer bool
)
func init() {
@@ -24,48 +29,117 @@
   flag.StringVar(&picFolder, "f", ".", "test pic folder")
   flag.StringVar(&ipcURL, "ipc", "ipc:///tmp/pic.ipc", "ipc label")
   flag.BoolVar(&testIt, "test", false, "use test")
   flag.BoolVar(&asServer, "server", false, "run ipc as server")
}
func test() {
   fmt.Println("start test")
   fmt.Println(picFolder)
   demo.SendByIPC(streamURL, "camera1", ipcURL)
// CameraInfo camera info
type CameraInfo struct {
   ID  string `json:"Cameraid"`
   URL string `json:"Rtsp"`
}
type cameraInfo struct {
   cameraID string
   videoURL string
// MsgIPC msg for ipc
type MsgIPC struct {
   CMD  string `json:"Command"`
   Port int    `jsong:"PortIpc"`
}
func recvFromIPC(ctx context.Context, url string) (cameraID, rtspURL string) {
var (
   mapCameraInfo = make(map[string]CameraInfo)
)
func recvCameraInfoFromIPC(ctx context.Context, url string, ch chan<- CameraInfo) {
   ipc := ipc.NewClient(ctx, url)
   for {
      msg := ipc.Recv()
      if msg != nil {
      var buf bytes.Buffer
      buf.Write(msg)
         var c CameraInfo
         if err := json.Unmarshal(msg, &c); err == nil {
      dec := gob.NewDecoder(&buf)
            if _, ok := mapCameraInfo[c.ID]; ok {
               continue
            }
      var i cameraInfo
      if err := dec.Decode(&i); err != nil {
         fmt.Println("gob decode CameraImage error", err)
         continue
            ch <- c
            msgIpc := MsgIPC{"new decoder", 0}
            if b, err := json.Marshal(msgIpc); err == nil {
               ipc.Send(b)
            }
         } else {
            fmt.Println(err)
         }
      }
      return i.cameraID, i.videoURL
   }
}
func main() {
   flag.Parse()
   goffmpeg.InitFFmpeg()
   ctx, cancel := context.WithCancel(context.Background())
   recvFromIPC(ctx, "tcp://192.168.1.156:7000")
   if testIt {
      test(ctx)
   }
   ch := make(chan CameraInfo)
   tcp := ``
   port := 7001
   if strings.Index(ipcURL, "tcp://") == 0 {
      i := strings.LastIndex(ipcURL, ":")
      tcp = ipcURL[0 : i+1]
      strPort := ipcURL[i+1:]
      port, _ = strconv.Atoi(strPort)
      port++
   }
   go recvCameraInfoFromIPC(ctx, ipcURL, ch)
   for {
      select {
      case <-ctx.Done():
         return
      case c := <-ch:
         if _, ok := mapCameraInfo[c.ID]; !ok {
            mapCameraInfo[c.ID] = c
            ipcAddr := tcp + strconv.Itoa(port)
            port++
            url := strings.TrimSpace(c.URL)
            id := strings.TrimSpace(c.ID)
            addr := strings.TrimSpace(ipcAddr)
            go runSender(ctx, id, url, addr)
         }
      }
   }
   goffmpeg.FreeFFmpeg()
   cancel()
   // test()
}
func runSender(ctx context.Context, cameraID, rtspURL, ipcLabel string) {
   d := srv.NewSender(ctx, cameraID, rtspURL, ipcLabel)
   if asServer {
      d.RunAsServer()
   }
   d.RunAsClient()
}
func test(ctx context.Context) {
   fmt.Println("start test")
   fmt.Println(picFolder)
   runSender(ctx, "cameraid", streamURL, ipcURL)
}