From 8e1e7d00ccf6c11922447b400d36b6954f21ac1b Mon Sep 17 00:00:00 2001
From: zhangmeng <775834166@qq.com>
Date: 星期三, 15 五月 2019 11:43:25 +0800
Subject: [PATCH] demo for deliver

---
 .gitignore  |    3 +
 go.sum      |    4 +
 .gitmodules |    3 +
 deliver     |    2 
 go.mod      |    8 ++
 main.go     |  113 +++++++++++++++++++++++++++++++++++++
 6 files changed, 132 insertions(+), 1 deletions(-)

diff --git a/.gitignore b/.gitignore
index 8365624..66bd209 100644
--- a/.gitignore
+++ b/.gitignore
@@ -21,3 +21,6 @@
 
 *.exe
 *.test
+
+.vscode
+demo
diff --git a/.gitmodules b/.gitmodules
new file mode 100644
index 0000000..d06e894
--- /dev/null
+++ b/.gitmodules
@@ -0,0 +1,3 @@
+[submodule "deliver"]
+	path = deliver
+	url = ssh://zhangmeng@192.168.1.12:29418/valib/deliver.git
diff --git a/deliver b/deliver
new file mode 160000
index 0000000..f5368c3
--- /dev/null
+++ b/deliver
@@ -1 +1 @@
-Subproject commit 0000000000000000000000000000000000000000
+Subproject commit f5368c38ae7d538ae37b1fa0444b66a688e299d0
diff --git a/go.mod b/go.mod
new file mode 100644
index 0000000..90c62b0
--- /dev/null
+++ b/go.mod
@@ -0,0 +1,8 @@
+module demo
+
+go 1.12
+
+require (
+	github.com/gorilla/websocket v1.4.0 // indirect
+	nanomsg.org/go-mangos v1.4.0
+)
diff --git a/go.sum b/go.sum
new file mode 100644
index 0000000..31ae857
--- /dev/null
+++ b/go.sum
@@ -0,0 +1,4 @@
+github.com/gorilla/websocket v1.4.0 h1:WDFjx/TMzVgy9VdMMQi2K2Emtwi2QcUQsztZ/zLaH/Q=
+github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ=
+nanomsg.org/go-mangos v1.4.0 h1:pVRLnzXePdSbhWlWdSncYszTagERhMG5zK/vXYmbEdM=
+nanomsg.org/go-mangos v1.4.0/go.mod h1:MOor8xUIgwsRMPpLr9xQxe7bT7rciibScOqVyztNxHQ=
diff --git a/main.go b/main.go
new file mode 100644
index 0000000..306c19c
--- /dev/null
+++ b/main.go
@@ -0,0 +1,113 @@
+package main
+
+import (
+	"demo/deliver"
+	"fmt"
+	"os"
+	"strconv"
+	"time"
+)
+
+const dLen = 12 * 1024 * 1024
+
+var mode = deliver.PushPull
+
+func sender(url string) {
+
+	var err error
+
+	s := deliver.NewProducer(deliver.Mode(mode), url)
+
+	buf := make([]byte, dLen)
+
+	for {
+
+		if err = s.Send(buf); err != nil {
+
+			fmt.Printf("can't send message on push socket: %s\n", err.Error())
+		} else {
+
+			fmt.Printf("send msg length %d\n", len(buf))
+		}
+
+		// time.Sleep(10 * time.Millisecond)
+	}
+
+}
+
+func recvImpl(url string, index int) {
+	c := deliver.NewConsumer(deliver.Mode(mode), url)
+
+	var msg []byte
+	var err error
+
+	var t int64
+	var elapse int64
+	count := 0
+
+	for {
+		msg, err = c.Recv()
+		if err != nil {
+			fmt.Println("recv error : ", err)
+		}
+		if t == 0 {
+			t = time.Now().UnixNano()
+		}
+		elapse = time.Now().UnixNano() - t
+
+		count++
+
+		if elapse > 1e9 {
+			fmt.Printf("NODE-%d: RECEIVED \"%d\" data len %d, use \"%d\" ns\n",
+				index, count, len(msg), elapse)
+			elapse = 0
+			count = 0
+			t = 0
+		}
+
+		// time.Sleep(10 * time.Millisecond)
+	}
+}
+
+func reciever(url string, strCount string) {
+	count, _ := strconv.Atoi(strCount)
+
+	for i := 0; i < count; i++ {
+		go recvImpl(url, i)
+	}
+
+	for {
+		time.Sleep(2 * time.Second)
+	}
+
+}
+
+func main() {
+	if len(os.Args) > 3 && os.Args[1] == "producer" {
+		if os.Args[2] == "pushpull" {
+			mode = deliver.PushPull
+		} else if os.Args[2] == "pubsub" {
+			mode = deliver.PubSub
+		} else if os.Args[2] == "pair" {
+			mode = deliver.Pair
+		}
+		sender(os.Args[3])
+		os.Exit(0)
+	}
+	if len(os.Args) > 3 && os.Args[1] == "consumer" {
+		if os.Args[2] == "pushpull" {
+			mode = deliver.PushPull
+		} else if os.Args[2] == "pubsub" {
+			mode = deliver.PubSub
+		} else if os.Args[2] == "pair" {
+			mode = deliver.Pair
+		}
+
+		reciever(os.Args[3], os.Args[4])
+		os.Exit(0)
+	}
+	fmt.Fprintf(os.Stderr,
+		"Usage: pushpull push|pull <URL> <ARG> ...\n")
+	os.Exit(1)
+
+}

--
Gitblit v1.8.0