chenshijun
2020-04-24 b4f186a5d496fd2085a2bf1405a6384cd7802236
shmqueue.go
@@ -12,7 +12,7 @@
)
const (
   TimePeriodPutOrGet = time.Duration(1) * time.Microsecond
   TimePeriodPutOrGet = time.Duration(5)*time.Millisecond //ms
)
//Element info
@@ -139,25 +139,21 @@
// Capacity queue 容量,总大小
func (eqi *EsQueueInfo) QueueCapacity() int {
   var capMod uint32
   capMod = atomic.LoadUint32(&eqi.Queue.capMod)
   return int(capMod)
   return int(eqi.Queue.capMod)
}
// Size queue 实际大小
func (eqi *EsQueueInfo) QueueSize() int {
   var putPos, getPos, capMod uint32
   var putPos, getPos uint32
   var quantity uint32
   capMod = atomic.LoadUint32(&eqi.Queue.capMod)
   getPos = atomic.LoadUint32(&eqi.Queue.getPos)
   putPos = atomic.LoadUint32(&eqi.Queue.putPos)
   if putPos >= getPos {
      quantity = putPos - getPos
   } else {
      //quantity = q.capMod + (putPos - getPos)
      quantity = (capMod + (putPos - getPos)) % capMod
      quantity = eqi.Queue.capMod + (putPos - getPos)
   }
   return int(quantity)
@@ -179,19 +175,17 @@
// Put 单次写入,可能失败
func (eqi *EsQueueInfo) Put(val ElemInfo) (bool, int) {
   var putPos, putPosNew, getPos, posCnt, capMod ,capacity uint32
   var putPos, putPosNew, getPos, posCnt uint32
   var cache *esCache
   capMod := eqi.Queue.capMod
   capacity = atomic.LoadUint32(&eqi.Queue.capacity)
   capMod = atomic.LoadUint32(&eqi.Queue.capMod)
   getPos = atomic.LoadUint32(&eqi.Queue.getPos)
   putPos = atomic.LoadUint32(&eqi.Queue.putPos)
   if putPos >= getPos {
      posCnt = putPos - getPos
   } else {
      //posCnt = capMod + (putPos - getPos)
      posCnt = (capMod + (putPos - getPos)) % capMod
      posCnt = capMod + (putPos - getPos)
   }
   //todo
@@ -210,22 +204,15 @@
   cache = &(eqi.EsCaches[putPosNew&capMod])
   tryMax := 100
   tryCount := 0
   for {
      getNo := atomic.LoadUint32(&cache.getNo)
      putNo := atomic.LoadUint32(&cache.putNo)
      if putPosNew == putNo && getNo == putNo {
         cache.value = val
         atomic.AddUint32(&cache.putNo, capacity)
         atomic.AddUint32(&cache.putNo, eqi.Queue.capacity)
         return true, int(posCnt + 1)
      } else {
         runtime.Gosched()
      }
      tryCount++
      if tryCount >= tryMax {
         fmt.Println("Put tryCount:", tryCount)
         return false, int(posCnt)
      }
   }
}
@@ -282,11 +269,10 @@
// Get 单次获取,可能失败
func (eqi *EsQueueInfo) Get() (ElemInfo, bool, int) {
   var putPos, getPos, getPosNew, posCnt, capMod, capacity uint32
   var putPos, getPos, getPosNew, posCnt uint32
   var cache *esCache
   capMod := eqi.Queue.capMod
   capacity = atomic.LoadUint32(&eqi.Queue.capacity)
   capMod = atomic.LoadUint32(&eqi.Queue.capMod)
   putPos = atomic.LoadUint32(&eqi.Queue.putPos)
   getPos = atomic.LoadUint32(&eqi.Queue.getPos)
@@ -309,23 +295,16 @@
   cache = &(eqi.EsCaches[getPosNew&capMod])
   tryMax := 100
   tryCount := 0
   for {
      getNo := atomic.LoadUint32(&cache.getNo)
      putNo := atomic.LoadUint32(&cache.putNo)
      if getPosNew == getNo && getNo == putNo-capacity {
      if getPosNew == getNo && getNo == putNo-eqi.Queue.capacity {
         val := cache.value
         cache.value = ElemInfo{PicId: 0}
         atomic.AddUint32(&cache.getNo, capacity)
         cache.value = ElemInfo{PicId: 0, InfoId:0}
         atomic.AddUint32(&cache.getNo, eqi.Queue.capacity)
         return val, true, int(posCnt - 1)
      } else {
         runtime.Gosched()
      }
      tryCount++
      if tryCount >= tryMax {
         fmt.Println("Get tryCount:", tryCount)
         return ElemInfo{}, false, int(posCnt)
      }
   }
}