New file |
| | |
| | | package shmqueue |
| | | |
| | | import ( |
| | | "context" |
| | | "fmt" |
| | | "github.com/gen2brain/shm" |
| | | "reflect" |
| | | "runtime" |
| | | "sync/atomic" |
| | | "time" |
| | | "unsafe" |
| | | ) |
| | | |
| | | const ( |
| | | TimePeriodPutOrGet = time.Duration(10) * time.Microsecond |
| | | ) |
| | | |
| | | //Element info |
| | | type ElemInfo struct { |
| | | ShmId int |
| | | } |
| | | |
| | | //queue info |
| | | type esCache struct { |
| | | putNo uint32 |
| | | getNo uint32 |
| | | value ElemInfo |
| | | } |
| | | |
| | | // lock free queue struct |
| | | type esQueue struct { |
| | | capacity uint32 |
| | | capMod uint32 |
| | | putPos uint32 |
| | | getPos uint32 |
| | | cache []esCache |
| | | } |
| | | |
| | | type EsQueueInfo struct { |
| | | EsCaches []esCache |
| | | ShmKey int |
| | | ShmID int |
| | | ShmData []byte |
| | | Queue *esQueue |
| | | } |
| | | |
| | | // ptr2esCache convert unsafe.Pointer to []esCache |
| | | func ptr2esCache(s unsafe.Pointer, size int) []esCache { |
| | | var x reflect.SliceHeader |
| | | x.Len = size |
| | | x.Cap = size |
| | | x.Data = uintptr(s) |
| | | return *(*[]esCache)(unsafe.Pointer(&x)) |
| | | } |
| | | |
| | | // bytes2shmEsQueue convert []byte to *esQueue |
| | | func bytes2shmEsQueue(b []byte) *esQueue { |
| | | return (*esQueue)(unsafe.Pointer( |
| | | (*reflect.SliceHeader)(unsafe.Pointer(&b)).Data, |
| | | )) |
| | | } |
| | | |
| | | // NewQueue new and attach a shm queue if no exist, otherwise return exist |
| | | func NewQueue(ctx context.Context, key int, cap int) *EsQueueInfo { |
| | | var shmLen uint32 |
| | | var datainfo esCache |
| | | var shmstruct esQueue |
| | | var eqi EsQueueInfo |
| | | |
| | | shmblocks := minQuantity(uint32(cap)) |
| | | shmLen = shmblocks*uint32(unsafe.Sizeof(datainfo)) + uint32(unsafe.Sizeof(shmstruct)) |
| | | |
| | | data, shmid := CreateRawShm(ctx, int(shmLen), key) |
| | | fmt.Println("shmid:", shmid) |
| | | fmt.Printf("data:%p\n", data) |
| | | q := bytes2shmEsQueue(data) |
| | | fmt.Printf("q:%p\n", q) |
| | | fmt.Println("esqueue:", q) |
| | | fmt.Printf("q.capacity:%p\n", &q.capacity) |
| | | fmt.Printf("q.capMod:%p\n", &q.capMod) |
| | | fmt.Printf("q.putPos:%p\n", &q.putPos) |
| | | fmt.Printf("q.getPos:%p\n", &q.getPos) |
| | | fmt.Printf("q.cache:%p\n", &q.cache) |
| | | //init parameters |
| | | q.capacity = shmblocks |
| | | q.capMod = q.capacity - 1 |
| | | q.putPos = 0 |
| | | q.getPos = 0 |
| | | |
| | | fmt.Println("int(q.capacity)",int(q.capacity)) |
| | | eqi.EsCaches = ptr2esCache(unsafe.Pointer(&q.cache), int(q.capacity)) |
| | | fmt.Printf("EsCaches:%p\n", &eqi.EsCaches) |
| | | fmt.Printf("EsCaches[0]:%p\n", &(eqi.EsCaches[0])) |
| | | fmt.Printf("EsCaches[1]:%p\n", &(eqi.EsCaches[1])) |
| | | |
| | | fmt.Println("int(q.capacity)",int(q.capacity)) |
| | | for i := 0; i < int(q.capacity); i++ { |
| | | fmt.Println("i", i) |
| | | fmt.Printf("EsCaches[%d]:%p\n", i, &(eqi.EsCaches[i])) |
| | | fmt.Printf("EsCaches[%d].getNo:%p\n", i, &(eqi.EsCaches[i].getNo)) |
| | | fmt.Printf("EsCaches[%d].putNo:%p\n", i, &(eqi.EsCaches[i].putNo)) |
| | | |
| | | eqi.EsCaches[i].getNo = uint32(i) |
| | | eqi.EsCaches[i].putNo = uint32(i) |
| | | fmt.Printf("cache.putNo:%p\n", &(eqi.EsCaches[i].putNo)) |
| | | fmt.Printf("cache.putNo:%p\n", &(eqi.EsCaches[i].putNo)) |
| | | fmt.Println("cache:", eqi.EsCaches[i]) |
| | | } |
| | | cache := &eqi.EsCaches[0] |
| | | cache.getNo = q.capacity |
| | | cache.putNo = q.capacity |
| | | fmt.Println("cache:", cache) |
| | | |
| | | cache0 := eqi.EsCaches[0] |
| | | fmt.Printf("cache0:%p\n", &(cache0)) |
| | | fmt.Printf("cache0.getNo:%p\n", &(cache0.getNo)) |
| | | fmt.Printf("cache0.putNo:%p\n", &(cache0.putNo)) |
| | | fmt.Println("cache0:", cache0) |
| | | |
| | | cache1 := eqi.EsCaches[1] |
| | | fmt.Printf("cache1:%p\n", &(cache1)) |
| | | fmt.Printf("cache1.getNo:%p\n", &(cache1.getNo)) |
| | | fmt.Printf("cache1.putNo:%p\n", &(cache1.putNo)) |
| | | fmt.Println("cache1:", cache1) |
| | | |
| | | fmt.Println("EsCaches:", eqi.EsCaches) |
| | | |
| | | eqi.ShmData = data |
| | | eqi.ShmID = shmid |
| | | eqi.ShmKey = key |
| | | eqi.Queue = q |
| | | time.Sleep(10*time.Second) |
| | | |
| | | return &eqi |
| | | } |
| | | |
| | | // AttachQueue attach an exist shm queue |
| | | func AttachQueue(ctx context.Context, key int) *EsQueueInfo { |
| | | |
| | | var eqi EsQueueInfo |
| | | data, shmid := AttachRawShm(ctx, key) |
| | | fmt.Println("shmid:", shmid) |
| | | shmdata := bytes2shmEsQueue(data) |
| | | fmt.Println("shmdata:", shmdata) |
| | | eqi.EsCaches = ptr2esCache(unsafe.Pointer(&shmdata.cache), int(shmdata.capacity)) |
| | | //fmt.Println("EsCaches:", eqi.EsCaches) |
| | | |
| | | eqi.Queue = shmdata |
| | | eqi.ShmKey = key |
| | | eqi.ShmID = shmid |
| | | eqi.ShmData = data |
| | | |
| | | return &eqi |
| | | } |
| | | |
| | | // ReleaseQueue detach an exist shm queue |
| | | func (eqi *EsQueueInfo) ReleaseQueue() { |
| | | fmt.Println("ReleaseQueue: key", eqi.ShmKey) |
| | | DestroyShm(eqi.ShmData) |
| | | } |
| | | |
| | | // RemoveShmId remove an exist shm queue (ipcrm -m shmid) |
| | | func (eqi *EsQueueInfo) RemoveShmId() error { |
| | | return shm.Rm(eqi.ShmID) |
| | | } |
| | | |
| | | // Status status of shm queue |
| | | func (eqi *EsQueueInfo) Status() string { |
| | | q := eqi.Queue |
| | | getPos := atomic.LoadUint32(&q.getPos) |
| | | putPos := atomic.LoadUint32(&q.putPos) |
| | | return fmt.Sprintf("Queue{Capacity: %v, putPos: %v, getPos: %v, size:%v, cache: %v}", |
| | | eqi.QueueCapacity(), putPos, getPos, eqi.QueueSize(), eqi.EsCaches) |
| | | } |
| | | |
| | | // Capacity queue 容量,总大小 |
| | | func (eqi *EsQueueInfo) QueueCapacity() int { |
| | | return int(eqi.Queue.capMod) |
| | | } |
| | | |
| | | // Size queue 实际大小 |
| | | func (eqi *EsQueueInfo) QueueSize() int { |
| | | var putPos, getPos uint32 |
| | | var quantity uint32 |
| | | getPos = atomic.LoadUint32(&eqi.Queue.getPos) |
| | | putPos = atomic.LoadUint32(&eqi.Queue.putPos) |
| | | |
| | | if putPos >= getPos { |
| | | quantity = putPos - getPos |
| | | } else { |
| | | //quantity = q.capMod + (putPos - getPos) |
| | | quantity = (eqi.Queue.capMod + (putPos - getPos)) % eqi.Queue.capMod |
| | | } |
| | | |
| | | return int(quantity) |
| | | } |
| | | |
| | | // Put 单次写入,可能失败 |
| | | func (eqi *EsQueueInfo) Put(val ElemInfo) (bool, int) { |
| | | var putPos, putPosNew, getPos, posCnt uint32 |
| | | var cache *esCache |
| | | capMod := eqi.Queue.capMod |
| | | |
| | | getPos = atomic.LoadUint32(&eqi.Queue.getPos) |
| | | putPos = atomic.LoadUint32(&eqi.Queue.putPos) |
| | | |
| | | if putPos >= getPos { |
| | | posCnt = putPos - getPos |
| | | } else { |
| | | //posCnt = capMod + (putPos - getPos) |
| | | posCnt = (capMod + (putPos - getPos)) % capMod |
| | | } |
| | | |
| | | //todo |
| | | //if posCnt >= capMod-1 { |
| | | if posCnt >= capMod { |
| | | runtime.Gosched() |
| | | return false, int(posCnt) |
| | | } |
| | | |
| | | putPosNew = putPos + 1 |
| | | if !atomic.CompareAndSwapUint32(&eqi.Queue.putPos, putPos, putPosNew) { |
| | | fmt.Println("CompareAndSwapUint32 error") |
| | | runtime.Gosched() |
| | | return false, int(posCnt) |
| | | } |
| | | |
| | | cache = &(eqi.EsCaches[putPosNew&capMod]) |
| | | |
| | | for { |
| | | getNo := atomic.LoadUint32(&cache.getNo) |
| | | putNo := atomic.LoadUint32(&cache.putNo) |
| | | if putPosNew == putNo && getNo == putNo { |
| | | cache.value = val |
| | | atomic.AddUint32(&cache.putNo, eqi.Queue.capacity) |
| | | return true, int(posCnt + 1) |
| | | } else { |
| | | runtime.Gosched() |
| | | } |
| | | } |
| | | } |
| | | |
| | | //PutForce 强制写入,失败则继续重写,直到成功为止 |
| | | func (eqi *EsQueueInfo) PutForce(val ElemInfo) int { |
| | | ok, qua := eqi.Put(val) |
| | | for !ok { |
| | | time.Sleep(TimePeriodPutOrGet) |
| | | ok, qua = eqi.Put(val) |
| | | } |
| | | |
| | | return qua |
| | | } |
| | | |
| | | //PutForceWithinTime 在timeout ms的时间内,强制写入,失败则继续重写,直到成功为止或者超时退出 |
| | | func (eqi *EsQueueInfo) PutForceWithinTime(val ElemInfo, timeout int) (bool, int) { |
| | | ok, qua := eqi.Put(val) |
| | | if ok { |
| | | return ok, qua |
| | | } |
| | | |
| | | to := time.NewTimer(time.Duration(timeout)*time.Millisecond) |
| | | defer to.Stop() |
| | | |
| | | for { |
| | | select { |
| | | case <-to.C: |
| | | return false, 0 |
| | | default: |
| | | ok, qua = eqi.Put(val) |
| | | if ok { |
| | | return ok, qua |
| | | } |
| | | time.Sleep(TimePeriodPutOrGet) |
| | | } |
| | | } |
| | | return ok, qua |
| | | } |
| | | |
| | | // Get 单次获取,可能失败 |
| | | func (eqi *EsQueueInfo) Get() (ElemInfo, bool, int) { |
| | | var putPos, getPos, getPosNew, posCnt uint32 |
| | | var cache *esCache |
| | | capMod := eqi.Queue.capMod |
| | | |
| | | putPos = atomic.LoadUint32(&eqi.Queue.putPos) |
| | | getPos = atomic.LoadUint32(&eqi.Queue.getPos) |
| | | |
| | | if putPos >= getPos { |
| | | posCnt = putPos - getPos |
| | | } else { |
| | | posCnt = capMod + (putPos - getPos) |
| | | } |
| | | |
| | | if posCnt < 1 { |
| | | runtime.Gosched() |
| | | return ElemInfo{}, false, int(posCnt) |
| | | } |
| | | |
| | | getPosNew = getPos + 1 |
| | | if !atomic.CompareAndSwapUint32(&eqi.Queue.getPos, getPos, getPosNew) { |
| | | runtime.Gosched() |
| | | return ElemInfo{}, false, int(posCnt) |
| | | } |
| | | |
| | | cache = &(eqi.EsCaches[getPosNew&capMod]) |
| | | |
| | | for { |
| | | getNo := atomic.LoadUint32(&cache.getNo) |
| | | putNo := atomic.LoadUint32(&cache.putNo) |
| | | if getPosNew == getNo && getNo == putNo-eqi.Queue.capacity { |
| | | val := cache.value |
| | | cache.value = ElemInfo{ShmId:0} |
| | | atomic.AddUint32(&cache.getNo, eqi.Queue.capacity) |
| | | return val, true, int(posCnt - 1) |
| | | } else { |
| | | runtime.Gosched() |
| | | } |
| | | } |
| | | } |
| | | |
| | | //GetForce 强制获取数据,失败则继续获取,直到成功为止 |
| | | func (eqi *EsQueueInfo) GetForce() (ElemInfo, int) { |
| | | val, ok, qua := eqi.Get() |
| | | for !ok { |
| | | time.Sleep(TimePeriodPutOrGet) |
| | | val, ok, qua = eqi.Get() |
| | | } |
| | | |
| | | return val, qua |
| | | } |
| | | |
| | | //GetForceWithinTime 在timeout ms的时间内,强制获取数据,失败则继续获取,直到成功为止 |
| | | func (eqi *EsQueueInfo) GetForceWithinTime(timeout int) (ElemInfo, bool, int) { |
| | | val, ok, qua := eqi.Get() |
| | | if ok { |
| | | return val, ok, qua |
| | | } |
| | | |
| | | to := time.NewTimer(time.Duration(timeout)*time.Millisecond) |
| | | defer to.Stop() |
| | | |
| | | for { |
| | | select { |
| | | case <-to.C: |
| | | return ElemInfo{},false, 0 |
| | | default: |
| | | val, ok, qua = eqi.Get() |
| | | if ok { |
| | | return val, ok, qua |
| | | } |
| | | time.Sleep(TimePeriodPutOrGet) |
| | | } |
| | | } |
| | | return val, ok, qua |
| | | } |
| | | |
| | | // round 到最近的2的倍数 |
| | | func minQuantity(v uint32) uint32 { |
| | | v-- |
| | | v |= v >> 1 |
| | | v |= v >> 2 |
| | | v |= v >> 4 |
| | | v |= v >> 8 |
| | | v |= v >> 16 |
| | | v++ |
| | | return v |
| | | } |
| | | |