From 26f699593d76d6fa4bcf320c84fd723343c4b5f4 Mon Sep 17 00:00:00 2001 From: zhangmeng <775834166@qq.com> Date: 星期一, 20 五月 2019 15:19:15 +0800 Subject: [PATCH] add one pull multi push --- /dev/null | 88 ------------ 1push-npull.go | 106 +++++++++++++++ shm.go | 4 main.go | 92 ++++++++---- npush-1pull.go | 101 ++++++++++++++ 5 files changed, 270 insertions(+), 121 deletions(-) diff --git a/1push-npull.go b/1push-npull.go new file mode 100644 index 0000000..31ea21a --- /dev/null +++ b/1push-npull.go @@ -0,0 +1,106 @@ +package main + +import ( + "demo/deliver" + "fmt" + "os" + "os/signal" + "time" + + "golang.org/x/sys/unix" +) + +func oneSenderImpl(s deliver.Deliver) { + var err error + + buf := make([]byte, dLen) + + for { + select { + case <-ctx.Done(): + return + default: + + if err = s.Send(buf); err != nil { + + fmt.Printf("can't send message on push socket: %s\n", err.Error()) + } else { + + fmt.Printf("send msg length %d\n", len(buf)) + } + } + } + +} + +func oneSender(url string, m deliver.Mode, args ...interface{}) { + s := deliver.NewServer(m, url, args...) + + go oneSenderImpl(s) + + c := make(chan os.Signal, 1) + signal.Notify(c, os.Interrupt, os.Kill, unix.SIGTERM) + <-c + + cancel() + s.Close() +} + +func nRecvImpl(c deliver.Deliver, index int) { + + var msg []byte + var err error + + var t int64 + var elapse int64 + count := 0 + + for { + select { + case <-ctx.Done(): + return + default: + msg, err = c.Recv() + if err != nil { + fmt.Println("recv error : ", err) + } + if t == 0 { + t = time.Now().UnixNano() + } + elapse = time.Now().UnixNano() - t + + count++ + + if elapse > 1e9 { + fmt.Printf("NODE-%d: RECEIVED \"%d\" data len %d, use \"%d\" ns\n", + index, count, len(msg), elapse) + elapse = 0 + count = 0 + t = 0 + } + } + + } +} + +func nReciever(url string, m deliver.Mode, count int) { + + var cs []deliver.Deliver + + for i := 0; i < count; i++ { + c := deliver.NewClient(m, url) + cs = append(cs, c) + + go nRecvImpl(c, i) + } + + c := make(chan os.Signal, 1) + signal.Notify(c, os.Interrupt, os.Kill, unix.SIGTERM) + <-c + + cancel() + for _, v := range cs { + v.Close() + } + +} diff --git a/main.go b/main.go index 24b9193..5c5eb53 100644 --- a/main.go +++ b/main.go @@ -3,6 +3,7 @@ import ( "context" "demo/deliver" + "flag" "fmt" "os" ) @@ -10,6 +11,58 @@ const dLen = 12 * 1024 * 1024 var ctx, cancel = context.WithCancel(context.Background()) + +func senderMode(ipc string, m deliver.Mode, count int, one bool) { + if m == deliver.ReqRep { + req(ipc, m) + } else if m == deliver.Shm { + shmSender(ipc, 2, 32*1024*1024) + } + + if one { + oneSender(ipc, m) + } else { + nSender(ipc, m, count) + } +} + +func recvMode(ipc string, m deliver.Mode, count int, n bool) { + if m == deliver.ReqRep { + rep(ipc, m) + } else if m == deliver.Shm { + shmReciever(ipc, count) + } + + if n { + nReciever(ipc, m, count) + } else { + oneReciever(ipc, m) + } +} + +var ( + proc string + procCount int + mode string + ipc string + oneSendnRecv bool +) + +const ( + act = "act" + pass = "pass" +) + +func init() { + flag.StringVar(&proc, "p", "act", "proc as sender") + flag.IntVar(&procCount, "c", 1, "proc run count") + + flag.StringVar(&mode, "m", "pushpull", "proc run mode pushpull or pubsub etc.") + + flag.StringVar(&ipc, "i", "ipc:///tmp/pic.ipc", "ipc label") + + flag.BoolVar(&oneSendnRecv, "n", true, "one send n recv") +} func modeType(t string) deliver.Mode { @@ -28,39 +81,18 @@ return deliver.NONE } -func senderMode(ipc string, m deliver.Mode) { - if m == deliver.ReqRep { - req(ipc, m) - } else if m == deliver.Shm { - shmSender(ipc, 2, 32*1024*1024) - } - sender(ipc, m) -} - -func recvMode(ipc string, m deliver.Mode, strCount string) { - if m == deliver.ReqRep { - rep(ipc, m) - } else if m == deliver.Shm { - shmReciever(ipc, strCount) - } - reciever(ipc, m, strCount) -} - func main() { - if len(os.Args) > 3 && os.Args[1] == "producer" { - m := modeType(os.Args[2]) - if m > deliver.ModeStart { - senderMode(os.Args[3], m) + flag.Parse() + + m := modeType(mode) + if m > deliver.ModeStart { + if proc == act { + senderMode(ipc, m, procCount, oneSendnRecv) + } else { + recvMode(ipc, m, procCount, oneSendnRecv) } - os.Exit(0) } - if len(os.Args) > 3 && os.Args[1] == "consumer" { - m := modeType(os.Args[2]) - if m > deliver.ModeStart { - recvMode(os.Args[3], m, os.Args[4]) - } - os.Exit(0) - } + fmt.Fprintf(os.Stderr, "Usage: pushpull push|pull <URL> <ARG> ...\n") os.Exit(1) diff --git a/npush-1pull.go b/npush-1pull.go new file mode 100644 index 0000000..9d54439 --- /dev/null +++ b/npush-1pull.go @@ -0,0 +1,101 @@ +package main + +import ( + "demo/deliver" + "fmt" + "os" + "os/signal" + "time" + + "golang.org/x/sys/unix" +) + +func nSenderImpl(s deliver.Deliver, index int) { + var err error + + buf := make([]byte, dLen) + + for { + select { + case <-ctx.Done(): + return + default: + if err = s.Send(buf); err != nil { + + fmt.Printf("%d can't send message on push socket: %s\n", index, err.Error()) + } else { + + fmt.Printf("%d send msg length %d\n", index, len(buf)) + } + } + } + +} + +func nSender(url string, m deliver.Mode, count int, args ...interface{}) { + + var cs []deliver.Deliver + + for i := 0; i < count; i++ { + c := deliver.NewClient(m, url, args...) + cs = append(cs, c) + + go nSenderImpl(c, i) + } + + c := make(chan os.Signal, 1) + signal.Notify(c, os.Interrupt, os.Kill, unix.SIGTERM) + <-c + cancel() + for _, v := range cs { + v.Close() + } + +} + +func oneRecvImpl(c deliver.Deliver, index int) { + + var msg []byte + var err error + + var t int64 + var elapse int64 + count := 0 + + for { + msg, err = c.Recv() + if err != nil { + fmt.Println("recv error : ", err) + } + if t == 0 { + t = time.Now().UnixNano() + } + elapse = time.Now().UnixNano() - t + + count++ + + if elapse > 1e9 { + fmt.Printf("NODE-%d: RECEIVED \"%d\" data len %d, use \"%d\" ns\n", + index, count, len(msg), elapse) + elapse = 0 + count = 0 + t = 0 + } + + // time.Sleep(10 * time.Millisecond) + } +} + +func oneReciever(url string, m deliver.Mode) { + + s := deliver.NewServer(m, url) + + go oneRecvImpl(s, 0) + + c := make(chan os.Signal, 1) + signal.Notify(c, os.Interrupt, os.Kill, unix.SIGTERM) + <-c + + cancel() + s.Close() +} diff --git a/pushpull.go b/pushpull.go deleted file mode 100644 index 9802c37..0000000 --- a/pushpull.go +++ /dev/null @@ -1,90 +0,0 @@ -package main - -import ( - "demo/deliver" - "fmt" - "os" - "os/signal" - "strconv" - "time" - - "golang.org/x/sys/unix" -) - -func senderImpl(s deliver.Deliver) { - var err error - - buf := make([]byte, dLen) - - for { - - if err = s.Send(buf); err != nil { - - fmt.Printf("can't send message on push socket: %s\n", err.Error()) - } else { - - fmt.Printf("send msg length %d\n", len(buf)) - } - - // time.Sleep(10 * time.Millisecond) - } - -} -func sender(url string, m deliver.Mode, args ...interface{}) { - s := deliver.NewServer(m, url, args...) - - go senderImpl(s) - - c := make(chan os.Signal, 1) - signal.Notify(c, os.Interrupt, os.Kill, unix.SIGTERM) - <-c - - s.Close() -} - -func recvImpl(url string, m deliver.Mode, index int) { - c := deliver.NewClient(m, url) - - var msg []byte - var err error - - var t int64 - var elapse int64 - count := 0 - - for { - msg, err = c.Recv() - if err != nil { - fmt.Println("recv error : ", err) - } - if t == 0 { - t = time.Now().UnixNano() - } - elapse = time.Now().UnixNano() - t - - count++ - - if elapse > 1e9 { - fmt.Printf("NODE-%d: RECEIVED \"%d\" data len %d, use \"%d\" ns\n", - index, count, len(msg), elapse) - elapse = 0 - count = 0 - t = 0 - } - - // time.Sleep(10 * time.Millisecond) - } -} - -func reciever(url string, m deliver.Mode, strCount string) { - count, _ := strconv.Atoi(strCount) - - for i := 0; i < count; i++ { - go recvImpl(url, m, i) - } - - for { - time.Sleep(2 * time.Second) - } - -} diff --git a/reqrep.go.orig b/reqrep.go.orig deleted file mode 100644 index 4526dcd..0000000 --- a/reqrep.go.orig +++ /dev/null @@ -1,88 +0,0 @@ -package main - -import ( - "demo/deliver" - "fmt" - "os" - "time" - - "nanomsg.org/go-mangos" - "nanomsg.org/go-mangos/protocol/rep" - "nanomsg.org/go-mangos/protocol/req" - "nanomsg.org/go-mangos/transport/ipc" - "nanomsg.org/go-mangos/transport/tcp" -) - -func fReq(url string, m deliver.Mode) { - var sock mangos.Socket - var err error - - if sock, err = req.NewSocket(); err != nil { - die("can't get new req socket: %s", err.Error()) - } - sock.AddTransport(ipc.NewTransport()) - sock.AddTransport(tcp.NewTransport()) - - sock.SetOption(mangos.OptionRaw, true) - if err = sock.Listen(url); err != nil { - die("can't dial on req socket: %s", err.Error()) - } - fmt.Printf("NODE1: SENDING DATE REQUEST %s\n", "DATE") - - data := []byte("DATE") - message := mangos.NewMessage(len(data)) - message.Body = data - if err = sock.SendMsg(message); err != nil { - die("can't send message on push socket: %s", err.Error()) - } - - var mesg *mangos.Message - if mesg, err = sock.RecvMsg(); err != nil { - die("can't receive date: %s", err.Error()) - } - fmt.Printf("NODE1: RECEIVED DATE %s\n", string(mesg.Body)) - sock.Close() - -} - -func fRep(url string, m deliver.Mode) { - var sock mangos.Socket - var err error - var msg *mangos.Message - - if sock, err = rep.NewSocket(); err != nil { - die("can't get new rep socket: %s", err) - } - sock.AddTransport(ipc.NewTransport()) - sock.AddTransport(tcp.NewTransport()) - sock.SetOption(mangos.OptionRaw, true) - if err = sock.Dial(url); err != nil { - die("can't listen on rep socket: %s", err.Error()) - } - for { - // Could also use sock.RecvMsg to get header - msg, err = sock.RecvMsg() - if string(msg.Body) == "DATE" { // no need to terminate - fmt.Println("NODE0: RECEIVED DATE REQUEST") - d := date() - fmt.Printf("NODE0: SENDING DATE %s\n", d) - - data := []byte(d) - message := mangos.NewMessage(len(data)) - message.Body = data - err = sock.SendMsg(message) - if err != nil { - die("can't send reply: %s", err.Error()) - } - } - } -} - -func die(format string, v ...interface{}) { - fmt.Fprintln(os.Stderr, fmt.Sprintf(format, v...)) - os.Exit(1) -} - -func date() string { - return time.Now().Format(time.ANSIC) -} diff --git a/shm.go b/shm.go index af18432..9611975 100644 --- a/shm.go +++ b/shm.go @@ -5,7 +5,6 @@ "fmt" "os" "os/signal" - "strconv" "time" "golang.org/x/sys/unix" @@ -88,8 +87,7 @@ } } -func shmReciever(url string, strCount string) { - count, _ := strconv.Atoi(strCount) +func shmReciever(url string, count int) { var cs []deliver.Deliver for i := 0; i < count; i++ { -- Gitblit v1.8.0