From 622b701e27351e28a6c3df579d4423120afe79fc Mon Sep 17 00:00:00 2001
From: zhangmeng <775834166@qq.com>
Date: 星期一, 20 五月 2019 12:55:54 +0800
Subject: [PATCH] update deliver
---
go.sum | 6 ++
shm.go | 109 ++++++++++++++++++++++++++++++++++++
deliver | 2
go.mod | 3 +
main.go | 11 +++
reqrep.go | 3 -
6 files changed, 129 insertions(+), 5 deletions(-)
diff --git a/deliver b/deliver
index d23f54e..9a89af6 160000
--- a/deliver
+++ b/deliver
@@ -1 +1 @@
-Subproject commit d23f54e337d12fb4e6d5a0a5e1f041a51005e10c
+Subproject commit 9a89af693b9336633bcac2a652c294f782e6b3b1
diff --git a/go.mod b/go.mod
index 517e535..f9b1263 100644
--- a/go.mod
+++ b/go.mod
@@ -4,6 +4,9 @@
require (
github.com/gorilla/websocket v1.4.0 // indirect
+ github.com/tmthrgd/go-sem v0.0.0-20160607101025-0214dbf53877 // indirect
+ github.com/tmthrgd/go-shm v0.0.0-20170117044846-90afcfcd5ee9 // indirect
+ github.com/tmthrgd/shm-go v0.0.0-20170130075737-7207ca97b290
golang.org/x/sys v0.0.0-20190429190828-d89cdac9e872
nanomsg.org/go-mangos v1.4.0
)
diff --git a/go.sum b/go.sum
index b32866f..6fb9f50 100644
--- a/go.sum
+++ b/go.sum
@@ -1,5 +1,11 @@
github.com/gorilla/websocket v1.4.0 h1:WDFjx/TMzVgy9VdMMQi2K2Emtwi2QcUQsztZ/zLaH/Q=
github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ=
+github.com/tmthrgd/go-sem v0.0.0-20160607101025-0214dbf53877 h1:n65+IT/xy5+trHm3Zpg9+j7IO4n8pBcPzvaKbMolW8U=
+github.com/tmthrgd/go-sem v0.0.0-20160607101025-0214dbf53877/go.mod h1:sgTk9wg3WurMlziuB3hcfgHYTz3pEkjQpSCTT8V2pW8=
+github.com/tmthrgd/go-shm v0.0.0-20170117044846-90afcfcd5ee9 h1:uVRQSWD6TOlWlLJ7IYYmbjRr0Xg35ADFN89HGQLPFGI=
+github.com/tmthrgd/go-shm v0.0.0-20170117044846-90afcfcd5ee9/go.mod h1:vy1jksyhzuQOMkHXMEi+X2bZ47ZeCn3QTnYdFBesABs=
+github.com/tmthrgd/shm-go v0.0.0-20170130075737-7207ca97b290 h1:5zW+TRr0WH4uN72/E/XYwb1PcaYN5BIB/FUbcQ0nHr0=
+github.com/tmthrgd/shm-go v0.0.0-20170130075737-7207ca97b290/go.mod h1:e9PZQr6zVezMTwj1v0j1YhGCNdS2zTCjXU9q9K+HHGk=
golang.org/x/sys v0.0.0-20190429190828-d89cdac9e872 h1:cGjJzUd8RgBw428LXP65YXni0aiGNA4Bl+ls8SmLOm8=
golang.org/x/sys v0.0.0-20190429190828-d89cdac9e872/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
nanomsg.org/go-mangos v1.4.0 h1:pVRLnzXePdSbhWlWdSncYszTagERhMG5zK/vXYmbEdM=
diff --git a/main.go b/main.go
index aed056c..24b9193 100644
--- a/main.go
+++ b/main.go
@@ -1,12 +1,15 @@
package main
import (
+ "context"
"demo/deliver"
"fmt"
"os"
)
const dLen = 12 * 1024 * 1024
+
+var ctx, cancel = context.WithCancel(context.Background())
func modeType(t string) deliver.Mode {
@@ -18,14 +21,18 @@
return deliver.Pair
} else if t == "reqrep" {
return deliver.ReqRep
+ } else if t == "shm" {
+ return deliver.Shm
}
- return deliver.Mode(-1)
+ return deliver.NONE
}
func senderMode(ipc string, m deliver.Mode) {
if m == deliver.ReqRep {
req(ipc, m)
+ } else if m == deliver.Shm {
+ shmSender(ipc, 2, 32*1024*1024)
}
sender(ipc, m)
}
@@ -33,6 +40,8 @@
func recvMode(ipc string, m deliver.Mode, strCount string) {
if m == deliver.ReqRep {
rep(ipc, m)
+ } else if m == deliver.Shm {
+ shmReciever(ipc, strCount)
}
reciever(ipc, m, strCount)
}
diff --git a/reqrep.go b/reqrep.go
index 468ff27..b23fdf4 100644
--- a/reqrep.go
+++ b/reqrep.go
@@ -22,8 +22,6 @@
fmt.Printf("can't send message on push socket: %s\n", err.Error())
} else {
-
- fmt.Printf("send msg length %d\n", len(msg))
}
if buf, err := p.Recv(); err != nil {
@@ -64,7 +62,6 @@
fmt.Println("recv error : ", err, " msg ", msg)
continue
}
- fmt.Println("recv msg: ", string(msg))
c.Send(buf)
// time.Sleep(10 * time.Millisecond)
diff --git a/shm.go b/shm.go
new file mode 100644
index 0000000..af18432
--- /dev/null
+++ b/shm.go
@@ -0,0 +1,109 @@
+package main
+
+import (
+ "demo/deliver"
+ "fmt"
+ "os"
+ "os/signal"
+ "strconv"
+ "time"
+
+ "golang.org/x/sys/unix"
+)
+
+func shmSenderImpl(s deliver.Deliver) {
+ var err error
+
+ buf := make([]byte, dLen)
+
+ copy(buf, []byte("hello, give you this"))
+ for {
+
+ select {
+ case <-ctx.Done():
+ return
+ default:
+ if err = s.Send(buf); err != nil {
+
+ fmt.Printf("can't send message on push socket: %s\n", err.Error())
+ } else {
+
+ fmt.Printf("send msg length %d\n", len(buf))
+ }
+ }
+
+ // time.Sleep(10 * time.Millisecond)
+ }
+
+}
+
+func shmSender(url string, args ...interface{}) {
+ s := deliver.NewServer(deliver.Shm, url, args...)
+
+ go shmSenderImpl(s)
+
+ c := make(chan os.Signal, 1)
+ signal.Notify(c, os.Interrupt, os.Kill, unix.SIGTERM)
+ <-c
+
+ cancel()
+ s.Close()
+}
+
+func shmRecvImpl(c deliver.Deliver, url string, index int) {
+
+ var msg []byte
+ var err error
+
+ var t int64
+ var elapse int64
+ count := 0
+
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ default:
+ msg, err = c.Recv()
+ if err != nil {
+ fmt.Println("recv error : ", err)
+ }
+ if t == 0 {
+ t = time.Now().UnixNano()
+ }
+ elapse = time.Now().UnixNano() - t
+
+ count++
+
+ if elapse > 1e9 {
+ fmt.Printf("NODE-%d: RECEIVED \"%d\" data %s len %d, use \"%d\" ns\n",
+ index, count, string(msg), len(msg), elapse)
+ elapse = 0
+ count = 0
+ t = 0
+ }
+ }
+
+ // time.Sleep(10 * time.Millisecond)
+ }
+}
+
+func shmReciever(url string, strCount string) {
+ count, _ := strconv.Atoi(strCount)
+
+ var cs []deliver.Deliver
+ for i := 0; i < count; i++ {
+ c := deliver.NewClient(deliver.Shm, url)
+ cs = append(cs, c)
+ go shmRecvImpl(c, url, i)
+ }
+
+ c := make(chan os.Signal, 1)
+ signal.Notify(c, os.Interrupt, os.Kill, unix.SIGTERM)
+ <-c
+
+ cancel()
+ for _, v := range cs {
+ v.Close()
+ }
+}
--
Gitblit v1.8.0