chenshijun
2020-06-04 47a19e070f57d92aadf52c62361da6ab5397bd4c
shmqueue.go
@@ -3,7 +3,7 @@
import (
   "context"
   "fmt"
   "github.com/gen2brain/shm"
   shm "basic.com/valib/goshm.git"
   "reflect"
   "runtime"
   "sync/atomic"
@@ -12,7 +12,7 @@
)
const (
   TimePeriodPutOrGet = time.Duration(1) * time.Microsecond
   TimePeriodPutOrGet = time.Duration(5)*time.Millisecond //ms
)
//Element info
@@ -139,25 +139,21 @@
// Capacity queue 容量,总大小
func (eqi *EsQueueInfo) QueueCapacity() int {
   var capMod uint32
   capMod = atomic.LoadUint32(&eqi.Queue.capMod)
   return int(capMod)
   return int(eqi.Queue.capMod)
}
// Size queue 实际大小
func (eqi *EsQueueInfo) QueueSize() int {
   var putPos, getPos, capMod uint32
   var putPos, getPos uint32
   var quantity uint32
   capMod = atomic.LoadUint32(&eqi.Queue.capMod)
   getPos = atomic.LoadUint32(&eqi.Queue.getPos)
   putPos = atomic.LoadUint32(&eqi.Queue.putPos)
   if putPos >= getPos {
      quantity = putPos - getPos
   } else {
      //quantity = q.capMod + (putPos - getPos)
      quantity = (capMod + (putPos - getPos)) % capMod
      quantity = eqi.Queue.capMod + (putPos - getPos)
   }
   return int(quantity)
@@ -179,19 +175,17 @@
// Put 单次写入,可能失败
func (eqi *EsQueueInfo) Put(val ElemInfo) (bool, int) {
   var putPos, putPosNew, getPos, posCnt, capMod ,capacity uint32
   var putPos, putPosNew, getPos, posCnt uint32
   var cache *esCache
   capMod := eqi.Queue.capMod
   capacity = atomic.LoadUint32(&eqi.Queue.capacity)
   capMod = atomic.LoadUint32(&eqi.Queue.capMod)
   getPos = atomic.LoadUint32(&eqi.Queue.getPos)
   putPos = atomic.LoadUint32(&eqi.Queue.putPos)
   if putPos >= getPos {
      posCnt = putPos - getPos
   } else {
      //posCnt = capMod + (putPos - getPos)
      posCnt = (capMod + (putPos - getPos)) % capMod
      posCnt = capMod + (putPos - getPos)
   }
   //todo
@@ -215,7 +209,7 @@
      putNo := atomic.LoadUint32(&cache.putNo)
      if putPosNew == putNo && getNo == putNo {
         cache.value = val
         atomic.AddUint32(&cache.putNo, capacity)
         atomic.AddUint32(&cache.putNo, eqi.Queue.capacity)
         return true, int(posCnt + 1)
      } else {
         runtime.Gosched()
@@ -275,11 +269,10 @@
// Get 单次获取,可能失败
func (eqi *EsQueueInfo) Get() (ElemInfo, bool, int) {
   var putPos, getPos, getPosNew, posCnt, capMod, capacity uint32
   var putPos, getPos, getPosNew, posCnt uint32
   var cache *esCache
   capMod := eqi.Queue.capMod
   capacity = atomic.LoadUint32(&eqi.Queue.capacity)
   capMod = atomic.LoadUint32(&eqi.Queue.capMod)
   putPos = atomic.LoadUint32(&eqi.Queue.putPos)
   getPos = atomic.LoadUint32(&eqi.Queue.getPos)
@@ -305,10 +298,10 @@
   for {
      getNo := atomic.LoadUint32(&cache.getNo)
      putNo := atomic.LoadUint32(&cache.putNo)
      if getPosNew == getNo && getNo == putNo-capacity {
      if getPosNew == getNo && getNo == putNo-eqi.Queue.capacity {
         val := cache.value
         cache.value = ElemInfo{PicId: 0}
         atomic.AddUint32(&cache.getNo, capacity)
         cache.value = ElemInfo{PicId: 0, InfoId:0}
         atomic.AddUint32(&cache.getNo, eqi.Queue.capacity)
         return val, true, int(posCnt - 1)
      } else {
         runtime.Gosched()