zhangmeng
2019-05-20 9a89af693b9336633bcac2a652c294f782e6b3b1
add share memory improve effect
1个文件已添加
4个文件已修改
139 ■■■■■ 已修改文件
deliver.go 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
mode.go 11 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
nng.go 14 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
nngmake.go 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
shm.go 108 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
deliver.go
@@ -18,6 +18,8 @@
    if m > ModeStart && m < ModeNNG {
        return nngServer(m, url, args...)
    } else if m == Shm {
        return shmServer(m, url, args...)
    }
    return nil
}
@@ -27,6 +29,8 @@
    if m > ModeStart && m < ModeNNG {
        return nngClient(m, url, args...)
    } else if m == Shm {
        return shmClient(m, url, args...)
    }
    return nil
mode.go
@@ -14,5 +14,16 @@
    Bus
    Pair
    ModeNNG
    Shm
    ModeEnd
)
// type deliver
type td int
const (
    // as server active
    agent = td(iota)
    // as client passive
    coactee
)
nng.go
@@ -21,18 +21,10 @@
    "nanomsg.org/go-mangos/transport/all"
)
// type deliver
type td int
const (
    agent = td(iota)
    coactee
)
// NNG mangos wrap
type NNG struct {
    sock   mangos.Socket
    server bool
    typ  td
    mode   Mode
    url    string
@@ -93,7 +85,7 @@
    rmExistedIpcName(url)
    return &NNG{
        server:    true,
        typ:       agent,
        mode:      m,
        url:       url,
        arguments: args,
@@ -103,7 +95,7 @@
func nngClient(m Mode, url string, args ...interface{}) *NNG {
    return &NNG{
        server:    false,
        typ:       coactee,
        mode:      m,
        url:       url,
        arguments: args,
nngmake.go
@@ -14,7 +14,7 @@
        sock.Close()
        sock = nil
    }
    if n.server {
    if n.typ == agent {
        if err = sock.Listen(n.url); err != nil {
            sock.Close()
            sock = nil
shm.go
New file
@@ -0,0 +1,108 @@
package deliver
import (
    "errors"
    "fmt"
    "io"
    "github.com/tmthrgd/shm-go"
)
// SHM share memory
type SHM struct {
    rw   *shm.ReadWriteCloser
    typ  td
    data []byte
}
// Send impl interface Diliver
func (s *SHM) Send(data []byte) error {
    if s.rw == nil {
        return errors.New("please init shm producer first")
    }
    n, err := s.rw.Write(data)
    if n < 1 {
        fmt.Println("recv data less than 1 length")
    }
    return err
}
// Recv impl interface Diliver
func (s *SHM) Recv() ([]byte, error) {
    if s.rw == nil {
        return nil, errors.New("please open shm consumer first")
    }
    n, err := s.rw.Read(s.data)
    if err == nil || err == io.EOF {
        s.data = s.data[:n:n]
        return s.data, nil
    }
    return nil, err
}
// Close impl interface Deliver
func (s *SHM) Close() {
    if s.rw != nil {
        s.rw.Close()
    }
    if s.typ == agent {
        shm.Unlink(s.rw.Name())
    }
}
func shmServer(m Mode, url string, args ...interface{}) *SHM {
    if m != Shm {
        fmt.Println("this is not a shm mode: ", m)
        return nil
    }
    var param []int
    for _, v := range args {
        fmt.Println(v)
        switch v.(type) {
        case int:
            param = append(param, v.(int))
        default:
            fmt.Println("shmProducer recv error parameters")
            return nil
        }
    }
    if len(param) != 2 {
        fmt.Println("shmProducer recv too much parameter: ", len(param))
        return nil
    }
    if rw, err := shm.CreateSimplex(url, 0644, param[0], param[1]); err == nil {
        return &SHM{
            rw,
            agent,
            nil,
        }
    }
    fmt.Println("create simple shm error")
    return nil
}
func shmClient(m Mode, url string, args ...interface{}) *SHM {
    if m != Shm {
        fmt.Println("this is not a shm mode: ", m)
        return nil
    }
    if rw, err := shm.OpenSimplex(url); err == nil {
        return &SHM{
            rw,
            coactee,
            make([]byte, maxRecvSize),
        }
    }
    fmt.Println("shmConsumer open error")
    return nil
}