From 9a89af693b9336633bcac2a652c294f782e6b3b1 Mon Sep 17 00:00:00 2001
From: zhangmeng <775834166@qq.com>
Date: 星期一, 20 五月 2019 12:54:50 +0800
Subject: [PATCH] add share memory improve effect

---
 mode.go    |   11 +++
 shm.go     |  108 ++++++++++++++++++++++++++++++++++++
 deliver.go |    4 +
 nng.go     |   20 ++----
 nngmake.go |    2 
 5 files changed, 130 insertions(+), 15 deletions(-)

diff --git a/deliver.go b/deliver.go
index 9220db3..fe01778 100644
--- a/deliver.go
+++ b/deliver.go
@@ -18,6 +18,8 @@
 
 	if m > ModeStart && m < ModeNNG {
 		return nngServer(m, url, args...)
+	} else if m == Shm {
+		return shmServer(m, url, args...)
 	}
 	return nil
 }
@@ -27,6 +29,8 @@
 
 	if m > ModeStart && m < ModeNNG {
 		return nngClient(m, url, args...)
+	} else if m == Shm {
+		return shmClient(m, url, args...)
 	}
 
 	return nil
diff --git a/mode.go b/mode.go
index f64f576..a9a1eb0 100644
--- a/mode.go
+++ b/mode.go
@@ -14,5 +14,16 @@
 	Bus
 	Pair
 	ModeNNG
+	Shm
 	ModeEnd
 )
+
+// type deliver
+type td int
+
+const (
+	// as server active
+	agent = td(iota)
+	// as client passive
+	coactee
+)
diff --git a/nng.go b/nng.go
index 1e9c1c0..751feac 100644
--- a/nng.go
+++ b/nng.go
@@ -21,20 +21,12 @@
 	"nanomsg.org/go-mangos/transport/all"
 )
 
-// type deliver
-type td int
-
-const (
-	agent = td(iota)
-	coactee
-)
-
 // NNG mangos wrap
 type NNG struct {
-	sock   mangos.Socket
-	server bool
-	mode   Mode
-	url    string
+	sock mangos.Socket
+	typ  td
+	mode Mode
+	url  string
 
 	arguments []interface{}
 }
@@ -93,7 +85,7 @@
 	rmExistedIpcName(url)
 
 	return &NNG{
-		server:    true,
+		typ:       agent,
 		mode:      m,
 		url:       url,
 		arguments: args,
@@ -103,7 +95,7 @@
 func nngClient(m Mode, url string, args ...interface{}) *NNG {
 
 	return &NNG{
-		server:    false,
+		typ:       coactee,
 		mode:      m,
 		url:       url,
 		arguments: args,
diff --git a/nngmake.go b/nngmake.go
index cdce421..0aff259 100644
--- a/nngmake.go
+++ b/nngmake.go
@@ -14,7 +14,7 @@
 		sock.Close()
 		sock = nil
 	}
-	if n.server {
+	if n.typ == agent {
 		if err = sock.Listen(n.url); err != nil {
 			sock.Close()
 			sock = nil
diff --git a/shm.go b/shm.go
new file mode 100644
index 0000000..0ea7ccc
--- /dev/null
+++ b/shm.go
@@ -0,0 +1,108 @@
+package deliver
+
+import (
+	"errors"
+	"fmt"
+	"io"
+
+	"github.com/tmthrgd/shm-go"
+)
+
+// SHM share memory
+type SHM struct {
+	rw   *shm.ReadWriteCloser
+	typ  td
+	data []byte
+}
+
+// Send impl interface Diliver
+func (s *SHM) Send(data []byte) error {
+	if s.rw == nil {
+		return errors.New("please init shm producer first")
+	}
+
+	n, err := s.rw.Write(data)
+	if n < 1 {
+		fmt.Println("recv data less than 1 length")
+	}
+
+	return err
+}
+
+// Recv impl interface Diliver
+func (s *SHM) Recv() ([]byte, error) {
+
+	if s.rw == nil {
+		return nil, errors.New("please open shm consumer first")
+	}
+
+	n, err := s.rw.Read(s.data)
+	if err == nil || err == io.EOF {
+		s.data = s.data[:n:n]
+		return s.data, nil
+	}
+
+	return nil, err
+}
+
+// Close impl interface Deliver
+func (s *SHM) Close() {
+	if s.rw != nil {
+		s.rw.Close()
+	}
+	if s.typ == agent {
+		shm.Unlink(s.rw.Name())
+	}
+}
+
+func shmServer(m Mode, url string, args ...interface{}) *SHM {
+	if m != Shm {
+		fmt.Println("this is not a shm mode: ", m)
+		return nil
+	}
+
+	var param []int
+	for _, v := range args {
+		fmt.Println(v)
+		switch v.(type) {
+		case int:
+			param = append(param, v.(int))
+		default:
+			fmt.Println("shmProducer recv error parameters")
+
+			return nil
+		}
+	}
+	if len(param) != 2 {
+		fmt.Println("shmProducer recv too much parameter: ", len(param))
+		return nil
+	}
+	if rw, err := shm.CreateSimplex(url, 0644, param[0], param[1]); err == nil {
+		return &SHM{
+			rw,
+			agent,
+			nil,
+		}
+	}
+
+	fmt.Println("create simple shm error")
+	return nil
+}
+
+func shmClient(m Mode, url string, args ...interface{}) *SHM {
+
+	if m != Shm {
+		fmt.Println("this is not a shm mode: ", m)
+		return nil
+	}
+
+	if rw, err := shm.OpenSimplex(url); err == nil {
+		return &SHM{
+			rw,
+			coactee,
+			make([]byte, maxRecvSize),
+		}
+	}
+	fmt.Println("shmConsumer open error")
+	return nil
+}

--
Gitblit v1.8.0