From 9a89af693b9336633bcac2a652c294f782e6b3b1 Mon Sep 17 00:00:00 2001 From: zhangmeng <775834166@qq.com> Date: 星期一, 20 五月 2019 12:54:50 +0800 Subject: [PATCH] add share memory improve effect --- mode.go | 11 +++ shm.go | 108 ++++++++++++++++++++++++++++++++++++ deliver.go | 4 + nng.go | 20 ++---- nngmake.go | 2 5 files changed, 130 insertions(+), 15 deletions(-) diff --git a/deliver.go b/deliver.go index 9220db3..fe01778 100644 --- a/deliver.go +++ b/deliver.go @@ -18,6 +18,8 @@ if m > ModeStart && m < ModeNNG { return nngServer(m, url, args...) + } else if m == Shm { + return shmServer(m, url, args...) } return nil } @@ -27,6 +29,8 @@ if m > ModeStart && m < ModeNNG { return nngClient(m, url, args...) + } else if m == Shm { + return shmClient(m, url, args...) } return nil diff --git a/mode.go b/mode.go index f64f576..a9a1eb0 100644 --- a/mode.go +++ b/mode.go @@ -14,5 +14,16 @@ Bus Pair ModeNNG + Shm ModeEnd ) + +// type deliver +type td int + +const ( + // as server active + agent = td(iota) + // as client passive + coactee +) diff --git a/nng.go b/nng.go index 1e9c1c0..751feac 100644 --- a/nng.go +++ b/nng.go @@ -21,20 +21,12 @@ "nanomsg.org/go-mangos/transport/all" ) -// type deliver -type td int - -const ( - agent = td(iota) - coactee -) - // NNG mangos wrap type NNG struct { - sock mangos.Socket - server bool - mode Mode - url string + sock mangos.Socket + typ td + mode Mode + url string arguments []interface{} } @@ -93,7 +85,7 @@ rmExistedIpcName(url) return &NNG{ - server: true, + typ: agent, mode: m, url: url, arguments: args, @@ -103,7 +95,7 @@ func nngClient(m Mode, url string, args ...interface{}) *NNG { return &NNG{ - server: false, + typ: coactee, mode: m, url: url, arguments: args, diff --git a/nngmake.go b/nngmake.go index cdce421..0aff259 100644 --- a/nngmake.go +++ b/nngmake.go @@ -14,7 +14,7 @@ sock.Close() sock = nil } - if n.server { + if n.typ == agent { if err = sock.Listen(n.url); err != nil { sock.Close() sock = nil diff --git a/shm.go b/shm.go new file mode 100644 index 0000000..0ea7ccc --- /dev/null +++ b/shm.go @@ -0,0 +1,108 @@ +package deliver + +import ( + "errors" + "fmt" + "io" + + "github.com/tmthrgd/shm-go" +) + +// SHM share memory +type SHM struct { + rw *shm.ReadWriteCloser + typ td + data []byte +} + +// Send impl interface Diliver +func (s *SHM) Send(data []byte) error { + if s.rw == nil { + return errors.New("please init shm producer first") + } + + n, err := s.rw.Write(data) + if n < 1 { + fmt.Println("recv data less than 1 length") + } + + return err +} + +// Recv impl interface Diliver +func (s *SHM) Recv() ([]byte, error) { + + if s.rw == nil { + return nil, errors.New("please open shm consumer first") + } + + n, err := s.rw.Read(s.data) + if err == nil || err == io.EOF { + s.data = s.data[:n:n] + return s.data, nil + } + + return nil, err +} + +// Close impl interface Deliver +func (s *SHM) Close() { + if s.rw != nil { + s.rw.Close() + } + if s.typ == agent { + shm.Unlink(s.rw.Name()) + } +} + +func shmServer(m Mode, url string, args ...interface{}) *SHM { + if m != Shm { + fmt.Println("this is not a shm mode: ", m) + return nil + } + + var param []int + for _, v := range args { + fmt.Println(v) + switch v.(type) { + case int: + param = append(param, v.(int)) + default: + fmt.Println("shmProducer recv error parameters") + + return nil + } + } + if len(param) != 2 { + fmt.Println("shmProducer recv too much parameter: ", len(param)) + return nil + } + if rw, err := shm.CreateSimplex(url, 0644, param[0], param[1]); err == nil { + return &SHM{ + rw, + agent, + nil, + } + } + + fmt.Println("create simple shm error") + return nil +} + +func shmClient(m Mode, url string, args ...interface{}) *SHM { + + if m != Shm { + fmt.Println("this is not a shm mode: ", m) + return nil + } + + if rw, err := shm.OpenSimplex(url); err == nil { + return &SHM{ + rw, + coactee, + make([]byte, maxRecvSize), + } + } + fmt.Println("shmConsumer open error") + return nil +} -- Gitblit v1.8.0