From 22125ca10867152617cc4f42f403a0f6e37648a4 Mon Sep 17 00:00:00 2001
From: zhangmeng <775834166@qq.com>
Date: 星期五, 17 五月 2019 18:08:23 +0800
Subject: [PATCH] add reqrep
---
reqrep.go.orig | 88 ++++++++++++
deliver | 2
main.go | 124 ++++-------------
pushpull.go | 90 ++++++++++++
reqrep.go | 72 ++++++++++
5 files changed, 282 insertions(+), 94 deletions(-)
diff --git a/deliver b/deliver
index 306588f..d23f54e 160000
--- a/deliver
+++ b/deliver
@@ -1 +1 @@
-Subproject commit 306588f52747268250997a9255ef19583bbd615c
+Subproject commit d23f54e337d12fb4e6d5a0a5e1f041a51005e10c
diff --git a/main.go b/main.go
index 4a370b7..aed056c 100644
--- a/main.go
+++ b/main.go
@@ -4,114 +4,52 @@
"demo/deliver"
"fmt"
"os"
- "os/signal"
- "strconv"
- "time"
-
- "golang.org/x/sys/unix"
)
const dLen = 12 * 1024 * 1024
-var mode = deliver.PushPull
+func modeType(t string) deliver.Mode {
-func senderImpl(s deliver.Deliver) {
- var err error
-
- buf := make([]byte, dLen)
-
- for {
-
- if err = s.Send(buf); err != nil {
-
- fmt.Printf("can't send message on push socket: %s\n", err.Error())
- } else {
-
- fmt.Printf("send msg length %d\n", len(buf))
- }
-
- // time.Sleep(10 * time.Millisecond)
- }
-
-}
-func sender(url string, args ...interface{}) {
- s := deliver.NewServer(deliver.Mode(mode), url, args...)
-
- go senderImpl(s)
-
- c := make(chan os.Signal, 1)
- signal.Notify(c, os.Interrupt, os.Kill, unix.SIGTERM)
- <-c
-
- s.Close()
-}
-
-func recvImpl(url string, index int) {
- c := deliver.NewClient(deliver.Mode(mode), url)
-
- var msg []byte
- var err error
-
- var t int64
- var elapse int64
- count := 0
-
- for {
- msg, err = c.Recv()
- if err != nil {
- fmt.Println("recv error : ", err)
- }
- if t == 0 {
- t = time.Now().UnixNano()
- }
- elapse = time.Now().UnixNano() - t
-
- count++
-
- if elapse > 1e9 {
- fmt.Printf("NODE-%d: RECEIVED \"%d\" data len %d, use \"%d\" ns\n",
- index, count, len(msg), elapse)
- elapse = 0
- count = 0
- t = 0
- }
-
- // time.Sleep(10 * time.Millisecond)
- }
-}
-
-func reciever(url string, strCount string) {
- count, _ := strconv.Atoi(strCount)
-
- for i := 0; i < count; i++ {
- go recvImpl(url, i)
- }
-
- for {
- time.Sleep(2 * time.Second)
- }
-
-}
-
-func modeType(t string) {
if t == "pushpull" {
- mode = deliver.PushPull
+ return deliver.PushPull
} else if t == "pubsub" {
- mode = deliver.PubSub
+ return deliver.PubSub
} else if t == "pair" {
- mode = deliver.Pair
+ return deliver.Pair
+ } else if t == "reqrep" {
+ return deliver.ReqRep
}
+
+ return deliver.Mode(-1)
}
+
+func senderMode(ipc string, m deliver.Mode) {
+ if m == deliver.ReqRep {
+ req(ipc, m)
+ }
+ sender(ipc, m)
+}
+
+func recvMode(ipc string, m deliver.Mode, strCount string) {
+ if m == deliver.ReqRep {
+ rep(ipc, m)
+ }
+ reciever(ipc, m, strCount)
+}
+
func main() {
if len(os.Args) > 3 && os.Args[1] == "producer" {
- modeType(os.Args[2])
- sender(os.Args[3])
+ m := modeType(os.Args[2])
+ if m > deliver.ModeStart {
+ senderMode(os.Args[3], m)
+ }
os.Exit(0)
}
if len(os.Args) > 3 && os.Args[1] == "consumer" {
- modeType(os.Args[2])
-
- reciever(os.Args[3], os.Args[4])
+ m := modeType(os.Args[2])
+ if m > deliver.ModeStart {
+ recvMode(os.Args[3], m, os.Args[4])
+ }
os.Exit(0)
}
fmt.Fprintf(os.Stderr,
diff --git a/pushpull.go b/pushpull.go
new file mode 100644
index 0000000..9802c37
--- /dev/null
+++ b/pushpull.go
@@ -0,0 +1,90 @@
+package main
+
+import (
+ "demo/deliver"
+ "fmt"
+ "os"
+ "os/signal"
+ "strconv"
+ "time"
+
+ "golang.org/x/sys/unix"
+)
+
+func senderImpl(s deliver.Deliver) {
+ var err error
+
+ buf := make([]byte, dLen)
+
+ for {
+
+ if err = s.Send(buf); err != nil {
+
+ fmt.Printf("can't send message on push socket: %s\n", err.Error())
+ } else {
+
+ fmt.Printf("send msg length %d\n", len(buf))
+ }
+
+ // time.Sleep(10 * time.Millisecond)
+ }
+
+}
+func sender(url string, m deliver.Mode, args ...interface{}) {
+ s := deliver.NewServer(m, url, args...)
+
+ go senderImpl(s)
+
+ c := make(chan os.Signal, 1)
+ signal.Notify(c, os.Interrupt, os.Kill, unix.SIGTERM)
+ <-c
+
+ s.Close()
+}
+
+func recvImpl(url string, m deliver.Mode, index int) {
+ c := deliver.NewClient(m, url)
+
+ var msg []byte
+ var err error
+
+ var t int64
+ var elapse int64
+ count := 0
+
+ for {
+ msg, err = c.Recv()
+ if err != nil {
+ fmt.Println("recv error : ", err)
+ }
+ if t == 0 {
+ t = time.Now().UnixNano()
+ }
+ elapse = time.Now().UnixNano() - t
+
+ count++
+
+ if elapse > 1e9 {
+ fmt.Printf("NODE-%d: RECEIVED \"%d\" data len %d, use \"%d\" ns\n",
+ index, count, len(msg), elapse)
+ elapse = 0
+ count = 0
+ t = 0
+ }
+
+ // time.Sleep(10 * time.Millisecond)
+ }
+}
+
+func reciever(url string, m deliver.Mode, strCount string) {
+ count, _ := strconv.Atoi(strCount)
+
+ for i := 0; i < count; i++ {
+ go recvImpl(url, m, i)
+ }
+
+ for {
+ time.Sleep(2 * time.Second)
+ }
+
+}
diff --git a/reqrep.go b/reqrep.go
new file mode 100644
index 0000000..468ff27
--- /dev/null
+++ b/reqrep.go
@@ -0,0 +1,72 @@
+package main
+
+import (
+ "demo/deliver"
+ "fmt"
+ "time"
+)
+
+func req(url string, m deliver.Mode) {
+ p := deliver.NewClient(m, url)
+ var err error
+
+ msg := `hello, give me your data`
+
+ var t int64
+ var elapse int64
+ count := 0
+
+ for {
+
+ if err = p.Send([]byte(msg)); err != nil {
+
+ fmt.Printf("can't send message on push socket: %s\n", err.Error())
+ } else {
+
+ fmt.Printf("send msg length %d\n", len(msg))
+ }
+
+ if buf, err := p.Recv(); err != nil {
+ fmt.Println("recv error: ", err)
+ } else {
+ if t == 0 {
+ t = time.Now().UnixNano()
+ }
+ elapse = time.Now().UnixNano() - t
+
+ count++
+
+ if elapse > 1e9 {
+ fmt.Printf("NODE: RECEIVED \"%d\" data len %d, use \"%d\" ns\n",
+ count, len(buf), elapse)
+ elapse = 0
+ count = 0
+ t = 0
+ }
+
+ }
+ // time.Sleep(10 * time.Millisecond)
+ }
+
+}
+
+func rep(url string, m deliver.Mode) {
+ c := deliver.NewServer(m, url)
+
+ var msg []byte
+ var err error
+
+ buf := make([]byte, dLen)
+
+ for {
+ msg, err = c.Recv()
+ if err != nil {
+ fmt.Println("recv error : ", err, " msg ", msg)
+ continue
+ }
+ fmt.Println("recv msg: ", string(msg))
+
+ c.Send(buf)
+ // time.Sleep(10 * time.Millisecond)
+ }
+}
diff --git a/reqrep.go.orig b/reqrep.go.orig
new file mode 100644
index 0000000..4526dcd
--- /dev/null
+++ b/reqrep.go.orig
@@ -0,0 +1,88 @@
+package main
+
+import (
+ "demo/deliver"
+ "fmt"
+ "os"
+ "time"
+
+ "nanomsg.org/go-mangos"
+ "nanomsg.org/go-mangos/protocol/rep"
+ "nanomsg.org/go-mangos/protocol/req"
+ "nanomsg.org/go-mangos/transport/ipc"
+ "nanomsg.org/go-mangos/transport/tcp"
+)
+
+func fReq(url string, m deliver.Mode) {
+ var sock mangos.Socket
+ var err error
+
+ if sock, err = req.NewSocket(); err != nil {
+ die("can't get new req socket: %s", err.Error())
+ }
+ sock.AddTransport(ipc.NewTransport())
+ sock.AddTransport(tcp.NewTransport())
+
+ sock.SetOption(mangos.OptionRaw, true)
+ if err = sock.Listen(url); err != nil {
+ die("can't dial on req socket: %s", err.Error())
+ }
+ fmt.Printf("NODE1: SENDING DATE REQUEST %s\n", "DATE")
+
+ data := []byte("DATE")
+ message := mangos.NewMessage(len(data))
+ message.Body = data
+ if err = sock.SendMsg(message); err != nil {
+ die("can't send message on push socket: %s", err.Error())
+ }
+
+ var mesg *mangos.Message
+ if mesg, err = sock.RecvMsg(); err != nil {
+ die("can't receive date: %s", err.Error())
+ }
+ fmt.Printf("NODE1: RECEIVED DATE %s\n", string(mesg.Body))
+ sock.Close()
+
+}
+
+func fRep(url string, m deliver.Mode) {
+ var sock mangos.Socket
+ var err error
+ var msg *mangos.Message
+
+ if sock, err = rep.NewSocket(); err != nil {
+ die("can't get new rep socket: %s", err)
+ }
+ sock.AddTransport(ipc.NewTransport())
+ sock.AddTransport(tcp.NewTransport())
+ sock.SetOption(mangos.OptionRaw, true)
+ if err = sock.Dial(url); err != nil {
+ die("can't listen on rep socket: %s", err.Error())
+ }
+ for {
+ // Could also use sock.RecvMsg to get header
+ msg, err = sock.RecvMsg()
+ if string(msg.Body) == "DATE" { // no need to terminate
+ fmt.Println("NODE0: RECEIVED DATE REQUEST")
+ d := date()
+ fmt.Printf("NODE0: SENDING DATE %s\n", d)
+
+ data := []byte(d)
+ message := mangos.NewMessage(len(data))
+ message.Body = data
+ err = sock.SendMsg(message)
+ if err != nil {
+ die("can't send reply: %s", err.Error())
+ }
+ }
+ }
+}
+
+func die(format string, v ...interface{}) {
+ fmt.Fprintln(os.Stderr, fmt.Sprintf(format, v...))
+ os.Exit(1)
+}
+
+func date() string {
+ return time.Now().Format(time.ANSIC)
+}
--
Gitblit v1.8.0