package shmqueue import ( "context" "fmt" "github.com/gen2brain/shm" "reflect" "runtime" "sync/atomic" "time" "unsafe" ) const ( TimePeriodPutOrGet = time.Duration(1) * time.Microsecond ) //Element info type ElemInfo struct { PicId int InfoId int } //queue info type esCache struct { putNo uint32 getNo uint32 value ElemInfo } // lock free queue struct type esQueue struct { capacity uint32 capMod uint32 putPos uint32 getPos uint32 cache []esCache } type EsQueueInfo struct { EsCaches []esCache ShmKey int ShmID int ShmData []byte Queue *esQueue } // ptr2esCache convert unsafe.Pointer to []esCache func ptr2esCache(s unsafe.Pointer, size int) []esCache { var x reflect.SliceHeader x.Len = size x.Cap = size x.Data = uintptr(s) return *(*[]esCache)(unsafe.Pointer(&x)) } // bytes2shmEsQueue convert []byte to *esQueue func bytes2shmEsQueue(b []byte) *esQueue { return (*esQueue)(unsafe.Pointer( (*reflect.SliceHeader)(unsafe.Pointer(&b)).Data, )) } // NewQueue new and attach a shm queue if no exist, otherwise return exist func NewQueue(ctx context.Context, key int, cap int) *EsQueueInfo { var shmLen uint32 var datainfo esCache var shmstruct esQueue var eqi EsQueueInfo shmblocks := minQuantity(uint32(cap)) shmLen = shmblocks*uint32(unsafe.Sizeof(datainfo)) + uint32(unsafe.Sizeof(shmstruct)) data, shmid := CreateRawShm(ctx, int(shmLen), key) q := bytes2shmEsQueue(data) //init parameters q.capacity = shmblocks q.capMod = q.capacity - 1 q.putPos = 0 q.getPos = 0 eqi.EsCaches = ptr2esCache(unsafe.Pointer(&q.cache), int(q.capacity)) for i := 0; i < int(q.capacity); i++ { eqi.EsCaches[i].getNo = uint32(i) eqi.EsCaches[i].putNo = uint32(i) } cache := &eqi.EsCaches[0] cache.getNo = q.capacity cache.putNo = q.capacity //fmt.Println("NewQueue EsCaches:", eqi.EsCaches) eqi.ShmData = data eqi.ShmID = shmid eqi.ShmKey = key eqi.Queue = q return &eqi } // AttachQueue attach an exist shm queue func AttachQueue(ctx context.Context, key int) *EsQueueInfo { var eqi EsQueueInfo data, shmid := AttachRawShm(ctx, key) shmdata := bytes2shmEsQueue(data) eqi.EsCaches = ptr2esCache(unsafe.Pointer(&shmdata.cache), int(shmdata.capacity)) //fmt.Println("AttachQueue EsCaches:", eqi.EsCaches) eqi.Queue = shmdata eqi.ShmKey = key eqi.ShmID = shmid eqi.ShmData = data return &eqi } // ReleaseQueue detach an exist shm queue func (eqi *EsQueueInfo) ReleaseQueue() { fmt.Printf("ReleaseQueue: key=%x\n", eqi.ShmKey) DetachShm(eqi.ShmData) } // RemoveShmId remove an exist shm queue (ipcrm -m shmid) func (eqi *EsQueueInfo) RemoveShmId() error { fmt.Printf("RemoveShmId: key=%x\n", eqi.ShmKey) return shm.Rm(eqi.ShmID) } // Status status of shm queue func (eqi *EsQueueInfo) Status() string { q := eqi.Queue getPos := atomic.LoadUint32(&q.getPos) putPos := atomic.LoadUint32(&q.putPos) return fmt.Sprintf("Queue{Capacity: %v, putPos: %v, getPos: %v, size:%v, cache: %v}", eqi.QueueCapacity(), putPos, getPos, eqi.QueueSize(), eqi.EsCaches) } // Capacity queue 容量,总大小 func (eqi *EsQueueInfo) QueueCapacity() int { var capMod uint32 capMod = atomic.LoadUint32(&eqi.Queue.capMod) return int(capMod) } // Size queue 实际大小 func (eqi *EsQueueInfo) QueueSize() int { var putPos, getPos, capMod uint32 var quantity uint32 capMod = atomic.LoadUint32(&eqi.Queue.capMod) getPos = atomic.LoadUint32(&eqi.Queue.getPos) putPos = atomic.LoadUint32(&eqi.Queue.putPos) if putPos >= getPos { quantity = putPos - getPos } else { //quantity = q.capMod + (putPos - getPos) quantity = (capMod + (putPos - getPos)) % capMod } return int(quantity) } // Status status of shm queue func (eqi *EsQueueInfo) QueueElements() []ElemInfo { var elems []ElemInfo for i := 0; i < len(eqi.EsCaches); i++ { if eqi.EsCaches[i].value.PicId != -1 && eqi.EsCaches[i].value.InfoId != -1 && eqi.EsCaches[i].value.PicId != 0 && eqi.EsCaches[i].value.InfoId != 0 { elems = append(elems, eqi.EsCaches[i].value) } } return elems } // Put 单次写入,可能失败 func (eqi *EsQueueInfo) Put(val ElemInfo) (bool, int) { var putPos, putPosNew, getPos, posCnt, capMod ,capacity uint32 var cache *esCache capacity = atomic.LoadUint32(&eqi.Queue.capacity) capMod = atomic.LoadUint32(&eqi.Queue.capMod) getPos = atomic.LoadUint32(&eqi.Queue.getPos) putPos = atomic.LoadUint32(&eqi.Queue.putPos) if putPos >= getPos { posCnt = putPos - getPos } else { //posCnt = capMod + (putPos - getPos) posCnt = (capMod + (putPos - getPos)) % capMod } //todo //if posCnt >= capMod-1 { if posCnt >= capMod { runtime.Gosched() return false, int(posCnt) } putPosNew = putPos + 1 if !atomic.CompareAndSwapUint32(&eqi.Queue.putPos, putPos, putPosNew) { fmt.Println("CompareAndSwapUint32 error") runtime.Gosched() return false, int(posCnt) } cache = &(eqi.EsCaches[putPosNew&capMod]) for { getNo := atomic.LoadUint32(&cache.getNo) putNo := atomic.LoadUint32(&cache.putNo) if putPosNew == putNo && getNo == putNo { cache.value = val atomic.AddUint32(&cache.putNo, capacity) return true, int(posCnt + 1) } else { runtime.Gosched() } } } //PutForce 强制写入,失败则继续重写,直到成功为止 func (eqi *EsQueueInfo) PutForce(ctx context.Context, val ElemInfo) int { ok, qua := eqi.Put(val) for !ok { loopB: for { select { case <-ctx.Done(): return 0 default: ok, qua = eqi.Put(val) if ok { break loopB } time.Sleep(TimePeriodPutOrGet) } } } return qua } //PutForceWithinTime 在timeout ms的时间内,强制写入,失败则继续重写,直到成功为止或者超时退出 func (eqi *EsQueueInfo) PutForceWithinTime(ctx context.Context, val ElemInfo, timeout int) (bool, int) { ok, qua := eqi.Put(val) if ok { return ok, qua } to := time.NewTimer(time.Duration(timeout) * time.Millisecond) defer to.Stop() loopB: for { select { case <-ctx.Done(): return false, 0 case <-to.C: return false, 0 default: ok, qua = eqi.Put(val) if ok { break loopB } time.Sleep(TimePeriodPutOrGet) } } return ok, qua } // Get 单次获取,可能失败 func (eqi *EsQueueInfo) Get() (ElemInfo, bool, int) { var putPos, getPos, getPosNew, posCnt, capMod, capacity uint32 var cache *esCache capacity = atomic.LoadUint32(&eqi.Queue.capacity) capMod = atomic.LoadUint32(&eqi.Queue.capMod) putPos = atomic.LoadUint32(&eqi.Queue.putPos) getPos = atomic.LoadUint32(&eqi.Queue.getPos) if putPos >= getPos { posCnt = putPos - getPos } else { posCnt = capMod + (putPos - getPos) } if posCnt < 1 { runtime.Gosched() return ElemInfo{}, false, int(posCnt) } getPosNew = getPos + 1 if !atomic.CompareAndSwapUint32(&eqi.Queue.getPos, getPos, getPosNew) { runtime.Gosched() return ElemInfo{}, false, int(posCnt) } cache = &(eqi.EsCaches[getPosNew&capMod]) for { getNo := atomic.LoadUint32(&cache.getNo) putNo := atomic.LoadUint32(&cache.putNo) if getPosNew == getNo && getNo == putNo-capacity { val := cache.value cache.value = ElemInfo{PicId: 0} atomic.AddUint32(&cache.getNo, capacity) return val, true, int(posCnt - 1) } else { runtime.Gosched() } } } //GetForce 强制获取数据,失败则继续获取,直到成功为止 func (eqi *EsQueueInfo) GetForce(ctx context.Context) (ElemInfo, int) { val, ok, qua := eqi.Get() for !ok { loopB: for { select { case <-ctx.Done(): return ElemInfo{}, 0 default: val, ok, qua = eqi.Get() if ok { break loopB } time.Sleep(TimePeriodPutOrGet) } } } return val, qua } //GetForceWithinTime 在timeout ms的时间内,强制获取数据,失败则继续获取,直到成功为止 func (eqi *EsQueueInfo) GetForceWithinTime(ctx context.Context, timeout int) (ElemInfo, bool, int) { val, ok, qua := eqi.Get() if ok { return val, ok, qua } to := time.NewTimer(time.Duration(timeout) * time.Millisecond) defer to.Stop() loopB: for { select { case <-ctx.Done(): return ElemInfo{}, false, 0 case <-to.C: return ElemInfo{}, false, 0 default: val, ok, qua = eqi.Get() if ok { break loopB } time.Sleep(TimePeriodPutOrGet) } } return val, ok, qua } // round 到最近的2的倍数 func minQuantity(v uint32) uint32 { if v < 2 { return 2 } v-- v |= v >> 1 v |= v >> 2 v |= v >> 4 v |= v >> 8 v |= v >> 16 v++ return v }