From f18b1d4daeeb42ea026dfad2df506c08a9148796 Mon Sep 17 00:00:00 2001
From: chenshijun <chenshijun@aiotlink.com>
Date: 星期五, 17 四月 2020 10:18:59 +0800
Subject: [PATCH] 将满队列判断修改成capMod而不是capMod-1
---
shmqueue.go | 105 +++++++++++++++++++++++++++++++++-------------------
1 files changed, 66 insertions(+), 39 deletions(-)
diff --git a/shmqueue.go b/shmqueue.go
index c01257f..efd2a40 100644
--- a/shmqueue.go
+++ b/shmqueue.go
@@ -12,7 +12,7 @@
)
const (
- TimePeriodPutOrGet = time.Duration(10) * time.Microsecond
+ TimePeriodPutOrGet = time.Duration(5) * time.Microsecond
)
//Element info
@@ -89,13 +89,12 @@
cache.getNo = q.capacity
cache.putNo = q.capacity
- fmt.Println("NewQueue EsCaches:", eqi.EsCaches)
+ //fmt.Println("NewQueue EsCaches:", eqi.EsCaches)
eqi.ShmData = data
eqi.ShmID = shmid
eqi.ShmKey = key
eqi.Queue = q
- time.Sleep(10 * time.Second)
return &eqi
}
@@ -107,7 +106,7 @@
data, shmid := AttachRawShm(ctx, key)
shmdata := bytes2shmEsQueue(data)
eqi.EsCaches = ptr2esCache(unsafe.Pointer(&shmdata.cache), int(shmdata.capacity))
- fmt.Println("AttachQueue EsCaches:", eqi.EsCaches)
+ //fmt.Println("AttachQueue EsCaches:", eqi.EsCaches)
eqi.Queue = shmdata
eqi.ShmKey = key
@@ -120,7 +119,7 @@
// ReleaseQueue detach an exist shm queue
func (eqi *EsQueueInfo) ReleaseQueue() {
fmt.Printf("ReleaseQueue: key=%x\n", eqi.ShmKey)
- DestroyShm(eqi.ShmData)
+ DetachShm(eqi.ShmData)
}
// RemoveShmId remove an exist shm queue (ipcrm -m shmid)
@@ -147,14 +146,14 @@
func (eqi *EsQueueInfo) QueueSize() int {
var putPos, getPos uint32
var quantity uint32
+
getPos = atomic.LoadUint32(&eqi.Queue.getPos)
putPos = atomic.LoadUint32(&eqi.Queue.putPos)
if putPos >= getPos {
quantity = putPos - getPos
} else {
- //quantity = q.capMod + (putPos - getPos)
- quantity = (eqi.Queue.capMod + (putPos - getPos)) % eqi.Queue.capMod
+ quantity = eqi.Queue.capMod + (putPos - getPos)
}
return int(quantity)
@@ -165,7 +164,8 @@
var elems []ElemInfo
for i := 0; i < len(eqi.EsCaches); i++ {
- if eqi.EsCaches[i].value.PicId != 0 && eqi.EsCaches[i].value.InfoId != 0 {
+ if eqi.EsCaches[i].value.PicId != -1 && eqi.EsCaches[i].value.InfoId != -1 &&
+ eqi.EsCaches[i].value.PicId != 0 && eqi.EsCaches[i].value.InfoId != 0 {
elems = append(elems, eqi.EsCaches[i].value)
}
}
@@ -185,8 +185,7 @@
if putPos >= getPos {
posCnt = putPos - getPos
} else {
- //posCnt = capMod + (putPos - getPos)
- posCnt = (capMod + (putPos - getPos)) % capMod
+ posCnt = capMod + (putPos - getPos)
}
//todo
@@ -219,18 +218,29 @@
}
//PutForce 寮哄埗鍐欏叆锛屽け璐ュ垯缁х画閲嶅啓锛岀洿鍒版垚鍔熶负姝�
-func (eqi *EsQueueInfo) PutForce(val ElemInfo) int {
+func (eqi *EsQueueInfo) PutForce(ctx context.Context, val ElemInfo) int {
ok, qua := eqi.Put(val)
for !ok {
- time.Sleep(TimePeriodPutOrGet)
- ok, qua = eqi.Put(val)
+ loopB:
+ for {
+ select {
+ case <-ctx.Done():
+ return 0
+ default:
+ ok, qua = eqi.Put(val)
+ if ok {
+ break loopB
+ }
+ time.Sleep(TimePeriodPutOrGet)
+ }
+ }
}
return qua
}
//PutForceWithinTime 鍦╰imeout ms鐨勬椂闂村唴锛屽己鍒跺啓鍏ワ紝澶辫触鍒欑户缁噸鍐欙紝鐩村埌鎴愬姛涓烘鎴栬�呰秴鏃堕��鍑�
-func (eqi *EsQueueInfo) PutForceWithinTime(val ElemInfo, timeout int) (bool, int) {
+func (eqi *EsQueueInfo) PutForceWithinTime(ctx context.Context, val ElemInfo, timeout int) (bool, int) {
ok, qua := eqi.Put(val)
if ok {
return ok, qua
@@ -239,18 +249,21 @@
to := time.NewTimer(time.Duration(timeout) * time.Millisecond)
defer to.Stop()
- for {
- select {
- case <-to.C:
- return false, 0
- default:
- ok, qua = eqi.Put(val)
- if ok {
- return ok, qua
+ loopB:
+ for {
+ select {
+ case <-ctx.Done():
+ return false, 0
+ case <-to.C:
+ return false, 0
+ default:
+ ok, qua = eqi.Put(val)
+ if ok {
+ break loopB
+ }
+ time.Sleep(TimePeriodPutOrGet)
}
- time.Sleep(TimePeriodPutOrGet)
}
- }
return ok, qua
}
@@ -287,7 +300,7 @@
putNo := atomic.LoadUint32(&cache.putNo)
if getPosNew == getNo && getNo == putNo-eqi.Queue.capacity {
val := cache.value
- cache.value = ElemInfo{PicId: 0}
+ cache.value = ElemInfo{PicId: 0, InfoId:0}
atomic.AddUint32(&cache.getNo, eqi.Queue.capacity)
return val, true, int(posCnt - 1)
} else {
@@ -297,18 +310,29 @@
}
//GetForce 寮哄埗鑾峰彇鏁版嵁锛屽け璐ュ垯缁х画鑾峰彇锛岀洿鍒版垚鍔熶负姝�
-func (eqi *EsQueueInfo) GetForce() (ElemInfo, int) {
+func (eqi *EsQueueInfo) GetForce(ctx context.Context) (ElemInfo, int) {
val, ok, qua := eqi.Get()
for !ok {
- time.Sleep(TimePeriodPutOrGet)
- val, ok, qua = eqi.Get()
+ loopB:
+ for {
+ select {
+ case <-ctx.Done():
+ return ElemInfo{}, 0
+ default:
+ val, ok, qua = eqi.Get()
+ if ok {
+ break loopB
+ }
+ time.Sleep(TimePeriodPutOrGet)
+ }
+ }
}
return val, qua
}
//GetForceWithinTime 鍦╰imeout ms鐨勬椂闂村唴锛屽己鍒惰幏鍙栨暟鎹紝澶辫触鍒欑户缁幏鍙栵紝鐩村埌鎴愬姛涓烘
-func (eqi *EsQueueInfo) GetForceWithinTime(timeout int) (ElemInfo, bool, int) {
+func (eqi *EsQueueInfo) GetForceWithinTime(ctx context.Context, timeout int) (ElemInfo, bool, int) {
val, ok, qua := eqi.Get()
if ok {
return val, ok, qua
@@ -317,18 +341,21 @@
to := time.NewTimer(time.Duration(timeout) * time.Millisecond)
defer to.Stop()
- for {
- select {
- case <-to.C:
- return ElemInfo{}, false, 0
- default:
- val, ok, qua = eqi.Get()
- if ok {
- return val, ok, qua
+ loopB:
+ for {
+ select {
+ case <-ctx.Done():
+ return ElemInfo{}, false, 0
+ case <-to.C:
+ return ElemInfo{}, false, 0
+ default:
+ val, ok, qua = eqi.Get()
+ if ok {
+ break loopB
+ }
+ time.Sleep(TimePeriodPutOrGet)
}
- time.Sleep(TimePeriodPutOrGet)
}
- }
return val, ok, qua
}
--
Gitblit v1.8.0