zhangmeng
2021-09-24 82a262ef5cf721e5a236c8b1d2ab3ff92ca33122
shmqueue.go
@@ -1,9 +1,9 @@
package shmqueue
import (
   shm "basic.com/valib/goshm.git"
   "context"
   "fmt"
   "github.com/gen2brain/shm"
   "reflect"
   "runtime"
   "sync/atomic"
@@ -12,12 +12,13 @@
)
const (
   TimePeriodPutOrGet = time.Duration(10) * time.Microsecond
   TimePeriodPutOrGet = time.Duration(5) * time.Millisecond //ms
)
//Element info
type ElemInfo struct {
   ShmId int
   PicId  int
   InfoId int
}
//queue info
@@ -38,10 +39,10 @@
type EsQueueInfo struct {
   EsCaches []esCache
   ShmKey int
   ShmID int
   ShmData []byte
   Queue *esQueue
   ShmKey   int
   ShmID    int
   ShmData  []byte
   Queue    *esQueue
}
// ptr2esCache convert unsafe.Pointer to []esCache
@@ -71,65 +72,32 @@
   shmLen = shmblocks*uint32(unsafe.Sizeof(datainfo)) + uint32(unsafe.Sizeof(shmstruct))
   data, shmid := CreateRawShm(ctx, int(shmLen), key)
   fmt.Println("shmid:", shmid)
   fmt.Printf("data:%p\n", data)
   if shmid == -1 {
      return nil
   }
   q := bytes2shmEsQueue(data)
   fmt.Printf("q:%p\n", q)
   fmt.Println("esqueue:", q)
   fmt.Printf("q.capacity:%p\n", &q.capacity)
   fmt.Printf("q.capMod:%p\n", &q.capMod)
   fmt.Printf("q.putPos:%p\n", &q.putPos)
   fmt.Printf("q.getPos:%p\n", &q.getPos)
   fmt.Printf("q.cache:%p\n", &q.cache)
   //init parameters
   q.capacity = shmblocks
   q.capMod = q.capacity - 1
   q.putPos = 0
   q.getPos = 0
   fmt.Println("int(q.capacity)",int(q.capacity))
   eqi.EsCaches = ptr2esCache(unsafe.Pointer(&q.cache), int(q.capacity))
   fmt.Printf("EsCaches:%p\n", &eqi.EsCaches)
   fmt.Printf("EsCaches[0]:%p\n", &(eqi.EsCaches[0]))
   fmt.Printf("EsCaches[1]:%p\n", &(eqi.EsCaches[1]))
   fmt.Println("int(q.capacity)",int(q.capacity))
   for i := 0; i < int(q.capacity); i++ {
      fmt.Println("i", i)
      fmt.Printf("EsCaches[%d]:%p\n", i,  &(eqi.EsCaches[i]))
      fmt.Printf("EsCaches[%d].getNo:%p\n", i,  &(eqi.EsCaches[i].getNo))
      fmt.Printf("EsCaches[%d].putNo:%p\n", i,  &(eqi.EsCaches[i].putNo))
      eqi.EsCaches[i].getNo = uint32(i)
      eqi.EsCaches[i].putNo = uint32(i)
      fmt.Printf("cache.putNo:%p\n",  &(eqi.EsCaches[i].putNo))
      fmt.Printf("cache.putNo:%p\n",  &(eqi.EsCaches[i].putNo))
      fmt.Println("cache:", eqi.EsCaches[i])
   }
   cache := &eqi.EsCaches[0]
   cache.getNo = q.capacity
   cache.putNo = q.capacity
   fmt.Println("cache:", cache)
   cache0 := eqi.EsCaches[0]
   fmt.Printf("cache0:%p\n",   &(cache0))
   fmt.Printf("cache0.getNo:%p\n", &(cache0.getNo))
   fmt.Printf("cache0.putNo:%p\n", &(cache0.putNo))
   fmt.Println("cache0:", cache0)
   cache1 := eqi.EsCaches[1]
   fmt.Printf("cache1:%p\n",   &(cache1))
   fmt.Printf("cache1.getNo:%p\n", &(cache1.getNo))
   fmt.Printf("cache1.putNo:%p\n", &(cache1.putNo))
   fmt.Println("cache1:", cache1)
   fmt.Println("EsCaches:", eqi.EsCaches)
   //fmt.Println("NewQueue EsCaches:", eqi.EsCaches)
   eqi.ShmData = data
   eqi.ShmID = shmid
   eqi.ShmKey = key
   eqi.Queue = q
   time.Sleep(10*time.Second)
   return &eqi
}
@@ -138,12 +106,13 @@
func AttachQueue(ctx context.Context, key int) *EsQueueInfo {
   var eqi EsQueueInfo
   data, shmid := AttachRawShm(ctx, key)
   fmt.Println("shmid:", shmid)
   data, shmid := AttachRawShmTimeout(ctx, key)
   if shmid == -1 {
      return nil
   }
   shmdata := bytes2shmEsQueue(data)
   fmt.Println("shmdata:", shmdata)
   eqi.EsCaches = ptr2esCache(unsafe.Pointer(&shmdata.cache), int(shmdata.capacity))
   //fmt.Println("EsCaches:", eqi.EsCaches)
   //fmt.Println("AttachQueue EsCaches:", eqi.EsCaches)
   eqi.Queue = shmdata
   eqi.ShmKey = key
@@ -155,12 +124,13 @@
// ReleaseQueue detach an exist shm queue
func (eqi *EsQueueInfo) ReleaseQueue() {
   fmt.Println("ReleaseQueue: key", eqi.ShmKey)
   DestroyShm(eqi.ShmData)
   fmt.Printf("ReleaseQueue: key=%x\n", eqi.ShmKey)
   DetachShm(eqi.ShmData)
}
// RemoveShmId remove an exist shm queue (ipcrm -m shmid)
func (eqi *EsQueueInfo) RemoveShmId() error {
   fmt.Printf("RemoveShmId: key=%x\n", eqi.ShmKey)
   return shm.Rm(eqi.ShmID)
}
@@ -182,17 +152,31 @@
func (eqi *EsQueueInfo) QueueSize() int {
   var putPos, getPos uint32
   var quantity uint32
   getPos = atomic.LoadUint32(&eqi.Queue.getPos)
   putPos = atomic.LoadUint32(&eqi.Queue.putPos)
   if putPos >= getPos {
      quantity = putPos - getPos
   } else {
      //quantity = q.capMod + (putPos - getPos)
      quantity = (eqi.Queue.capMod + (putPos - getPos)) % eqi.Queue.capMod
      quantity = eqi.Queue.capMod + (putPos - getPos)
   }
   return int(quantity)
}
// Status status of shm queue
func (eqi *EsQueueInfo) QueueElements() []ElemInfo {
   var elems []ElemInfo
   for i := 0; i < len(eqi.EsCaches); i++ {
      if eqi.EsCaches[i].value.PicId != -1 && eqi.EsCaches[i].value.InfoId != -1 &&
         eqi.EsCaches[i].value.PicId != 0 && eqi.EsCaches[i].value.InfoId != 0 {
         elems = append(elems, eqi.EsCaches[i].value)
      }
   }
   return elems
}
// Put 单次写入,可能失败
@@ -207,8 +191,7 @@
   if putPos >= getPos {
      posCnt = putPos - getPos
   } else {
      //posCnt = capMod + (putPos - getPos)
      posCnt = (capMod + (putPos - getPos)) % capMod
      posCnt = capMod + (putPos - getPos)
   }
   //todo
@@ -241,39 +224,62 @@
}
//PutForce 强制写入,失败则继续重写,直到成功为止
func (eqi *EsQueueInfo) PutForce(val ElemInfo) int {
func (eqi *EsQueueInfo) PutForce(ctx context.Context, val ElemInfo) int {
   ok, qua := eqi.Put(val)
   for !ok {
      time.Sleep(TimePeriodPutOrGet)
      ok, qua = eqi.Put(val)
   loopB:
      for {
         select {
         case <-ctx.Done():
            return 0
         default:
            ok, qua = eqi.Put(val)
            if ok {
               break loopB
            }
            time.Sleep(TimePeriodPutOrGet)
         }
      }
   }
   return qua
}
//PutForceWithinTime 在timeout ms的时间内,强制写入,失败则继续重写,直到成功为止或者超时退出
func (eqi *EsQueueInfo) PutForceWithinTime(val ElemInfo, timeout int) (bool, int) {
func (eqi *EsQueueInfo) PutForceWithinTime(ctx context.Context, val ElemInfo, timeout int) (bool, int) {
   ok, qua := eqi.Put(val)
   if ok {
      return ok, qua
   }
   to := time.NewTimer(time.Duration(timeout)*time.Millisecond)
   defer to.Stop()
   for {
      select {
      case <-to.C:
         return false, 0
      default:
         ok, qua = eqi.Put(val)
         if ok {
            return ok, qua
         }
         time.Sleep(TimePeriodPutOrGet)
   count := timeout/int(TimePeriodPutOrGet) + 1
   for i := 0; i < count; i++ {
      if ok, qua := eqi.Put(val); ok {
         return ok, qua
      }
      time.Sleep(TimePeriodPutOrGet)
   }
   return ok, qua
   return false, 0
   //    to := time.NewTimer(time.Duration(timeout) * time.Millisecond)
   //    defer to.Stop()
   // loopB:
   //    for {
   //       select {
   //       case <-ctx.Done():
   //          return false, 0
   //       case <-to.C:
   //          return false, 0
   //       default:
   //          ok, qua = eqi.Put(val)
   //          if ok {
   //             break loopB
   //          }
   //          time.Sleep(TimePeriodPutOrGet)
   //       }
   //    }
   //    return ok, qua
}
// Get 单次获取,可能失败
@@ -309,7 +315,7 @@
      putNo := atomic.LoadUint32(&cache.putNo)
      if getPosNew == getNo && getNo == putNo-eqi.Queue.capacity {
         val := cache.value
         cache.value = ElemInfo{ShmId:0}
         cache.value = ElemInfo{PicId: 0, InfoId: 0}
         atomic.AddUint32(&cache.getNo, eqi.Queue.capacity)
         return val, true, int(posCnt - 1)
      } else {
@@ -319,43 +325,68 @@
}
//GetForce 强制获取数据,失败则继续获取,直到成功为止
func (eqi *EsQueueInfo) GetForce() (ElemInfo, int) {
func (eqi *EsQueueInfo) GetForce(ctx context.Context) (ElemInfo, int) {
   val, ok, qua := eqi.Get()
   for !ok {
      time.Sleep(TimePeriodPutOrGet)
      val, ok, qua = eqi.Get()
   loopB:
      for {
         select {
         case <-ctx.Done():
            return ElemInfo{}, 0
         default:
            val, ok, qua = eqi.Get()
            if ok {
               break loopB
            }
            time.Sleep(TimePeriodPutOrGet)
         }
      }
   }
   return val, qua
}
//GetForceWithinTime 在timeout ms的时间内,强制获取数据,失败则继续获取,直到成功为止
func (eqi *EsQueueInfo) GetForceWithinTime(timeout int) (ElemInfo, bool, int) {
func (eqi *EsQueueInfo) GetForceWithinTime(ctx context.Context, timeout int) (ElemInfo, bool, int) {
   val, ok, qua := eqi.Get()
   if ok {
      return val, ok, qua
   }
   to := time.NewTimer(time.Duration(timeout)*time.Millisecond)
   defer to.Stop()
   for {
      select {
      case <-to.C:
         return ElemInfo{},false, 0
      default:
         val, ok, qua = eqi.Get()
         if ok {
            return val, ok, qua
         }
         time.Sleep(TimePeriodPutOrGet)
   count := timeout/int(TimePeriodPutOrGet) + 1
   for i := 0; i < count; i++ {
      if val, ok, qua := eqi.Get(); ok {
         return val, ok, qua
      }
      time.Sleep(TimePeriodPutOrGet)
   }
   return val, ok, qua
   return ElemInfo{}, false, 0
   //    to := time.NewTimer(time.Duration(timeout) * time.Millisecond)
   //    defer to.Stop()
   // loopB:
   //    for {
   //       select {
   //       case <-ctx.Done():
   //          return ElemInfo{}, false, 0
   //       case <-to.C:
   //          return ElemInfo{}, false, 0
   //       default:
   //          val, ok, qua = eqi.Get()
   //          if ok {
   //             break loopB
   //          }
   //          time.Sleep(TimePeriodPutOrGet)
   //       }
   //    }
   //    return val, ok, qua
}
// round 到最近的2的倍数
func minQuantity(v uint32) uint32 {
   if v < 2 {
      return 2
   }
   v--
   v |= v >> 1
   v |= v >> 2
@@ -365,4 +396,3 @@
   v++
   return v
}