From 26f699593d76d6fa4bcf320c84fd723343c4b5f4 Mon Sep 17 00:00:00 2001
From: zhangmeng <775834166@qq.com>
Date: 星期一, 20 五月 2019 15:19:15 +0800
Subject: [PATCH] add one pull multi push
---
/dev/null | 88 ------------
1push-npull.go | 106 +++++++++++++++
shm.go | 4
main.go | 92 ++++++++----
npush-1pull.go | 101 ++++++++++++++
5 files changed, 270 insertions(+), 121 deletions(-)
diff --git a/1push-npull.go b/1push-npull.go
new file mode 100644
index 0000000..31ea21a
--- /dev/null
+++ b/1push-npull.go
@@ -0,0 +1,106 @@
+package main
+
+import (
+ "demo/deliver"
+ "fmt"
+ "os"
+ "os/signal"
+ "time"
+
+ "golang.org/x/sys/unix"
+)
+
+func oneSenderImpl(s deliver.Deliver) {
+ var err error
+
+ buf := make([]byte, dLen)
+
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ default:
+
+ if err = s.Send(buf); err != nil {
+
+ fmt.Printf("can't send message on push socket: %s\n", err.Error())
+ } else {
+
+ fmt.Printf("send msg length %d\n", len(buf))
+ }
+ }
+ }
+
+}
+
+func oneSender(url string, m deliver.Mode, args ...interface{}) {
+ s := deliver.NewServer(m, url, args...)
+
+ go oneSenderImpl(s)
+
+ c := make(chan os.Signal, 1)
+ signal.Notify(c, os.Interrupt, os.Kill, unix.SIGTERM)
+ <-c
+
+ cancel()
+ s.Close()
+}
+
+func nRecvImpl(c deliver.Deliver, index int) {
+
+ var msg []byte
+ var err error
+
+ var t int64
+ var elapse int64
+ count := 0
+
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ default:
+ msg, err = c.Recv()
+ if err != nil {
+ fmt.Println("recv error : ", err)
+ }
+ if t == 0 {
+ t = time.Now().UnixNano()
+ }
+ elapse = time.Now().UnixNano() - t
+
+ count++
+
+ if elapse > 1e9 {
+ fmt.Printf("NODE-%d: RECEIVED \"%d\" data len %d, use \"%d\" ns\n",
+ index, count, len(msg), elapse)
+ elapse = 0
+ count = 0
+ t = 0
+ }
+ }
+
+ }
+}
+
+func nReciever(url string, m deliver.Mode, count int) {
+
+ var cs []deliver.Deliver
+
+ for i := 0; i < count; i++ {
+ c := deliver.NewClient(m, url)
+ cs = append(cs, c)
+
+ go nRecvImpl(c, i)
+ }
+
+ c := make(chan os.Signal, 1)
+ signal.Notify(c, os.Interrupt, os.Kill, unix.SIGTERM)
+ <-c
+
+ cancel()
+ for _, v := range cs {
+ v.Close()
+ }
+
+}
diff --git a/main.go b/main.go
index 24b9193..5c5eb53 100644
--- a/main.go
+++ b/main.go
@@ -3,6 +3,7 @@
import (
"context"
"demo/deliver"
+ "flag"
"fmt"
"os"
)
@@ -10,6 +11,58 @@
const dLen = 12 * 1024 * 1024
var ctx, cancel = context.WithCancel(context.Background())
+
+func senderMode(ipc string, m deliver.Mode, count int, one bool) {
+ if m == deliver.ReqRep {
+ req(ipc, m)
+ } else if m == deliver.Shm {
+ shmSender(ipc, 2, 32*1024*1024)
+ }
+
+ if one {
+ oneSender(ipc, m)
+ } else {
+ nSender(ipc, m, count)
+ }
+}
+
+func recvMode(ipc string, m deliver.Mode, count int, n bool) {
+ if m == deliver.ReqRep {
+ rep(ipc, m)
+ } else if m == deliver.Shm {
+ shmReciever(ipc, count)
+ }
+
+ if n {
+ nReciever(ipc, m, count)
+ } else {
+ oneReciever(ipc, m)
+ }
+}
+
+var (
+ proc string
+ procCount int
+ mode string
+ ipc string
+ oneSendnRecv bool
+)
+
+const (
+ act = "act"
+ pass = "pass"
+)
+
+func init() {
+ flag.StringVar(&proc, "p", "act", "proc as sender")
+ flag.IntVar(&procCount, "c", 1, "proc run count")
+
+ flag.StringVar(&mode, "m", "pushpull", "proc run mode pushpull or pubsub etc.")
+
+ flag.StringVar(&ipc, "i", "ipc:///tmp/pic.ipc", "ipc label")
+
+ flag.BoolVar(&oneSendnRecv, "n", true, "one send n recv")
+}
func modeType(t string) deliver.Mode {
@@ -28,39 +81,18 @@
return deliver.NONE
}
-func senderMode(ipc string, m deliver.Mode) {
- if m == deliver.ReqRep {
- req(ipc, m)
- } else if m == deliver.Shm {
- shmSender(ipc, 2, 32*1024*1024)
- }
- sender(ipc, m)
-}
-
-func recvMode(ipc string, m deliver.Mode, strCount string) {
- if m == deliver.ReqRep {
- rep(ipc, m)
- } else if m == deliver.Shm {
- shmReciever(ipc, strCount)
- }
- reciever(ipc, m, strCount)
-}
-
func main() {
- if len(os.Args) > 3 && os.Args[1] == "producer" {
- m := modeType(os.Args[2])
- if m > deliver.ModeStart {
- senderMode(os.Args[3], m)
+ flag.Parse()
+
+ m := modeType(mode)
+ if m > deliver.ModeStart {
+ if proc == act {
+ senderMode(ipc, m, procCount, oneSendnRecv)
+ } else {
+ recvMode(ipc, m, procCount, oneSendnRecv)
}
- os.Exit(0)
}
- if len(os.Args) > 3 && os.Args[1] == "consumer" {
- m := modeType(os.Args[2])
- if m > deliver.ModeStart {
- recvMode(os.Args[3], m, os.Args[4])
- }
- os.Exit(0)
- }
+
fmt.Fprintf(os.Stderr,
"Usage: pushpull push|pull <URL> <ARG> ...\n")
os.Exit(1)
diff --git a/npush-1pull.go b/npush-1pull.go
new file mode 100644
index 0000000..9d54439
--- /dev/null
+++ b/npush-1pull.go
@@ -0,0 +1,101 @@
+package main
+
+import (
+ "demo/deliver"
+ "fmt"
+ "os"
+ "os/signal"
+ "time"
+
+ "golang.org/x/sys/unix"
+)
+
+func nSenderImpl(s deliver.Deliver, index int) {
+ var err error
+
+ buf := make([]byte, dLen)
+
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ default:
+ if err = s.Send(buf); err != nil {
+
+ fmt.Printf("%d can't send message on push socket: %s\n", index, err.Error())
+ } else {
+
+ fmt.Printf("%d send msg length %d\n", index, len(buf))
+ }
+ }
+ }
+
+}
+
+func nSender(url string, m deliver.Mode, count int, args ...interface{}) {
+
+ var cs []deliver.Deliver
+
+ for i := 0; i < count; i++ {
+ c := deliver.NewClient(m, url, args...)
+ cs = append(cs, c)
+
+ go nSenderImpl(c, i)
+ }
+
+ c := make(chan os.Signal, 1)
+ signal.Notify(c, os.Interrupt, os.Kill, unix.SIGTERM)
+ <-c
+ cancel()
+ for _, v := range cs {
+ v.Close()
+ }
+
+}
+
+func oneRecvImpl(c deliver.Deliver, index int) {
+
+ var msg []byte
+ var err error
+
+ var t int64
+ var elapse int64
+ count := 0
+
+ for {
+ msg, err = c.Recv()
+ if err != nil {
+ fmt.Println("recv error : ", err)
+ }
+ if t == 0 {
+ t = time.Now().UnixNano()
+ }
+ elapse = time.Now().UnixNano() - t
+
+ count++
+
+ if elapse > 1e9 {
+ fmt.Printf("NODE-%d: RECEIVED \"%d\" data len %d, use \"%d\" ns\n",
+ index, count, len(msg), elapse)
+ elapse = 0
+ count = 0
+ t = 0
+ }
+
+ // time.Sleep(10 * time.Millisecond)
+ }
+}
+
+func oneReciever(url string, m deliver.Mode) {
+
+ s := deliver.NewServer(m, url)
+
+ go oneRecvImpl(s, 0)
+
+ c := make(chan os.Signal, 1)
+ signal.Notify(c, os.Interrupt, os.Kill, unix.SIGTERM)
+ <-c
+
+ cancel()
+ s.Close()
+}
diff --git a/pushpull.go b/pushpull.go
deleted file mode 100644
index 9802c37..0000000
--- a/pushpull.go
+++ /dev/null
@@ -1,90 +0,0 @@
-package main
-
-import (
- "demo/deliver"
- "fmt"
- "os"
- "os/signal"
- "strconv"
- "time"
-
- "golang.org/x/sys/unix"
-)
-
-func senderImpl(s deliver.Deliver) {
- var err error
-
- buf := make([]byte, dLen)
-
- for {
-
- if err = s.Send(buf); err != nil {
-
- fmt.Printf("can't send message on push socket: %s\n", err.Error())
- } else {
-
- fmt.Printf("send msg length %d\n", len(buf))
- }
-
- // time.Sleep(10 * time.Millisecond)
- }
-
-}
-func sender(url string, m deliver.Mode, args ...interface{}) {
- s := deliver.NewServer(m, url, args...)
-
- go senderImpl(s)
-
- c := make(chan os.Signal, 1)
- signal.Notify(c, os.Interrupt, os.Kill, unix.SIGTERM)
- <-c
-
- s.Close()
-}
-
-func recvImpl(url string, m deliver.Mode, index int) {
- c := deliver.NewClient(m, url)
-
- var msg []byte
- var err error
-
- var t int64
- var elapse int64
- count := 0
-
- for {
- msg, err = c.Recv()
- if err != nil {
- fmt.Println("recv error : ", err)
- }
- if t == 0 {
- t = time.Now().UnixNano()
- }
- elapse = time.Now().UnixNano() - t
-
- count++
-
- if elapse > 1e9 {
- fmt.Printf("NODE-%d: RECEIVED \"%d\" data len %d, use \"%d\" ns\n",
- index, count, len(msg), elapse)
- elapse = 0
- count = 0
- t = 0
- }
-
- // time.Sleep(10 * time.Millisecond)
- }
-}
-
-func reciever(url string, m deliver.Mode, strCount string) {
- count, _ := strconv.Atoi(strCount)
-
- for i := 0; i < count; i++ {
- go recvImpl(url, m, i)
- }
-
- for {
- time.Sleep(2 * time.Second)
- }
-
-}
diff --git a/reqrep.go.orig b/reqrep.go.orig
deleted file mode 100644
index 4526dcd..0000000
--- a/reqrep.go.orig
+++ /dev/null
@@ -1,88 +0,0 @@
-package main
-
-import (
- "demo/deliver"
- "fmt"
- "os"
- "time"
-
- "nanomsg.org/go-mangos"
- "nanomsg.org/go-mangos/protocol/rep"
- "nanomsg.org/go-mangos/protocol/req"
- "nanomsg.org/go-mangos/transport/ipc"
- "nanomsg.org/go-mangos/transport/tcp"
-)
-
-func fReq(url string, m deliver.Mode) {
- var sock mangos.Socket
- var err error
-
- if sock, err = req.NewSocket(); err != nil {
- die("can't get new req socket: %s", err.Error())
- }
- sock.AddTransport(ipc.NewTransport())
- sock.AddTransport(tcp.NewTransport())
-
- sock.SetOption(mangos.OptionRaw, true)
- if err = sock.Listen(url); err != nil {
- die("can't dial on req socket: %s", err.Error())
- }
- fmt.Printf("NODE1: SENDING DATE REQUEST %s\n", "DATE")
-
- data := []byte("DATE")
- message := mangos.NewMessage(len(data))
- message.Body = data
- if err = sock.SendMsg(message); err != nil {
- die("can't send message on push socket: %s", err.Error())
- }
-
- var mesg *mangos.Message
- if mesg, err = sock.RecvMsg(); err != nil {
- die("can't receive date: %s", err.Error())
- }
- fmt.Printf("NODE1: RECEIVED DATE %s\n", string(mesg.Body))
- sock.Close()
-
-}
-
-func fRep(url string, m deliver.Mode) {
- var sock mangos.Socket
- var err error
- var msg *mangos.Message
-
- if sock, err = rep.NewSocket(); err != nil {
- die("can't get new rep socket: %s", err)
- }
- sock.AddTransport(ipc.NewTransport())
- sock.AddTransport(tcp.NewTransport())
- sock.SetOption(mangos.OptionRaw, true)
- if err = sock.Dial(url); err != nil {
- die("can't listen on rep socket: %s", err.Error())
- }
- for {
- // Could also use sock.RecvMsg to get header
- msg, err = sock.RecvMsg()
- if string(msg.Body) == "DATE" { // no need to terminate
- fmt.Println("NODE0: RECEIVED DATE REQUEST")
- d := date()
- fmt.Printf("NODE0: SENDING DATE %s\n", d)
-
- data := []byte(d)
- message := mangos.NewMessage(len(data))
- message.Body = data
- err = sock.SendMsg(message)
- if err != nil {
- die("can't send reply: %s", err.Error())
- }
- }
- }
-}
-
-func die(format string, v ...interface{}) {
- fmt.Fprintln(os.Stderr, fmt.Sprintf(format, v...))
- os.Exit(1)
-}
-
-func date() string {
- return time.Now().Format(time.ANSIC)
-}
diff --git a/shm.go b/shm.go
index af18432..9611975 100644
--- a/shm.go
+++ b/shm.go
@@ -5,7 +5,6 @@
"fmt"
"os"
"os/signal"
- "strconv"
"time"
"golang.org/x/sys/unix"
@@ -88,8 +87,7 @@
}
}
-func shmReciever(url string, strCount string) {
- count, _ := strconv.Atoi(strCount)
+func shmReciever(url string, count int) {
var cs []deliver.Deliver
for i := 0; i < count; i++ {
--
Gitblit v1.8.0