From 22125ca10867152617cc4f42f403a0f6e37648a4 Mon Sep 17 00:00:00 2001 From: zhangmeng <775834166@qq.com> Date: 星期五, 17 五月 2019 18:08:23 +0800 Subject: [PATCH] add reqrep --- main.go | 124 ++++++++++------------------------------- 1 files changed, 31 insertions(+), 93 deletions(-) diff --git a/main.go b/main.go index 4a370b7..aed056c 100644 --- a/main.go +++ b/main.go @@ -4,114 +4,52 @@ "demo/deliver" "fmt" "os" - "os/signal" - "strconv" - "time" - - "golang.org/x/sys/unix" ) const dLen = 12 * 1024 * 1024 -var mode = deliver.PushPull +func modeType(t string) deliver.Mode { -func senderImpl(s deliver.Deliver) { - var err error - - buf := make([]byte, dLen) - - for { - - if err = s.Send(buf); err != nil { - - fmt.Printf("can't send message on push socket: %s\n", err.Error()) - } else { - - fmt.Printf("send msg length %d\n", len(buf)) - } - - // time.Sleep(10 * time.Millisecond) - } - -} -func sender(url string, args ...interface{}) { - s := deliver.NewServer(deliver.Mode(mode), url, args...) - - go senderImpl(s) - - c := make(chan os.Signal, 1) - signal.Notify(c, os.Interrupt, os.Kill, unix.SIGTERM) - <-c - - s.Close() -} - -func recvImpl(url string, index int) { - c := deliver.NewClient(deliver.Mode(mode), url) - - var msg []byte - var err error - - var t int64 - var elapse int64 - count := 0 - - for { - msg, err = c.Recv() - if err != nil { - fmt.Println("recv error : ", err) - } - if t == 0 { - t = time.Now().UnixNano() - } - elapse = time.Now().UnixNano() - t - - count++ - - if elapse > 1e9 { - fmt.Printf("NODE-%d: RECEIVED \"%d\" data len %d, use \"%d\" ns\n", - index, count, len(msg), elapse) - elapse = 0 - count = 0 - t = 0 - } - - // time.Sleep(10 * time.Millisecond) - } -} - -func reciever(url string, strCount string) { - count, _ := strconv.Atoi(strCount) - - for i := 0; i < count; i++ { - go recvImpl(url, i) - } - - for { - time.Sleep(2 * time.Second) - } - -} - -func modeType(t string) { if t == "pushpull" { - mode = deliver.PushPull + return deliver.PushPull } else if t == "pubsub" { - mode = deliver.PubSub + return deliver.PubSub } else if t == "pair" { - mode = deliver.Pair + return deliver.Pair + } else if t == "reqrep" { + return deliver.ReqRep } + + return deliver.Mode(-1) } + +func senderMode(ipc string, m deliver.Mode) { + if m == deliver.ReqRep { + req(ipc, m) + } + sender(ipc, m) +} + +func recvMode(ipc string, m deliver.Mode, strCount string) { + if m == deliver.ReqRep { + rep(ipc, m) + } + reciever(ipc, m, strCount) +} + func main() { if len(os.Args) > 3 && os.Args[1] == "producer" { - modeType(os.Args[2]) - sender(os.Args[3]) + m := modeType(os.Args[2]) + if m > deliver.ModeStart { + senderMode(os.Args[3], m) + } os.Exit(0) } if len(os.Args) > 3 && os.Args[1] == "consumer" { - modeType(os.Args[2]) - - reciever(os.Args[3], os.Args[4]) + m := modeType(os.Args[2]) + if m > deliver.ModeStart { + recvMode(os.Args[3], m, os.Args[4]) + } os.Exit(0) } fmt.Fprintf(os.Stderr, -- Gitblit v1.8.0