chenshijun
2020-10-09 1df33ffacf98f09a33969a8c8ed4078247524d29
shmqueue.go
@@ -3,7 +3,7 @@
import (
   "context"
   "fmt"
   "github.com/gen2brain/shm"
   shm "basic.com/valib/goshm.git"
   "reflect"
   "runtime"
   "sync/atomic"
@@ -12,7 +12,7 @@
)
const (
   TimePeriodPutOrGet = time.Duration(1) * time.Microsecond
   TimePeriodPutOrGet = time.Duration(5)*time.Millisecond //ms
)
//Element info
@@ -72,6 +72,9 @@
   shmLen = shmblocks*uint32(unsafe.Sizeof(datainfo)) + uint32(unsafe.Sizeof(shmstruct))
   data, shmid := CreateRawShm(ctx, int(shmLen), key)
   if shmid == -1 {
      return nil
   }
   q := bytes2shmEsQueue(data)
   //init parameters
   q.capacity = shmblocks
@@ -103,7 +106,10 @@
func AttachQueue(ctx context.Context, key int) *EsQueueInfo {
   var eqi EsQueueInfo
   data, shmid := AttachRawShm(ctx, key)
   data, shmid := AttachRawShmTimeout(ctx, key)
   if shmid == -1 {
      return nil
   }
   shmdata := bytes2shmEsQueue(data)
   eqi.EsCaches = ptr2esCache(unsafe.Pointer(&shmdata.cache), int(shmdata.capacity))
   //fmt.Println("AttachQueue EsCaches:", eqi.EsCaches)
@@ -146,14 +152,14 @@
func (eqi *EsQueueInfo) QueueSize() int {
   var putPos, getPos uint32
   var quantity uint32
   getPos = atomic.LoadUint32(&eqi.Queue.getPos)
   putPos = atomic.LoadUint32(&eqi.Queue.putPos)
   if putPos >= getPos {
      quantity = putPos - getPos
   } else {
      //quantity = q.capMod + (putPos - getPos)
      quantity = (eqi.Queue.capMod + (putPos - getPos)) % eqi.Queue.capMod
      quantity = eqi.Queue.capMod + (putPos - getPos)
   }
   return int(quantity)
@@ -164,7 +170,8 @@
   var elems []ElemInfo
   for i := 0; i < len(eqi.EsCaches); i++ {
      if eqi.EsCaches[i].value.PicId != 0 && eqi.EsCaches[i].value.InfoId != 0 {
      if eqi.EsCaches[i].value.PicId != -1 && eqi.EsCaches[i].value.InfoId != -1 &&
         eqi.EsCaches[i].value.PicId != 0 && eqi.EsCaches[i].value.InfoId != 0 {
         elems = append(elems, eqi.EsCaches[i].value)
      }
   }
@@ -184,8 +191,7 @@
   if putPos >= getPos {
      posCnt = putPos - getPos
   } else {
      //posCnt = capMod + (putPos - getPos)
      posCnt = (capMod + (putPos - getPos)) % capMod
      posCnt = capMod + (putPos - getPos)
   }
   //todo
@@ -300,7 +306,7 @@
      putNo := atomic.LoadUint32(&cache.putNo)
      if getPosNew == getNo && getNo == putNo-eqi.Queue.capacity {
         val := cache.value
         cache.value = ElemInfo{PicId: 0}
         cache.value = ElemInfo{PicId: 0, InfoId:0}
         atomic.AddUint32(&cache.getNo, eqi.Queue.capacity)
         return val, true, int(posCnt - 1)
      } else {