chenshijun
2020-03-12 ba4809966f2dd740219eca6c4d9208bd95d21d89
shmqueue lib 多读多写,多协程,多shmkey均测试通过
2个文件已添加
1个文件已修改
510 ■■■■■ 已修改文件
README.md 1 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
shmqueue.go 368 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
shmwrap.go 141 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
README.md
@@ -2,3 +2,4 @@
system v shm lock-free queue
多读多写,多协程,多shmkey均测试通过
shmqueue.go
New file
@@ -0,0 +1,368 @@
package shmqueue
import (
    "context"
    "fmt"
    "github.com/gen2brain/shm"
    "reflect"
    "runtime"
    "sync/atomic"
    "time"
    "unsafe"
)
const (
    TimePeriodPutOrGet = time.Duration(10) * time.Microsecond
)
//Element info
type ElemInfo struct {
    ShmId int
}
//queue info
type esCache struct {
    putNo uint32
    getNo uint32
    value ElemInfo
}
// lock free queue struct
type esQueue struct {
    capacity uint32
    capMod   uint32
    putPos   uint32
    getPos   uint32
    cache    []esCache
}
type EsQueueInfo struct {
    EsCaches []esCache
    ShmKey int
    ShmID int
    ShmData []byte
    Queue *esQueue
}
// ptr2esCache convert unsafe.Pointer to []esCache
func ptr2esCache(s unsafe.Pointer, size int) []esCache {
    var x reflect.SliceHeader
    x.Len = size
    x.Cap = size
    x.Data = uintptr(s)
    return *(*[]esCache)(unsafe.Pointer(&x))
}
// bytes2shmEsQueue convert []byte to *esQueue
func bytes2shmEsQueue(b []byte) *esQueue {
    return (*esQueue)(unsafe.Pointer(
        (*reflect.SliceHeader)(unsafe.Pointer(&b)).Data,
    ))
}
// NewQueue new and attach a shm queue if no exist, otherwise return exist
func NewQueue(ctx context.Context, key int, cap int) *EsQueueInfo {
    var shmLen uint32
    var datainfo esCache
    var shmstruct esQueue
    var eqi EsQueueInfo
    shmblocks := minQuantity(uint32(cap))
    shmLen = shmblocks*uint32(unsafe.Sizeof(datainfo)) + uint32(unsafe.Sizeof(shmstruct))
    data, shmid := CreateRawShm(ctx, int(shmLen), key)
    fmt.Println("shmid:", shmid)
    fmt.Printf("data:%p\n", data)
    q := bytes2shmEsQueue(data)
    fmt.Printf("q:%p\n", q)
    fmt.Println("esqueue:", q)
    fmt.Printf("q.capacity:%p\n", &q.capacity)
    fmt.Printf("q.capMod:%p\n", &q.capMod)
    fmt.Printf("q.putPos:%p\n", &q.putPos)
    fmt.Printf("q.getPos:%p\n", &q.getPos)
    fmt.Printf("q.cache:%p\n", &q.cache)
    //init parameters
    q.capacity = shmblocks
    q.capMod = q.capacity - 1
    q.putPos = 0
    q.getPos = 0
    fmt.Println("int(q.capacity)",int(q.capacity))
    eqi.EsCaches = ptr2esCache(unsafe.Pointer(&q.cache), int(q.capacity))
    fmt.Printf("EsCaches:%p\n", &eqi.EsCaches)
    fmt.Printf("EsCaches[0]:%p\n", &(eqi.EsCaches[0]))
    fmt.Printf("EsCaches[1]:%p\n", &(eqi.EsCaches[1]))
    fmt.Println("int(q.capacity)",int(q.capacity))
    for i := 0; i < int(q.capacity); i++ {
        fmt.Println("i", i)
        fmt.Printf("EsCaches[%d]:%p\n", i,  &(eqi.EsCaches[i]))
        fmt.Printf("EsCaches[%d].getNo:%p\n", i,  &(eqi.EsCaches[i].getNo))
        fmt.Printf("EsCaches[%d].putNo:%p\n", i,  &(eqi.EsCaches[i].putNo))
        eqi.EsCaches[i].getNo = uint32(i)
        eqi.EsCaches[i].putNo = uint32(i)
        fmt.Printf("cache.putNo:%p\n",  &(eqi.EsCaches[i].putNo))
        fmt.Printf("cache.putNo:%p\n",  &(eqi.EsCaches[i].putNo))
        fmt.Println("cache:", eqi.EsCaches[i])
    }
    cache := &eqi.EsCaches[0]
    cache.getNo = q.capacity
    cache.putNo = q.capacity
    fmt.Println("cache:", cache)
    cache0 := eqi.EsCaches[0]
    fmt.Printf("cache0:%p\n",   &(cache0))
    fmt.Printf("cache0.getNo:%p\n", &(cache0.getNo))
    fmt.Printf("cache0.putNo:%p\n", &(cache0.putNo))
    fmt.Println("cache0:", cache0)
    cache1 := eqi.EsCaches[1]
    fmt.Printf("cache1:%p\n",   &(cache1))
    fmt.Printf("cache1.getNo:%p\n", &(cache1.getNo))
    fmt.Printf("cache1.putNo:%p\n", &(cache1.putNo))
    fmt.Println("cache1:", cache1)
    fmt.Println("EsCaches:", eqi.EsCaches)
    eqi.ShmData = data
    eqi.ShmID = shmid
    eqi.ShmKey = key
    eqi.Queue = q
    time.Sleep(10*time.Second)
    return &eqi
}
// AttachQueue attach an exist shm queue
func AttachQueue(ctx context.Context, key int) *EsQueueInfo {
    var eqi EsQueueInfo
    data, shmid := AttachRawShm(ctx, key)
    fmt.Println("shmid:", shmid)
    shmdata := bytes2shmEsQueue(data)
    fmt.Println("shmdata:", shmdata)
    eqi.EsCaches = ptr2esCache(unsafe.Pointer(&shmdata.cache), int(shmdata.capacity))
    //fmt.Println("EsCaches:", eqi.EsCaches)
    eqi.Queue = shmdata
    eqi.ShmKey = key
    eqi.ShmID = shmid
    eqi.ShmData = data
    return &eqi
}
// ReleaseQueue detach an exist shm queue
func (eqi *EsQueueInfo) ReleaseQueue() {
    fmt.Println("ReleaseQueue: key", eqi.ShmKey)
    DestroyShm(eqi.ShmData)
}
// RemoveShmId remove an exist shm queue (ipcrm -m shmid)
func (eqi *EsQueueInfo) RemoveShmId() error {
    return shm.Rm(eqi.ShmID)
}
// Status status of shm queue
func (eqi *EsQueueInfo) Status() string {
    q := eqi.Queue
    getPos := atomic.LoadUint32(&q.getPos)
    putPos := atomic.LoadUint32(&q.putPos)
    return fmt.Sprintf("Queue{Capacity: %v, putPos: %v, getPos: %v, size:%v, cache: %v}",
        eqi.QueueCapacity(), putPos, getPos, eqi.QueueSize(), eqi.EsCaches)
}
// Capacity queue 容量,总大小
func (eqi *EsQueueInfo) QueueCapacity() int {
    return int(eqi.Queue.capMod)
}
// Size queue 实际大小
func (eqi *EsQueueInfo) QueueSize() int {
    var putPos, getPos uint32
    var quantity uint32
    getPos = atomic.LoadUint32(&eqi.Queue.getPos)
    putPos = atomic.LoadUint32(&eqi.Queue.putPos)
    if putPos >= getPos {
        quantity = putPos - getPos
    } else {
        //quantity = q.capMod + (putPos - getPos)
        quantity = (eqi.Queue.capMod + (putPos - getPos)) % eqi.Queue.capMod
    }
    return int(quantity)
}
// Put 单次写入,可能失败
func (eqi *EsQueueInfo) Put(val ElemInfo) (bool, int) {
    var putPos, putPosNew, getPos, posCnt uint32
    var cache *esCache
    capMod := eqi.Queue.capMod
    getPos = atomic.LoadUint32(&eqi.Queue.getPos)
    putPos = atomic.LoadUint32(&eqi.Queue.putPos)
    if putPos >= getPos {
        posCnt = putPos - getPos
    } else {
        //posCnt = capMod + (putPos - getPos)
        posCnt = (capMod + (putPos - getPos)) % capMod
    }
    //todo
    //if posCnt >= capMod-1 {
    if posCnt >= capMod {
        runtime.Gosched()
        return false, int(posCnt)
    }
    putPosNew = putPos + 1
    if !atomic.CompareAndSwapUint32(&eqi.Queue.putPos, putPos, putPosNew) {
        fmt.Println("CompareAndSwapUint32 error")
        runtime.Gosched()
        return false, int(posCnt)
    }
    cache = &(eqi.EsCaches[putPosNew&capMod])
    for {
        getNo := atomic.LoadUint32(&cache.getNo)
        putNo := atomic.LoadUint32(&cache.putNo)
        if putPosNew == putNo && getNo == putNo {
            cache.value = val
            atomic.AddUint32(&cache.putNo, eqi.Queue.capacity)
            return true, int(posCnt + 1)
        } else {
            runtime.Gosched()
        }
    }
}
//PutForce 强制写入,失败则继续重写,直到成功为止
func (eqi *EsQueueInfo) PutForce(val ElemInfo) int {
    ok, qua := eqi.Put(val)
    for !ok {
        time.Sleep(TimePeriodPutOrGet)
        ok, qua = eqi.Put(val)
    }
    return qua
}
//PutForceWithinTime 在timeout ms的时间内,强制写入,失败则继续重写,直到成功为止或者超时退出
func (eqi *EsQueueInfo) PutForceWithinTime(val ElemInfo, timeout int) (bool, int) {
    ok, qua := eqi.Put(val)
    if ok {
        return ok, qua
    }
    to := time.NewTimer(time.Duration(timeout)*time.Millisecond)
    defer to.Stop()
    for {
        select {
        case <-to.C:
            return false, 0
        default:
            ok, qua = eqi.Put(val)
            if ok {
                return ok, qua
            }
            time.Sleep(TimePeriodPutOrGet)
        }
    }
    return ok, qua
}
// Get 单次获取,可能失败
func (eqi *EsQueueInfo) Get() (ElemInfo, bool, int) {
    var putPos, getPos, getPosNew, posCnt uint32
    var cache *esCache
    capMod := eqi.Queue.capMod
    putPos = atomic.LoadUint32(&eqi.Queue.putPos)
    getPos = atomic.LoadUint32(&eqi.Queue.getPos)
    if putPos >= getPos {
        posCnt = putPos - getPos
    } else {
        posCnt = capMod + (putPos - getPos)
    }
    if posCnt < 1 {
        runtime.Gosched()
        return ElemInfo{}, false, int(posCnt)
    }
    getPosNew = getPos + 1
    if !atomic.CompareAndSwapUint32(&eqi.Queue.getPos, getPos, getPosNew) {
        runtime.Gosched()
        return ElemInfo{}, false, int(posCnt)
    }
    cache = &(eqi.EsCaches[getPosNew&capMod])
    for {
        getNo := atomic.LoadUint32(&cache.getNo)
        putNo := atomic.LoadUint32(&cache.putNo)
        if getPosNew == getNo && getNo == putNo-eqi.Queue.capacity {
            val := cache.value
            cache.value = ElemInfo{ShmId:0}
            atomic.AddUint32(&cache.getNo, eqi.Queue.capacity)
            return val, true, int(posCnt - 1)
        } else {
            runtime.Gosched()
        }
    }
}
//GetForce 强制获取数据,失败则继续获取,直到成功为止
func (eqi *EsQueueInfo) GetForce() (ElemInfo, int) {
    val, ok, qua := eqi.Get()
    for !ok {
        time.Sleep(TimePeriodPutOrGet)
        val, ok, qua = eqi.Get()
    }
    return val, qua
}
//GetForceWithinTime 在timeout ms的时间内,强制获取数据,失败则继续获取,直到成功为止
func (eqi *EsQueueInfo) GetForceWithinTime(timeout int) (ElemInfo, bool, int) {
    val, ok, qua := eqi.Get()
    if ok {
        return val, ok, qua
    }
    to := time.NewTimer(time.Duration(timeout)*time.Millisecond)
    defer to.Stop()
    for {
        select {
        case <-to.C:
            return ElemInfo{},false, 0
        default:
            val, ok, qua = eqi.Get()
            if ok {
                return val, ok, qua
            }
            time.Sleep(TimePeriodPutOrGet)
        }
    }
    return val, ok, qua
}
// round 到最近的2的倍数
func minQuantity(v uint32) uint32 {
    v--
    v |= v >> 1
    v |= v >> 2
    v |= v >> 4
    v |= v >> 8
    v |= v >> 16
    v++
    return v
}
shmwrap.go
New file
@@ -0,0 +1,141 @@
package shmqueue
import (
    "context"
    "fmt"
    "time"
    "github.com/gen2brain/shm"
)
// NewBlock shm block with size
func NewBlock(size int, key int) ([]byte, int, error) {
    id, err := shm.Get(key, size, shm.IPC_CREAT|0666)
    fmt.Println("Get:", id, err)
    if err != nil || id == -1 {
        return nil, -1, err
    }
    data, err2 := shm.At(id, 0, 0)
    if err2 != nil {
        return nil, -1, err2
    }
    return data, id, nil
}
// AttachBlock attach exist shm
func AttachBlock(key int) ([]byte, int, error) {
    id, err := shm.Get(key, 0, 0)
    fmt.Println("Get:", id, err)
    if err != nil || id == -1 { //no exist
        return nil, -1, err
    }
    data, err2 := shm.At(id, 0, 0)
    if err2 != nil {
        return nil, -1, err2
    }
    return data, id, nil
}
// ReleaseBlock release shm block
func ReleaseBlock(data []byte) {
    if data != nil {
        shm.Dt(data)
    }
}
// CreateRawShm create raw shm block with size, only space, no padding, return data([]byte), id(int)
// context for quit
func CreateRawShm(ctx context.Context, size int, key int) ([]byte, int) {
    data, id, err := AttachBlock(key)
    fmt.Println("err:", err)
    if err != nil {
    loopB:
        for {
            select {
            case <-ctx.Done():
                return nil, -1
            default:
                if err == nil {
                    break loopB
                } else {
                    fmt.Println("createShm error:", err)
                }
                time.Sleep(time.Millisecond)
                data, id, err = NewBlock(size, key)
            }
        }
    }
    return data, id
}
//AttachRawShm don't create
func AttachRawShm(ctx context.Context, key int) ([]byte, int) {
    data, id, err := AttachBlock(key)
    if err != nil {
    loopB:
        for {
            select {
            case <-ctx.Done():
                return nil, -1
            default:
                if err == nil {
                    break loopB
                } else {
                    fmt.Println("createShm error:", err)
                }
                time.Sleep(time.Millisecond)
                data, id, err = AttachBlock(key)
            }
        }
    }
    return data, id
}
// CreatePaddingShm create padding shm block with size, return data-with-padding([]byte), id(int)
// context for quit, padding for fill raw shm block
func CreatePaddingShm(ctx context.Context, size int, key int, padding []byte) ([]byte, int) {
    data, id, err := NewBlock(size, key)
    if err != nil {
    loopB:
        for {
            select {
            case <-ctx.Done():
                return nil, -1
            default:
                if err == nil {
                    break loopB
                } else {
                    fmt.Println("createShm error:", err)
                }
                time.Sleep(time.Millisecond)
                data, id, err = NewBlock(size, key)
            }
        }
    }
    copy(data, padding)
    return data, id
}
// DestroyShm destroy
func DestroyShm(data []byte) {
    ReleaseBlock(data)
}
// Attach attach shmid get block
func Attach(id int) ([]byte, error) {
   return shm.At(id, 0, 0)
}
// Detach detach shm block
func Detach(d []byte) error {
   return shm.Dt(d)
}
// ReleaseShmID release shmid
func ReleaseShmID(id int) error {
   return shm.Rm(id)
}