package shmqueue
|
|
import (
|
"context"
|
"fmt"
|
shm "basic.com/valib/goshm.git"
|
"reflect"
|
"runtime"
|
"sync/atomic"
|
"time"
|
"unsafe"
|
)
|
|
const (
|
TimePeriodPutOrGet = time.Duration(5)*time.Millisecond //ms
|
)
|
|
//Element info
|
type ElemInfo struct {
|
PicId int
|
InfoId int
|
}
|
|
//queue info
|
type esCache struct {
|
putNo uint32
|
getNo uint32
|
value ElemInfo
|
}
|
|
// lock free queue struct
|
type esQueue struct {
|
capacity uint32
|
capMod uint32
|
putPos uint32
|
getPos uint32
|
cache []esCache
|
}
|
|
type EsQueueInfo struct {
|
EsCaches []esCache
|
ShmKey int
|
ShmID int
|
ShmData []byte
|
Queue *esQueue
|
}
|
|
// ptr2esCache convert unsafe.Pointer to []esCache
|
func ptr2esCache(s unsafe.Pointer, size int) []esCache {
|
var x reflect.SliceHeader
|
x.Len = size
|
x.Cap = size
|
x.Data = uintptr(s)
|
return *(*[]esCache)(unsafe.Pointer(&x))
|
}
|
|
// bytes2shmEsQueue convert []byte to *esQueue
|
func bytes2shmEsQueue(b []byte) *esQueue {
|
return (*esQueue)(unsafe.Pointer(
|
(*reflect.SliceHeader)(unsafe.Pointer(&b)).Data,
|
))
|
}
|
|
// NewQueue new and attach a shm queue if no exist, otherwise return exist
|
func NewQueue(ctx context.Context, key int, cap int) *EsQueueInfo {
|
var shmLen uint32
|
var datainfo esCache
|
var shmstruct esQueue
|
var eqi EsQueueInfo
|
|
shmblocks := minQuantity(uint32(cap))
|
shmLen = shmblocks*uint32(unsafe.Sizeof(datainfo)) + uint32(unsafe.Sizeof(shmstruct))
|
|
data, shmid := CreateRawShm(ctx, int(shmLen), key)
|
q := bytes2shmEsQueue(data)
|
//init parameters
|
q.capacity = shmblocks
|
q.capMod = q.capacity - 1
|
q.putPos = 0
|
q.getPos = 0
|
|
eqi.EsCaches = ptr2esCache(unsafe.Pointer(&q.cache), int(q.capacity))
|
|
for i := 0; i < int(q.capacity); i++ {
|
eqi.EsCaches[i].getNo = uint32(i)
|
eqi.EsCaches[i].putNo = uint32(i)
|
}
|
cache := &eqi.EsCaches[0]
|
cache.getNo = q.capacity
|
cache.putNo = q.capacity
|
|
//fmt.Println("NewQueue EsCaches:", eqi.EsCaches)
|
|
eqi.ShmData = data
|
eqi.ShmID = shmid
|
eqi.ShmKey = key
|
eqi.Queue = q
|
|
return &eqi
|
}
|
|
// AttachQueue attach an exist shm queue
|
func AttachQueue(ctx context.Context, key int) *EsQueueInfo {
|
|
var eqi EsQueueInfo
|
data, shmid := AttachRawShm(ctx, key)
|
shmdata := bytes2shmEsQueue(data)
|
eqi.EsCaches = ptr2esCache(unsafe.Pointer(&shmdata.cache), int(shmdata.capacity))
|
//fmt.Println("AttachQueue EsCaches:", eqi.EsCaches)
|
|
eqi.Queue = shmdata
|
eqi.ShmKey = key
|
eqi.ShmID = shmid
|
eqi.ShmData = data
|
|
return &eqi
|
}
|
|
// ReleaseQueue detach an exist shm queue
|
func (eqi *EsQueueInfo) ReleaseQueue() {
|
fmt.Printf("ReleaseQueue: key=%x\n", eqi.ShmKey)
|
DetachShm(eqi.ShmData)
|
}
|
|
// RemoveShmId remove an exist shm queue (ipcrm -m shmid)
|
func (eqi *EsQueueInfo) RemoveShmId() error {
|
fmt.Printf("RemoveShmId: key=%x\n", eqi.ShmKey)
|
return shm.Rm(eqi.ShmID)
|
}
|
|
// Status status of shm queue
|
func (eqi *EsQueueInfo) Status() string {
|
q := eqi.Queue
|
getPos := atomic.LoadUint32(&q.getPos)
|
putPos := atomic.LoadUint32(&q.putPos)
|
return fmt.Sprintf("Queue{Capacity: %v, putPos: %v, getPos: %v, size:%v, cache: %v}",
|
eqi.QueueCapacity(), putPos, getPos, eqi.QueueSize(), eqi.EsCaches)
|
}
|
|
// Capacity queue 容量,总大小
|
func (eqi *EsQueueInfo) QueueCapacity() int {
|
return int(eqi.Queue.capMod)
|
}
|
|
// Size queue 实际大小
|
func (eqi *EsQueueInfo) QueueSize() int {
|
var putPos, getPos uint32
|
var quantity uint32
|
|
getPos = atomic.LoadUint32(&eqi.Queue.getPos)
|
putPos = atomic.LoadUint32(&eqi.Queue.putPos)
|
|
if putPos >= getPos {
|
quantity = putPos - getPos
|
} else {
|
quantity = eqi.Queue.capMod + (putPos - getPos)
|
}
|
|
return int(quantity)
|
}
|
|
// Status status of shm queue
|
func (eqi *EsQueueInfo) QueueElements() []ElemInfo {
|
var elems []ElemInfo
|
|
for i := 0; i < len(eqi.EsCaches); i++ {
|
if eqi.EsCaches[i].value.PicId != -1 && eqi.EsCaches[i].value.InfoId != -1 &&
|
eqi.EsCaches[i].value.PicId != 0 && eqi.EsCaches[i].value.InfoId != 0 {
|
elems = append(elems, eqi.EsCaches[i].value)
|
}
|
}
|
|
return elems
|
}
|
|
// Put 单次写入,可能失败
|
func (eqi *EsQueueInfo) Put(val ElemInfo) (bool, int) {
|
var putPos, putPosNew, getPos, posCnt uint32
|
var cache *esCache
|
capMod := eqi.Queue.capMod
|
|
getPos = atomic.LoadUint32(&eqi.Queue.getPos)
|
putPos = atomic.LoadUint32(&eqi.Queue.putPos)
|
|
if putPos >= getPos {
|
posCnt = putPos - getPos
|
} else {
|
posCnt = capMod + (putPos - getPos)
|
}
|
|
//todo
|
//if posCnt >= capMod-1 {
|
if posCnt >= capMod {
|
runtime.Gosched()
|
return false, int(posCnt)
|
}
|
|
putPosNew = putPos + 1
|
if !atomic.CompareAndSwapUint32(&eqi.Queue.putPos, putPos, putPosNew) {
|
fmt.Println("CompareAndSwapUint32 error")
|
runtime.Gosched()
|
return false, int(posCnt)
|
}
|
|
cache = &(eqi.EsCaches[putPosNew&capMod])
|
|
for {
|
getNo := atomic.LoadUint32(&cache.getNo)
|
putNo := atomic.LoadUint32(&cache.putNo)
|
if putPosNew == putNo && getNo == putNo {
|
cache.value = val
|
atomic.AddUint32(&cache.putNo, eqi.Queue.capacity)
|
return true, int(posCnt + 1)
|
} else {
|
runtime.Gosched()
|
}
|
}
|
}
|
|
//PutForce 强制写入,失败则继续重写,直到成功为止
|
func (eqi *EsQueueInfo) PutForce(ctx context.Context, val ElemInfo) int {
|
ok, qua := eqi.Put(val)
|
for !ok {
|
loopB:
|
for {
|
select {
|
case <-ctx.Done():
|
return 0
|
default:
|
ok, qua = eqi.Put(val)
|
if ok {
|
break loopB
|
}
|
time.Sleep(TimePeriodPutOrGet)
|
}
|
}
|
}
|
|
return qua
|
}
|
|
//PutForceWithinTime 在timeout ms的时间内,强制写入,失败则继续重写,直到成功为止或者超时退出
|
func (eqi *EsQueueInfo) PutForceWithinTime(ctx context.Context, val ElemInfo, timeout int) (bool, int) {
|
ok, qua := eqi.Put(val)
|
if ok {
|
return ok, qua
|
}
|
|
to := time.NewTimer(time.Duration(timeout) * time.Millisecond)
|
defer to.Stop()
|
|
loopB:
|
for {
|
select {
|
case <-ctx.Done():
|
return false, 0
|
case <-to.C:
|
return false, 0
|
default:
|
ok, qua = eqi.Put(val)
|
if ok {
|
break loopB
|
}
|
time.Sleep(TimePeriodPutOrGet)
|
}
|
}
|
return ok, qua
|
}
|
|
// Get 单次获取,可能失败
|
func (eqi *EsQueueInfo) Get() (ElemInfo, bool, int) {
|
var putPos, getPos, getPosNew, posCnt uint32
|
var cache *esCache
|
capMod := eqi.Queue.capMod
|
|
putPos = atomic.LoadUint32(&eqi.Queue.putPos)
|
getPos = atomic.LoadUint32(&eqi.Queue.getPos)
|
|
if putPos >= getPos {
|
posCnt = putPos - getPos
|
} else {
|
posCnt = capMod + (putPos - getPos)
|
}
|
|
if posCnt < 1 {
|
runtime.Gosched()
|
return ElemInfo{}, false, int(posCnt)
|
}
|
|
getPosNew = getPos + 1
|
if !atomic.CompareAndSwapUint32(&eqi.Queue.getPos, getPos, getPosNew) {
|
runtime.Gosched()
|
return ElemInfo{}, false, int(posCnt)
|
}
|
|
cache = &(eqi.EsCaches[getPosNew&capMod])
|
|
for {
|
getNo := atomic.LoadUint32(&cache.getNo)
|
putNo := atomic.LoadUint32(&cache.putNo)
|
if getPosNew == getNo && getNo == putNo-eqi.Queue.capacity {
|
val := cache.value
|
cache.value = ElemInfo{PicId: 0, InfoId:0}
|
atomic.AddUint32(&cache.getNo, eqi.Queue.capacity)
|
return val, true, int(posCnt - 1)
|
} else {
|
runtime.Gosched()
|
}
|
}
|
}
|
|
//GetForce 强制获取数据,失败则继续获取,直到成功为止
|
func (eqi *EsQueueInfo) GetForce(ctx context.Context) (ElemInfo, int) {
|
val, ok, qua := eqi.Get()
|
for !ok {
|
loopB:
|
for {
|
select {
|
case <-ctx.Done():
|
return ElemInfo{}, 0
|
default:
|
val, ok, qua = eqi.Get()
|
if ok {
|
break loopB
|
}
|
time.Sleep(TimePeriodPutOrGet)
|
}
|
}
|
}
|
|
return val, qua
|
}
|
|
//GetForceWithinTime 在timeout ms的时间内,强制获取数据,失败则继续获取,直到成功为止
|
func (eqi *EsQueueInfo) GetForceWithinTime(ctx context.Context, timeout int) (ElemInfo, bool, int) {
|
val, ok, qua := eqi.Get()
|
if ok {
|
return val, ok, qua
|
}
|
|
to := time.NewTimer(time.Duration(timeout) * time.Millisecond)
|
defer to.Stop()
|
|
loopB:
|
for {
|
select {
|
case <-ctx.Done():
|
return ElemInfo{}, false, 0
|
case <-to.C:
|
return ElemInfo{}, false, 0
|
default:
|
val, ok, qua = eqi.Get()
|
if ok {
|
break loopB
|
}
|
time.Sleep(TimePeriodPutOrGet)
|
}
|
}
|
return val, ok, qua
|
}
|
|
// round 到最近的2的倍数
|
func minQuantity(v uint32) uint32 {
|
if v < 2 {
|
return 2
|
}
|
v--
|
v |= v >> 1
|
v |= v >> 2
|
v |= v >> 4
|
v |= v >> 8
|
v |= v >> 16
|
v++
|
return v
|
}
|