zhangmeng
2019-10-29 156e610f9bca0581b45c710c1af8cec35db14cfb
readwriter.go
@@ -6,12 +6,11 @@
package shm
import (
   "golang.org/x/sys/unix"
   "io"
   "sync/atomic"
   "unsafe"
   "github.com/tmthrgd/go-sem"
   "golang.org/x/sys/unix"
)
const (
@@ -57,24 +56,40 @@
   return rw.name
}
// Unlink removes the shared memory.
//
// It is the equivalent to calling Unlink(string) with
// the same name as Create* or Open*.
//
// Taken from shm_unlink(3):
//    The  operation  of shm_unlink() is analogous to unlink(2): it removes a
//    shared memory object name, and, once all processes  have  unmapped  the
//    object, de-allocates and destroys the contents of the associated memory
//    region.  After a successful shm_unlink(),  attempts  to  shm_open()  an
//    object  with  the same name will fail (unless O_CREAT was specified, in
//    which case a new, distinct object is created).
func (rw *ReadWriteCloser) Unlink() error {
   return Unlink(rw.name)
// DirectRead create byte in func
func (rw *ReadWriteCloser) DirectRead() ([]byte, error) {
   buf, err := rw.GetReadBuffer()
   if err != nil {
      return nil, err
   }
   data := make([]byte, len(buf.Data))
   copy(data, buf.Data)
   isEOF := buf.Flags[eofFlagIndex]&eofFlagMask != 0
   if err = rw.SendReadBuffer(buf); err != nil {
      return nil, err
   }
   if isEOF {
      return data, io.EOF
   }
   return data, nil
}
// Peek get length
func (rw *ReadWriteCloser) Peek() (n int, err error) {
   buf, err := rw.GetReadBuffer()
   if err != nil {
      return 0, err
   }
   return len(buf.Data), nil
}
// Read
func (rw *ReadWriteCloser) Read(p []byte) (n int, err error) {
   buf, err := rw.GetReadBuffer()
   if err != nil {
@@ -135,7 +150,7 @@
      block = (*sharedBlock)(unsafe.Pointer(blocks + uintptr(uint64(blockIndex)*rw.fullBlockSize)))
      if blockIndex == atomic.LoadUint32((*uint32)(&rw.readShared.WriteEnd)) {
         if err := ((*sem.Semaphore)(&rw.readShared.SemSignal)).Wait(); err != nil {
         if err := ((*Semaphore)(&rw.readShared.SemSignal)).Wait(); err != nil {
            return Buffer{}, err
         }
@@ -187,7 +202,7 @@
      atomic.CompareAndSwapUint32((*uint32)(&rw.readShared.ReadEnd), blockIndex, uint32(block.Next))
      if uint32(block.Prev) == atomic.LoadUint32((*uint32)(&rw.readShared.WriteStart)) {
         if err := ((*sem.Semaphore)(&rw.readShared.SemAvail)).Post(); err != nil {
         if err := ((*Semaphore)(&rw.readShared.SemAvail)).Post(); err != nil {
            return err
         }
      }
@@ -258,7 +273,7 @@
      block = (*sharedBlock)(unsafe.Pointer(blocks + uintptr(uint64(blockIndex)*rw.fullBlockSize)))
      if uint32(block.Next) == atomic.LoadUint32((*uint32)(&rw.writeShared.ReadEnd)) {
         if err := ((*sem.Semaphore)(&rw.writeShared.SemAvail)).Wait(); err != nil {
         if err := ((*Semaphore)(&rw.writeShared.SemAvail)).Wait(); err != nil {
            return Buffer{}, err
         }
@@ -313,7 +328,7 @@
      atomic.CompareAndSwapUint32((*uint32)(&rw.writeShared.WriteEnd), blockIndex, uint32(block.Next))
      if blockIndex == atomic.LoadUint32((*uint32)(&rw.writeShared.ReadStart)) {
         if err := ((*sem.Semaphore)(&rw.writeShared.SemSignal)).Post(); err != nil {
         if err := ((*Semaphore)(&rw.writeShared.SemSignal)).Post(); err != nil {
            return len(buf.Data), err
         }
      }