From 22125ca10867152617cc4f42f403a0f6e37648a4 Mon Sep 17 00:00:00 2001
From: zhangmeng <775834166@qq.com>
Date: 星期五, 17 五月 2019 18:08:23 +0800
Subject: [PATCH] add reqrep

---
 main.go |  124 ++++++++++-------------------------------
 1 files changed, 31 insertions(+), 93 deletions(-)

diff --git a/main.go b/main.go
index 4a370b7..aed056c 100644
--- a/main.go
+++ b/main.go
@@ -4,114 +4,52 @@
 	"demo/deliver"
 	"fmt"
 	"os"
-	"os/signal"
-	"strconv"
-	"time"
-
-	"golang.org/x/sys/unix"
 )
 
 const dLen = 12 * 1024 * 1024
 
-var mode = deliver.PushPull
+func modeType(t string) deliver.Mode {
 
-func senderImpl(s deliver.Deliver) {
-	var err error
-
-	buf := make([]byte, dLen)
-
-	for {
-
-		if err = s.Send(buf); err != nil {
-
-			fmt.Printf("can't send message on push socket: %s\n", err.Error())
-		} else {
-
-			fmt.Printf("send msg length %d\n", len(buf))
-		}
-
-		// time.Sleep(10 * time.Millisecond)
-	}
-
-}
-func sender(url string, args ...interface{}) {
-	s := deliver.NewServer(deliver.Mode(mode), url, args...)
-
-	go senderImpl(s)
-
-	c := make(chan os.Signal, 1)
-	signal.Notify(c, os.Interrupt, os.Kill, unix.SIGTERM)
-	<-c
-
-	s.Close()
-}
-
-func recvImpl(url string, index int) {
-	c := deliver.NewClient(deliver.Mode(mode), url)
-
-	var msg []byte
-	var err error
-
-	var t int64
-	var elapse int64
-	count := 0
-
-	for {
-		msg, err = c.Recv()
-		if err != nil {
-			fmt.Println("recv error : ", err)
-		}
-		if t == 0 {
-			t = time.Now().UnixNano()
-		}
-		elapse = time.Now().UnixNano() - t
-
-		count++
-
-		if elapse > 1e9 {
-			fmt.Printf("NODE-%d: RECEIVED \"%d\" data len %d, use \"%d\" ns\n",
-				index, count, len(msg), elapse)
-			elapse = 0
-			count = 0
-			t = 0
-		}
-
-		// time.Sleep(10 * time.Millisecond)
-	}
-}
-
-func reciever(url string, strCount string) {
-	count, _ := strconv.Atoi(strCount)
-
-	for i := 0; i < count; i++ {
-		go recvImpl(url, i)
-	}
-
-	for {
-		time.Sleep(2 * time.Second)
-	}
-
-}
-
-func modeType(t string) {
 	if t == "pushpull" {
-		mode = deliver.PushPull
+		return deliver.PushPull
 	} else if t == "pubsub" {
-		mode = deliver.PubSub
+		return deliver.PubSub
 	} else if t == "pair" {
-		mode = deliver.Pair
+		return deliver.Pair
+	} else if t == "reqrep" {
+		return deliver.ReqRep
 	}
+
+	return deliver.Mode(-1)
 }
+
+func senderMode(ipc string, m deliver.Mode) {
+	if m == deliver.ReqRep {
+		req(ipc, m)
+	}
+	sender(ipc, m)
+}
+
+func recvMode(ipc string, m deliver.Mode, strCount string) {
+	if m == deliver.ReqRep {
+		rep(ipc, m)
+	}
+	reciever(ipc, m, strCount)
+}
+
 func main() {
 	if len(os.Args) > 3 && os.Args[1] == "producer" {
-		modeType(os.Args[2])
-		sender(os.Args[3])
+		m := modeType(os.Args[2])
+		if m > deliver.ModeStart {
+			senderMode(os.Args[3], m)
+		}
 		os.Exit(0)
 	}
 	if len(os.Args) > 3 && os.Args[1] == "consumer" {
-		modeType(os.Args[2])
-
-		reciever(os.Args[3], os.Args[4])
+		m := modeType(os.Args[2])
+		if m > deliver.ModeStart {
+			recvMode(os.Args[3], m, os.Args[4])
+		}
 		os.Exit(0)
 	}
 	fmt.Fprintf(os.Stderr,

--
Gitblit v1.8.0