From a0123f163eddcea3e6b9f9d36f1f3fb3aa2c835a Mon Sep 17 00:00:00 2001 From: zhangmeng <775834166@qq.com> Date: 星期一, 26 八月 2019 13:46:34 +0800 Subject: [PATCH] update --- /dev/null | 21 ---- create.go | 16 +- sem.go | 114 ++++++++++++++++++++++ shm.go | 83 ++++++++++++++++ open.go | 7 readwriter.go | 27 +---- 6 files changed, 212 insertions(+), 56 deletions(-) diff --git a/create.go b/create.go index 8fa3080..640429b 100644 --- a/create.go +++ b/create.go @@ -6,13 +6,11 @@ package shm import ( - "golang.org/x/sys/unix" "os" "sync/atomic" "unsafe" - "github.com/tmthrgd/go-sem" - "github.com/tmthrgd/go-shm" + "golang.org/x/sys/unix" ) func CreateSimplex(name string, perm os.FileMode, blockCount, blockSize int) (*ReadWriteCloser, error) { @@ -20,7 +18,7 @@ return nil, ErrNotMultipleOf64 } - file, err := shm.Open(name, unix.O_CREAT|unix.O_EXCL|unix.O_TRUNC|unix.O_RDWR, perm) + file, err := Open(name, unix.O_CREAT|unix.O_EXCL|unix.O_TRUNC|unix.O_RDWR, perm) if err != nil { return nil, err } @@ -50,11 +48,11 @@ */ *(*uint32)(&shared.BlockCount), *(*uint64)(&shared.BlockSize) = uint32(blockCount), uint64(blockSize) - if err = ((*sem.Semaphore)(&shared.SemSignal)).Init(0); err != nil { + if err = ((*Semaphore)(&shared.SemSignal)).Init(0); err != nil { return nil, err } - if err = ((*sem.Semaphore)(&shared.SemAvail)).Init(0); err != nil { + if err = ((*Semaphore)(&shared.SemAvail)).Init(0); err != nil { return nil, err } @@ -91,7 +89,7 @@ return nil, ErrNotMultipleOf64 } - file, err := shm.Open(name, unix.O_CREAT|unix.O_EXCL|unix.O_TRUNC|unix.O_RDWR, perm) + file, err := Open(name, unix.O_CREAT|unix.O_EXCL|unix.O_TRUNC|unix.O_RDWR, perm) if err != nil { return nil, err } @@ -123,11 +121,11 @@ */ *(*uint32)(&shared.BlockCount), *(*uint64)(&shared.BlockSize) = uint32(blockCount), uint64(blockSize) - if err = ((*sem.Semaphore)(&shared.SemSignal)).Init(0); err != nil { + if err = ((*Semaphore)(&shared.SemSignal)).Init(0); err != nil { return nil, err } - if err = ((*sem.Semaphore)(&shared.SemAvail)).Init(0); err != nil { + if err = ((*Semaphore)(&shared.SemAvail)).Init(0); err != nil { return nil, err } diff --git a/net/addr.go b/net/addr.go deleted file mode 100644 index 4924376..0000000 --- a/net/addr.go +++ /dev/null @@ -1,16 +0,0 @@ -// Copyright 2016 Tom Thorogood. All rights reserved. -// Use of this source code is governed by a -// Modified BSD License license that can be found in -// the LICENSE file. - -package net - -type addr string - -func (addr) Network() string { - return "shm" -} - -func (a addr) String() string { - return string(a) -} diff --git a/net/conn.go b/net/conn.go deleted file mode 100644 index 36a5e23..0000000 --- a/net/conn.go +++ /dev/null @@ -1,46 +0,0 @@ -// Copyright 2016 Tom Thorogood. All rights reserved. -// Use of this source code is governed by a -// Modified BSD License license that can be found in -// the LICENSE file. - -package net - -import ( - "net" - "sync" - "time" - - "github.com/tmthrgd/shm-go" -) - -type Conn struct { - *shm.ReadWriteCloser - name string - - mut *sync.Mutex -} - -func (c *Conn) Close() error { - c.mut.Unlock() - return nil -} - -func (c *Conn) LocalAddr() net.Addr { - return addr(c.name) -} - -func (c *Conn) RemoteAddr() net.Addr { - return addr(c.name) -} - -func (c *Conn) SetDeadline(t time.Time) error { - return nil -} - -func (c *Conn) SetReadDeadline(t time.Time) error { - return nil -} - -func (c *Conn) SetWriteDeadline(t time.Time) error { - return nil -} diff --git a/net/dialer.go b/net/dialer.go deleted file mode 100644 index 1d3cefb..0000000 --- a/net/dialer.go +++ /dev/null @@ -1,53 +0,0 @@ -// Copyright 2016 Tom Thorogood. All rights reserved. -// Use of this source code is governed by a -// Modified BSD License license that can be found in -// the LICENSE file. - -package net - -import ( - "errors" - "net" - "sync" - - "github.com/tmthrgd/shm-go" -) - -type Dialer struct { - rw *shm.ReadWriteCloser - name string - - mut sync.Mutex -} - -func Dial(name string) (net.Conn, error) { - rw, err := shm.OpenDuplex(name) - if err != nil { - return nil, err - } - - return (&Dialer{ - rw: rw, - name: name, - }).Dial("shm", name) -} - -func NewDialer(rw *shm.ReadWriteCloser, name string) *Dialer { - return &Dialer{ - rw: rw, - name: name, - } -} - -func (d *Dialer) Dial(network, address string) (net.Conn, error) { - if network != "shm" { - return nil, errors.New("unrecognised network") - } - - if address != d.name { - return nil, errors.New("invalid address") - } - - d.mut.Lock() - return &Conn{d.rw, d.name, &d.mut}, nil -} diff --git a/net/listener.go b/net/listener.go deleted file mode 100644 index 4039490..0000000 --- a/net/listener.go +++ /dev/null @@ -1,53 +0,0 @@ -// Copyright 2016 Tom Thorogood. All rights reserved. -// Use of this source code is governed by a -// Modified BSD License license that can be found in -// the LICENSE file. - -package net - -import ( - "net" - "os" - "sync" - - "github.com/tmthrgd/shm-go" -) - -type Listener struct { - rw *shm.ReadWriteCloser - name string - - mut sync.Mutex -} - -func Listen(name string, perm os.FileMode, blockCount, blockSize int) (*Listener, error) { - rw, err := shm.CreateDuplex(name, perm, blockCount, blockSize) - if err != nil { - return nil, err - } - - return &Listener{ - rw: rw, - name: name, - }, nil -} - -func NewListener(rw *shm.ReadWriteCloser, name string) *Listener { - return &Listener{ - rw: rw, - name: name, - } -} - -func (l *Listener) Accept() (net.Conn, error) { - l.mut.Lock() - return &Conn{l.rw, l.name, &l.mut}, nil -} - -func (l *Listener) Close() error { - return l.rw.Close() -} - -func (l *Listener) Addr() net.Addr { - return addr(l.name) -} diff --git a/open.go b/open.go index 861bb76..a63b4b5 100644 --- a/open.go +++ b/open.go @@ -6,15 +6,14 @@ package shm import ( - "golang.org/x/sys/unix" "sync/atomic" "unsafe" - "github.com/tmthrgd/go-shm" + "golang.org/x/sys/unix" ) func OpenSimplex(name string) (*ReadWriteCloser, error) { - file, err := shm.Open(name, unix.O_RDWR, 0) + file, err := Open(name, unix.O_RDWR, 0) if err != nil { return nil, err } @@ -60,7 +59,7 @@ } func OpenDuplex(name string) (*ReadWriteCloser, error) { - file, err := shm.Open(name, unix.O_RDWR, 0) + file, err := Open(name, unix.O_RDWR, 0) if err != nil { return nil, err } diff --git a/readwriter.go b/readwriter.go index 2da3119..523a605 100644 --- a/readwriter.go +++ b/readwriter.go @@ -6,12 +6,11 @@ package shm import ( - "golang.org/x/sys/unix" "io" "sync/atomic" "unsafe" - "github.com/tmthrgd/go-sem" + "golang.org/x/sys/unix" ) const ( @@ -55,22 +54,6 @@ // Name returns the name of the shared memory. func (rw *ReadWriteCloser) Name() string { return rw.name -} - -// Unlink removes the shared memory. -// -// It is the equivalent to calling Unlink(string) with -// the same name as Create* or Open*. -// -// Taken from shm_unlink(3): -// The operation of shm_unlink() is analogous to unlink(2): it removes a -// shared memory object name, and, once all processes have unmapped the -// object, de-allocates and destroys the contents of the associated memory -// region. After a successful shm_unlink(), attempts to shm_open() an -// object with the same name will fail (unless O_CREAT was specified, in -// which case a new, distinct object is created). -func (rw *ReadWriteCloser) Unlink() error { - return Unlink(rw.name) } // Read @@ -135,7 +118,7 @@ block = (*sharedBlock)(unsafe.Pointer(blocks + uintptr(uint64(blockIndex)*rw.fullBlockSize))) if blockIndex == atomic.LoadUint32((*uint32)(&rw.readShared.WriteEnd)) { - if err := ((*sem.Semaphore)(&rw.readShared.SemSignal)).Wait(); err != nil { + if err := ((*Semaphore)(&rw.readShared.SemSignal)).Wait(); err != nil { return Buffer{}, err } @@ -187,7 +170,7 @@ atomic.CompareAndSwapUint32((*uint32)(&rw.readShared.ReadEnd), blockIndex, uint32(block.Next)) if uint32(block.Prev) == atomic.LoadUint32((*uint32)(&rw.readShared.WriteStart)) { - if err := ((*sem.Semaphore)(&rw.readShared.SemAvail)).Post(); err != nil { + if err := ((*Semaphore)(&rw.readShared.SemAvail)).Post(); err != nil { return err } } @@ -258,7 +241,7 @@ block = (*sharedBlock)(unsafe.Pointer(blocks + uintptr(uint64(blockIndex)*rw.fullBlockSize))) if uint32(block.Next) == atomic.LoadUint32((*uint32)(&rw.writeShared.ReadEnd)) { - if err := ((*sem.Semaphore)(&rw.writeShared.SemAvail)).Wait(); err != nil { + if err := ((*Semaphore)(&rw.writeShared.SemAvail)).Wait(); err != nil { return Buffer{}, err } @@ -313,7 +296,7 @@ atomic.CompareAndSwapUint32((*uint32)(&rw.writeShared.WriteEnd), blockIndex, uint32(block.Next)) if blockIndex == atomic.LoadUint32((*uint32)(&rw.writeShared.ReadStart)) { - if err := ((*sem.Semaphore)(&rw.writeShared.SemSignal)).Post(); err != nil { + if err := ((*Semaphore)(&rw.writeShared.SemSignal)).Post(); err != nil { return len(buf.Data), err } } diff --git a/sem.go b/sem.go new file mode 100644 index 0000000..ecc24a3 --- /dev/null +++ b/sem.go @@ -0,0 +1,114 @@ +// Created by cgo -godefs - DO NOT EDIT +// cgo -godefs sem_linux.go + +package shm + +import ( + "sync/atomic" + "syscall" + "unsafe" + + "golang.org/x/sys/unix" +) + +type Semaphore [32]byte + +type newSem struct { + Value uint32 + Private int32 + NWaiters uint64 +} + +func New(value uint) (*Semaphore, error) { + sem := new(Semaphore) + + if err := sem.Init(value); err != nil { + return nil, err + } + + return sem, nil +} + +func atomicDecrementIfPositive(mem *uint32) uint32 { + for { + if old := atomic.LoadUint32(mem); old == 0 || atomic.CompareAndSwapUint32(mem, old, old-1) { + return old + } + } +} + +func (sem *Semaphore) Wait() error { + isem := (*newSem)(unsafe.Pointer(sem)) + + if atomicDecrementIfPositive(&isem.Value) > 0 { + return nil + } + + atomic.AddUint64(&isem.NWaiters, 1) + + for { + t := syscall.Timeval{3 /*sec*/, 0 /*usec*/} + if _, _, err := unix.Syscall6(unix.SYS_FUTEX, uintptr(unsafe.Pointer(&isem.Value)), uintptr(0x0), 0, uintptr(unsafe.Pointer(&t)), 0, 0); err != 0 && err != unix.EWOULDBLOCK { + atomic.AddUint64(&isem.NWaiters, ^uint64(0)) + return err + } + + if atomicDecrementIfPositive(&isem.Value) > 0 { + atomic.AddUint64(&isem.NWaiters, ^uint64(0)) + return nil + } + } +} + +func (sem *Semaphore) TryWait() error { + isem := (*newSem)(unsafe.Pointer(sem)) + + if atomicDecrementIfPositive(&isem.Value) > 0 { + return nil + } + + return unix.EAGAIN +} + +func (sem *Semaphore) Post() error { + isem := (*newSem)(unsafe.Pointer(sem)) + + for { + cur := atomic.LoadUint32(&isem.Value) + + if cur == 0x7fffffff { + return unix.EOVERFLOW + } + + if atomic.CompareAndSwapUint32(&isem.Value, cur, cur+1) { + break + } + } + + if atomic.LoadUint64(&isem.NWaiters) <= 0 { + return nil + } + + if _, _, err := unix.Syscall6(unix.SYS_FUTEX, uintptr(unsafe.Pointer(&isem.Value)), uintptr(0x1), 1, 0, 0, 0); err != 0 { + return err + } + + return nil +} + +func (sem *Semaphore) Init(value uint) error { + if value > 0x7fffffff { + return unix.EINVAL + } + + isem := (*newSem)(unsafe.Pointer(sem)) + isem.Value = uint32(value) + isem.Private = 0 + isem.NWaiters = 0 + + return nil +} + +func (sem *Semaphore) Destroy() error { + return nil +} diff --git a/shm.go b/shm.go new file mode 100644 index 0000000..c65cea3 --- /dev/null +++ b/shm.go @@ -0,0 +1,83 @@ +// Copyright 2016 Tom Thorogood. All rights reserved. +// Use of this source code is governed by a +// Modified BSD License license that can be found in +// the LICENSE file. + +// Package shm provides functions to open and unlink shared memory. +package shm + +import ( + "os" + + "golang.org/x/sys/unix" +) + +const devShm = "/dev/shm/" + +// Taken from shm_open(3): +// shm_open() creates and opens a new, or opens an existing, POSIX shared +// memory object. A POSIX shared memory object is in effect a handle which +// can be used by unrelated processes to mmap(2) the same region of shared +// memory. The shm_unlink() function performs the converse operation, +// removing an object previously created by shm_open(). +// +// The operation of shm_open() is analogous to that of open(2). name +// specifies the shared memory object to be created or opened. For +// portable use, a shared memory object should be identified by a name of +// the form /somename; that is, a null-terminated string of up to NAME_MAX +// (i.e., 255) characters consisting of an initial slash, followed by one +// or more characters, none of which are slashes. +func Open(name string, flag int, perm os.FileMode) (*os.File, error) { + fileName := name + + for len(name) != 0 && name[0] == '/' { + name = name[1:] + } + + if len(name) == 0 { + return nil, &os.PathError{Op: "open", Path: fileName, Err: unix.EINVAL} + } + + o := uint32(perm.Perm()) + if perm&os.ModeSetuid != 0 { + o |= unix.S_ISUID + } + if perm&os.ModeSetgid != 0 { + o |= unix.S_ISGID + } + if perm&os.ModeSticky != 0 { + o |= unix.S_ISVTX + } + + fd, err := unix.Open(devShm+name, flag|unix.O_CLOEXEC, o) + if err != nil { + return nil, &os.PathError{Op: "open", Path: fileName, Err: err} + } + + return os.NewFile(uintptr(fd), fileName), nil +} + +// Taken from shm_unlink(3): +// The operation of shm_unlink() is analogous to unlink(2): it removes a +// shared memory object name, and, once all processes have unmapped the +// object, de-allocates and destroys the contents of the associated memory +// region. After a successful shm_unlink(), attempts to shm_open() an +// object with the same name will fail (unless O_CREAT was specified, in +// which case a new, distinct object is created). +func Unlink(name string) error { + fileName := name + + for len(name) != 0 && name[0] == '/' { + name = name[1:] + } + + if len(name) == 0 { + return &os.PathError{Op: "unlink", Path: fileName, Err: unix.EINVAL} + } + + if err := unix.Unlink(devShm + name); err != nil { + return &os.PathError{Op: "unlink", Path: fileName, Err: err} + } + + return nil +} diff --git a/unlink.go b/unlink.go deleted file mode 100644 index dbf252e..0000000 --- a/unlink.go +++ /dev/null @@ -1,21 +0,0 @@ -// Copyright 2016 Tom Thorogood. All rights reserved. -// Use of this source code is governed by a -// Modified BSD License license that can be found in -// the LICENSE file. - -package shm - -import "github.com/tmthrgd/go-shm" - -// Unlink removes the previously created blocker. -// -// Taken from shm_unlink(3): -// The operation of shm_unlink() is analogous to unlink(2): it removes a -// shared memory object name, and, once all processes have unmapped the -// object, de-allocates and destroys the contents of the associated memory -// region. After a successful shm_unlink(), attempts to shm_open() an -// object with the same name will fail (unless O_CREAT was specified, in -// which case a new, distinct object is created). -func Unlink(name string) error { - return shm.Unlink(name) -} -- Gitblit v1.8.0