From 26f699593d76d6fa4bcf320c84fd723343c4b5f4 Mon Sep 17 00:00:00 2001
From: zhangmeng <775834166@qq.com>
Date: 星期一, 20 五月 2019 15:19:15 +0800
Subject: [PATCH] add one pull multi push

---
 /dev/null      |   88 ------------
 1push-npull.go |  106 +++++++++++++++
 shm.go         |    4 
 main.go        |   92 ++++++++----
 npush-1pull.go |  101 ++++++++++++++
 5 files changed, 270 insertions(+), 121 deletions(-)

diff --git a/1push-npull.go b/1push-npull.go
new file mode 100644
index 0000000..31ea21a
--- /dev/null
+++ b/1push-npull.go
@@ -0,0 +1,106 @@
+package main
+
+import (
+	"demo/deliver"
+	"fmt"
+	"os"
+	"os/signal"
+	"time"
+
+	"golang.org/x/sys/unix"
+)
+
+func oneSenderImpl(s deliver.Deliver) {
+	var err error
+
+	buf := make([]byte, dLen)
+
+	for {
+		select {
+		case <-ctx.Done():
+			return
+		default:
+
+			if err = s.Send(buf); err != nil {
+
+				fmt.Printf("can't send message on push socket: %s\n", err.Error())
+			} else {
+
+				fmt.Printf("send msg length %d\n", len(buf))
+			}
+		}
+	}
+
+}
+
+func oneSender(url string, m deliver.Mode, args ...interface{}) {
+	s := deliver.NewServer(m, url, args...)
+
+	go oneSenderImpl(s)
+
+	c := make(chan os.Signal, 1)
+	signal.Notify(c, os.Interrupt, os.Kill, unix.SIGTERM)
+	<-c
+
+	cancel()
+	s.Close()
+}
+
+func nRecvImpl(c deliver.Deliver, index int) {
+
+	var msg []byte
+	var err error
+
+	var t int64
+	var elapse int64
+	count := 0
+
+	for {
+		select {
+		case <-ctx.Done():
+			return
+		default:
+			msg, err = c.Recv()
+			if err != nil {
+				fmt.Println("recv error : ", err)
+			}
+			if t == 0 {
+				t = time.Now().UnixNano()
+			}
+			elapse = time.Now().UnixNano() - t
+
+			count++
+
+			if elapse > 1e9 {
+				fmt.Printf("NODE-%d: RECEIVED \"%d\" data len %d, use \"%d\" ns\n",
+					index, count, len(msg), elapse)
+				elapse = 0
+				count = 0
+				t = 0
+			}
+		}
+
+	}
+}
+
+func nReciever(url string, m deliver.Mode, count int) {
+
+	var cs []deliver.Deliver
+
+	for i := 0; i < count; i++ {
+		c := deliver.NewClient(m, url)
+		cs = append(cs, c)
+
+		go nRecvImpl(c, i)
+	}
+
+	c := make(chan os.Signal, 1)
+	signal.Notify(c, os.Interrupt, os.Kill, unix.SIGTERM)
+	<-c
+
+	cancel()
+	for _, v := range cs {
+		v.Close()
+	}
+
+}
diff --git a/main.go b/main.go
index 24b9193..5c5eb53 100644
--- a/main.go
+++ b/main.go
@@ -3,6 +3,7 @@
 import (
 	"context"
 	"demo/deliver"
+	"flag"
 	"fmt"
 	"os"
 )
@@ -10,6 +11,58 @@
 const dLen = 12 * 1024 * 1024
 
 var ctx, cancel = context.WithCancel(context.Background())
+
+func senderMode(ipc string, m deliver.Mode, count int, one bool) {
+	if m == deliver.ReqRep {
+		req(ipc, m)
+	} else if m == deliver.Shm {
+		shmSender(ipc, 2, 32*1024*1024)
+	}
+
+	if one {
+		oneSender(ipc, m)
+	} else {
+		nSender(ipc, m, count)
+	}
+}
+
+func recvMode(ipc string, m deliver.Mode, count int, n bool) {
+	if m == deliver.ReqRep {
+		rep(ipc, m)
+	} else if m == deliver.Shm {
+		shmReciever(ipc, count)
+	}
+
+	if n {
+		nReciever(ipc, m, count)
+	} else {
+		oneReciever(ipc, m)
+	}
+}
+
+var (
+	proc         string
+	procCount    int
+	mode         string
+	ipc          string
+	oneSendnRecv bool
+)
+
+const (
+	act  = "act"
+	pass = "pass"
+)
+
+func init() {
+	flag.StringVar(&proc, "p", "act", "proc as sender")
+	flag.IntVar(&procCount, "c", 1, "proc run count")
+
+	flag.StringVar(&mode, "m", "pushpull", "proc run mode pushpull or pubsub etc.")
+
+	flag.StringVar(&ipc, "i", "ipc:///tmp/pic.ipc", "ipc label")
+
+	flag.BoolVar(&oneSendnRecv, "n", true, "one send n recv")
+}
 
 func modeType(t string) deliver.Mode {
 
@@ -28,39 +81,18 @@
 	return deliver.NONE
 }
 
-func senderMode(ipc string, m deliver.Mode) {
-	if m == deliver.ReqRep {
-		req(ipc, m)
-	} else if m == deliver.Shm {
-		shmSender(ipc, 2, 32*1024*1024)
-	}
-	sender(ipc, m)
-}
-
-func recvMode(ipc string, m deliver.Mode, strCount string) {
-	if m == deliver.ReqRep {
-		rep(ipc, m)
-	} else if m == deliver.Shm {
-		shmReciever(ipc, strCount)
-	}
-	reciever(ipc, m, strCount)
-}
-
 func main() {
-	if len(os.Args) > 3 && os.Args[1] == "producer" {
-		m := modeType(os.Args[2])
-		if m > deliver.ModeStart {
-			senderMode(os.Args[3], m)
+	flag.Parse()
+
+	m := modeType(mode)
+	if m > deliver.ModeStart {
+		if proc == act {
+			senderMode(ipc, m, procCount, oneSendnRecv)
+		} else {
+			recvMode(ipc, m, procCount, oneSendnRecv)
 		}
-		os.Exit(0)
 	}
-	if len(os.Args) > 3 && os.Args[1] == "consumer" {
-		m := modeType(os.Args[2])
-		if m > deliver.ModeStart {
-			recvMode(os.Args[3], m, os.Args[4])
-		}
-		os.Exit(0)
-	}
+
 	fmt.Fprintf(os.Stderr,
 		"Usage: pushpull push|pull <URL> <ARG> ...\n")
 	os.Exit(1)
diff --git a/npush-1pull.go b/npush-1pull.go
new file mode 100644
index 0000000..9d54439
--- /dev/null
+++ b/npush-1pull.go
@@ -0,0 +1,101 @@
+package main
+
+import (
+	"demo/deliver"
+	"fmt"
+	"os"
+	"os/signal"
+	"time"
+
+	"golang.org/x/sys/unix"
+)
+
+func nSenderImpl(s deliver.Deliver, index int) {
+	var err error
+
+	buf := make([]byte, dLen)
+
+	for {
+		select {
+		case <-ctx.Done():
+			return
+		default:
+			if err = s.Send(buf); err != nil {
+
+				fmt.Printf("%d can't send message on push socket: %s\n", index, err.Error())
+			} else {
+
+				fmt.Printf("%d send msg length %d\n", index, len(buf))
+			}
+		}
+	}
+
+}
+
+func nSender(url string, m deliver.Mode, count int, args ...interface{}) {
+
+	var cs []deliver.Deliver
+
+	for i := 0; i < count; i++ {
+		c := deliver.NewClient(m, url, args...)
+		cs = append(cs, c)
+
+		go nSenderImpl(c, i)
+	}
+
+	c := make(chan os.Signal, 1)
+	signal.Notify(c, os.Interrupt, os.Kill, unix.SIGTERM)
+	<-c
+	cancel()
+	for _, v := range cs {
+		v.Close()
+	}
+
+}
+
+func oneRecvImpl(c deliver.Deliver, index int) {
+
+	var msg []byte
+	var err error
+
+	var t int64
+	var elapse int64
+	count := 0
+
+	for {
+		msg, err = c.Recv()
+		if err != nil {
+			fmt.Println("recv error : ", err)
+		}
+		if t == 0 {
+			t = time.Now().UnixNano()
+		}
+		elapse = time.Now().UnixNano() - t
+
+		count++
+
+		if elapse > 1e9 {
+			fmt.Printf("NODE-%d: RECEIVED \"%d\" data len %d, use \"%d\" ns\n",
+				index, count, len(msg), elapse)
+			elapse = 0
+			count = 0
+			t = 0
+		}
+
+		// time.Sleep(10 * time.Millisecond)
+	}
+}
+
+func oneReciever(url string, m deliver.Mode) {
+
+	s := deliver.NewServer(m, url)
+
+	go oneRecvImpl(s, 0)
+
+	c := make(chan os.Signal, 1)
+	signal.Notify(c, os.Interrupt, os.Kill, unix.SIGTERM)
+	<-c
+
+	cancel()
+	s.Close()
+}
diff --git a/pushpull.go b/pushpull.go
deleted file mode 100644
index 9802c37..0000000
--- a/pushpull.go
+++ /dev/null
@@ -1,90 +0,0 @@
-package main
-
-import (
-	"demo/deliver"
-	"fmt"
-	"os"
-	"os/signal"
-	"strconv"
-	"time"
-
-	"golang.org/x/sys/unix"
-)
-
-func senderImpl(s deliver.Deliver) {
-	var err error
-
-	buf := make([]byte, dLen)
-
-	for {
-
-		if err = s.Send(buf); err != nil {
-
-			fmt.Printf("can't send message on push socket: %s\n", err.Error())
-		} else {
-
-			fmt.Printf("send msg length %d\n", len(buf))
-		}
-
-		// time.Sleep(10 * time.Millisecond)
-	}
-
-}
-func sender(url string, m deliver.Mode, args ...interface{}) {
-	s := deliver.NewServer(m, url, args...)
-
-	go senderImpl(s)
-
-	c := make(chan os.Signal, 1)
-	signal.Notify(c, os.Interrupt, os.Kill, unix.SIGTERM)
-	<-c
-
-	s.Close()
-}
-
-func recvImpl(url string, m deliver.Mode, index int) {
-	c := deliver.NewClient(m, url)
-
-	var msg []byte
-	var err error
-
-	var t int64
-	var elapse int64
-	count := 0
-
-	for {
-		msg, err = c.Recv()
-		if err != nil {
-			fmt.Println("recv error : ", err)
-		}
-		if t == 0 {
-			t = time.Now().UnixNano()
-		}
-		elapse = time.Now().UnixNano() - t
-
-		count++
-
-		if elapse > 1e9 {
-			fmt.Printf("NODE-%d: RECEIVED \"%d\" data len %d, use \"%d\" ns\n",
-				index, count, len(msg), elapse)
-			elapse = 0
-			count = 0
-			t = 0
-		}
-
-		// time.Sleep(10 * time.Millisecond)
-	}
-}
-
-func reciever(url string, m deliver.Mode, strCount string) {
-	count, _ := strconv.Atoi(strCount)
-
-	for i := 0; i < count; i++ {
-		go recvImpl(url, m, i)
-	}
-
-	for {
-		time.Sleep(2 * time.Second)
-	}
-
-}
diff --git a/reqrep.go.orig b/reqrep.go.orig
deleted file mode 100644
index 4526dcd..0000000
--- a/reqrep.go.orig
+++ /dev/null
@@ -1,88 +0,0 @@
-package main
-
-import (
-	"demo/deliver"
-	"fmt"
-	"os"
-	"time"
-
-	"nanomsg.org/go-mangos"
-	"nanomsg.org/go-mangos/protocol/rep"
-	"nanomsg.org/go-mangos/protocol/req"
-	"nanomsg.org/go-mangos/transport/ipc"
-	"nanomsg.org/go-mangos/transport/tcp"
-)
-
-func fReq(url string, m deliver.Mode) {
-	var sock mangos.Socket
-	var err error
-
-	if sock, err = req.NewSocket(); err != nil {
-		die("can't get new req socket: %s", err.Error())
-	}
-	sock.AddTransport(ipc.NewTransport())
-	sock.AddTransport(tcp.NewTransport())
-
-	sock.SetOption(mangos.OptionRaw, true)
-	if err = sock.Listen(url); err != nil {
-		die("can't dial on req socket: %s", err.Error())
-	}
-	fmt.Printf("NODE1: SENDING DATE REQUEST %s\n", "DATE")
-
-	data := []byte("DATE")
-	message := mangos.NewMessage(len(data))
-	message.Body = data
-	if err = sock.SendMsg(message); err != nil {
-		die("can't send message on push socket: %s", err.Error())
-	}
-
-	var mesg *mangos.Message
-	if mesg, err = sock.RecvMsg(); err != nil {
-		die("can't receive date: %s", err.Error())
-	}
-	fmt.Printf("NODE1: RECEIVED DATE %s\n", string(mesg.Body))
-	sock.Close()
-
-}
-
-func fRep(url string, m deliver.Mode) {
-	var sock mangos.Socket
-	var err error
-	var msg *mangos.Message
-
-	if sock, err = rep.NewSocket(); err != nil {
-		die("can't get new rep socket: %s", err)
-	}
-	sock.AddTransport(ipc.NewTransport())
-	sock.AddTransport(tcp.NewTransport())
-	sock.SetOption(mangos.OptionRaw, true)
-	if err = sock.Dial(url); err != nil {
-		die("can't listen on rep socket: %s", err.Error())
-	}
-	for {
-		// Could also use sock.RecvMsg to get header
-		msg, err = sock.RecvMsg()
-		if string(msg.Body) == "DATE" { // no need to terminate
-			fmt.Println("NODE0: RECEIVED DATE REQUEST")
-			d := date()
-			fmt.Printf("NODE0: SENDING DATE %s\n", d)
-
-			data := []byte(d)
-			message := mangos.NewMessage(len(data))
-			message.Body = data
-			err = sock.SendMsg(message)
-			if err != nil {
-				die("can't send reply: %s", err.Error())
-			}
-		}
-	}
-}
-
-func die(format string, v ...interface{}) {
-	fmt.Fprintln(os.Stderr, fmt.Sprintf(format, v...))
-	os.Exit(1)
-}
-
-func date() string {
-	return time.Now().Format(time.ANSIC)
-}
diff --git a/shm.go b/shm.go
index af18432..9611975 100644
--- a/shm.go
+++ b/shm.go
@@ -5,7 +5,6 @@
 	"fmt"
 	"os"
 	"os/signal"
-	"strconv"
 	"time"
 
 	"golang.org/x/sys/unix"
@@ -88,8 +87,7 @@
 	}
 }
 
-func shmReciever(url string, strCount string) {
-	count, _ := strconv.Atoi(strCount)
+func shmReciever(url string, count int) {
 
 	var cs []deliver.Deliver
 	for i := 0; i < count; i++ {

--
Gitblit v1.8.0