From 775b076861a1d2c260c32befef12770833baeffd Mon Sep 17 00:00:00 2001
From: zhangmeng <775834166@qq.com>
Date: 星期二, 29 十月 2019 11:17:47 +0800
Subject: [PATCH] fixed
---
readwriter.go | 57 ++++++++++++++++++++++++++++++++++++---------------------
1 files changed, 36 insertions(+), 21 deletions(-)
diff --git a/readwriter.go b/readwriter.go
index 2da3119..7649741 100644
--- a/readwriter.go
+++ b/readwriter.go
@@ -6,12 +6,11 @@
package shm
import (
- "golang.org/x/sys/unix"
"io"
"sync/atomic"
"unsafe"
- "github.com/tmthrgd/go-sem"
+ "golang.org/x/sys/unix"
)
const (
@@ -57,24 +56,40 @@
return rw.name
}
-// Unlink removes the shared memory.
-//
-// It is the equivalent to calling Unlink(string) with
-// the same name as Create* or Open*.
-//
-// Taken from shm_unlink(3):
-// The operation of shm_unlink() is analogous to unlink(2): it removes a
-// shared memory object name, and, once all processes have unmapped the
-// object, de-allocates and destroys the contents of the associated memory
-// region. After a successful shm_unlink(), attempts to shm_open() an
-// object with the same name will fail (unless O_CREAT was specified, in
-// which case a new, distinct object is created).
-func (rw *ReadWriteCloser) Unlink() error {
- return Unlink(rw.name)
+// DirectRead create byte in func
+func (rw *ReadWriteCloser) DirectRead() ([]byte, error) {
+ buf, err := rw.GetReadBuffer()
+ if err != nil {
+ return nil, err
+ }
+
+ data := make([]byte, len(buf.Data))
+
+ copy(data, buf.Data)
+ isEOF := buf.Flags[eofFlagIndex]&eofFlagMask != 0
+
+ if err = rw.SendReadBuffer(buf); err != nil {
+ return nil, err
+ }
+
+ if isEOF {
+ return data, io.EOF
+ }
+
+ return data, nil
+}
+
+// Peek get length
+func (rw *ReadWriteCloser) Peek() (n int, err error) {
+ buf, err := rw.GetReadBuffer()
+ if err != nil {
+ return 0, err
+ }
+
+ return len(buf.Data), nil
}
// Read
-
func (rw *ReadWriteCloser) Read(p []byte) (n int, err error) {
buf, err := rw.GetReadBuffer()
if err != nil {
@@ -135,7 +150,7 @@
block = (*sharedBlock)(unsafe.Pointer(blocks + uintptr(uint64(blockIndex)*rw.fullBlockSize)))
if blockIndex == atomic.LoadUint32((*uint32)(&rw.readShared.WriteEnd)) {
- if err := ((*sem.Semaphore)(&rw.readShared.SemSignal)).Wait(); err != nil {
+ if err := ((*Semaphore)(&rw.readShared.SemSignal)).Wait(); err != nil {
return Buffer{}, err
}
@@ -187,7 +202,7 @@
atomic.CompareAndSwapUint32((*uint32)(&rw.readShared.ReadEnd), blockIndex, uint32(block.Next))
if uint32(block.Prev) == atomic.LoadUint32((*uint32)(&rw.readShared.WriteStart)) {
- if err := ((*sem.Semaphore)(&rw.readShared.SemAvail)).Post(); err != nil {
+ if err := ((*Semaphore)(&rw.readShared.SemAvail)).Post(); err != nil {
return err
}
}
@@ -258,7 +273,7 @@
block = (*sharedBlock)(unsafe.Pointer(blocks + uintptr(uint64(blockIndex)*rw.fullBlockSize)))
if uint32(block.Next) == atomic.LoadUint32((*uint32)(&rw.writeShared.ReadEnd)) {
- if err := ((*sem.Semaphore)(&rw.writeShared.SemAvail)).Wait(); err != nil {
+ if err := ((*Semaphore)(&rw.writeShared.SemAvail)).Wait(); err != nil {
return Buffer{}, err
}
@@ -313,7 +328,7 @@
atomic.CompareAndSwapUint32((*uint32)(&rw.writeShared.WriteEnd), blockIndex, uint32(block.Next))
if blockIndex == atomic.LoadUint32((*uint32)(&rw.writeShared.ReadStart)) {
- if err := ((*sem.Semaphore)(&rw.writeShared.SemSignal)).Post(); err != nil {
+ if err := ((*Semaphore)(&rw.writeShared.SemSignal)).Post(); err != nil {
return len(buf.Data), err
}
}
--
Gitblit v1.8.0