zhangzengfei
2021-11-12 33cc2473a3d6ca941717c557f8e72b9904fbdc4f
add foreach, size api
3个文件已修改
149 ■■■■ 已修改文件
api.go 46 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
bolt_store.go 77 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
store_test.go 26 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
api.go
@@ -1,18 +1,20 @@
package boltcache
import "strconv"
//添加一条日志
func (ls *LogStore) ApplyLog(logData []byte) {
    lastLogIndex := ls.getLastLog()
    ls.applyCh <- &Log {
        Index: lastLogIndex+1,
        Data: logData,
    ls.applyCh <- &Log{
        Index: lastLogIndex + 1,
        Data:  logData,
    }
    ls.setLastLog(lastLogIndex + 1)
}
type LogCon struct {
    conf *confLog
    Log *Log
    Log  *Log
}
//获取缓存的数据
@@ -24,21 +26,21 @@
func (ls *LogStore) Get() *LogCon {
    idx, _ := ls.store.FirstIndex(confBucket)
    cLog := &confLog{}
    if err := ls.store.GetConfLog(idx, cLog);err ==nil {
    if err := ls.store.GetConfLog(idx, cLog); err == nil {
        u, _ := ls.store.FirstIndex(cLog.BucketName)
        log := &Log{}
        if err = ls.store.GetLog(cLog.BucketName, u, log);err ==nil {
        if err = ls.store.GetLog(cLog.BucketName, u, log); err == nil {
            log.Index = u
            cLog.Index = int(idx)
            return &LogCon{
                conf:cLog,
                Log: log,
                conf: cLog,
                Log:  log,
            }
        } else {
            if size,_ := ls.store.Size(confBucket);size >1 {
            if size, _ := ls.store.Size(confBucket); size > 1 {
                ls.Delete(&LogCon{
                    conf: cLog,
                    Log: nil,
                    Log:  nil,
                })
            }
            ls.printLog("Get log err:", err)
@@ -49,6 +51,28 @@
    return nil
}
// 遍历数据, 输出原始数据
func (ls *LogStore) ForEach(f func(v []byte) error) {
    ls.store.ForEach(f)
}
func (ls *LogStore) Size() int {
    start, _ := ls.store.FirstIndex(confBucket)
    end, _ := ls.store.LastIndex(confBucket)
    totalSize := 0
    for ; start <= end; start++ {
        bucketName := bucketPre + strconv.Itoa(int(start))
        bucketSize, err := ls.store.Size(bucketName)
        if err == nil {
            totalSize = totalSize + bucketSize
        }
    }
    return totalSize
}
//提供给外层使用,删除日志
func (ls *LogStore) Delete(lc *LogCon) error {
    return ls.store.Delete(lc)
@@ -56,4 +80,4 @@
func (ls *LogStore) Close() error {
    return ls.store.Close()
}
}
bolt_store.go
@@ -3,6 +3,7 @@
import (
    "errors"
    "github.com/boltdb/bolt"
    "strconv"
    "time"
)
@@ -17,8 +18,8 @@
    dbConf = []byte(confBucket)
    // An error indicating a given key does not exist
    ErrKeyNotFound = errors.New("not found")
    ErrLogNotFound = errors.New("log not found")
    ErrKeyNotFound    = errors.New("not found")
    ErrLogNotFound    = errors.New("log not found")
    ErrBucketNotFound = errors.New("bucket not found")
)
@@ -115,7 +116,7 @@
    defer tx.Rollback()
    bucket := tx.Bucket([]byte(bucketName))
    if bucket ==nil {
    if bucket == nil {
        return 0, ErrBucketNotFound
    }
@@ -135,7 +136,7 @@
    }
    defer tx.Rollback()
    bucket := tx.Bucket([]byte(bucketName))
    if bucket ==nil {
    if bucket == nil {
        return 0, ErrBucketNotFound
    }
    curs := bucket.Cursor()
@@ -203,7 +204,7 @@
        }
        bucket := tx.Bucket([]byte(bucketName))
        if bucket == nil {
            bucket,err = tx.CreateBucket([]byte(bucketName))
            bucket, err = tx.CreateBucket([]byte(bucketName))
            if err != nil {
                return err
            }
@@ -242,8 +243,34 @@
    return tx.Commit()
}
func (b *BoltStore) ForEach(f func(v []byte) error) {
    tx, err := b.conn.Begin(true)
    if err != nil {
        return
    }
    defer tx.Rollback()
    start, _ := b.FirstIndex(confBucket)
    end, _ := b.LastIndex(confBucket)
    for ; start <= end; start++ {
        bucketName := bucketPre + strconv.Itoa(int(start))
        bucket := tx.Bucket([]byte(bucketName))
        if bucket == nil {
            return
        }
        bucket.ForEach(func(k, v []byte) error {
            log := &Log{}
            err := decodeMsgPack(v, log)
            f(log.Data)
            return err
        })
    }
}
func (b *BoltStore) Delete(lc *LogCon) error {
    tx,err := b.conn.Begin(true)
    tx, err := b.conn.Begin(true)
    if err != nil {
        return err
    }
@@ -252,7 +279,7 @@
    dc := false
    if bucket != nil {
        if lc.Log != nil {
            if dErr := bucket.Delete(uint64ToBytes(lc.Log.Index));dErr != nil {
            if dErr := bucket.Delete(uint64ToBytes(lc.Log.Index)); dErr != nil {
                return dErr
            }
        }
@@ -277,7 +304,7 @@
                return nil
            })
            if cbSize > 1 {
                if dErr := cb.Delete(uint64ToBytes(uint64(lc.conf.Index)));dErr != nil {
                if dErr := cb.Delete(uint64ToBytes(uint64(lc.conf.Index))); dErr != nil {
                    return dErr
                }
            }
@@ -288,7 +315,7 @@
}
func (b *BoltStore) UpdateTime(bucketName string, t time.Time) error {
    tx,err := b.conn.Begin(true)
    tx, err := b.conn.Begin(true)
    if err != nil {
        return err
    }
@@ -302,9 +329,9 @@
                if cLog.BucketName == bucketName {
                    cLog.UpdateTime = t
                    cLog.Index = int(bytesToUint64(k))
                    val,err := encodeMsgPack(cLog)
                    if err ==nil {
                        if bucket.Put(k, val.Bytes()) ==nil {
                    val, err := encodeMsgPack(cLog)
                    if err == nil {
                        if bucket.Put(k, val.Bytes()) == nil {
                            flag = true
                        }
                    }
@@ -313,7 +340,7 @@
            return nil
        })
        if !flag {
            return errors.New("conf表中未找到"+bucketName)
            return errors.New("conf表中未找到" + bucketName)
        }
    } else {
        return ErrBucketNotFound
@@ -324,10 +351,10 @@
func (b *BoltStore) Clean(conf *Config) ([]string, []string, error) {
    type kConfLog struct {
        k []byte
        k   []byte
        log *confLog
    }
    tx,err := b.conn.Begin(true)
    tx, err := b.conn.Begin(true)
    if err != nil {
        return nil, nil, err
    }
@@ -339,10 +366,10 @@
        var allCLogs []*kConfLog
        bucket.ForEach(func(k, v []byte) error {
            cLog := &confLog{}
            if e :=decodeMsgPack(v, cLog);e ==nil {
            if e := decodeMsgPack(v, cLog); e == nil {
                allCLogs = append(allCLogs, &kConfLog{
                    k:k,
                    log:cLog,
                    k:   k,
                    log: cLog,
                })
            }
            return nil
@@ -351,8 +378,8 @@
        if len(allCLogs) > conf.bucketLimit {
            arr := allCLogs[:len(allCLogs)-conf.bucketLimit]
            leftClogs = allCLogs[len(allCLogs)-conf.bucketLimit:]
            for _,a := range arr {
                if de := tx.DeleteBucket([]byte(a.log.BucketName));de !=nil {
            for _, a := range arr {
                if de := tx.DeleteBucket([]byte(a.log.BucketName)); de != nil {
                    delErrBucketNames = append(delErrBucketNames, a.log.BucketName)
                } else {
                    bucket.Delete(a.k)
@@ -362,11 +389,11 @@
        } else {
            leftClogs = allCLogs
        }
        if len(leftClogs) >1 {
        if len(leftClogs) > 1 {
            leftClogs = leftClogs[:len(leftClogs)-1]
            for _,a := range leftClogs {
                if int(time.Now().Sub(a.log.UpdateTime).Hours()) > conf.keepDays * 24 {
                    if de := tx.DeleteBucket([]byte(a.log.BucketName));de !=nil {
            for _, a := range leftClogs {
                if int(time.Now().Sub(a.log.UpdateTime).Hours()) > conf.keepDays*24 {
                    if de := tx.DeleteBucket([]byte(a.log.BucketName)); de != nil {
                        delErrBucketNames = append(delErrBucketNames, a.log.BucketName)
                    } else {
                        bucket.Delete(a.k)
@@ -399,7 +426,7 @@
    return tx.Commit()
}
func (b *BoltStore) Size(bucketName string) (int,error) {
func (b *BoltStore) Size(bucketName string) (int, error) {
    tx, err := b.conn.Begin(false)
    if err != nil {
        return 0, err
store_test.go
@@ -16,23 +16,33 @@
    defer ls.Close()
    go consume(ls)
    for i:=0; i<10000; i++{
        ls.ApplyLog([]byte("hello world "+strconv.Itoa(i)))
        time.Sleep(time.Second * 1)
    //go consume(ls)
    //
    for i := 0; i < 100; i++ {
        ls.ApplyLog([]byte("hello world " + strconv.Itoa(i)))
        time.Sleep(time.Millisecond * 100)
    }
    ls.printLog("Size = ", ls.Size())
    ls.ForEach(func(v []byte) error {
        ls.printLog("val=:", string(v))
        return nil
    })
    consume(ls)
}
func consume(ls *LogStore) {
    for {
        lc := ls.Get()
        if lc != nil {
            ls.printLog(lc.conf.BucketName," send old log:", string(lc.Log.Data))
            ls.printLog(lc.conf.BucketName, " send old log:", string(lc.Log.Data))
            ls.Delete(lc)
        } else {
            return
        }
        time.Sleep(10 * time.Second)
        time.Sleep(10 * time.Millisecond)
    }
}
}