From f963cd025c1aa88ac8b211e24f46ceb0eb64c418 Mon Sep 17 00:00:00 2001
From: zhangmeng <775834166@qq.com>
Date: 星期二, 07 五月 2019 14:19:29 +0800
Subject: [PATCH] 多个解码单元同时运行,注意tcp地址

---
 decoder/main.go |  143 ++++++++++++++++++++++++++++++++++-------------
 1 files changed, 102 insertions(+), 41 deletions(-)

diff --git a/decoder/main.go b/decoder/main.go
index 0c7f65b..4684779 100644
--- a/decoder/main.go
+++ b/decoder/main.go
@@ -1,14 +1,14 @@
 package main
 
 import (
-	"bytes"
 	"context"
-	"decoder/demo"
 	"decoder/valib/ipc"
-	"encoding/gob"
+	srv "decoder/work/service"
+	"encoding/json"
 	"flag"
 	"fmt"
-	// "videoServer/demo"
+	"strconv"
+	"strings"
 )
 
 var (
@@ -17,6 +17,10 @@
 
 	ipcURL string
 	proc   string
+
+	testIt bool
+
+	asServer bool
 )
 
 func init() {
@@ -24,6 +28,99 @@
 	flag.StringVar(&picFolder, "f", ".", "test pic folder")
 
 	flag.StringVar(&ipcURL, "ipc", "ipc:///tmp/pic.ipc", "ipc label")
+
+	flag.BoolVar(&testIt, "test", false, "use test")
+
+	flag.BoolVar(&asServer, "server", false, "run ipc as server")
+}
+
+// CameraInfo camera info
+type CameraInfo struct {
+	ID  string `json:"Cameraid"`
+	URL string `json:"Rtsp"`
+}
+
+// MsgIPC msg for ipc
+type MsgIPC struct {
+	CMD  string `json:"Command"`
+	Port int    `jsong:"PortIpc"`
+}
+
+var (
+	mapCameraInfo = make(map[string]CameraInfo)
+
+	tcp  = `tcp://192.168.1.124:`
+	port = 7001
+)
+
+func recvCameraInfoFromIPC(ctx context.Context, url string, ch chan<- CameraInfo) {
+	ipc := ipc.NewClient(ctx, url)
+
+	for {
+		msg := ipc.Recv()
+		if msg != nil {
+
+			var c CameraInfo
+			if err := json.Unmarshal(msg, &c); err == nil {
+
+				if _, ok := mapCameraInfo[c.ID]; ok {
+					continue
+				}
+
+				ch <- c
+
+				msgIpc := MsgIPC{"new decoder", port}
+				if b, err := json.Marshal(msgIpc); err == nil {
+					ipc.Send(b)
+				}
+			} else {
+				fmt.Println(err)
+			}
+
+		}
+	}
+}
+
+func main() {
+	flag.Parse()
+
+	if testIt {
+		test()
+	}
+
+	ctx, cancel := context.WithCancel(context.Background())
+
+	ch := make(chan CameraInfo)
+	// tcpURL := tcp + strconv.Itoa(port)
+	// port++
+	go recvCameraInfoFromIPC(ctx, ipcURL, ch)
+
+	for {
+		select {
+		case <-ctx.Done():
+			return
+		case c := <-ch:
+			if _, ok := mapCameraInfo[c.ID]; !ok {
+				mapCameraInfo[c.ID] = c
+				ipcAddr := tcp + strconv.Itoa(port)
+				port++
+
+				url := strings.TrimSpace(c.URL)
+				id := strings.TrimSpace(c.ID)
+				addr := strings.TrimSpace(ipcAddr)
+				go runSender(id, url, addr)
+			}
+		}
+	}
+	cancel()
+}
+
+func runSender(cameraID, rtspURL, ipcLabel string) {
+	d := srv.NewSender(cameraID, rtspURL, ipcLabel)
+	if asServer {
+		d.RunAsServer()
+	}
+	d.RunAsClient()
 }
 
 func test() {
@@ -31,41 +128,5 @@
 
 	fmt.Println(picFolder)
 
-	demo.SendByIPC(streamURL, "camera1", ipcURL)
-}
-
-type cameraInfo struct {
-	cameraID string
-	videoURL string
-}
-
-func recvFromIPC(ctx context.Context, url string) (cameraID, rtspURL string) {
-	ipc := ipc.NewClient(ctx, url)
-
-	for {
-		msg := ipc.Recv()
-
-		var buf bytes.Buffer
-		buf.Write(msg)
-
-		dec := gob.NewDecoder(&buf)
-
-		var i cameraInfo
-		if err := dec.Decode(&i); err != nil {
-			fmt.Println("gob decode CameraImage error", err)
-			continue
-		}
-		return i.cameraID, i.videoURL
-	}
-}
-
-func main() {
-	flag.Parse()
-
-	ctx, cancel := context.WithCancel(context.Background())
-	recvFromIPC(ctx, "tcp://192.168.1.156:7000")
-
-	cancel()
-	// test()
-
+	runSender("cameraid", streamURL, ipcURL)
 }

--
Gitblit v1.8.0