From ba4809966f2dd740219eca6c4d9208bd95d21d89 Mon Sep 17 00:00:00 2001 From: chenshijun <csj_sky@126.com> Date: 星期四, 12 三月 2020 17:17:17 +0800 Subject: [PATCH] shmqueue lib 多读多写,多协程,多shmkey均测试通过 --- shmqueue.go | 368 ++++++++++++++++++++++++++++++++++++++++ shmwrap.go | 141 +++++++++++++++ README.md | 1 3 files changed, 510 insertions(+), 0 deletions(-) diff --git a/README.md b/README.md index 9310489..a04ce6b 100644 --- a/README.md +++ b/README.md @@ -2,3 +2,4 @@ system v shm lock-free queue +澶氳澶氬啓锛屽鍗忕▼锛屽shmkey鍧囨祴璇曢�氳繃 \ No newline at end of file diff --git a/shmqueue.go b/shmqueue.go new file mode 100644 index 0000000..c819298 --- /dev/null +++ b/shmqueue.go @@ -0,0 +1,368 @@ +package shmqueue + +import ( + "context" + "fmt" + "github.com/gen2brain/shm" + "reflect" + "runtime" + "sync/atomic" + "time" + "unsafe" +) + +const ( + TimePeriodPutOrGet = time.Duration(10) * time.Microsecond +) + +//Element info +type ElemInfo struct { + ShmId int +} + +//queue info +type esCache struct { + putNo uint32 + getNo uint32 + value ElemInfo +} + +// lock free queue struct +type esQueue struct { + capacity uint32 + capMod uint32 + putPos uint32 + getPos uint32 + cache []esCache +} + +type EsQueueInfo struct { + EsCaches []esCache + ShmKey int + ShmID int + ShmData []byte + Queue *esQueue +} + +// ptr2esCache convert unsafe.Pointer to []esCache +func ptr2esCache(s unsafe.Pointer, size int) []esCache { + var x reflect.SliceHeader + x.Len = size + x.Cap = size + x.Data = uintptr(s) + return *(*[]esCache)(unsafe.Pointer(&x)) +} + +// bytes2shmEsQueue convert []byte to *esQueue +func bytes2shmEsQueue(b []byte) *esQueue { + return (*esQueue)(unsafe.Pointer( + (*reflect.SliceHeader)(unsafe.Pointer(&b)).Data, + )) +} + +// NewQueue new and attach a shm queue if no exist, otherwise return exist +func NewQueue(ctx context.Context, key int, cap int) *EsQueueInfo { + var shmLen uint32 + var datainfo esCache + var shmstruct esQueue + var eqi EsQueueInfo + + shmblocks := minQuantity(uint32(cap)) + shmLen = shmblocks*uint32(unsafe.Sizeof(datainfo)) + uint32(unsafe.Sizeof(shmstruct)) + + data, shmid := CreateRawShm(ctx, int(shmLen), key) + fmt.Println("shmid:", shmid) + fmt.Printf("data:%p\n", data) + q := bytes2shmEsQueue(data) + fmt.Printf("q:%p\n", q) + fmt.Println("esqueue:", q) + fmt.Printf("q.capacity:%p\n", &q.capacity) + fmt.Printf("q.capMod:%p\n", &q.capMod) + fmt.Printf("q.putPos:%p\n", &q.putPos) + fmt.Printf("q.getPos:%p\n", &q.getPos) + fmt.Printf("q.cache:%p\n", &q.cache) + //init parameters + q.capacity = shmblocks + q.capMod = q.capacity - 1 + q.putPos = 0 + q.getPos = 0 + + fmt.Println("int(q.capacity)",int(q.capacity)) + eqi.EsCaches = ptr2esCache(unsafe.Pointer(&q.cache), int(q.capacity)) + fmt.Printf("EsCaches:%p\n", &eqi.EsCaches) + fmt.Printf("EsCaches[0]:%p\n", &(eqi.EsCaches[0])) + fmt.Printf("EsCaches[1]:%p\n", &(eqi.EsCaches[1])) + + fmt.Println("int(q.capacity)",int(q.capacity)) + for i := 0; i < int(q.capacity); i++ { + fmt.Println("i", i) + fmt.Printf("EsCaches[%d]:%p\n", i, &(eqi.EsCaches[i])) + fmt.Printf("EsCaches[%d].getNo:%p\n", i, &(eqi.EsCaches[i].getNo)) + fmt.Printf("EsCaches[%d].putNo:%p\n", i, &(eqi.EsCaches[i].putNo)) + + eqi.EsCaches[i].getNo = uint32(i) + eqi.EsCaches[i].putNo = uint32(i) + fmt.Printf("cache.putNo:%p\n", &(eqi.EsCaches[i].putNo)) + fmt.Printf("cache.putNo:%p\n", &(eqi.EsCaches[i].putNo)) + fmt.Println("cache:", eqi.EsCaches[i]) + } + cache := &eqi.EsCaches[0] + cache.getNo = q.capacity + cache.putNo = q.capacity + fmt.Println("cache:", cache) + + cache0 := eqi.EsCaches[0] + fmt.Printf("cache0:%p\n", &(cache0)) + fmt.Printf("cache0.getNo:%p\n", &(cache0.getNo)) + fmt.Printf("cache0.putNo:%p\n", &(cache0.putNo)) + fmt.Println("cache0:", cache0) + + cache1 := eqi.EsCaches[1] + fmt.Printf("cache1:%p\n", &(cache1)) + fmt.Printf("cache1.getNo:%p\n", &(cache1.getNo)) + fmt.Printf("cache1.putNo:%p\n", &(cache1.putNo)) + fmt.Println("cache1:", cache1) + + fmt.Println("EsCaches:", eqi.EsCaches) + + eqi.ShmData = data + eqi.ShmID = shmid + eqi.ShmKey = key + eqi.Queue = q + time.Sleep(10*time.Second) + + return &eqi +} + +// AttachQueue attach an exist shm queue +func AttachQueue(ctx context.Context, key int) *EsQueueInfo { + + var eqi EsQueueInfo + data, shmid := AttachRawShm(ctx, key) + fmt.Println("shmid:", shmid) + shmdata := bytes2shmEsQueue(data) + fmt.Println("shmdata:", shmdata) + eqi.EsCaches = ptr2esCache(unsafe.Pointer(&shmdata.cache), int(shmdata.capacity)) + //fmt.Println("EsCaches:", eqi.EsCaches) + + eqi.Queue = shmdata + eqi.ShmKey = key + eqi.ShmID = shmid + eqi.ShmData = data + + return &eqi +} + +// ReleaseQueue detach an exist shm queue +func (eqi *EsQueueInfo) ReleaseQueue() { + fmt.Println("ReleaseQueue: key", eqi.ShmKey) + DestroyShm(eqi.ShmData) +} + +// RemoveShmId remove an exist shm queue (ipcrm -m shmid) +func (eqi *EsQueueInfo) RemoveShmId() error { + return shm.Rm(eqi.ShmID) +} + +// Status status of shm queue +func (eqi *EsQueueInfo) Status() string { + q := eqi.Queue + getPos := atomic.LoadUint32(&q.getPos) + putPos := atomic.LoadUint32(&q.putPos) + return fmt.Sprintf("Queue{Capacity: %v, putPos: %v, getPos: %v, size:%v, cache: %v}", + eqi.QueueCapacity(), putPos, getPos, eqi.QueueSize(), eqi.EsCaches) +} + +// Capacity queue 瀹归噺锛屾�诲ぇ灏� +func (eqi *EsQueueInfo) QueueCapacity() int { + return int(eqi.Queue.capMod) +} + +// Size queue 瀹為檯澶у皬 +func (eqi *EsQueueInfo) QueueSize() int { + var putPos, getPos uint32 + var quantity uint32 + getPos = atomic.LoadUint32(&eqi.Queue.getPos) + putPos = atomic.LoadUint32(&eqi.Queue.putPos) + + if putPos >= getPos { + quantity = putPos - getPos + } else { + //quantity = q.capMod + (putPos - getPos) + quantity = (eqi.Queue.capMod + (putPos - getPos)) % eqi.Queue.capMod + } + + return int(quantity) +} + +// Put 鍗曟鍐欏叆锛屽彲鑳藉け璐� +func (eqi *EsQueueInfo) Put(val ElemInfo) (bool, int) { + var putPos, putPosNew, getPos, posCnt uint32 + var cache *esCache + capMod := eqi.Queue.capMod + + getPos = atomic.LoadUint32(&eqi.Queue.getPos) + putPos = atomic.LoadUint32(&eqi.Queue.putPos) + + if putPos >= getPos { + posCnt = putPos - getPos + } else { + //posCnt = capMod + (putPos - getPos) + posCnt = (capMod + (putPos - getPos)) % capMod + } + + //todo + //if posCnt >= capMod-1 { + if posCnt >= capMod { + runtime.Gosched() + return false, int(posCnt) + } + + putPosNew = putPos + 1 + if !atomic.CompareAndSwapUint32(&eqi.Queue.putPos, putPos, putPosNew) { + fmt.Println("CompareAndSwapUint32 error") + runtime.Gosched() + return false, int(posCnt) + } + + cache = &(eqi.EsCaches[putPosNew&capMod]) + + for { + getNo := atomic.LoadUint32(&cache.getNo) + putNo := atomic.LoadUint32(&cache.putNo) + if putPosNew == putNo && getNo == putNo { + cache.value = val + atomic.AddUint32(&cache.putNo, eqi.Queue.capacity) + return true, int(posCnt + 1) + } else { + runtime.Gosched() + } + } +} + +//PutForce 寮哄埗鍐欏叆锛屽け璐ュ垯缁х画閲嶅啓锛岀洿鍒版垚鍔熶负姝� +func (eqi *EsQueueInfo) PutForce(val ElemInfo) int { + ok, qua := eqi.Put(val) + for !ok { + time.Sleep(TimePeriodPutOrGet) + ok, qua = eqi.Put(val) + } + + return qua +} + +//PutForceWithinTime 鍦╰imeout ms鐨勬椂闂村唴锛屽己鍒跺啓鍏ワ紝澶辫触鍒欑户缁噸鍐欙紝鐩村埌鎴愬姛涓烘鎴栬�呰秴鏃堕��鍑� +func (eqi *EsQueueInfo) PutForceWithinTime(val ElemInfo, timeout int) (bool, int) { + ok, qua := eqi.Put(val) + if ok { + return ok, qua + } + + to := time.NewTimer(time.Duration(timeout)*time.Millisecond) + defer to.Stop() + + for { + select { + case <-to.C: + return false, 0 + default: + ok, qua = eqi.Put(val) + if ok { + return ok, qua + } + time.Sleep(TimePeriodPutOrGet) + } + } + return ok, qua +} + +// Get 鍗曟鑾峰彇锛屽彲鑳藉け璐� +func (eqi *EsQueueInfo) Get() (ElemInfo, bool, int) { + var putPos, getPos, getPosNew, posCnt uint32 + var cache *esCache + capMod := eqi.Queue.capMod + + putPos = atomic.LoadUint32(&eqi.Queue.putPos) + getPos = atomic.LoadUint32(&eqi.Queue.getPos) + + if putPos >= getPos { + posCnt = putPos - getPos + } else { + posCnt = capMod + (putPos - getPos) + } + + if posCnt < 1 { + runtime.Gosched() + return ElemInfo{}, false, int(posCnt) + } + + getPosNew = getPos + 1 + if !atomic.CompareAndSwapUint32(&eqi.Queue.getPos, getPos, getPosNew) { + runtime.Gosched() + return ElemInfo{}, false, int(posCnt) + } + + cache = &(eqi.EsCaches[getPosNew&capMod]) + + for { + getNo := atomic.LoadUint32(&cache.getNo) + putNo := atomic.LoadUint32(&cache.putNo) + if getPosNew == getNo && getNo == putNo-eqi.Queue.capacity { + val := cache.value + cache.value = ElemInfo{ShmId:0} + atomic.AddUint32(&cache.getNo, eqi.Queue.capacity) + return val, true, int(posCnt - 1) + } else { + runtime.Gosched() + } + } +} + +//GetForce 寮哄埗鑾峰彇鏁版嵁锛屽け璐ュ垯缁х画鑾峰彇锛岀洿鍒版垚鍔熶负姝� +func (eqi *EsQueueInfo) GetForce() (ElemInfo, int) { + val, ok, qua := eqi.Get() + for !ok { + time.Sleep(TimePeriodPutOrGet) + val, ok, qua = eqi.Get() + } + + return val, qua +} + +//GetForceWithinTime 鍦╰imeout ms鐨勬椂闂村唴锛屽己鍒惰幏鍙栨暟鎹紝澶辫触鍒欑户缁幏鍙栵紝鐩村埌鎴愬姛涓烘 +func (eqi *EsQueueInfo) GetForceWithinTime(timeout int) (ElemInfo, bool, int) { + val, ok, qua := eqi.Get() + if ok { + return val, ok, qua + } + + to := time.NewTimer(time.Duration(timeout)*time.Millisecond) + defer to.Stop() + + for { + select { + case <-to.C: + return ElemInfo{},false, 0 + default: + val, ok, qua = eqi.Get() + if ok { + return val, ok, qua + } + time.Sleep(TimePeriodPutOrGet) + } + } + return val, ok, qua +} + +// round 鍒版渶杩戠殑2鐨勫�嶆暟 +func minQuantity(v uint32) uint32 { + v-- + v |= v >> 1 + v |= v >> 2 + v |= v >> 4 + v |= v >> 8 + v |= v >> 16 + v++ + return v +} + diff --git a/shmwrap.go b/shmwrap.go new file mode 100644 index 0000000..87ea7a4 --- /dev/null +++ b/shmwrap.go @@ -0,0 +1,141 @@ +package shmqueue + +import ( + "context" + "fmt" + "time" + "github.com/gen2brain/shm" +) + +// NewBlock shm block with size +func NewBlock(size int, key int) ([]byte, int, error) { + id, err := shm.Get(key, size, shm.IPC_CREAT|0666) + fmt.Println("Get:", id, err) + if err != nil || id == -1 { + return nil, -1, err + } + + data, err2 := shm.At(id, 0, 0) + if err2 != nil { + return nil, -1, err2 + } + + return data, id, nil +} + +// AttachBlock attach exist shm +func AttachBlock(key int) ([]byte, int, error) { + id, err := shm.Get(key, 0, 0) + fmt.Println("Get:", id, err) + if err != nil || id == -1 { //no exist + return nil, -1, err + } + + data, err2 := shm.At(id, 0, 0) + if err2 != nil { + return nil, -1, err2 + } + + return data, id, nil +} + +// ReleaseBlock release shm block +func ReleaseBlock(data []byte) { + if data != nil { + shm.Dt(data) + } +} + +// CreateRawShm create raw shm block with size, only space, no padding, return data([]byte), id(int) +// context for quit +func CreateRawShm(ctx context.Context, size int, key int) ([]byte, int) { + data, id, err := AttachBlock(key) + fmt.Println("err:", err) + if err != nil { + loopB: + for { + select { + case <-ctx.Done(): + return nil, -1 + default: + if err == nil { + break loopB + } else { + fmt.Println("createShm error:", err) + } + time.Sleep(time.Millisecond) + data, id, err = NewBlock(size, key) + } + } + } + return data, id +} + +//AttachRawShm don't create +func AttachRawShm(ctx context.Context, key int) ([]byte, int) { + data, id, err := AttachBlock(key) + if err != nil { + loopB: + for { + select { + case <-ctx.Done(): + return nil, -1 + default: + if err == nil { + break loopB + } else { + fmt.Println("createShm error:", err) + } + time.Sleep(time.Millisecond) + data, id, err = AttachBlock(key) + } + } + } + return data, id +} + +// CreatePaddingShm create padding shm block with size, return data-with-padding([]byte), id(int) +// context for quit, padding for fill raw shm block +func CreatePaddingShm(ctx context.Context, size int, key int, padding []byte) ([]byte, int) { + data, id, err := NewBlock(size, key) + if err != nil { + loopB: + for { + select { + case <-ctx.Done(): + return nil, -1 + default: + if err == nil { + break loopB + } else { + fmt.Println("createShm error:", err) + } + time.Sleep(time.Millisecond) + data, id, err = NewBlock(size, key) + } + } + } + + copy(data, padding) + return data, id +} + +// DestroyShm destroy +func DestroyShm(data []byte) { + ReleaseBlock(data) +} + +// Attach attach shmid get block +func Attach(id int) ([]byte, error) { + return shm.At(id, 0, 0) +} + +// Detach detach shm block +func Detach(d []byte) error { + return shm.Dt(d) +} + +// ReleaseShmID release shmid +func ReleaseShmID(id int) error { + return shm.Rm(id) +} -- Gitblit v1.8.0