From 9a89af693b9336633bcac2a652c294f782e6b3b1 Mon Sep 17 00:00:00 2001
From: zhangmeng <775834166@qq.com>
Date: 星期一, 20 五月 2019 12:54:50 +0800
Subject: [PATCH] add share memory improve effect
---
mode.go | 11 +++
shm.go | 108 ++++++++++++++++++++++++++++++++++++
deliver.go | 4 +
nng.go | 20 ++----
nngmake.go | 2
5 files changed, 130 insertions(+), 15 deletions(-)
diff --git a/deliver.go b/deliver.go
index 9220db3..fe01778 100644
--- a/deliver.go
+++ b/deliver.go
@@ -18,6 +18,8 @@
if m > ModeStart && m < ModeNNG {
return nngServer(m, url, args...)
+ } else if m == Shm {
+ return shmServer(m, url, args...)
}
return nil
}
@@ -27,6 +29,8 @@
if m > ModeStart && m < ModeNNG {
return nngClient(m, url, args...)
+ } else if m == Shm {
+ return shmClient(m, url, args...)
}
return nil
diff --git a/mode.go b/mode.go
index f64f576..a9a1eb0 100644
--- a/mode.go
+++ b/mode.go
@@ -14,5 +14,16 @@
Bus
Pair
ModeNNG
+ Shm
ModeEnd
)
+
+// type deliver
+type td int
+
+const (
+ // as server active
+ agent = td(iota)
+ // as client passive
+ coactee
+)
diff --git a/nng.go b/nng.go
index 1e9c1c0..751feac 100644
--- a/nng.go
+++ b/nng.go
@@ -21,20 +21,12 @@
"nanomsg.org/go-mangos/transport/all"
)
-// type deliver
-type td int
-
-const (
- agent = td(iota)
- coactee
-)
-
// NNG mangos wrap
type NNG struct {
- sock mangos.Socket
- server bool
- mode Mode
- url string
+ sock mangos.Socket
+ typ td
+ mode Mode
+ url string
arguments []interface{}
}
@@ -93,7 +85,7 @@
rmExistedIpcName(url)
return &NNG{
- server: true,
+ typ: agent,
mode: m,
url: url,
arguments: args,
@@ -103,7 +95,7 @@
func nngClient(m Mode, url string, args ...interface{}) *NNG {
return &NNG{
- server: false,
+ typ: coactee,
mode: m,
url: url,
arguments: args,
diff --git a/nngmake.go b/nngmake.go
index cdce421..0aff259 100644
--- a/nngmake.go
+++ b/nngmake.go
@@ -14,7 +14,7 @@
sock.Close()
sock = nil
}
- if n.server {
+ if n.typ == agent {
if err = sock.Listen(n.url); err != nil {
sock.Close()
sock = nil
diff --git a/shm.go b/shm.go
new file mode 100644
index 0000000..0ea7ccc
--- /dev/null
+++ b/shm.go
@@ -0,0 +1,108 @@
+package deliver
+
+import (
+ "errors"
+ "fmt"
+ "io"
+
+ "github.com/tmthrgd/shm-go"
+)
+
+// SHM share memory
+type SHM struct {
+ rw *shm.ReadWriteCloser
+ typ td
+ data []byte
+}
+
+// Send impl interface Diliver
+func (s *SHM) Send(data []byte) error {
+ if s.rw == nil {
+ return errors.New("please init shm producer first")
+ }
+
+ n, err := s.rw.Write(data)
+ if n < 1 {
+ fmt.Println("recv data less than 1 length")
+ }
+
+ return err
+}
+
+// Recv impl interface Diliver
+func (s *SHM) Recv() ([]byte, error) {
+
+ if s.rw == nil {
+ return nil, errors.New("please open shm consumer first")
+ }
+
+ n, err := s.rw.Read(s.data)
+ if err == nil || err == io.EOF {
+ s.data = s.data[:n:n]
+ return s.data, nil
+ }
+
+ return nil, err
+}
+
+// Close impl interface Deliver
+func (s *SHM) Close() {
+ if s.rw != nil {
+ s.rw.Close()
+ }
+ if s.typ == agent {
+ shm.Unlink(s.rw.Name())
+ }
+}
+
+func shmServer(m Mode, url string, args ...interface{}) *SHM {
+ if m != Shm {
+ fmt.Println("this is not a shm mode: ", m)
+ return nil
+ }
+
+ var param []int
+ for _, v := range args {
+ fmt.Println(v)
+ switch v.(type) {
+ case int:
+ param = append(param, v.(int))
+ default:
+ fmt.Println("shmProducer recv error parameters")
+
+ return nil
+ }
+ }
+ if len(param) != 2 {
+ fmt.Println("shmProducer recv too much parameter: ", len(param))
+ return nil
+ }
+ if rw, err := shm.CreateSimplex(url, 0644, param[0], param[1]); err == nil {
+ return &SHM{
+ rw,
+ agent,
+ nil,
+ }
+ }
+
+ fmt.Println("create simple shm error")
+ return nil
+}
+
+func shmClient(m Mode, url string, args ...interface{}) *SHM {
+
+ if m != Shm {
+ fmt.Println("this is not a shm mode: ", m)
+ return nil
+ }
+
+ if rw, err := shm.OpenSimplex(url); err == nil {
+ return &SHM{
+ rw,
+ coactee,
+ make([]byte, maxRecvSize),
+ }
+ }
+ fmt.Println("shmConsumer open error")
+ return nil
+}
--
Gitblit v1.8.0