From 0f27a87065cd0446e9a491357d440dc67e36ea0d Mon Sep 17 00:00:00 2001
From: zhangmeng <775834166@qq.com>
Date: 星期一, 20 五月 2019 16:20:34 +0800
Subject: [PATCH] add shm multi send one recv

---
 shm2.go |  108 ++++++++++++++++++++++++++++++++++++
 shm.go  |   13 +++-
 main.go |    6 +
 3 files changed, 122 insertions(+), 5 deletions(-)

diff --git a/main.go b/main.go
index 5c5eb53..a9416b3 100644
--- a/main.go
+++ b/main.go
@@ -16,7 +16,8 @@
 	if m == deliver.ReqRep {
 		req(ipc, m)
 	} else if m == deliver.Shm {
-		shmSender(ipc, 2, 32*1024*1024)
+		// shmSender(ipc, 2, 32*1024*1024)
+		shmReciever2(ipc, count, 2, 32*1024*1024)
 	}
 
 	if one {
@@ -30,7 +31,8 @@
 	if m == deliver.ReqRep {
 		rep(ipc, m)
 	} else if m == deliver.Shm {
-		shmReciever(ipc, count)
+		// shmReciever(ipc, count)
+		shmSender2(ipc, count)
 	}
 
 	if n {
diff --git a/shm.go b/shm.go
index 9611975..abb0ba5 100644
--- a/shm.go
+++ b/shm.go
@@ -5,6 +5,7 @@
 	"fmt"
 	"os"
 	"os/signal"
+	"sync"
 	"time"
 
 	"golang.org/x/sys/unix"
@@ -42,14 +43,14 @@
 	go shmSenderImpl(s)
 
 	c := make(chan os.Signal, 1)
-	signal.Notify(c, os.Interrupt, os.Kill, unix.SIGTERM)
+	signal.Notify(c, os.Interrupt, os.Kill, unix.SIGINT)
 	<-c
 
 	cancel()
 	s.Close()
 }
 
-func shmRecvImpl(c deliver.Deliver, url string, index int) {
+func shmRecvImpl(wg *sync.WaitGroup, c deliver.Deliver, url string, index int) {
 
 	var msg []byte
 	var err error
@@ -61,6 +62,7 @@
 	for {
 		select {
 		case <-ctx.Done():
+			wg.Done()
 			return
 		default:
 			msg, err = c.Recv()
@@ -85,15 +87,19 @@
 
 		// time.Sleep(10 * time.Millisecond)
 	}
+
 }
 
 func shmReciever(url string, count int) {
 
+	wg := sync.WaitGroup{}
+
 	var cs []deliver.Deliver
 	for i := 0; i < count; i++ {
+		wg.Add(1)
 		c := deliver.NewClient(deliver.Shm, url)
 		cs = append(cs, c)
-		go shmRecvImpl(c, url, i)
+		go shmRecvImpl(&wg, c, url, i)
 	}
 
 	c := make(chan os.Signal, 1)
@@ -101,6 +107,7 @@
 	<-c
 
 	cancel()
+	wg.Wait()
 	for _, v := range cs {
 		v.Close()
 	}
diff --git a/shm2.go b/shm2.go
new file mode 100644
index 0000000..791241d
--- /dev/null
+++ b/shm2.go
@@ -0,0 +1,108 @@
+package main
+
+import (
+	"demo/deliver"
+	"fmt"
+	"os"
+	"os/signal"
+	"time"
+
+	"golang.org/x/sys/unix"
+)
+
+func shmSenderImpl2(s deliver.Deliver) {
+	var err error
+
+	buf := make([]byte, dLen)
+
+	copy(buf, []byte("hello, give you this"))
+	for {
+
+		select {
+		case <-ctx.Done():
+			return
+		default:
+			if err = s.Send(buf); err != nil {
+
+				fmt.Printf("can't send message on push socket: %s\n", err.Error())
+			} else {
+
+				fmt.Printf("send msg length %d\n", len(buf))
+			}
+		}
+
+		// time.Sleep(10 * time.Millisecond)
+	}
+
+}
+
+func shmReciever2(url string, count int, args ...interface{}) {
+	s := deliver.NewServer(deliver.Shm, url, args...)
+
+	go shmRecvImpl2(s, 0)
+
+	c := make(chan os.Signal, 1)
+	signal.Notify(c, os.Interrupt, os.Kill, unix.SIGINT)
+	<-c
+
+	cancel()
+	s.Close()
+}
+
+func shmRecvImpl2(c deliver.Deliver, index int) {
+
+	var msg []byte
+	var err error
+
+	var t int64
+	var elapse int64
+	count := 0
+
+	for {
+		select {
+		case <-ctx.Done():
+			return
+		default:
+			msg, err = c.Recv()
+			if err != nil {
+				fmt.Println("recv error : ", err)
+			}
+			if t == 0 {
+				t = time.Now().UnixNano()
+			}
+			elapse = time.Now().UnixNano() - t
+
+			count++
+
+			if elapse > 1e9 {
+				fmt.Printf("NODE-%d: RECEIVED \"%d\" data %s len %d, use \"%d\" ns\n",
+					index, count, string(msg), len(msg), elapse)
+				elapse = 0
+				count = 0
+				t = 0
+			}
+		}
+
+		// time.Sleep(10 * time.Millisecond)
+	}
+
+}
+
+func shmSender2(url string, count int) {
+
+	var cs []deliver.Deliver
+	for i := 0; i < count; i++ {
+		c := deliver.NewClient(deliver.Shm, url)
+		cs = append(cs, c)
+		go shmSenderImpl2(c)
+	}
+
+	c := make(chan os.Signal, 1)
+	signal.Notify(c, os.Interrupt, os.Kill, unix.SIGTERM)
+	<-c
+
+	cancel()
+	for _, v := range cs {
+		v.Close()
+	}
+}

--
Gitblit v1.8.0