From 22125ca10867152617cc4f42f403a0f6e37648a4 Mon Sep 17 00:00:00 2001
From: zhangmeng <775834166@qq.com>
Date: 星期五, 17 五月 2019 18:08:23 +0800
Subject: [PATCH] add reqrep

---
 reqrep.go.orig |   88 ++++++++++++
 deliver        |    2 
 main.go        |  124 ++++-------------
 pushpull.go    |   90 ++++++++++++
 reqrep.go      |   72 ++++++++++
 5 files changed, 282 insertions(+), 94 deletions(-)

diff --git a/deliver b/deliver
index 306588f..d23f54e 160000
--- a/deliver
+++ b/deliver
@@ -1 +1 @@
-Subproject commit 306588f52747268250997a9255ef19583bbd615c
+Subproject commit d23f54e337d12fb4e6d5a0a5e1f041a51005e10c
diff --git a/main.go b/main.go
index 4a370b7..aed056c 100644
--- a/main.go
+++ b/main.go
@@ -4,114 +4,52 @@
 	"demo/deliver"
 	"fmt"
 	"os"
-	"os/signal"
-	"strconv"
-	"time"
-
-	"golang.org/x/sys/unix"
 )
 
 const dLen = 12 * 1024 * 1024
 
-var mode = deliver.PushPull
+func modeType(t string) deliver.Mode {
 
-func senderImpl(s deliver.Deliver) {
-	var err error
-
-	buf := make([]byte, dLen)
-
-	for {
-
-		if err = s.Send(buf); err != nil {
-
-			fmt.Printf("can't send message on push socket: %s\n", err.Error())
-		} else {
-
-			fmt.Printf("send msg length %d\n", len(buf))
-		}
-
-		// time.Sleep(10 * time.Millisecond)
-	}
-
-}
-func sender(url string, args ...interface{}) {
-	s := deliver.NewServer(deliver.Mode(mode), url, args...)
-
-	go senderImpl(s)
-
-	c := make(chan os.Signal, 1)
-	signal.Notify(c, os.Interrupt, os.Kill, unix.SIGTERM)
-	<-c
-
-	s.Close()
-}
-
-func recvImpl(url string, index int) {
-	c := deliver.NewClient(deliver.Mode(mode), url)
-
-	var msg []byte
-	var err error
-
-	var t int64
-	var elapse int64
-	count := 0
-
-	for {
-		msg, err = c.Recv()
-		if err != nil {
-			fmt.Println("recv error : ", err)
-		}
-		if t == 0 {
-			t = time.Now().UnixNano()
-		}
-		elapse = time.Now().UnixNano() - t
-
-		count++
-
-		if elapse > 1e9 {
-			fmt.Printf("NODE-%d: RECEIVED \"%d\" data len %d, use \"%d\" ns\n",
-				index, count, len(msg), elapse)
-			elapse = 0
-			count = 0
-			t = 0
-		}
-
-		// time.Sleep(10 * time.Millisecond)
-	}
-}
-
-func reciever(url string, strCount string) {
-	count, _ := strconv.Atoi(strCount)
-
-	for i := 0; i < count; i++ {
-		go recvImpl(url, i)
-	}
-
-	for {
-		time.Sleep(2 * time.Second)
-	}
-
-}
-
-func modeType(t string) {
 	if t == "pushpull" {
-		mode = deliver.PushPull
+		return deliver.PushPull
 	} else if t == "pubsub" {
-		mode = deliver.PubSub
+		return deliver.PubSub
 	} else if t == "pair" {
-		mode = deliver.Pair
+		return deliver.Pair
+	} else if t == "reqrep" {
+		return deliver.ReqRep
 	}
+
+	return deliver.Mode(-1)
 }
+
+func senderMode(ipc string, m deliver.Mode) {
+	if m == deliver.ReqRep {
+		req(ipc, m)
+	}
+	sender(ipc, m)
+}
+
+func recvMode(ipc string, m deliver.Mode, strCount string) {
+	if m == deliver.ReqRep {
+		rep(ipc, m)
+	}
+	reciever(ipc, m, strCount)
+}
+
 func main() {
 	if len(os.Args) > 3 && os.Args[1] == "producer" {
-		modeType(os.Args[2])
-		sender(os.Args[3])
+		m := modeType(os.Args[2])
+		if m > deliver.ModeStart {
+			senderMode(os.Args[3], m)
+		}
 		os.Exit(0)
 	}
 	if len(os.Args) > 3 && os.Args[1] == "consumer" {
-		modeType(os.Args[2])
-
-		reciever(os.Args[3], os.Args[4])
+		m := modeType(os.Args[2])
+		if m > deliver.ModeStart {
+			recvMode(os.Args[3], m, os.Args[4])
+		}
 		os.Exit(0)
 	}
 	fmt.Fprintf(os.Stderr,
diff --git a/pushpull.go b/pushpull.go
new file mode 100644
index 0000000..9802c37
--- /dev/null
+++ b/pushpull.go
@@ -0,0 +1,90 @@
+package main
+
+import (
+	"demo/deliver"
+	"fmt"
+	"os"
+	"os/signal"
+	"strconv"
+	"time"
+
+	"golang.org/x/sys/unix"
+)
+
+func senderImpl(s deliver.Deliver) {
+	var err error
+
+	buf := make([]byte, dLen)
+
+	for {
+
+		if err = s.Send(buf); err != nil {
+
+			fmt.Printf("can't send message on push socket: %s\n", err.Error())
+		} else {
+
+			fmt.Printf("send msg length %d\n", len(buf))
+		}
+
+		// time.Sleep(10 * time.Millisecond)
+	}
+
+}
+func sender(url string, m deliver.Mode, args ...interface{}) {
+	s := deliver.NewServer(m, url, args...)
+
+	go senderImpl(s)
+
+	c := make(chan os.Signal, 1)
+	signal.Notify(c, os.Interrupt, os.Kill, unix.SIGTERM)
+	<-c
+
+	s.Close()
+}
+
+func recvImpl(url string, m deliver.Mode, index int) {
+	c := deliver.NewClient(m, url)
+
+	var msg []byte
+	var err error
+
+	var t int64
+	var elapse int64
+	count := 0
+
+	for {
+		msg, err = c.Recv()
+		if err != nil {
+			fmt.Println("recv error : ", err)
+		}
+		if t == 0 {
+			t = time.Now().UnixNano()
+		}
+		elapse = time.Now().UnixNano() - t
+
+		count++
+
+		if elapse > 1e9 {
+			fmt.Printf("NODE-%d: RECEIVED \"%d\" data len %d, use \"%d\" ns\n",
+				index, count, len(msg), elapse)
+			elapse = 0
+			count = 0
+			t = 0
+		}
+
+		// time.Sleep(10 * time.Millisecond)
+	}
+}
+
+func reciever(url string, m deliver.Mode, strCount string) {
+	count, _ := strconv.Atoi(strCount)
+
+	for i := 0; i < count; i++ {
+		go recvImpl(url, m, i)
+	}
+
+	for {
+		time.Sleep(2 * time.Second)
+	}
+
+}
diff --git a/reqrep.go b/reqrep.go
new file mode 100644
index 0000000..468ff27
--- /dev/null
+++ b/reqrep.go
@@ -0,0 +1,72 @@
+package main
+
+import (
+	"demo/deliver"
+	"fmt"
+	"time"
+)
+
+func req(url string, m deliver.Mode) {
+	p := deliver.NewClient(m, url)
+	var err error
+
+	msg := `hello, give me your data`
+
+	var t int64
+	var elapse int64
+	count := 0
+
+	for {
+
+		if err = p.Send([]byte(msg)); err != nil {
+
+			fmt.Printf("can't send message on push socket: %s\n", err.Error())
+		} else {
+
+			fmt.Printf("send msg length %d\n", len(msg))
+		}
+
+		if buf, err := p.Recv(); err != nil {
+			fmt.Println("recv error: ", err)
+		} else {
+			if t == 0 {
+				t = time.Now().UnixNano()
+			}
+			elapse = time.Now().UnixNano() - t
+
+			count++
+
+			if elapse > 1e9 {
+				fmt.Printf("NODE: RECEIVED \"%d\" data len %d, use \"%d\" ns\n",
+					count, len(buf), elapse)
+				elapse = 0
+				count = 0
+				t = 0
+			}
+
+		}
+		// time.Sleep(10 * time.Millisecond)
+	}
+
+}
+
+func rep(url string, m deliver.Mode) {
+	c := deliver.NewServer(m, url)
+
+	var msg []byte
+	var err error
+
+	buf := make([]byte, dLen)
+
+	for {
+		msg, err = c.Recv()
+		if err != nil {
+			fmt.Println("recv error : ", err, " msg ", msg)
+			continue
+		}
+		fmt.Println("recv msg: ", string(msg))
+
+		c.Send(buf)
+		// time.Sleep(10 * time.Millisecond)
+	}
+}
diff --git a/reqrep.go.orig b/reqrep.go.orig
new file mode 100644
index 0000000..4526dcd
--- /dev/null
+++ b/reqrep.go.orig
@@ -0,0 +1,88 @@
+package main
+
+import (
+	"demo/deliver"
+	"fmt"
+	"os"
+	"time"
+
+	"nanomsg.org/go-mangos"
+	"nanomsg.org/go-mangos/protocol/rep"
+	"nanomsg.org/go-mangos/protocol/req"
+	"nanomsg.org/go-mangos/transport/ipc"
+	"nanomsg.org/go-mangos/transport/tcp"
+)
+
+func fReq(url string, m deliver.Mode) {
+	var sock mangos.Socket
+	var err error
+
+	if sock, err = req.NewSocket(); err != nil {
+		die("can't get new req socket: %s", err.Error())
+	}
+	sock.AddTransport(ipc.NewTransport())
+	sock.AddTransport(tcp.NewTransport())
+
+	sock.SetOption(mangos.OptionRaw, true)
+	if err = sock.Listen(url); err != nil {
+		die("can't dial on req socket: %s", err.Error())
+	}
+	fmt.Printf("NODE1: SENDING DATE REQUEST %s\n", "DATE")
+
+	data := []byte("DATE")
+	message := mangos.NewMessage(len(data))
+	message.Body = data
+	if err = sock.SendMsg(message); err != nil {
+		die("can't send message on push socket: %s", err.Error())
+	}
+
+	var mesg *mangos.Message
+	if mesg, err = sock.RecvMsg(); err != nil {
+		die("can't receive date: %s", err.Error())
+	}
+	fmt.Printf("NODE1: RECEIVED DATE %s\n", string(mesg.Body))
+	sock.Close()
+
+}
+
+func fRep(url string, m deliver.Mode) {
+	var sock mangos.Socket
+	var err error
+	var msg *mangos.Message
+
+	if sock, err = rep.NewSocket(); err != nil {
+		die("can't get new rep socket: %s", err)
+	}
+	sock.AddTransport(ipc.NewTransport())
+	sock.AddTransport(tcp.NewTransport())
+	sock.SetOption(mangos.OptionRaw, true)
+	if err = sock.Dial(url); err != nil {
+		die("can't listen on rep socket: %s", err.Error())
+	}
+	for {
+		// Could also use sock.RecvMsg to get header
+		msg, err = sock.RecvMsg()
+		if string(msg.Body) == "DATE" { // no need to terminate
+			fmt.Println("NODE0: RECEIVED DATE REQUEST")
+			d := date()
+			fmt.Printf("NODE0: SENDING DATE %s\n", d)
+
+			data := []byte(d)
+			message := mangos.NewMessage(len(data))
+			message.Body = data
+			err = sock.SendMsg(message)
+			if err != nil {
+				die("can't send reply: %s", err.Error())
+			}
+		}
+	}
+}
+
+func die(format string, v ...interface{}) {
+	fmt.Fprintln(os.Stderr, fmt.Sprintf(format, v...))
+	os.Exit(1)
+}
+
+func date() string {
+	return time.Now().Format(time.ANSIC)
+}

--
Gitblit v1.8.0