zhangzengfei
2021-11-12 33cc2473a3d6ca941717c557f8e72b9904fbdc4f
add foreach, size api
3个文件已修改
71 ■■■■■ 已修改文件
api.go 24 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
bolt_store.go 27 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
store_test.go 20 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
api.go
@@ -1,5 +1,7 @@
package boltcache
import "strconv"
//添加一条日志
func (ls *LogStore) ApplyLog(logData []byte) {
    lastLogIndex := ls.getLastLog()
@@ -49,6 +51,28 @@
    return nil
}
// 遍历数据, 输出原始数据
func (ls *LogStore) ForEach(f func(v []byte) error) {
    ls.store.ForEach(f)
}
func (ls *LogStore) Size() int {
    start, _ := ls.store.FirstIndex(confBucket)
    end, _ := ls.store.LastIndex(confBucket)
    totalSize := 0
    for ; start <= end; start++ {
        bucketName := bucketPre + strconv.Itoa(int(start))
        bucketSize, err := ls.store.Size(bucketName)
        if err == nil {
            totalSize = totalSize + bucketSize
        }
    }
    return totalSize
}
//提供给外层使用,删除日志
func (ls *LogStore) Delete(lc *LogCon) error {
    return ls.store.Delete(lc)
bolt_store.go
@@ -3,6 +3,7 @@
import (
    "errors"
    "github.com/boltdb/bolt"
    "strconv"
    "time"
)
@@ -242,6 +243,32 @@
    return tx.Commit()
}
func (b *BoltStore) ForEach(f func(v []byte) error) {
    tx, err := b.conn.Begin(true)
    if err != nil {
        return
    }
    defer tx.Rollback()
    start, _ := b.FirstIndex(confBucket)
    end, _ := b.LastIndex(confBucket)
    for ; start <= end; start++ {
        bucketName := bucketPre + strconv.Itoa(int(start))
        bucket := tx.Bucket([]byte(bucketName))
        if bucket == nil {
            return
        }
        bucket.ForEach(func(k, v []byte) error {
            log := &Log{}
            err := decodeMsgPack(v, log)
            f(log.Data)
            return err
        })
    }
}
func (b *BoltStore) Delete(lc *LogCon) error {
    tx,err := b.conn.Begin(true)
    if err != nil {
store_test.go
@@ -16,13 +16,21 @@
    defer ls.Close()
    go consume(ls)
    for i:=0; i<10000; i++{
    //go consume(ls)
    //
    for i := 0; i < 100; i++ {
        ls.ApplyLog([]byte("hello world "+strconv.Itoa(i)))
        time.Sleep(time.Second * 1)
        time.Sleep(time.Millisecond * 100)
    }
    ls.printLog("Size = ", ls.Size())
    ls.ForEach(func(v []byte) error {
        ls.printLog("val=:", string(v))
        return nil
    })
    consume(ls)
}
func consume(ls *LogStore) {
@@ -31,8 +39,10 @@
        if lc != nil {
            ls.printLog(lc.conf.BucketName," send old log:", string(lc.Log.Data))
            ls.Delete(lc)
        } else {
            return
        }
        time.Sleep(10 * time.Second)
        time.Sleep(10 * time.Millisecond)
    }
}