From ba4809966f2dd740219eca6c4d9208bd95d21d89 Mon Sep 17 00:00:00 2001
From: chenshijun <csj_sky@126.com>
Date: 星期四, 12 三月 2020 17:17:17 +0800
Subject: [PATCH] shmqueue lib 多读多写,多协程,多shmkey均测试通过

---
 shmqueue.go |  368 ++++++++++++++++++++++++++++++++++++++++
 shmwrap.go  |  141 +++++++++++++++
 README.md   |    1 
 3 files changed, 510 insertions(+), 0 deletions(-)

diff --git a/README.md b/README.md
index 9310489..a04ce6b 100644
--- a/README.md
+++ b/README.md
@@ -2,3 +2,4 @@
 
 system v shm lock-free queue
 
+澶氳澶氬啓锛屽鍗忕▼锛屽shmkey鍧囨祴璇曢�氳繃
\ No newline at end of file
diff --git a/shmqueue.go b/shmqueue.go
new file mode 100644
index 0000000..c819298
--- /dev/null
+++ b/shmqueue.go
@@ -0,0 +1,368 @@
+package shmqueue
+
+import (
+	"context"
+	"fmt"
+	"github.com/gen2brain/shm"
+	"reflect"
+	"runtime"
+	"sync/atomic"
+	"time"
+	"unsafe"
+)
+
+const (
+	TimePeriodPutOrGet = time.Duration(10) * time.Microsecond
+)
+
+//Element info
+type ElemInfo struct {
+	ShmId int
+}
+
+//queue info
+type esCache struct {
+	putNo uint32
+	getNo uint32
+	value ElemInfo
+}
+
+// lock free queue struct
+type esQueue struct {
+	capacity uint32
+	capMod   uint32
+	putPos   uint32
+	getPos   uint32
+	cache    []esCache
+}
+
+type EsQueueInfo struct {
+	EsCaches []esCache
+	ShmKey int
+	ShmID int
+	ShmData []byte
+	Queue *esQueue
+}
+
+// ptr2esCache convert unsafe.Pointer to []esCache
+func ptr2esCache(s unsafe.Pointer, size int) []esCache {
+	var x reflect.SliceHeader
+	x.Len = size
+	x.Cap = size
+	x.Data = uintptr(s)
+	return *(*[]esCache)(unsafe.Pointer(&x))
+}
+
+// bytes2shmEsQueue convert []byte to *esQueue
+func bytes2shmEsQueue(b []byte) *esQueue {
+	return (*esQueue)(unsafe.Pointer(
+		(*reflect.SliceHeader)(unsafe.Pointer(&b)).Data,
+	))
+}
+
+// NewQueue new and attach a shm queue if no exist, otherwise return exist
+func NewQueue(ctx context.Context, key int, cap int) *EsQueueInfo {
+	var shmLen uint32
+	var datainfo esCache
+	var shmstruct esQueue
+	var eqi EsQueueInfo
+
+	shmblocks := minQuantity(uint32(cap))
+	shmLen = shmblocks*uint32(unsafe.Sizeof(datainfo)) + uint32(unsafe.Sizeof(shmstruct))
+
+	data, shmid := CreateRawShm(ctx, int(shmLen), key)
+	fmt.Println("shmid:", shmid)
+	fmt.Printf("data:%p\n", data)
+	q := bytes2shmEsQueue(data)
+	fmt.Printf("q:%p\n", q)
+	fmt.Println("esqueue:", q)
+	fmt.Printf("q.capacity:%p\n", &q.capacity)
+	fmt.Printf("q.capMod:%p\n", &q.capMod)
+	fmt.Printf("q.putPos:%p\n", &q.putPos)
+	fmt.Printf("q.getPos:%p\n", &q.getPos)
+	fmt.Printf("q.cache:%p\n", &q.cache)
+	//init parameters
+	q.capacity = shmblocks
+	q.capMod = q.capacity - 1
+	q.putPos = 0
+	q.getPos = 0
+
+	fmt.Println("int(q.capacity)",int(q.capacity))
+	eqi.EsCaches = ptr2esCache(unsafe.Pointer(&q.cache), int(q.capacity))
+	fmt.Printf("EsCaches:%p\n", &eqi.EsCaches)
+	fmt.Printf("EsCaches[0]:%p\n", &(eqi.EsCaches[0]))
+	fmt.Printf("EsCaches[1]:%p\n", &(eqi.EsCaches[1]))
+
+	fmt.Println("int(q.capacity)",int(q.capacity))
+	for i := 0; i < int(q.capacity); i++ {
+		fmt.Println("i", i)
+		fmt.Printf("EsCaches[%d]:%p\n", i,  &(eqi.EsCaches[i]))
+		fmt.Printf("EsCaches[%d].getNo:%p\n", i,  &(eqi.EsCaches[i].getNo))
+		fmt.Printf("EsCaches[%d].putNo:%p\n", i,  &(eqi.EsCaches[i].putNo))
+
+		eqi.EsCaches[i].getNo = uint32(i)
+		eqi.EsCaches[i].putNo = uint32(i)
+		fmt.Printf("cache.putNo:%p\n",  &(eqi.EsCaches[i].putNo))
+		fmt.Printf("cache.putNo:%p\n",  &(eqi.EsCaches[i].putNo))
+		fmt.Println("cache:", eqi.EsCaches[i])
+	}
+	cache := &eqi.EsCaches[0]
+	cache.getNo = q.capacity
+	cache.putNo = q.capacity
+	fmt.Println("cache:", cache)
+
+	cache0 := eqi.EsCaches[0]
+	fmt.Printf("cache0:%p\n",   &(cache0))
+	fmt.Printf("cache0.getNo:%p\n", &(cache0.getNo))
+	fmt.Printf("cache0.putNo:%p\n", &(cache0.putNo))
+	fmt.Println("cache0:", cache0)
+
+	cache1 := eqi.EsCaches[1]
+	fmt.Printf("cache1:%p\n",   &(cache1))
+	fmt.Printf("cache1.getNo:%p\n", &(cache1.getNo))
+	fmt.Printf("cache1.putNo:%p\n", &(cache1.putNo))
+	fmt.Println("cache1:", cache1)
+
+	fmt.Println("EsCaches:", eqi.EsCaches)
+
+	eqi.ShmData = data
+	eqi.ShmID = shmid
+	eqi.ShmKey = key
+	eqi.Queue = q
+	time.Sleep(10*time.Second)
+
+	return &eqi
+}
+
+// AttachQueue attach an exist shm queue
+func AttachQueue(ctx context.Context, key int) *EsQueueInfo {
+
+	var eqi EsQueueInfo
+	data, shmid := AttachRawShm(ctx, key)
+	fmt.Println("shmid:", shmid)
+	shmdata := bytes2shmEsQueue(data)
+	fmt.Println("shmdata:", shmdata)
+	eqi.EsCaches = ptr2esCache(unsafe.Pointer(&shmdata.cache), int(shmdata.capacity))
+	//fmt.Println("EsCaches:", eqi.EsCaches)
+
+	eqi.Queue = shmdata
+	eqi.ShmKey = key
+	eqi.ShmID = shmid
+	eqi.ShmData = data
+
+	return &eqi
+}
+
+// ReleaseQueue detach an exist shm queue
+func (eqi *EsQueueInfo) ReleaseQueue() {
+	fmt.Println("ReleaseQueue: key", eqi.ShmKey)
+	DestroyShm(eqi.ShmData)
+}
+
+// RemoveShmId remove an exist shm queue (ipcrm -m shmid)
+func (eqi *EsQueueInfo) RemoveShmId() error {
+	return shm.Rm(eqi.ShmID)
+}
+
+// Status status of shm queue
+func (eqi *EsQueueInfo) Status() string {
+	q := eqi.Queue
+	getPos := atomic.LoadUint32(&q.getPos)
+	putPos := atomic.LoadUint32(&q.putPos)
+	return fmt.Sprintf("Queue{Capacity: %v, putPos: %v, getPos: %v, size:%v, cache: %v}",
+		eqi.QueueCapacity(), putPos, getPos, eqi.QueueSize(), eqi.EsCaches)
+}
+
+// Capacity queue 瀹归噺锛屾�诲ぇ灏�
+func (eqi *EsQueueInfo) QueueCapacity() int {
+	return int(eqi.Queue.capMod)
+}
+
+// Size queue 瀹為檯澶у皬
+func (eqi *EsQueueInfo) QueueSize() int {
+	var putPos, getPos uint32
+	var quantity uint32
+	getPos = atomic.LoadUint32(&eqi.Queue.getPos)
+	putPos = atomic.LoadUint32(&eqi.Queue.putPos)
+
+	if putPos >= getPos {
+		quantity = putPos - getPos
+	} else {
+		//quantity = q.capMod + (putPos - getPos)
+		quantity = (eqi.Queue.capMod + (putPos - getPos)) % eqi.Queue.capMod
+	}
+
+	return int(quantity)
+}
+
+// Put 鍗曟鍐欏叆锛屽彲鑳藉け璐�
+func (eqi *EsQueueInfo) Put(val ElemInfo) (bool, int) {
+	var putPos, putPosNew, getPos, posCnt uint32
+	var cache *esCache
+	capMod := eqi.Queue.capMod
+
+	getPos = atomic.LoadUint32(&eqi.Queue.getPos)
+	putPos = atomic.LoadUint32(&eqi.Queue.putPos)
+
+	if putPos >= getPos {
+		posCnt = putPos - getPos
+	} else {
+		//posCnt = capMod + (putPos - getPos)
+		posCnt = (capMod + (putPos - getPos)) % capMod
+	}
+
+	//todo
+	//if posCnt >= capMod-1 {
+	if posCnt >= capMod {
+		runtime.Gosched()
+		return false, int(posCnt)
+	}
+
+	putPosNew = putPos + 1
+	if !atomic.CompareAndSwapUint32(&eqi.Queue.putPos, putPos, putPosNew) {
+		fmt.Println("CompareAndSwapUint32 error")
+		runtime.Gosched()
+		return false, int(posCnt)
+	}
+
+	cache = &(eqi.EsCaches[putPosNew&capMod])
+
+	for {
+		getNo := atomic.LoadUint32(&cache.getNo)
+		putNo := atomic.LoadUint32(&cache.putNo)
+		if putPosNew == putNo && getNo == putNo {
+			cache.value = val
+			atomic.AddUint32(&cache.putNo, eqi.Queue.capacity)
+			return true, int(posCnt + 1)
+		} else {
+			runtime.Gosched()
+		}
+	}
+}
+
+//PutForce 寮哄埗鍐欏叆锛屽け璐ュ垯缁х画閲嶅啓锛岀洿鍒版垚鍔熶负姝�
+func (eqi *EsQueueInfo) PutForce(val ElemInfo) int {
+	ok, qua := eqi.Put(val)
+	for !ok {
+		time.Sleep(TimePeriodPutOrGet)
+		ok, qua = eqi.Put(val)
+	}
+
+	return qua
+}
+
+//PutForceWithinTime 鍦╰imeout ms鐨勬椂闂村唴锛屽己鍒跺啓鍏ワ紝澶辫触鍒欑户缁噸鍐欙紝鐩村埌鎴愬姛涓烘鎴栬�呰秴鏃堕��鍑�
+func (eqi *EsQueueInfo) PutForceWithinTime(val ElemInfo, timeout int) (bool, int) {
+	ok, qua := eqi.Put(val)
+	if ok {
+		return ok, qua
+	}
+
+	to := time.NewTimer(time.Duration(timeout)*time.Millisecond)
+	defer to.Stop()
+
+	for {
+		select {
+		case <-to.C:
+			return false, 0
+		default:
+			ok, qua = eqi.Put(val)
+			if ok {
+				return ok, qua
+			}
+			time.Sleep(TimePeriodPutOrGet)
+		}
+	}
+	return ok, qua
+}
+
+// Get 鍗曟鑾峰彇锛屽彲鑳藉け璐�
+func (eqi *EsQueueInfo) Get() (ElemInfo, bool, int) {
+	var putPos, getPos, getPosNew, posCnt uint32
+	var cache *esCache
+	capMod := eqi.Queue.capMod
+
+	putPos = atomic.LoadUint32(&eqi.Queue.putPos)
+	getPos = atomic.LoadUint32(&eqi.Queue.getPos)
+
+	if putPos >= getPos {
+		posCnt = putPos - getPos
+	} else {
+		posCnt = capMod + (putPos - getPos)
+	}
+
+	if posCnt < 1 {
+		runtime.Gosched()
+		return ElemInfo{}, false, int(posCnt)
+	}
+
+	getPosNew = getPos + 1
+	if !atomic.CompareAndSwapUint32(&eqi.Queue.getPos, getPos, getPosNew) {
+		runtime.Gosched()
+		return ElemInfo{}, false, int(posCnt)
+	}
+
+	cache = &(eqi.EsCaches[getPosNew&capMod])
+
+	for {
+		getNo := atomic.LoadUint32(&cache.getNo)
+		putNo := atomic.LoadUint32(&cache.putNo)
+		if getPosNew == getNo && getNo == putNo-eqi.Queue.capacity {
+			val := cache.value
+			cache.value = ElemInfo{ShmId:0}
+			atomic.AddUint32(&cache.getNo, eqi.Queue.capacity)
+			return val, true, int(posCnt - 1)
+		} else {
+			runtime.Gosched()
+		}
+	}
+}
+
+//GetForce 寮哄埗鑾峰彇鏁版嵁锛屽け璐ュ垯缁х画鑾峰彇锛岀洿鍒版垚鍔熶负姝�
+func (eqi *EsQueueInfo) GetForce() (ElemInfo, int) {
+	val, ok, qua := eqi.Get()
+	for !ok {
+		time.Sleep(TimePeriodPutOrGet)
+		val, ok, qua = eqi.Get()
+	}
+
+	return val, qua
+}
+
+//GetForceWithinTime 鍦╰imeout ms鐨勬椂闂村唴锛屽己鍒惰幏鍙栨暟鎹紝澶辫触鍒欑户缁幏鍙栵紝鐩村埌鎴愬姛涓烘
+func (eqi *EsQueueInfo) GetForceWithinTime(timeout int) (ElemInfo, bool, int) {
+	val, ok, qua := eqi.Get()
+	if ok {
+		return val, ok, qua
+	}
+
+	to := time.NewTimer(time.Duration(timeout)*time.Millisecond)
+	defer to.Stop()
+
+	for {
+		select {
+		case <-to.C:
+			return ElemInfo{},false, 0
+		default:
+			val, ok, qua = eqi.Get()
+			if ok {
+				return val, ok, qua
+			}
+			time.Sleep(TimePeriodPutOrGet)
+		}
+	}
+	return val, ok, qua
+}
+
+// round 鍒版渶杩戠殑2鐨勫�嶆暟
+func minQuantity(v uint32) uint32 {
+	v--
+	v |= v >> 1
+	v |= v >> 2
+	v |= v >> 4
+	v |= v >> 8
+	v |= v >> 16
+	v++
+	return v
+}
+
diff --git a/shmwrap.go b/shmwrap.go
new file mode 100644
index 0000000..87ea7a4
--- /dev/null
+++ b/shmwrap.go
@@ -0,0 +1,141 @@
+package shmqueue
+
+import (
+    "context"
+    "fmt"
+    "time"
+    "github.com/gen2brain/shm"
+)
+
+// NewBlock shm block with size
+func NewBlock(size int, key int) ([]byte, int, error) {
+    id, err := shm.Get(key, size, shm.IPC_CREAT|0666)
+    fmt.Println("Get:", id, err)
+    if err != nil || id == -1 {
+        return nil, -1, err
+    }
+
+    data, err2 := shm.At(id, 0, 0)
+    if err2 != nil {
+        return nil, -1, err2
+    }
+
+    return data, id, nil
+}
+
+// AttachBlock attach exist shm
+func AttachBlock(key int) ([]byte, int, error) {
+    id, err := shm.Get(key, 0, 0)
+    fmt.Println("Get:", id, err)
+    if err != nil || id == -1 { //no exist
+        return nil, -1, err
+    }
+
+    data, err2 := shm.At(id, 0, 0)
+    if err2 != nil {
+        return nil, -1, err2
+    }
+
+    return data, id, nil
+}
+
+// ReleaseBlock release shm block
+func ReleaseBlock(data []byte) {
+    if data != nil {
+        shm.Dt(data)
+    }
+}
+
+// CreateRawShm create raw shm block with size, only space, no padding, return data([]byte), id(int)
+// context for quit
+func CreateRawShm(ctx context.Context, size int, key int) ([]byte, int) {
+    data, id, err := AttachBlock(key)
+    fmt.Println("err:", err)
+    if err != nil {
+    loopB:
+        for {
+            select {
+            case <-ctx.Done():
+                return nil, -1
+            default:
+                if err == nil {
+                    break loopB
+                } else {
+                    fmt.Println("createShm error:", err)
+                }
+                time.Sleep(time.Millisecond)
+                data, id, err = NewBlock(size, key)
+            }
+        }
+    }
+    return data, id
+}
+
+//AttachRawShm don't create
+func AttachRawShm(ctx context.Context, key int) ([]byte, int) {
+    data, id, err := AttachBlock(key)
+    if err != nil {
+    loopB:
+        for {
+            select {
+            case <-ctx.Done():
+                return nil, -1
+            default:
+                if err == nil {
+                    break loopB
+                } else {
+                    fmt.Println("createShm error:", err)
+                }
+                time.Sleep(time.Millisecond)
+                data, id, err = AttachBlock(key)
+            }
+        }
+    }
+    return data, id
+}
+
+// CreatePaddingShm create padding shm block with size, return data-with-padding([]byte), id(int)
+// context for quit, padding for fill raw shm block
+func CreatePaddingShm(ctx context.Context, size int, key int, padding []byte) ([]byte, int) {
+    data, id, err := NewBlock(size, key)
+    if err != nil {
+    loopB:
+        for {
+            select {
+            case <-ctx.Done():
+                return nil, -1
+            default:
+                if err == nil {
+                    break loopB
+                } else {
+                    fmt.Println("createShm error:", err)
+                }
+                time.Sleep(time.Millisecond)
+                data, id, err = NewBlock(size, key)
+            }
+        }
+    }
+
+    copy(data, padding)
+    return data, id
+}
+
+// DestroyShm destroy
+func DestroyShm(data []byte) {
+    ReleaseBlock(data)
+}
+
+// Attach attach shmid get block
+func Attach(id int) ([]byte, error) {
+   return shm.At(id, 0, 0)
+}
+
+// Detach detach shm block
+func Detach(d []byte) error {
+   return shm.Dt(d)
+}
+
+// ReleaseShmID release shmid
+func ReleaseShmID(id int) error {
+   return shm.Rm(id)
+}

--
Gitblit v1.8.0