From 622b701e27351e28a6c3df579d4423120afe79fc Mon Sep 17 00:00:00 2001
From: zhangmeng <775834166@qq.com>
Date: 星期一, 20 五月 2019 12:55:54 +0800
Subject: [PATCH] update deliver

---
 go.sum    |    6 ++
 shm.go    |  109 ++++++++++++++++++++++++++++++++++++
 deliver   |    2 
 go.mod    |    3 +
 main.go   |   11 +++
 reqrep.go |    3 -
 6 files changed, 129 insertions(+), 5 deletions(-)

diff --git a/deliver b/deliver
index d23f54e..9a89af6 160000
--- a/deliver
+++ b/deliver
@@ -1 +1 @@
-Subproject commit d23f54e337d12fb4e6d5a0a5e1f041a51005e10c
+Subproject commit 9a89af693b9336633bcac2a652c294f782e6b3b1
diff --git a/go.mod b/go.mod
index 517e535..f9b1263 100644
--- a/go.mod
+++ b/go.mod
@@ -4,6 +4,9 @@
 
 require (
 	github.com/gorilla/websocket v1.4.0 // indirect
+	github.com/tmthrgd/go-sem v0.0.0-20160607101025-0214dbf53877 // indirect
+	github.com/tmthrgd/go-shm v0.0.0-20170117044846-90afcfcd5ee9 // indirect
+	github.com/tmthrgd/shm-go v0.0.0-20170130075737-7207ca97b290
 	golang.org/x/sys v0.0.0-20190429190828-d89cdac9e872
 	nanomsg.org/go-mangos v1.4.0
 )
diff --git a/go.sum b/go.sum
index b32866f..6fb9f50 100644
--- a/go.sum
+++ b/go.sum
@@ -1,5 +1,11 @@
 github.com/gorilla/websocket v1.4.0 h1:WDFjx/TMzVgy9VdMMQi2K2Emtwi2QcUQsztZ/zLaH/Q=
 github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ=
+github.com/tmthrgd/go-sem v0.0.0-20160607101025-0214dbf53877 h1:n65+IT/xy5+trHm3Zpg9+j7IO4n8pBcPzvaKbMolW8U=
+github.com/tmthrgd/go-sem v0.0.0-20160607101025-0214dbf53877/go.mod h1:sgTk9wg3WurMlziuB3hcfgHYTz3pEkjQpSCTT8V2pW8=
+github.com/tmthrgd/go-shm v0.0.0-20170117044846-90afcfcd5ee9 h1:uVRQSWD6TOlWlLJ7IYYmbjRr0Xg35ADFN89HGQLPFGI=
+github.com/tmthrgd/go-shm v0.0.0-20170117044846-90afcfcd5ee9/go.mod h1:vy1jksyhzuQOMkHXMEi+X2bZ47ZeCn3QTnYdFBesABs=
+github.com/tmthrgd/shm-go v0.0.0-20170130075737-7207ca97b290 h1:5zW+TRr0WH4uN72/E/XYwb1PcaYN5BIB/FUbcQ0nHr0=
+github.com/tmthrgd/shm-go v0.0.0-20170130075737-7207ca97b290/go.mod h1:e9PZQr6zVezMTwj1v0j1YhGCNdS2zTCjXU9q9K+HHGk=
 golang.org/x/sys v0.0.0-20190429190828-d89cdac9e872 h1:cGjJzUd8RgBw428LXP65YXni0aiGNA4Bl+ls8SmLOm8=
 golang.org/x/sys v0.0.0-20190429190828-d89cdac9e872/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 nanomsg.org/go-mangos v1.4.0 h1:pVRLnzXePdSbhWlWdSncYszTagERhMG5zK/vXYmbEdM=
diff --git a/main.go b/main.go
index aed056c..24b9193 100644
--- a/main.go
+++ b/main.go
@@ -1,12 +1,15 @@
 package main
 
 import (
+	"context"
 	"demo/deliver"
 	"fmt"
 	"os"
 )
 
 const dLen = 12 * 1024 * 1024
+
+var ctx, cancel = context.WithCancel(context.Background())
 
 func modeType(t string) deliver.Mode {
 
@@ -18,14 +21,18 @@
 		return deliver.Pair
 	} else if t == "reqrep" {
 		return deliver.ReqRep
+	} else if t == "shm" {
+		return deliver.Shm
 	}
 
-	return deliver.Mode(-1)
+	return deliver.NONE
 }
 
 func senderMode(ipc string, m deliver.Mode) {
 	if m == deliver.ReqRep {
 		req(ipc, m)
+	} else if m == deliver.Shm {
+		shmSender(ipc, 2, 32*1024*1024)
 	}
 	sender(ipc, m)
 }
@@ -33,6 +40,8 @@
 func recvMode(ipc string, m deliver.Mode, strCount string) {
 	if m == deliver.ReqRep {
 		rep(ipc, m)
+	} else if m == deliver.Shm {
+		shmReciever(ipc, strCount)
 	}
 	reciever(ipc, m, strCount)
 }
diff --git a/reqrep.go b/reqrep.go
index 468ff27..b23fdf4 100644
--- a/reqrep.go
+++ b/reqrep.go
@@ -22,8 +22,6 @@
 
 			fmt.Printf("can't send message on push socket: %s\n", err.Error())
 		} else {
-
-			fmt.Printf("send msg length %d\n", len(msg))
 		}
 
 		if buf, err := p.Recv(); err != nil {
@@ -64,7 +62,6 @@
 			fmt.Println("recv error : ", err, " msg ", msg)
 			continue
 		}
-		fmt.Println("recv msg: ", string(msg))
 
 		c.Send(buf)
 		// time.Sleep(10 * time.Millisecond)
diff --git a/shm.go b/shm.go
new file mode 100644
index 0000000..af18432
--- /dev/null
+++ b/shm.go
@@ -0,0 +1,109 @@
+package main
+
+import (
+	"demo/deliver"
+	"fmt"
+	"os"
+	"os/signal"
+	"strconv"
+	"time"
+
+	"golang.org/x/sys/unix"
+)
+
+func shmSenderImpl(s deliver.Deliver) {
+	var err error
+
+	buf := make([]byte, dLen)
+
+	copy(buf, []byte("hello, give you this"))
+	for {
+
+		select {
+		case <-ctx.Done():
+			return
+		default:
+			if err = s.Send(buf); err != nil {
+
+				fmt.Printf("can't send message on push socket: %s\n", err.Error())
+			} else {
+
+				fmt.Printf("send msg length %d\n", len(buf))
+			}
+		}
+
+		// time.Sleep(10 * time.Millisecond)
+	}
+
+}
+
+func shmSender(url string, args ...interface{}) {
+	s := deliver.NewServer(deliver.Shm, url, args...)
+
+	go shmSenderImpl(s)
+
+	c := make(chan os.Signal, 1)
+	signal.Notify(c, os.Interrupt, os.Kill, unix.SIGTERM)
+	<-c
+
+	cancel()
+	s.Close()
+}
+
+func shmRecvImpl(c deliver.Deliver, url string, index int) {
+
+	var msg []byte
+	var err error
+
+	var t int64
+	var elapse int64
+	count := 0
+
+	for {
+		select {
+		case <-ctx.Done():
+			return
+		default:
+			msg, err = c.Recv()
+			if err != nil {
+				fmt.Println("recv error : ", err)
+			}
+			if t == 0 {
+				t = time.Now().UnixNano()
+			}
+			elapse = time.Now().UnixNano() - t
+
+			count++
+
+			if elapse > 1e9 {
+				fmt.Printf("NODE-%d: RECEIVED \"%d\" data %s len %d, use \"%d\" ns\n",
+					index, count, string(msg), len(msg), elapse)
+				elapse = 0
+				count = 0
+				t = 0
+			}
+		}
+
+		// time.Sleep(10 * time.Millisecond)
+	}
+}
+
+func shmReciever(url string, strCount string) {
+	count, _ := strconv.Atoi(strCount)
+
+	var cs []deliver.Deliver
+	for i := 0; i < count; i++ {
+		c := deliver.NewClient(deliver.Shm, url)
+		cs = append(cs, c)
+		go shmRecvImpl(c, url, i)
+	}
+
+	c := make(chan os.Signal, 1)
+	signal.Notify(c, os.Interrupt, os.Kill, unix.SIGTERM)
+	<-c
+
+	cancel()
+	for _, v := range cs {
+		v.Close()
+	}
+}

--
Gitblit v1.8.0