From 0f27a87065cd0446e9a491357d440dc67e36ea0d Mon Sep 17 00:00:00 2001
From: zhangmeng <775834166@qq.com>
Date: 星期一, 20 五月 2019 16:20:34 +0800
Subject: [PATCH] add shm multi send one recv
---
shm2.go | 108 ++++++++++++++++++++++++++++++++++++
shm.go | 13 +++-
main.go | 6 +
3 files changed, 122 insertions(+), 5 deletions(-)
diff --git a/main.go b/main.go
index 5c5eb53..a9416b3 100644
--- a/main.go
+++ b/main.go
@@ -16,7 +16,8 @@
if m == deliver.ReqRep {
req(ipc, m)
} else if m == deliver.Shm {
- shmSender(ipc, 2, 32*1024*1024)
+ // shmSender(ipc, 2, 32*1024*1024)
+ shmReciever2(ipc, count, 2, 32*1024*1024)
}
if one {
@@ -30,7 +31,8 @@
if m == deliver.ReqRep {
rep(ipc, m)
} else if m == deliver.Shm {
- shmReciever(ipc, count)
+ // shmReciever(ipc, count)
+ shmSender2(ipc, count)
}
if n {
diff --git a/shm.go b/shm.go
index 9611975..abb0ba5 100644
--- a/shm.go
+++ b/shm.go
@@ -5,6 +5,7 @@
"fmt"
"os"
"os/signal"
+ "sync"
"time"
"golang.org/x/sys/unix"
@@ -42,14 +43,14 @@
go shmSenderImpl(s)
c := make(chan os.Signal, 1)
- signal.Notify(c, os.Interrupt, os.Kill, unix.SIGTERM)
+ signal.Notify(c, os.Interrupt, os.Kill, unix.SIGINT)
<-c
cancel()
s.Close()
}
-func shmRecvImpl(c deliver.Deliver, url string, index int) {
+func shmRecvImpl(wg *sync.WaitGroup, c deliver.Deliver, url string, index int) {
var msg []byte
var err error
@@ -61,6 +62,7 @@
for {
select {
case <-ctx.Done():
+ wg.Done()
return
default:
msg, err = c.Recv()
@@ -85,15 +87,19 @@
// time.Sleep(10 * time.Millisecond)
}
+
}
func shmReciever(url string, count int) {
+ wg := sync.WaitGroup{}
+
var cs []deliver.Deliver
for i := 0; i < count; i++ {
+ wg.Add(1)
c := deliver.NewClient(deliver.Shm, url)
cs = append(cs, c)
- go shmRecvImpl(c, url, i)
+ go shmRecvImpl(&wg, c, url, i)
}
c := make(chan os.Signal, 1)
@@ -101,6 +107,7 @@
<-c
cancel()
+ wg.Wait()
for _, v := range cs {
v.Close()
}
diff --git a/shm2.go b/shm2.go
new file mode 100644
index 0000000..791241d
--- /dev/null
+++ b/shm2.go
@@ -0,0 +1,108 @@
+package main
+
+import (
+ "demo/deliver"
+ "fmt"
+ "os"
+ "os/signal"
+ "time"
+
+ "golang.org/x/sys/unix"
+)
+
+func shmSenderImpl2(s deliver.Deliver) {
+ var err error
+
+ buf := make([]byte, dLen)
+
+ copy(buf, []byte("hello, give you this"))
+ for {
+
+ select {
+ case <-ctx.Done():
+ return
+ default:
+ if err = s.Send(buf); err != nil {
+
+ fmt.Printf("can't send message on push socket: %s\n", err.Error())
+ } else {
+
+ fmt.Printf("send msg length %d\n", len(buf))
+ }
+ }
+
+ // time.Sleep(10 * time.Millisecond)
+ }
+
+}
+
+func shmReciever2(url string, count int, args ...interface{}) {
+ s := deliver.NewServer(deliver.Shm, url, args...)
+
+ go shmRecvImpl2(s, 0)
+
+ c := make(chan os.Signal, 1)
+ signal.Notify(c, os.Interrupt, os.Kill, unix.SIGINT)
+ <-c
+
+ cancel()
+ s.Close()
+}
+
+func shmRecvImpl2(c deliver.Deliver, index int) {
+
+ var msg []byte
+ var err error
+
+ var t int64
+ var elapse int64
+ count := 0
+
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ default:
+ msg, err = c.Recv()
+ if err != nil {
+ fmt.Println("recv error : ", err)
+ }
+ if t == 0 {
+ t = time.Now().UnixNano()
+ }
+ elapse = time.Now().UnixNano() - t
+
+ count++
+
+ if elapse > 1e9 {
+ fmt.Printf("NODE-%d: RECEIVED \"%d\" data %s len %d, use \"%d\" ns\n",
+ index, count, string(msg), len(msg), elapse)
+ elapse = 0
+ count = 0
+ t = 0
+ }
+ }
+
+ // time.Sleep(10 * time.Millisecond)
+ }
+
+}
+
+func shmSender2(url string, count int) {
+
+ var cs []deliver.Deliver
+ for i := 0; i < count; i++ {
+ c := deliver.NewClient(deliver.Shm, url)
+ cs = append(cs, c)
+ go shmSenderImpl2(c)
+ }
+
+ c := make(chan os.Signal, 1)
+ signal.Notify(c, os.Interrupt, os.Kill, unix.SIGTERM)
+ <-c
+
+ cancel()
+ for _, v := range cs {
+ v.Close()
+ }
+}
--
Gitblit v1.8.0