chenshijun
2020-03-28 91f7d13bdf60a2df6bedff6467f550c7c643f3fc
给强制写入和读取增加context资源管理
2个文件已修改
57 ■■■■ 已修改文件
shmqueue.go 46 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
shmwrap.go 11 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
shmqueue.go
@@ -12,7 +12,7 @@
)
const (
    TimePeriodPutOrGet = time.Duration(10) * time.Microsecond
    TimePeriodPutOrGet = time.Duration(1) * time.Microsecond
)
//Element info
@@ -218,18 +218,29 @@
}
//PutForce 强制写入,失败则继续重写,直到成功为止
func (eqi *EsQueueInfo) PutForce(val ElemInfo) int {
func (eqi *EsQueueInfo) PutForce(ctx context.Context, val ElemInfo) int {
    ok, qua := eqi.Put(val)
    for !ok {
        time.Sleep(TimePeriodPutOrGet)
    loopB:
        for {
            select {
            case <-ctx.Done():
                return 0
            default:
        ok, qua = eqi.Put(val)
                if ok {
                    break loopB
                }
                time.Sleep(TimePeriodPutOrGet)
            }
        }
    }
    return qua
}
//PutForceWithinTime 在timeout ms的时间内,强制写入,失败则继续重写,直到成功为止或者超时退出
func (eqi *EsQueueInfo) PutForceWithinTime(val ElemInfo, timeout int) (bool, int) {
func (eqi *EsQueueInfo) PutForceWithinTime(ctx context.Context, val ElemInfo, timeout int) (bool, int) {
    ok, qua := eqi.Put(val)
    if ok {
        return ok, qua
@@ -238,14 +249,17 @@
    to := time.NewTimer(time.Duration(timeout) * time.Millisecond)
    defer to.Stop()
    loopB:
    for {
        select {
            case <-ctx.Done():
                return false, 0
        case <-to.C:
            return false, 0
        default:
            ok, qua = eqi.Put(val)
            if ok {
                return ok, qua
                    break loopB
            }
            time.Sleep(TimePeriodPutOrGet)
        }
@@ -296,18 +310,29 @@
}
//GetForce 强制获取数据,失败则继续获取,直到成功为止
func (eqi *EsQueueInfo) GetForce() (ElemInfo, int) {
func (eqi *EsQueueInfo) GetForce(ctx context.Context) (ElemInfo, int) {
    val, ok, qua := eqi.Get()
    for !ok {
        time.Sleep(TimePeriodPutOrGet)
    loopB:
        for {
            select {
            case <-ctx.Done():
                return ElemInfo{}, 0
            default:
        val, ok, qua = eqi.Get()
                if ok {
                    break loopB
                }
                time.Sleep(TimePeriodPutOrGet)
            }
        }
    }
    return val, qua
}
//GetForceWithinTime 在timeout ms的时间内,强制获取数据,失败则继续获取,直到成功为止
func (eqi *EsQueueInfo) GetForceWithinTime(timeout int) (ElemInfo, bool, int) {
func (eqi *EsQueueInfo) GetForceWithinTime(ctx context.Context, timeout int) (ElemInfo, bool, int) {
    val, ok, qua := eqi.Get()
    if ok {
        return val, ok, qua
@@ -316,14 +341,17 @@
    to := time.NewTimer(time.Duration(timeout) * time.Millisecond)
    defer to.Stop()
    loopB:
    for {
        select {
            case <-ctx.Done():
                return ElemInfo{}, false, 0
        case <-to.C:
            return ElemInfo{}, false, 0
        default:
            val, ok, qua = eqi.Get()
            if ok {
                return val, ok, qua
                    break loopB
            }
            time.Sleep(TimePeriodPutOrGet)
        }
shmwrap.go
@@ -10,13 +10,14 @@
// NewBlock shm block with size
func NewBlock(size int, key int) ([]byte, int, error) {
    id, err := shm.Get(key, size, shm.IPC_CREAT|0666)
    fmt.Println("Get:", id, err)
    if err != nil || id == -1 {
        fmt.Println("NewBlock Get:", key, err)
        return nil, -1, err
    }
    data, err2 := shm.At(id, 0, 0)
    if err2 != nil {
        fmt.Println("NewBlock At:", key, err2)
        return nil, -1, err2
    }
@@ -26,13 +27,14 @@
// AttachBlock attach exist shm
func AttachBlock(key int) ([]byte, int, error) {
    id, err := shm.Get(key, 0, 0)
    fmt.Println("Get:", id, err)
    if err != nil || id == -1 { //no exist
        fmt.Println("AttachBlock Get:", key, err)
        return nil, -1, err
    }
    data, err2 := shm.At(id, 0, 0)
    if err2 != nil {
        fmt.Println("AttachBlock At:", key, err2)
        return nil, -1, err2
    }
@@ -50,7 +52,6 @@
// context for quit
func CreateRawShm(ctx context.Context, size int, key int) ([]byte, int) {
    data, id, err := AttachBlock(key)
    fmt.Println("err:", err)
    if err != nil {
    loopB:
        for {
@@ -60,8 +61,6 @@
            default:
                if err == nil {
                    break loopB
                } else {
                    fmt.Println("createShm error:", err)
                }
                time.Sleep(time.Millisecond)
                data, id, err = NewBlock(size, key)
@@ -83,8 +82,6 @@
            default:
                if err == nil {
                    break loopB
                } else {
                    fmt.Println("createShm error:", err)
                }
                time.Sleep(time.Millisecond)
                data, id, err = AttachBlock(key)