From 3bd1f29975c0eaa6af8c99776b099faafbbfc250 Mon Sep 17 00:00:00 2001
From: zhangmeng <775834166@qq.com>
Date: 星期一, 26 八月 2019 10:36:48 +0800
Subject: [PATCH] init copy from github
---
net/listener.go | 53 +++
net/dialer.go | 53 +++
shared.go | 64 ++++
readwriter.go | 321 ++++++++++++++++++++
create.go | 162 ++++++++++
net/conn.go | 46 ++
shared_linux_386.go | 37 ++
shared_linux_amd64.go | 37 ++
unlink.go | 21 +
open.go | 107 ++++++
shared_linux.go | 11
errors.go | 14
net/addr.go | 16 +
13 files changed, 942 insertions(+), 0 deletions(-)
diff --git a/create.go b/create.go
new file mode 100644
index 0000000..8fa3080
--- /dev/null
+++ b/create.go
@@ -0,0 +1,162 @@
+// Copyright 2016 Tom Thorogood. All rights reserved.
+// Use of this source code is governed by a
+// Modified BSD License license that can be found in
+// the LICENSE file.
+
+package shm
+
+import (
+ "golang.org/x/sys/unix"
+ "os"
+ "sync/atomic"
+ "unsafe"
+
+ "github.com/tmthrgd/go-sem"
+ "github.com/tmthrgd/go-shm"
+)
+
+func CreateSimplex(name string, perm os.FileMode, blockCount, blockSize int) (*ReadWriteCloser, error) {
+ if blockSize&0x3f != 0 {
+ return nil, ErrNotMultipleOf64
+ }
+
+ file, err := shm.Open(name, unix.O_CREAT|unix.O_EXCL|unix.O_TRUNC|unix.O_RDWR, perm)
+ if err != nil {
+ return nil, err
+ }
+
+ defer file.Close()
+
+ fullBlockSize := blockHeaderSize + uint64(blockSize)
+ size := sharedHeaderSize + fullBlockSize*uint64(blockCount)
+
+ if err = file.Truncate(int64(size)); err != nil {
+ return nil, err
+ }
+
+ data, err := unix.Mmap(int(file.Fd()), 0, int(size), unix.PROT_READ|unix.PROT_WRITE, unix.MAP_SHARED)
+ if err != nil {
+ return nil, err
+ }
+
+ shared := (*sharedMem)(unsafe.Pointer(&data[0]))
+
+ /*
+ * memset already set:
+ * shared.ReadStart, shared.ReadEnd = 0, 0
+ * shared.WriteStart, shared.WriteEnd = 0, 0
+ * shared.block[i].Size = 0
+ * shared.block[i].DoneRead, shared.block[i].DoneWrite = 0, 0
+ */
+ *(*uint32)(&shared.BlockCount), *(*uint64)(&shared.BlockSize) = uint32(blockCount), uint64(blockSize)
+
+ if err = ((*sem.Semaphore)(&shared.SemSignal)).Init(0); err != nil {
+ return nil, err
+ }
+
+ if err = ((*sem.Semaphore)(&shared.SemAvail)).Init(0); err != nil {
+ return nil, err
+ }
+
+ for i := uint32(0); i < uint32(blockCount); i++ {
+ block := (*sharedBlock)(unsafe.Pointer(&data[sharedHeaderSize+uint64(i)*fullBlockSize]))
+
+ switch i {
+ case 0:
+ block.Next, *(*uint32)(&block.Prev) = 1, uint32(blockCount-1)
+ case uint32(blockCount - 1):
+ block.Next, *(*uint32)(&block.Prev) = 0, uint32(blockCount-2)
+ default:
+ *(*uint32)(&block.Next), *(*uint32)(&block.Prev) = i+1, i-1
+ }
+ }
+
+ atomic.StoreUint32((*uint32)(&shared.Version), version)
+
+ return &ReadWriteCloser{
+ name: name,
+
+ data: data,
+ readShared: shared,
+ writeShared: shared,
+ size: size,
+ fullBlockSize: fullBlockSize,
+
+ Flags: (*[len(shared.Flags)]uint32)(unsafe.Pointer(&shared.Flags[0])),
+ }, nil
+}
+
+func CreateDuplex(name string, perm os.FileMode, blockCount, blockSize int) (*ReadWriteCloser, error) {
+ if blockSize&0x3f != 0 {
+ return nil, ErrNotMultipleOf64
+ }
+
+ file, err := shm.Open(name, unix.O_CREAT|unix.O_EXCL|unix.O_TRUNC|unix.O_RDWR, perm)
+ if err != nil {
+ return nil, err
+ }
+
+ defer file.Close()
+
+ fullBlockSize := blockHeaderSize + uint64(blockSize)
+ sharedSize := sharedHeaderSize + fullBlockSize*uint64(blockCount)
+ size := 2 * sharedSize
+
+ if err = file.Truncate(int64(size)); err != nil {
+ return nil, err
+ }
+
+ data, err := unix.Mmap(int(file.Fd()), 0, int(size), unix.PROT_READ|unix.PROT_WRITE, unix.MAP_SHARED)
+ if err != nil {
+ return nil, err
+ }
+
+ for i := uint64(0); i < 2; i++ {
+ shared := (*sharedMem)(unsafe.Pointer(&data[i*sharedSize]))
+
+ /*
+ * memset already set:
+ * shared.ReadStart, shared.ReadEnd = 0, 0
+ * shared.WriteStart, shared.WriteEnd = 0, 0
+ * shared.Blocks[i].Size = 0
+ * shared.Blocks[i].DoneRead, shared.Blocks[i].DoneWrite = 0, 0
+ */
+ *(*uint32)(&shared.BlockCount), *(*uint64)(&shared.BlockSize) = uint32(blockCount), uint64(blockSize)
+
+ if err = ((*sem.Semaphore)(&shared.SemSignal)).Init(0); err != nil {
+ return nil, err
+ }
+
+ if err = ((*sem.Semaphore)(&shared.SemAvail)).Init(0); err != nil {
+ return nil, err
+ }
+
+ for j := uint32(0); j < uint32(blockCount); j++ {
+ block := (*sharedBlock)(unsafe.Pointer(&data[i*sharedSize+sharedHeaderSize+uint64(j)*fullBlockSize]))
+
+ switch j {
+ case 0:
+ block.Next, *(*uint32)(&block.Prev) = 1, uint32(blockCount-1)
+ case uint32(blockCount - 1):
+ block.Next, *(*uint32)(&block.Prev) = 0, uint32(blockCount-2)
+ default:
+ *(*uint32)(&block.Next), *(*uint32)(&block.Prev) = j+1, j-1
+ }
+ }
+ }
+
+ readShared := (*sharedMem)(unsafe.Pointer(&data[0]))
+ atomic.StoreUint32((*uint32)(&readShared.Version), version)
+
+ return &ReadWriteCloser{
+ name: name,
+
+ data: data,
+ readShared: readShared,
+ writeShared: (*sharedMem)(unsafe.Pointer(&data[sharedSize])),
+ size: size,
+ fullBlockSize: fullBlockSize,
+
+ Flags: (*[len(readShared.Flags)]uint32)(unsafe.Pointer(&readShared.Flags[0])),
+ }, nil
+}
diff --git a/errors.go b/errors.go
new file mode 100644
index 0000000..9f2baf0
--- /dev/null
+++ b/errors.go
@@ -0,0 +1,14 @@
+// Copyright 2016 Tom Thorogood. All rights reserved.
+// Use of this source code is governed by a
+// Modified BSD License license that can be found in
+// the LICENSE file.
+
+package shm
+
+import "errors"
+
+var (
+ ErrInvalidSharedMemory = errors.New("invalid shared memory")
+ ErrNotMultipleOf64 = errors.New("blockSize is not a multiple of 64")
+ ErrInvalidBuffer = errors.New("invalid buffer")
+)
diff --git a/net/addr.go b/net/addr.go
new file mode 100644
index 0000000..4924376
--- /dev/null
+++ b/net/addr.go
@@ -0,0 +1,16 @@
+// Copyright 2016 Tom Thorogood. All rights reserved.
+// Use of this source code is governed by a
+// Modified BSD License license that can be found in
+// the LICENSE file.
+
+package net
+
+type addr string
+
+func (addr) Network() string {
+ return "shm"
+}
+
+func (a addr) String() string {
+ return string(a)
+}
diff --git a/net/conn.go b/net/conn.go
new file mode 100644
index 0000000..36a5e23
--- /dev/null
+++ b/net/conn.go
@@ -0,0 +1,46 @@
+// Copyright 2016 Tom Thorogood. All rights reserved.
+// Use of this source code is governed by a
+// Modified BSD License license that can be found in
+// the LICENSE file.
+
+package net
+
+import (
+ "net"
+ "sync"
+ "time"
+
+ "github.com/tmthrgd/shm-go"
+)
+
+type Conn struct {
+ *shm.ReadWriteCloser
+ name string
+
+ mut *sync.Mutex
+}
+
+func (c *Conn) Close() error {
+ c.mut.Unlock()
+ return nil
+}
+
+func (c *Conn) LocalAddr() net.Addr {
+ return addr(c.name)
+}
+
+func (c *Conn) RemoteAddr() net.Addr {
+ return addr(c.name)
+}
+
+func (c *Conn) SetDeadline(t time.Time) error {
+ return nil
+}
+
+func (c *Conn) SetReadDeadline(t time.Time) error {
+ return nil
+}
+
+func (c *Conn) SetWriteDeadline(t time.Time) error {
+ return nil
+}
diff --git a/net/dialer.go b/net/dialer.go
new file mode 100644
index 0000000..1d3cefb
--- /dev/null
+++ b/net/dialer.go
@@ -0,0 +1,53 @@
+// Copyright 2016 Tom Thorogood. All rights reserved.
+// Use of this source code is governed by a
+// Modified BSD License license that can be found in
+// the LICENSE file.
+
+package net
+
+import (
+ "errors"
+ "net"
+ "sync"
+
+ "github.com/tmthrgd/shm-go"
+)
+
+type Dialer struct {
+ rw *shm.ReadWriteCloser
+ name string
+
+ mut sync.Mutex
+}
+
+func Dial(name string) (net.Conn, error) {
+ rw, err := shm.OpenDuplex(name)
+ if err != nil {
+ return nil, err
+ }
+
+ return (&Dialer{
+ rw: rw,
+ name: name,
+ }).Dial("shm", name)
+}
+
+func NewDialer(rw *shm.ReadWriteCloser, name string) *Dialer {
+ return &Dialer{
+ rw: rw,
+ name: name,
+ }
+}
+
+func (d *Dialer) Dial(network, address string) (net.Conn, error) {
+ if network != "shm" {
+ return nil, errors.New("unrecognised network")
+ }
+
+ if address != d.name {
+ return nil, errors.New("invalid address")
+ }
+
+ d.mut.Lock()
+ return &Conn{d.rw, d.name, &d.mut}, nil
+}
diff --git a/net/listener.go b/net/listener.go
new file mode 100644
index 0000000..4039490
--- /dev/null
+++ b/net/listener.go
@@ -0,0 +1,53 @@
+// Copyright 2016 Tom Thorogood. All rights reserved.
+// Use of this source code is governed by a
+// Modified BSD License license that can be found in
+// the LICENSE file.
+
+package net
+
+import (
+ "net"
+ "os"
+ "sync"
+
+ "github.com/tmthrgd/shm-go"
+)
+
+type Listener struct {
+ rw *shm.ReadWriteCloser
+ name string
+
+ mut sync.Mutex
+}
+
+func Listen(name string, perm os.FileMode, blockCount, blockSize int) (*Listener, error) {
+ rw, err := shm.CreateDuplex(name, perm, blockCount, blockSize)
+ if err != nil {
+ return nil, err
+ }
+
+ return &Listener{
+ rw: rw,
+ name: name,
+ }, nil
+}
+
+func NewListener(rw *shm.ReadWriteCloser, name string) *Listener {
+ return &Listener{
+ rw: rw,
+ name: name,
+ }
+}
+
+func (l *Listener) Accept() (net.Conn, error) {
+ l.mut.Lock()
+ return &Conn{l.rw, l.name, &l.mut}, nil
+}
+
+func (l *Listener) Close() error {
+ return l.rw.Close()
+}
+
+func (l *Listener) Addr() net.Addr {
+ return addr(l.name)
+}
diff --git a/open.go b/open.go
new file mode 100644
index 0000000..861bb76
--- /dev/null
+++ b/open.go
@@ -0,0 +1,107 @@
+// Copyright 2016 Tom Thorogood. All rights reserved.
+// Use of this source code is governed by a
+// Modified BSD License license that can be found in
+// the LICENSE file.
+
+package shm
+
+import (
+ "golang.org/x/sys/unix"
+ "sync/atomic"
+ "unsafe"
+
+ "github.com/tmthrgd/go-shm"
+)
+
+func OpenSimplex(name string) (*ReadWriteCloser, error) {
+ file, err := shm.Open(name, unix.O_RDWR, 0)
+ if err != nil {
+ return nil, err
+ }
+
+ defer file.Close()
+
+ data, err := unix.Mmap(int(file.Fd()), 0, sharedHeaderSize, unix.PROT_READ, unix.MAP_SHARED)
+ if err != nil {
+ return nil, err
+ }
+
+ shared := (*sharedMem)(unsafe.Pointer(&data[0]))
+
+ if atomic.LoadUint32((*uint32)(&shared.Version)) != version {
+ return nil, ErrInvalidSharedMemory
+ }
+
+ blockCount, blockSize := uint64(shared.BlockCount), uint64(shared.BlockSize)
+
+ if err = unix.Munmap(data); err != nil {
+ return nil, err
+ }
+
+ size := sharedHeaderSize + (blockHeaderSize+blockSize)*blockCount
+
+ data, err = unix.Mmap(int(file.Fd()), 0, int(size), unix.PROT_READ|unix.PROT_WRITE, unix.MAP_SHARED)
+ if err != nil {
+ return nil, err
+ }
+
+ shared = (*sharedMem)(unsafe.Pointer(&data[0]))
+ return &ReadWriteCloser{
+ name: name,
+
+ data: data,
+ readShared: shared,
+ writeShared: shared,
+ size: size,
+ fullBlockSize: blockHeaderSize + blockSize,
+
+ Flags: (*[len(shared.Flags)]uint32)(unsafe.Pointer(&shared.Flags[0])),
+ }, nil
+}
+
+func OpenDuplex(name string) (*ReadWriteCloser, error) {
+ file, err := shm.Open(name, unix.O_RDWR, 0)
+ if err != nil {
+ return nil, err
+ }
+
+ defer file.Close()
+
+ data, err := unix.Mmap(int(file.Fd()), 0, sharedHeaderSize, unix.PROT_READ, unix.MAP_SHARED)
+ if err != nil {
+ return nil, err
+ }
+
+ shared := (*sharedMem)(unsafe.Pointer(&data[0]))
+
+ if atomic.LoadUint32((*uint32)(&shared.Version)) != version {
+ return nil, ErrInvalidSharedMemory
+ }
+
+ blockCount, blockSize := uint64(shared.BlockCount), uint64(shared.BlockSize)
+
+ if err = unix.Munmap(data); err != nil {
+ return nil, err
+ }
+
+ sharedSize := sharedHeaderSize + (blockHeaderSize+blockSize)*blockCount
+ size := 2 * sharedSize
+
+ data, err = unix.Mmap(int(file.Fd()), 0, int(size), unix.PROT_READ|unix.PROT_WRITE, unix.MAP_SHARED)
+ if err != nil {
+ return nil, err
+ }
+
+ writeShared := (*sharedMem)(unsafe.Pointer(&data[0]))
+ return &ReadWriteCloser{
+ name: name,
+
+ data: data,
+ readShared: (*sharedMem)(unsafe.Pointer(&data[sharedSize])),
+ writeShared: writeShared,
+ size: size,
+ fullBlockSize: blockHeaderSize + blockSize,
+
+ Flags: (*[len(writeShared.Flags)]uint32)(unsafe.Pointer(&writeShared.Flags[0])),
+ }, nil
+}
diff --git a/readwriter.go b/readwriter.go
new file mode 100644
index 0000000..2da3119
--- /dev/null
+++ b/readwriter.go
@@ -0,0 +1,321 @@
+// Copyright 2016 Tom Thorogood. All rights reserved.
+// Use of this source code is governed by a
+// Modified BSD License license that can be found in
+// the LICENSE file.
+
+package shm
+
+import (
+ "golang.org/x/sys/unix"
+ "io"
+ "sync/atomic"
+ "unsafe"
+
+ "github.com/tmthrgd/go-sem"
+)
+
+const (
+ eofFlagIndex = 0
+ eofFlagMask = 0x01
+)
+
+type Buffer struct {
+ block *sharedBlock
+ write bool
+
+ Data []byte
+ Flags *[blockFlagsSize]byte
+}
+
+type ReadWriteCloser struct {
+ name string
+
+ data []byte
+ readShared *sharedMem
+ writeShared *sharedMem
+ size uint64
+ fullBlockSize uint64
+
+ // Must be accessed using atomic operations
+ Flags *[sharedFlagsSize]uint32
+
+ closed uint32
+}
+
+func (rw *ReadWriteCloser) Close() error {
+ if !atomic.CompareAndSwapUint32(&rw.closed, 0, 1) {
+ return nil
+ }
+
+ // finish all sends before close!
+
+ return unix.Munmap(rw.data)
+}
+
+// Name returns the name of the shared memory.
+func (rw *ReadWriteCloser) Name() string {
+ return rw.name
+}
+
+// Unlink removes the shared memory.
+//
+// It is the equivalent to calling Unlink(string) with
+// the same name as Create* or Open*.
+//
+// Taken from shm_unlink(3):
+// The operation of shm_unlink() is analogous to unlink(2): it removes a
+// shared memory object name, and, once all processes have unmapped the
+// object, de-allocates and destroys the contents of the associated memory
+// region. After a successful shm_unlink(), attempts to shm_open() an
+// object with the same name will fail (unless O_CREAT was specified, in
+// which case a new, distinct object is created).
+func (rw *ReadWriteCloser) Unlink() error {
+ return Unlink(rw.name)
+}
+
+// Read
+
+func (rw *ReadWriteCloser) Read(p []byte) (n int, err error) {
+ buf, err := rw.GetReadBuffer()
+ if err != nil {
+ return 0, err
+ }
+
+ n = copy(p, buf.Data)
+ isEOF := buf.Flags[eofFlagIndex]&eofFlagMask != 0
+
+ if err = rw.SendReadBuffer(buf); err != nil {
+ return n, err
+ }
+
+ if isEOF {
+ return n, io.EOF
+ }
+
+ return n, nil
+}
+
+func (rw *ReadWriteCloser) WriteTo(w io.Writer) (n int64, err error) {
+ for {
+ buf, err := rw.GetReadBuffer()
+ if err != nil {
+ return n, err
+ }
+
+ nn, err := w.Write(buf.Data)
+ n += int64(nn)
+
+ isEOF := buf.Flags[eofFlagIndex]&eofFlagMask != 0
+
+ if putErr := rw.SendReadBuffer(buf); putErr != nil {
+ return n, putErr
+ }
+
+ if err != nil || isEOF {
+ return n, err
+ }
+ }
+}
+
+func (rw *ReadWriteCloser) GetReadBuffer() (Buffer, error) {
+ if atomic.LoadUint32(&rw.closed) != 0 {
+ return Buffer{}, io.ErrClosedPipe
+ }
+
+ var block *sharedBlock
+
+ blocks := uintptr(unsafe.Pointer(rw.readShared)) + sharedHeaderSize
+
+ for {
+ blockIndex := atomic.LoadUint32((*uint32)(&rw.readShared.ReadStart))
+ if blockIndex > uint32(rw.readShared.BlockCount) {
+ return Buffer{}, ErrInvalidSharedMemory
+ }
+
+ block = (*sharedBlock)(unsafe.Pointer(blocks + uintptr(uint64(blockIndex)*rw.fullBlockSize)))
+
+ if blockIndex == atomic.LoadUint32((*uint32)(&rw.readShared.WriteEnd)) {
+ if err := ((*sem.Semaphore)(&rw.readShared.SemSignal)).Wait(); err != nil {
+ return Buffer{}, err
+ }
+
+ continue
+ }
+
+ if atomic.CompareAndSwapUint32((*uint32)(&rw.readShared.ReadStart), blockIndex, uint32(block.Next)) {
+ break
+ }
+ }
+
+ data := (*[1 << 30]byte)(unsafe.Pointer(uintptr(unsafe.Pointer(block)) + blockHeaderSize))
+ flags := (*[len(block.Flags)]byte)(unsafe.Pointer(&block.Flags[0]))
+ return Buffer{
+ block: block,
+
+ Data: data[:block.Size:rw.readShared.BlockSize],
+ Flags: flags,
+ }, nil
+}
+
+func (rw *ReadWriteCloser) SendReadBuffer(buf Buffer) error {
+ if atomic.LoadUint32(&rw.closed) != 0 {
+ return io.ErrClosedPipe
+ }
+
+ if buf.write {
+ return ErrInvalidBuffer
+ }
+
+ block := buf.block
+
+ atomic.StoreUint32((*uint32)(&block.DoneRead), 1)
+
+ blocks := uintptr(unsafe.Pointer(rw.readShared)) + sharedHeaderSize
+
+ for {
+ blockIndex := atomic.LoadUint32((*uint32)(&rw.readShared.ReadEnd))
+ if blockIndex > uint32(rw.readShared.BlockCount) {
+ return ErrInvalidSharedMemory
+ }
+
+ block = (*sharedBlock)(unsafe.Pointer(blocks + uintptr(uint64(blockIndex)*rw.fullBlockSize)))
+
+ if !atomic.CompareAndSwapUint32((*uint32)(&block.DoneRead), 1, 0) {
+ return nil
+ }
+
+ atomic.CompareAndSwapUint32((*uint32)(&rw.readShared.ReadEnd), blockIndex, uint32(block.Next))
+
+ if uint32(block.Prev) == atomic.LoadUint32((*uint32)(&rw.readShared.WriteStart)) {
+ if err := ((*sem.Semaphore)(&rw.readShared.SemAvail)).Post(); err != nil {
+ return err
+ }
+ }
+ }
+}
+
+// Write
+
+func (rw *ReadWriteCloser) Write(p []byte) (n int, err error) {
+ buf, err := rw.GetWriteBuffer()
+ if err != nil {
+ return 0, err
+ }
+
+ n = copy(buf.Data[:cap(buf.Data)], p)
+ buf.Data = buf.Data[:n]
+
+ buf.Flags[eofFlagIndex] |= eofFlagMask
+
+ _, err = rw.SendWriteBuffer(buf)
+ return n, err
+}
+
+func (rw *ReadWriteCloser) ReadFrom(r io.Reader) (n int64, err error) {
+ for {
+ buf, err := rw.GetWriteBuffer()
+ if err != nil {
+ return n, err
+ }
+
+ nn, err := r.Read(buf.Data[:cap(buf.Data)])
+ buf.Data = buf.Data[:nn]
+ n += int64(nn)
+
+ if err == io.EOF {
+ buf.Flags[eofFlagIndex] |= eofFlagMask
+ } else {
+ buf.Flags[eofFlagIndex] &^= eofFlagMask
+ }
+
+ if _, putErr := rw.SendWriteBuffer(buf); putErr != nil {
+ return n, err
+ }
+
+ if err == io.EOF {
+ return n, nil
+ } else if err != nil {
+ return n, err
+ }
+ }
+}
+
+func (rw *ReadWriteCloser) GetWriteBuffer() (Buffer, error) {
+ if atomic.LoadUint32(&rw.closed) != 0 {
+ return Buffer{}, io.ErrClosedPipe
+ }
+
+ var block *sharedBlock
+
+ blocks := uintptr(unsafe.Pointer(rw.writeShared)) + sharedHeaderSize
+
+ for {
+ blockIndex := atomic.LoadUint32((*uint32)(&rw.writeShared.WriteStart))
+ if blockIndex > uint32(rw.writeShared.BlockCount) {
+ return Buffer{}, ErrInvalidSharedMemory
+ }
+
+ block = (*sharedBlock)(unsafe.Pointer(blocks + uintptr(uint64(blockIndex)*rw.fullBlockSize)))
+
+ if uint32(block.Next) == atomic.LoadUint32((*uint32)(&rw.writeShared.ReadEnd)) {
+ if err := ((*sem.Semaphore)(&rw.writeShared.SemAvail)).Wait(); err != nil {
+ return Buffer{}, err
+ }
+
+ continue
+ }
+
+ if atomic.CompareAndSwapUint32((*uint32)(&rw.writeShared.WriteStart), blockIndex, uint32(block.Next)) {
+ break
+ }
+ }
+
+ data := (*[1 << 30]byte)(unsafe.Pointer(uintptr(unsafe.Pointer(block)) + blockHeaderSize))
+ flags := (*[len(block.Flags)]byte)(unsafe.Pointer(&block.Flags[0]))
+ return Buffer{
+ block: block,
+ write: true,
+
+ Data: data[:0:rw.writeShared.BlockSize],
+ Flags: flags,
+ }, nil
+}
+
+func (rw *ReadWriteCloser) SendWriteBuffer(buf Buffer) (n int, err error) {
+ if atomic.LoadUint32(&rw.closed) != 0 {
+ return 0, io.ErrClosedPipe
+ }
+
+ if !buf.write {
+ return 0, ErrInvalidBuffer
+ }
+
+ block := buf.block
+
+ *(*uint64)(&block.Size) = uint64(len(buf.Data))
+
+ atomic.StoreUint32((*uint32)(&block.DoneWrite), 1)
+
+ blocks := uintptr(unsafe.Pointer(rw.writeShared)) + sharedHeaderSize
+
+ for {
+ blockIndex := atomic.LoadUint32((*uint32)(&rw.writeShared.WriteEnd))
+ if blockIndex > uint32(rw.writeShared.BlockCount) {
+ return len(buf.Data), ErrInvalidSharedMemory
+ }
+
+ block = (*sharedBlock)(unsafe.Pointer(blocks + uintptr(uint64(blockIndex)*rw.fullBlockSize)))
+
+ if !atomic.CompareAndSwapUint32((*uint32)(&block.DoneWrite), 1, 0) {
+ return len(buf.Data), nil
+ }
+
+ atomic.CompareAndSwapUint32((*uint32)(&rw.writeShared.WriteEnd), blockIndex, uint32(block.Next))
+
+ if blockIndex == atomic.LoadUint32((*uint32)(&rw.writeShared.ReadStart)) {
+ if err := ((*sem.Semaphore)(&rw.writeShared.SemSignal)).Post(); err != nil {
+ return len(buf.Data), err
+ }
+ }
+ }
+}
diff --git a/shared.go b/shared.go
new file mode 100644
index 0000000..ad968ff
--- /dev/null
+++ b/shared.go
@@ -0,0 +1,64 @@
+// Copyright 2016 Tom Thorogood. All rights reserved.
+// Use of this source code is governed by a
+// Modified BSD License license that can be found in
+// the LICENSE file.
+
+// +build !linux !386,!amd64
+
+package shm
+
+/*
+#include <stdint.h> // For (u)int*_t
+#include <semaphore.h> // For sem_*
+
+typedef struct {
+ uint32_t Next;
+ uint32_t Prev;
+
+ uint32_t DoneRead;
+ uint32_t DoneWrite;
+
+ uint64_t Size;
+
+ uint8_t Flags[(0x40-(2*2*sizeof(uint32_t)+sizeof(uint64_t))&0x3f)&0x3f];
+
+ uint8_t Data[];
+} shared_block_t;
+
+typedef struct {
+ uint32_t Version;
+ uint32_t __padding0;
+
+ uint32_t BlockCount;
+ uint32_t __padding1;
+
+ uint64_t BlockSize;
+
+ uint32_t ReadStart;
+ uint32_t ReadEnd;
+
+ uint32_t WriteStart;
+ uint32_t WriteEnd;
+
+ sem_t SemSignal;
+ sem_t SemAvail;
+
+ uint32_t Flags[((0x40-(4*2*sizeof(uint32_t)+sizeof(uint64_t)+2*sizeof(sem_t))&0x3f)&0x3f)/4];
+
+ shared_block_t Blocks[];
+} shared_mem_t;
+*/
+import "C"
+
+type sharedBlock C.shared_block_t
+
+type sharedMem C.shared_mem_t
+
+const (
+ sharedHeaderSize = C.sizeof_shared_mem_t
+ sharedFlagsSize = len(sharedMem{}.Flags)
+ blockHeaderSize = C.sizeof_shared_block_t
+ blockFlagsSize = len(sharedBlock{}.Flags)
+
+ version = uint32((^uint(0)>>32)&0x80000000) | 0x00000001
+)
diff --git a/shared_linux.go b/shared_linux.go
new file mode 100644
index 0000000..85c356b
--- /dev/null
+++ b/shared_linux.go
@@ -0,0 +1,11 @@
+// Copyright 2016 Tom Thorogood. All rights reserved.
+// Use of this source code is governed by a
+// Modified BSD License license that can be found in
+// the LICENSE file.
+
+// +build linux
+
+//go:generate sh -c "GOARCH=386 go tool cgo -godefs shared.go | gofmt > shared_linux_386.go"
+//go:generate sh -c "GOARCH=amd64 go tool cgo -godefs shared.go | gofmt > shared_linux_amd64.go"
+
+package shm
diff --git a/shared_linux_386.go b/shared_linux_386.go
new file mode 100644
index 0000000..93ca12b
--- /dev/null
+++ b/shared_linux_386.go
@@ -0,0 +1,37 @@
+// Created by cgo -godefs - DO NOT EDIT
+// cgo -godefs shared.go
+
+package shm
+
+type sharedBlock struct {
+ Next uint32
+ Prev uint32
+ DoneRead uint32
+ DoneWrite uint32
+ Size uint64
+ Flags [40]uint8
+}
+
+type sharedMem struct {
+ Version uint32
+ X__padding0 uint32
+ BlockCount uint32
+ X__padding1 uint32
+ BlockSize uint64
+ ReadStart uint32
+ ReadEnd uint32
+ WriteStart uint32
+ WriteEnd uint32
+ SemSignal [16]byte
+ SemAvail [16]byte
+ Flags [14]uint32
+}
+
+const (
+ sharedHeaderSize = 0x80
+ sharedFlagsSize = len(sharedMem{}.Flags)
+ blockHeaderSize = 0x40
+ blockFlagsSize = len(sharedBlock{}.Flags)
+
+ version = uint32((^uint(0)>>32)&0x80000000) | 0x00000001
+)
diff --git a/shared_linux_amd64.go b/shared_linux_amd64.go
new file mode 100644
index 0000000..3774171
--- /dev/null
+++ b/shared_linux_amd64.go
@@ -0,0 +1,37 @@
+// Created by cgo -godefs - DO NOT EDIT
+// cgo -godefs shared.go
+
+package shm
+
+type sharedBlock struct {
+ Next uint32
+ Prev uint32
+ DoneRead uint32
+ DoneWrite uint32
+ Size uint64
+ Flags [40]uint8
+}
+
+type sharedMem struct {
+ Version uint32
+ X__padding0 uint32
+ BlockCount uint32
+ X__padding1 uint32
+ BlockSize uint64
+ ReadStart uint32
+ ReadEnd uint32
+ WriteStart uint32
+ WriteEnd uint32
+ SemSignal [32]byte
+ SemAvail [32]byte
+ Flags [6]uint32
+}
+
+const (
+ sharedHeaderSize = 0x80
+ sharedFlagsSize = len(sharedMem{}.Flags)
+ blockHeaderSize = 0x40
+ blockFlagsSize = len(sharedBlock{}.Flags)
+
+ version = uint32((^uint(0)>>32)&0x80000000) | 0x00000001
+)
diff --git a/unlink.go b/unlink.go
new file mode 100644
index 0000000..dbf252e
--- /dev/null
+++ b/unlink.go
@@ -0,0 +1,21 @@
+// Copyright 2016 Tom Thorogood. All rights reserved.
+// Use of this source code is governed by a
+// Modified BSD License license that can be found in
+// the LICENSE file.
+
+package shm
+
+import "github.com/tmthrgd/go-shm"
+
+// Unlink removes the previously created blocker.
+//
+// Taken from shm_unlink(3):
+// The operation of shm_unlink() is analogous to unlink(2): it removes a
+// shared memory object name, and, once all processes have unmapped the
+// object, de-allocates and destroys the contents of the associated memory
+// region. After a successful shm_unlink(), attempts to shm_open() an
+// object with the same name will fail (unless O_CREAT was specified, in
+// which case a new, distinct object is created).
+func Unlink(name string) error {
+ return shm.Unlink(name)
+}
--
Gitblit v1.8.0