chenshijun
2020-04-02 1644fdbc4aea52a63e4152ad0f5ec6a2b3d7e6ec
shmqueue.go
@@ -139,13 +139,17 @@
// Capacity queue 容量,总大小
func (eqi *EsQueueInfo) QueueCapacity() int {
   return int(eqi.Queue.capMod)
   var capMod uint32
   capMod = atomic.LoadUint32(&eqi.Queue.capMod)
   return int(capMod)
}
// Size queue 实际大小
func (eqi *EsQueueInfo) QueueSize() int {
   var putPos, getPos uint32
   var putPos, getPos, capMod uint32
   var quantity uint32
   capMod = atomic.LoadUint32(&eqi.Queue.capMod)
   getPos = atomic.LoadUint32(&eqi.Queue.getPos)
   putPos = atomic.LoadUint32(&eqi.Queue.putPos)
@@ -153,7 +157,7 @@
      quantity = putPos - getPos
   } else {
      //quantity = q.capMod + (putPos - getPos)
      quantity = (eqi.Queue.capMod + (putPos - getPos)) % eqi.Queue.capMod
      quantity = (capMod + (putPos - getPos)) % capMod
   }
   return int(quantity)
@@ -175,10 +179,11 @@
// Put 单次写入,可能失败
func (eqi *EsQueueInfo) Put(val ElemInfo) (bool, int) {
   var putPos, putPosNew, getPos, posCnt uint32
   var putPos, putPosNew, getPos, posCnt, capMod ,capacity uint32
   var cache *esCache
   capMod := eqi.Queue.capMod
   capacity = atomic.LoadUint32(&eqi.Queue.capacity)
   capMod = atomic.LoadUint32(&eqi.Queue.capMod)
   getPos = atomic.LoadUint32(&eqi.Queue.getPos)
   putPos = atomic.LoadUint32(&eqi.Queue.putPos)
@@ -210,7 +215,7 @@
      putNo := atomic.LoadUint32(&cache.putNo)
      if putPosNew == putNo && getNo == putNo {
         cache.value = val
         atomic.AddUint32(&cache.putNo, eqi.Queue.capacity)
         atomic.AddUint32(&cache.putNo, capacity)
         return true, int(posCnt + 1)
      } else {
         runtime.Gosched()
@@ -270,10 +275,11 @@
// Get 单次获取,可能失败
func (eqi *EsQueueInfo) Get() (ElemInfo, bool, int) {
   var putPos, getPos, getPosNew, posCnt uint32
   var putPos, getPos, getPosNew, posCnt, capMod, capacity uint32
   var cache *esCache
   capMod := eqi.Queue.capMod
   capacity = atomic.LoadUint32(&eqi.Queue.capacity)
   capMod = atomic.LoadUint32(&eqi.Queue.capMod)
   putPos = atomic.LoadUint32(&eqi.Queue.putPos)
   getPos = atomic.LoadUint32(&eqi.Queue.getPos)
@@ -299,10 +305,10 @@
   for {
      getNo := atomic.LoadUint32(&cache.getNo)
      putNo := atomic.LoadUint32(&cache.putNo)
      if getPosNew == getNo && getNo == putNo-eqi.Queue.capacity {
      if getPosNew == getNo && getNo == putNo-capacity {
         val := cache.value
         cache.value = ElemInfo{PicId: 0}
         atomic.AddUint32(&cache.getNo, eqi.Queue.capacity)
         atomic.AddUint32(&cache.getNo, capacity)
         return val, true, int(posCnt - 1)
      } else {
         runtime.Gosched()