chenshijun
2020-04-02 1644fdbc4aea52a63e4152ad0f5ec6a2b3d7e6ec
shmqueue.go
@@ -12,7 +12,7 @@
)
const (
   TimePeriodPutOrGet = time.Duration(10) * time.Microsecond
   TimePeriodPutOrGet = time.Duration(1) * time.Microsecond
)
//Element info
@@ -139,13 +139,17 @@
// Capacity queue 容量,总大小
func (eqi *EsQueueInfo) QueueCapacity() int {
   return int(eqi.Queue.capMod)
   var capMod uint32
   capMod = atomic.LoadUint32(&eqi.Queue.capMod)
   return int(capMod)
}
// Size queue 实际大小
func (eqi *EsQueueInfo) QueueSize() int {
   var putPos, getPos uint32
   var putPos, getPos, capMod uint32
   var quantity uint32
   capMod = atomic.LoadUint32(&eqi.Queue.capMod)
   getPos = atomic.LoadUint32(&eqi.Queue.getPos)
   putPos = atomic.LoadUint32(&eqi.Queue.putPos)
@@ -153,7 +157,7 @@
      quantity = putPos - getPos
   } else {
      //quantity = q.capMod + (putPos - getPos)
      quantity = (eqi.Queue.capMod + (putPos - getPos)) % eqi.Queue.capMod
      quantity = (capMod + (putPos - getPos)) % capMod
   }
   return int(quantity)
@@ -164,7 +168,8 @@
   var elems []ElemInfo
   for i := 0; i < len(eqi.EsCaches); i++ {
      if eqi.EsCaches[i].value.PicId != 0 && eqi.EsCaches[i].value.InfoId != 0 {
      if eqi.EsCaches[i].value.PicId != -1 && eqi.EsCaches[i].value.InfoId != -1 &&
         eqi.EsCaches[i].value.PicId != 0 && eqi.EsCaches[i].value.InfoId != 0 {
         elems = append(elems, eqi.EsCaches[i].value)
      }
   }
@@ -174,10 +179,11 @@
// Put 单次写入,可能失败
func (eqi *EsQueueInfo) Put(val ElemInfo) (bool, int) {
   var putPos, putPosNew, getPos, posCnt uint32
   var putPos, putPosNew, getPos, posCnt, capMod ,capacity uint32
   var cache *esCache
   capMod := eqi.Queue.capMod
   capacity = atomic.LoadUint32(&eqi.Queue.capacity)
   capMod = atomic.LoadUint32(&eqi.Queue.capMod)
   getPos = atomic.LoadUint32(&eqi.Queue.getPos)
   putPos = atomic.LoadUint32(&eqi.Queue.putPos)
@@ -209,7 +215,7 @@
      putNo := atomic.LoadUint32(&cache.putNo)
      if putPosNew == putNo && getNo == putNo {
         cache.value = val
         atomic.AddUint32(&cache.putNo, eqi.Queue.capacity)
         atomic.AddUint32(&cache.putNo, capacity)
         return true, int(posCnt + 1)
      } else {
         runtime.Gosched()
@@ -218,18 +224,29 @@
}
//PutForce 强制写入,失败则继续重写,直到成功为止
func (eqi *EsQueueInfo) PutForce(val ElemInfo) int {
func (eqi *EsQueueInfo) PutForce(ctx context.Context, val ElemInfo) int {
   ok, qua := eqi.Put(val)
   for !ok {
      time.Sleep(TimePeriodPutOrGet)
      ok, qua = eqi.Put(val)
   loopB:
      for {
         select {
         case <-ctx.Done():
            return 0
         default:
            ok, qua = eqi.Put(val)
            if ok {
               break loopB
            }
            time.Sleep(TimePeriodPutOrGet)
         }
      }
   }
   return qua
}
//PutForceWithinTime 在timeout ms的时间内,强制写入,失败则继续重写,直到成功为止或者超时退出
func (eqi *EsQueueInfo) PutForceWithinTime(val ElemInfo, timeout int) (bool, int) {
func (eqi *EsQueueInfo) PutForceWithinTime(ctx context.Context, val ElemInfo, timeout int) (bool, int) {
   ok, qua := eqi.Put(val)
   if ok {
      return ok, qua
@@ -238,27 +255,31 @@
   to := time.NewTimer(time.Duration(timeout) * time.Millisecond)
   defer to.Stop()
   for {
      select {
      case <-to.C:
         return false, 0
      default:
         ok, qua = eqi.Put(val)
         if ok {
            return ok, qua
   loopB:
      for {
         select {
         case <-ctx.Done():
            return false, 0
         case <-to.C:
            return false, 0
         default:
            ok, qua = eqi.Put(val)
            if ok {
               break loopB
            }
            time.Sleep(TimePeriodPutOrGet)
         }
         time.Sleep(TimePeriodPutOrGet)
      }
   }
   return ok, qua
}
// Get 单次获取,可能失败
func (eqi *EsQueueInfo) Get() (ElemInfo, bool, int) {
   var putPos, getPos, getPosNew, posCnt uint32
   var putPos, getPos, getPosNew, posCnt, capMod, capacity uint32
   var cache *esCache
   capMod := eqi.Queue.capMod
   capacity = atomic.LoadUint32(&eqi.Queue.capacity)
   capMod = atomic.LoadUint32(&eqi.Queue.capMod)
   putPos = atomic.LoadUint32(&eqi.Queue.putPos)
   getPos = atomic.LoadUint32(&eqi.Queue.getPos)
@@ -284,10 +305,10 @@
   for {
      getNo := atomic.LoadUint32(&cache.getNo)
      putNo := atomic.LoadUint32(&cache.putNo)
      if getPosNew == getNo && getNo == putNo-eqi.Queue.capacity {
      if getPosNew == getNo && getNo == putNo-capacity {
         val := cache.value
         cache.value = ElemInfo{PicId: 0}
         atomic.AddUint32(&cache.getNo, eqi.Queue.capacity)
         atomic.AddUint32(&cache.getNo, capacity)
         return val, true, int(posCnt - 1)
      } else {
         runtime.Gosched()
@@ -296,18 +317,29 @@
}
//GetForce 强制获取数据,失败则继续获取,直到成功为止
func (eqi *EsQueueInfo) GetForce() (ElemInfo, int) {
func (eqi *EsQueueInfo) GetForce(ctx context.Context) (ElemInfo, int) {
   val, ok, qua := eqi.Get()
   for !ok {
      time.Sleep(TimePeriodPutOrGet)
      val, ok, qua = eqi.Get()
   loopB:
      for {
         select {
         case <-ctx.Done():
            return ElemInfo{}, 0
         default:
            val, ok, qua = eqi.Get()
            if ok {
               break loopB
            }
            time.Sleep(TimePeriodPutOrGet)
         }
      }
   }
   return val, qua
}
//GetForceWithinTime 在timeout ms的时间内,强制获取数据,失败则继续获取,直到成功为止
func (eqi *EsQueueInfo) GetForceWithinTime(timeout int) (ElemInfo, bool, int) {
func (eqi *EsQueueInfo) GetForceWithinTime(ctx context.Context, timeout int) (ElemInfo, bool, int) {
   val, ok, qua := eqi.Get()
   if ok {
      return val, ok, qua
@@ -316,18 +348,21 @@
   to := time.NewTimer(time.Duration(timeout) * time.Millisecond)
   defer to.Stop()
   for {
      select {
      case <-to.C:
         return ElemInfo{}, false, 0
      default:
         val, ok, qua = eqi.Get()
         if ok {
            return val, ok, qua
   loopB:
      for {
         select {
         case <-ctx.Done():
            return ElemInfo{}, false, 0
         case <-to.C:
            return ElemInfo{}, false, 0
         default:
            val, ok, qua = eqi.Get()
            if ok {
               break loopB
            }
            time.Sleep(TimePeriodPutOrGet)
         }
         time.Sleep(TimePeriodPutOrGet)
      }
   }
   return val, ok, qua
}