| | |
| | | package shm |
| | | |
| | | import ( |
| | | "golang.org/x/sys/unix" |
| | | "io" |
| | | "sync/atomic" |
| | | "unsafe" |
| | | |
| | | "github.com/tmthrgd/go-sem" |
| | | "golang.org/x/sys/unix" |
| | | ) |
| | | |
| | | const ( |
| | |
| | | return rw.name |
| | | } |
| | | |
| | | // Unlink removes the shared memory. |
| | | // |
| | | // It is the equivalent to calling Unlink(string) with |
| | | // the same name as Create* or Open*. |
| | | // |
| | | // Taken from shm_unlink(3): |
| | | // The operation of shm_unlink() is analogous to unlink(2): it removes a |
| | | // shared memory object name, and, once all processes have unmapped the |
| | | // object, de-allocates and destroys the contents of the associated memory |
| | | // region. After a successful shm_unlink(), attempts to shm_open() an |
| | | // object with the same name will fail (unless O_CREAT was specified, in |
| | | // which case a new, distinct object is created). |
| | | func (rw *ReadWriteCloser) Unlink() error { |
| | | return Unlink(rw.name) |
| | | // DirectRead create byte in func |
| | | func (rw *ReadWriteCloser) DirectRead() ([]byte, error) { |
| | | buf, err := rw.GetReadBuffer() |
| | | if err != nil { |
| | | return nil, err |
| | | } |
| | | |
| | | data := make([]byte, len(buf.Data)) |
| | | |
| | | copy(data, buf.Data) |
| | | isEOF := buf.Flags[eofFlagIndex]&eofFlagMask != 0 |
| | | |
| | | if err = rw.SendReadBuffer(buf); err != nil { |
| | | return nil, err |
| | | } |
| | | |
| | | if isEOF { |
| | | return data, io.EOF |
| | | } |
| | | |
| | | return data, nil |
| | | } |
| | | |
| | | // Peek get length |
| | | func (rw *ReadWriteCloser) Peek() (n int, err error) { |
| | | buf, err := rw.GetReadBuffer() |
| | | if err != nil { |
| | | return 0, err |
| | | } |
| | | |
| | | return len(buf.Data), nil |
| | | } |
| | | |
| | | // Read |
| | | |
| | | func (rw *ReadWriteCloser) Read(p []byte) (n int, err error) { |
| | | buf, err := rw.GetReadBuffer() |
| | | if err != nil { |
| | |
| | | block = (*sharedBlock)(unsafe.Pointer(blocks + uintptr(uint64(blockIndex)*rw.fullBlockSize))) |
| | | |
| | | if blockIndex == atomic.LoadUint32((*uint32)(&rw.readShared.WriteEnd)) { |
| | | if err := ((*sem.Semaphore)(&rw.readShared.SemSignal)).Wait(); err != nil { |
| | | if err := ((*Semaphore)(&rw.readShared.SemSignal)).Wait(); err != nil { |
| | | return Buffer{}, err |
| | | } |
| | | |
| | |
| | | atomic.CompareAndSwapUint32((*uint32)(&rw.readShared.ReadEnd), blockIndex, uint32(block.Next)) |
| | | |
| | | if uint32(block.Prev) == atomic.LoadUint32((*uint32)(&rw.readShared.WriteStart)) { |
| | | if err := ((*sem.Semaphore)(&rw.readShared.SemAvail)).Post(); err != nil { |
| | | if err := ((*Semaphore)(&rw.readShared.SemAvail)).Post(); err != nil { |
| | | return err |
| | | } |
| | | } |
| | |
| | | block = (*sharedBlock)(unsafe.Pointer(blocks + uintptr(uint64(blockIndex)*rw.fullBlockSize))) |
| | | |
| | | if uint32(block.Next) == atomic.LoadUint32((*uint32)(&rw.writeShared.ReadEnd)) { |
| | | if err := ((*sem.Semaphore)(&rw.writeShared.SemAvail)).Wait(); err != nil { |
| | | if err := ((*Semaphore)(&rw.writeShared.SemAvail)).Wait(); err != nil { |
| | | return Buffer{}, err |
| | | } |
| | | |
| | |
| | | atomic.CompareAndSwapUint32((*uint32)(&rw.writeShared.WriteEnd), blockIndex, uint32(block.Next)) |
| | | |
| | | if blockIndex == atomic.LoadUint32((*uint32)(&rw.writeShared.ReadStart)) { |
| | | if err := ((*sem.Semaphore)(&rw.writeShared.SemSignal)).Post(); err != nil { |
| | | if err := ((*Semaphore)(&rw.writeShared.SemSignal)).Post(); err != nil { |
| | | return len(buf.Data), err |
| | | } |
| | | } |