From 8e1e7d00ccf6c11922447b400d36b6954f21ac1b Mon Sep 17 00:00:00 2001 From: zhangmeng <775834166@qq.com> Date: 星期三, 15 五月 2019 11:43:25 +0800 Subject: [PATCH] demo for deliver --- .gitignore | 3 + go.sum | 4 + .gitmodules | 3 + deliver | 2 go.mod | 8 ++ main.go | 113 +++++++++++++++++++++++++++++++++++++ 6 files changed, 132 insertions(+), 1 deletions(-) diff --git a/.gitignore b/.gitignore index 8365624..66bd209 100644 --- a/.gitignore +++ b/.gitignore @@ -21,3 +21,6 @@ *.exe *.test + +.vscode +demo diff --git a/.gitmodules b/.gitmodules new file mode 100644 index 0000000..d06e894 --- /dev/null +++ b/.gitmodules @@ -0,0 +1,3 @@ +[submodule "deliver"] + path = deliver + url = ssh://zhangmeng@192.168.1.12:29418/valib/deliver.git diff --git a/deliver b/deliver new file mode 160000 index 0000000..f5368c3 --- /dev/null +++ b/deliver @@ -1 +1 @@ -Subproject commit 0000000000000000000000000000000000000000 +Subproject commit f5368c38ae7d538ae37b1fa0444b66a688e299d0 diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..90c62b0 --- /dev/null +++ b/go.mod @@ -0,0 +1,8 @@ +module demo + +go 1.12 + +require ( + github.com/gorilla/websocket v1.4.0 // indirect + nanomsg.org/go-mangos v1.4.0 +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..31ae857 --- /dev/null +++ b/go.sum @@ -0,0 +1,4 @@ +github.com/gorilla/websocket v1.4.0 h1:WDFjx/TMzVgy9VdMMQi2K2Emtwi2QcUQsztZ/zLaH/Q= +github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ= +nanomsg.org/go-mangos v1.4.0 h1:pVRLnzXePdSbhWlWdSncYszTagERhMG5zK/vXYmbEdM= +nanomsg.org/go-mangos v1.4.0/go.mod h1:MOor8xUIgwsRMPpLr9xQxe7bT7rciibScOqVyztNxHQ= diff --git a/main.go b/main.go new file mode 100644 index 0000000..306c19c --- /dev/null +++ b/main.go @@ -0,0 +1,113 @@ +package main + +import ( + "demo/deliver" + "fmt" + "os" + "strconv" + "time" +) + +const dLen = 12 * 1024 * 1024 + +var mode = deliver.PushPull + +func sender(url string) { + + var err error + + s := deliver.NewProducer(deliver.Mode(mode), url) + + buf := make([]byte, dLen) + + for { + + if err = s.Send(buf); err != nil { + + fmt.Printf("can't send message on push socket: %s\n", err.Error()) + } else { + + fmt.Printf("send msg length %d\n", len(buf)) + } + + // time.Sleep(10 * time.Millisecond) + } + +} + +func recvImpl(url string, index int) { + c := deliver.NewConsumer(deliver.Mode(mode), url) + + var msg []byte + var err error + + var t int64 + var elapse int64 + count := 0 + + for { + msg, err = c.Recv() + if err != nil { + fmt.Println("recv error : ", err) + } + if t == 0 { + t = time.Now().UnixNano() + } + elapse = time.Now().UnixNano() - t + + count++ + + if elapse > 1e9 { + fmt.Printf("NODE-%d: RECEIVED \"%d\" data len %d, use \"%d\" ns\n", + index, count, len(msg), elapse) + elapse = 0 + count = 0 + t = 0 + } + + // time.Sleep(10 * time.Millisecond) + } +} + +func reciever(url string, strCount string) { + count, _ := strconv.Atoi(strCount) + + for i := 0; i < count; i++ { + go recvImpl(url, i) + } + + for { + time.Sleep(2 * time.Second) + } + +} + +func main() { + if len(os.Args) > 3 && os.Args[1] == "producer" { + if os.Args[2] == "pushpull" { + mode = deliver.PushPull + } else if os.Args[2] == "pubsub" { + mode = deliver.PubSub + } else if os.Args[2] == "pair" { + mode = deliver.Pair + } + sender(os.Args[3]) + os.Exit(0) + } + if len(os.Args) > 3 && os.Args[1] == "consumer" { + if os.Args[2] == "pushpull" { + mode = deliver.PushPull + } else if os.Args[2] == "pubsub" { + mode = deliver.PubSub + } else if os.Args[2] == "pair" { + mode = deliver.Pair + } + + reciever(os.Args[3], os.Args[4]) + os.Exit(0) + } + fmt.Fprintf(os.Stderr, + "Usage: pushpull push|pull <URL> <ARG> ...\n") + os.Exit(1) + +} -- Gitblit v1.8.0