From 8e1e7d00ccf6c11922447b400d36b6954f21ac1b Mon Sep 17 00:00:00 2001
From: zhangmeng <775834166@qq.com>
Date: 星期三, 15 五月 2019 11:43:25 +0800
Subject: [PATCH] demo for deliver
---
.gitignore | 3 +
go.sum | 4 +
.gitmodules | 3 +
deliver | 2
go.mod | 8 ++
main.go | 113 +++++++++++++++++++++++++++++++++++++
6 files changed, 132 insertions(+), 1 deletions(-)
diff --git a/.gitignore b/.gitignore
index 8365624..66bd209 100644
--- a/.gitignore
+++ b/.gitignore
@@ -21,3 +21,6 @@
*.exe
*.test
+
+.vscode
+demo
diff --git a/.gitmodules b/.gitmodules
new file mode 100644
index 0000000..d06e894
--- /dev/null
+++ b/.gitmodules
@@ -0,0 +1,3 @@
+[submodule "deliver"]
+ path = deliver
+ url = ssh://zhangmeng@192.168.1.12:29418/valib/deliver.git
diff --git a/deliver b/deliver
new file mode 160000
index 0000000..f5368c3
--- /dev/null
+++ b/deliver
@@ -1 +1 @@
-Subproject commit 0000000000000000000000000000000000000000
+Subproject commit f5368c38ae7d538ae37b1fa0444b66a688e299d0
diff --git a/go.mod b/go.mod
new file mode 100644
index 0000000..90c62b0
--- /dev/null
+++ b/go.mod
@@ -0,0 +1,8 @@
+module demo
+
+go 1.12
+
+require (
+ github.com/gorilla/websocket v1.4.0 // indirect
+ nanomsg.org/go-mangos v1.4.0
+)
diff --git a/go.sum b/go.sum
new file mode 100644
index 0000000..31ae857
--- /dev/null
+++ b/go.sum
@@ -0,0 +1,4 @@
+github.com/gorilla/websocket v1.4.0 h1:WDFjx/TMzVgy9VdMMQi2K2Emtwi2QcUQsztZ/zLaH/Q=
+github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ=
+nanomsg.org/go-mangos v1.4.0 h1:pVRLnzXePdSbhWlWdSncYszTagERhMG5zK/vXYmbEdM=
+nanomsg.org/go-mangos v1.4.0/go.mod h1:MOor8xUIgwsRMPpLr9xQxe7bT7rciibScOqVyztNxHQ=
diff --git a/main.go b/main.go
new file mode 100644
index 0000000..306c19c
--- /dev/null
+++ b/main.go
@@ -0,0 +1,113 @@
+package main
+
+import (
+ "demo/deliver"
+ "fmt"
+ "os"
+ "strconv"
+ "time"
+)
+
+const dLen = 12 * 1024 * 1024
+
+var mode = deliver.PushPull
+
+func sender(url string) {
+
+ var err error
+
+ s := deliver.NewProducer(deliver.Mode(mode), url)
+
+ buf := make([]byte, dLen)
+
+ for {
+
+ if err = s.Send(buf); err != nil {
+
+ fmt.Printf("can't send message on push socket: %s\n", err.Error())
+ } else {
+
+ fmt.Printf("send msg length %d\n", len(buf))
+ }
+
+ // time.Sleep(10 * time.Millisecond)
+ }
+
+}
+
+func recvImpl(url string, index int) {
+ c := deliver.NewConsumer(deliver.Mode(mode), url)
+
+ var msg []byte
+ var err error
+
+ var t int64
+ var elapse int64
+ count := 0
+
+ for {
+ msg, err = c.Recv()
+ if err != nil {
+ fmt.Println("recv error : ", err)
+ }
+ if t == 0 {
+ t = time.Now().UnixNano()
+ }
+ elapse = time.Now().UnixNano() - t
+
+ count++
+
+ if elapse > 1e9 {
+ fmt.Printf("NODE-%d: RECEIVED \"%d\" data len %d, use \"%d\" ns\n",
+ index, count, len(msg), elapse)
+ elapse = 0
+ count = 0
+ t = 0
+ }
+
+ // time.Sleep(10 * time.Millisecond)
+ }
+}
+
+func reciever(url string, strCount string) {
+ count, _ := strconv.Atoi(strCount)
+
+ for i := 0; i < count; i++ {
+ go recvImpl(url, i)
+ }
+
+ for {
+ time.Sleep(2 * time.Second)
+ }
+
+}
+
+func main() {
+ if len(os.Args) > 3 && os.Args[1] == "producer" {
+ if os.Args[2] == "pushpull" {
+ mode = deliver.PushPull
+ } else if os.Args[2] == "pubsub" {
+ mode = deliver.PubSub
+ } else if os.Args[2] == "pair" {
+ mode = deliver.Pair
+ }
+ sender(os.Args[3])
+ os.Exit(0)
+ }
+ if len(os.Args) > 3 && os.Args[1] == "consumer" {
+ if os.Args[2] == "pushpull" {
+ mode = deliver.PushPull
+ } else if os.Args[2] == "pubsub" {
+ mode = deliver.PubSub
+ } else if os.Args[2] == "pair" {
+ mode = deliver.Pair
+ }
+
+ reciever(os.Args[3], os.Args[4])
+ os.Exit(0)
+ }
+ fmt.Fprintf(os.Stderr,
+ "Usage: pushpull push|pull <URL> <ARG> ...\n")
+ os.Exit(1)
+
+}
--
Gitblit v1.8.0