liuxiaolong
2020-11-18 15c091bccbe9ee6e5e72dc93d413a2b67c13579c
first push
9个文件已添加
820 ■■■■■ 已修改文件
api.go 59 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
bolt_store.go 457 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
config.go 41 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
go.mod 8 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
go.sum 10 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
state.go 21 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
store.go 149 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
store_test.go 38 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
util.go 37 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
api.go
New file
@@ -0,0 +1,59 @@
package boltcache
//添加一条日志
func (ls *LogStore) ApplyLog(logData []byte) {
    lastLogIndex := ls.getLastLog()
    ls.applyCh <- &Log {
        Index: lastLogIndex+1,
        Data: logData,
    }
    ls.setLastLog(lastLogIndex + 1)
}
type LogCon struct {
    conf *confLog
    Log *Log
}
//获取缓存的数据
func (lc *LogCon) GetData() []byte {
    return lc.Log.Data
}
//提供给外层使用,按顺序获取缓存的日志
func (ls *LogStore) Get() *LogCon {
    idx, _ := ls.store.FirstIndex(confBucket)
    cLog := &confLog{}
    if err := ls.store.GetConfLog(idx, cLog);err ==nil {
        u, _ := ls.store.FirstIndex(cLog.BucketName)
        log := &Log{}
        if err = ls.store.GetLog(cLog.BucketName, u, log);err ==nil {
            log.Index = u
            cLog.Index = int(idx)
            return &LogCon{
                conf:cLog,
                Log: log,
            }
        } else {
            if size,_ := ls.store.Size(confBucket);size >1 {
                ls.Delete(&LogCon{
                    conf: cLog,
                    Log: nil,
                })
            }
            ls.printLog("Get log err:", err)
        }
    } else {
        ls.printLog("Get conf idx:", idx, "err:", err)
    }
    return nil
}
//提供给外层使用,删除日志
func (ls *LogStore) Delete(lc *LogCon) error {
    return ls.store.Delete(lc)
}
func (ls *LogStore) Close() error {
    return ls.store.Close()
}
bolt_store.go
New file
@@ -0,0 +1,457 @@
package boltcache
import (
    "errors"
    "github.com/boltdb/bolt"
    "time"
)
const (
    // Permissions to use on the db file. This is only used if the
    // database file does not exist and needs to be created.
    dbFileMode = 0600
)
var (
    // Bucket names we perform transactions in
    dbConf = []byte(confBucket)
    // An error indicating a given key does not exist
    ErrKeyNotFound = errors.New("not found")
    ErrLogNotFound = errors.New("log not found")
    ErrBucketNotFound = errors.New("bucket not found")
)
// BoltStore provides access to BoltDB for Raft to store and retrieve
// log entries. It also provides key/value storage, and can be used as
// a LogStore and StableStore.
type BoltStore struct {
    // conn is the underlying handle to the db.
    conn *bolt.DB
    // The path to the Bolt database file
    path string
}
// Options contains all the configuration used to open the BoltDB
type Options struct {
    // Path is the file path to the BoltDB to use
    Path string
    // BoltOptions contains any specific BoltDB options you might
    // want to specify [e.g. open timeout]
    BoltOptions *bolt.Options
    // NoSync causes the database to skip fsync calls after each
    // write to the log. This is unsafe, so it should be used
    // with caution.
    NoSync bool
}
// readOnly returns true if the contained bolt options say to open
// the DB in readOnly mode [this can be useful to tools that want
// to examine the log]
func (o *Options) readOnly() bool {
    return o != nil && o.BoltOptions != nil && o.BoltOptions.ReadOnly
}
// NewBoltStore takes a file path and returns a connected Raft backend.
func NewBoltStore(path string) (*BoltStore, error) {
    return New(Options{Path: path})
}
// New uses the supplied options to open the BoltDB and prepare it for use as a raft backend.
func New(options Options) (*BoltStore, error) {
    // Try to connect
    handle, err := bolt.Open(options.Path, dbFileMode, options.BoltOptions)
    if err != nil {
        return nil, err
    }
    handle.NoSync = options.NoSync
    // Create the new store
    store := &BoltStore{
        conn: handle,
        path: options.Path,
    }
    // If the store was opened read-only, don't try and create buckets
    if !options.readOnly() {
        // Set up our buckets
        if err := store.initialize(); err != nil {
            store.Close()
            return nil, err
        }
    }
    return store, nil
}
// initialize is used to set up all of the buckets.
func (b *BoltStore) initialize() error {
    tx, err := b.conn.Begin(true)
    if err != nil {
        return err
    }
    defer tx.Rollback()
    if _, err := tx.CreateBucketIfNotExists(dbConf); err != nil {
        return err
    }
    return tx.Commit()
}
// Close is used to gracefully close the DB connection.
func (b *BoltStore) Close() error {
    return b.conn.Close()
}
// FirstIndex returns the first known index from the Raft log.
func (b *BoltStore) FirstIndex(bucketName string) (uint64, error) {
    tx, err := b.conn.Begin(false)
    if err != nil {
        return 0, err
    }
    defer tx.Rollback()
    bucket := tx.Bucket([]byte(bucketName))
    if bucket ==nil {
        return 0, ErrBucketNotFound
    }
    curs := bucket.Cursor()
    if first, _ := curs.First(); first == nil {
        return 0, nil
    } else {
        return bytesToUint64(first), nil
    }
}
// LastIndex returns the last known index from the Raft log.
func (b *BoltStore) LastIndex(bucketName string) (uint64, error) {
    tx, err := b.conn.Begin(false)
    if err != nil {
        return 0, err
    }
    defer tx.Rollback()
    bucket := tx.Bucket([]byte(bucketName))
    if bucket ==nil {
        return 0, ErrBucketNotFound
    }
    curs := bucket.Cursor()
    if last, _ := curs.Last(); last == nil {
        return 0, nil
    } else {
        return bytesToUint64(last), nil
    }
}
// GetLog is used to retrieve a log from BoltDB at a given index.
func (b *BoltStore) GetLog(bucketName string, idx uint64, log *Log) error {
    tx, err := b.conn.Begin(false)
    if err != nil {
        return err
    }
    defer tx.Rollback()
    bucket := tx.Bucket([]byte(bucketName))
    if bucket == nil {
        return ErrBucketNotFound
    }
    val := bucket.Get(uint64ToBytes(idx))
    if val == nil {
        return ErrLogNotFound
    }
    return decodeMsgPack(val, log)
}
func (b *BoltStore) GetConfLog(idx uint64, clog *confLog) error {
    tx, err := b.conn.Begin(false)
    if err != nil {
        return err
    }
    defer tx.Rollback()
    bucket := tx.Bucket([]byte(confBucket))
    val := bucket.Get(uint64ToBytes(idx))
    if val == nil {
        return ErrLogNotFound
    }
    return decodeMsgPack(val, clog)
}
// StoreLog is used to store a single raft log
func (b *BoltStore) StoreLog(bucketName string, log *Log) error {
    return b.StoreLogs(bucketName, []*Log{log})
}
// StoreLogs is used to store a set of raft logs
func (b *BoltStore) StoreLogs(bucketName string, logs []*Log) error {
    tx, err := b.conn.Begin(true)
    if err != nil {
        return err
    }
    defer tx.Rollback()
    for _, log := range logs {
        key := uint64ToBytes(log.Index)
        val, err := encodeMsgPack(log)
        if err != nil {
            return err
        }
        bucket := tx.Bucket([]byte(bucketName))
        if bucket == nil {
            bucket,err = tx.CreateBucket([]byte(bucketName))
            if err != nil {
                return err
            }
        }
        if err := bucket.Put(key, val.Bytes()); err != nil {
            return err
        }
    }
    return tx.Commit()
}
// DeleteRange is used to delete logs within a given range inclusively.
func (b *BoltStore) DeleteRange(bucketName string, min, max uint64) error {
    minKey := uint64ToBytes(min)
    tx, err := b.conn.Begin(true)
    if err != nil {
        return err
    }
    defer tx.Rollback()
    curs := tx.Bucket([]byte(bucketName)).Cursor()
    for k, _ := curs.Seek(minKey); k != nil; k, _ = curs.Next() {
        // Handle out-of-range log index
        if bytesToUint64(k) > max {
            break
        }
        // Delete in-range log index
        if err := curs.Delete(); err != nil {
            return err
        }
    }
    return tx.Commit()
}
func (b *BoltStore) Delete(lc *LogCon) error {
    tx,err := b.conn.Begin(true)
    if err != nil {
        return err
    }
    defer tx.Rollback()
    bucket := tx.Bucket([]byte(lc.conf.BucketName))
    dc := false
    if bucket != nil {
        if lc.Log != nil {
            if dErr := bucket.Delete(uint64ToBytes(lc.Log.Index));dErr != nil {
                return dErr
            }
        }
        size := 0
        bucket.ForEach(func(k, v []byte) error {
            size++
            return nil
        })
        if size == 0 {
            dc = true
        }
    } else {
        dc = true
    }
    if dc {
        cb := tx.Bucket([]byte(confBucket))
        if cb != nil {
            cbSize := 0
            cb.ForEach(func(k, v []byte) error {
                cbSize++
                return nil
            })
            if cbSize > 1 {
                if dErr := cb.Delete(uint64ToBytes(uint64(lc.conf.Index)));dErr != nil {
                    return dErr
                }
            }
        }
    }
    tx.Commit()
    return nil
}
func (b *BoltStore) UpdateTime(bucketName string, t time.Time) error {
    tx,err := b.conn.Begin(true)
    if err != nil {
        return err
    }
    defer tx.Rollback()
    bucket := tx.Bucket([]byte(confBucket))
    if bucket != nil {
        flag := false
        bucket.ForEach(func(k, v []byte) error {
            cLog := &confLog{}
            if de := decodeMsgPack(v, cLog); de == nil {
                if cLog.BucketName == bucketName {
                    cLog.UpdateTime = t
                    cLog.Index = int(bytesToUint64(k))
                    val,err := encodeMsgPack(cLog)
                    if err ==nil {
                        if bucket.Put(k, val.Bytes()) ==nil {
                            flag = true
                        }
                    }
                }
            }
            return nil
        })
        if !flag {
            return errors.New("conf表中未找到"+bucketName)
        }
    } else {
        return ErrBucketNotFound
    }
    tx.Commit()
    return nil
}
func (b *BoltStore) Clean(conf *Config) ([]string, []string, error) {
    type kConfLog struct {
        k []byte
        log *confLog
    }
    tx,err := b.conn.Begin(true)
    if err != nil {
        return nil, nil, err
    }
    defer tx.Rollback()
    bucket := tx.Bucket([]byte(confBucket))
    var delBucketNames []string
    var delErrBucketNames []string
    if bucket != nil {
        var allCLogs []*kConfLog
        bucket.ForEach(func(k, v []byte) error {
            cLog := &confLog{}
            if e :=decodeMsgPack(v, cLog);e ==nil {
                allCLogs = append(allCLogs, &kConfLog{
                    k:k,
                    log:cLog,
                })
            }
            return nil
        })
        var leftClogs []*kConfLog
        if len(allCLogs) > conf.bucketLimit {
            arr := allCLogs[:len(allCLogs)-conf.bucketLimit]
            leftClogs = allCLogs[len(allCLogs)-conf.bucketLimit:]
            for _,a := range arr {
                if de := tx.DeleteBucket([]byte(a.log.BucketName));de !=nil {
                    delErrBucketNames = append(delErrBucketNames, a.log.BucketName)
                } else {
                    bucket.Delete(a.k)
                    delBucketNames = append(delBucketNames, a.log.BucketName)
                }
            }
        } else {
            leftClogs = allCLogs
        }
        if len(leftClogs) >1 {
            leftClogs = leftClogs[:len(leftClogs)-1]
            for _,a := range leftClogs {
                if int(time.Now().Sub(a.log.UpdateTime).Hours()) > conf.keepDays * 24 {
                    if de := tx.DeleteBucket([]byte(a.log.BucketName));de !=nil {
                        delErrBucketNames = append(delErrBucketNames, a.log.BucketName)
                    } else {
                        bucket.Delete(a.k)
                        delBucketNames = append(delBucketNames, a.log.BucketName)
                    }
                }
            }
        }
    } else {
        return nil, nil, errors.New("bucket not found")
    }
    tx.Commit()
    return delBucketNames, delErrBucketNames, nil
}
// Set is used to set a key/value set outside of the raft log
func (b *BoltStore) Set(k, v []byte) error {
    tx, err := b.conn.Begin(true)
    if err != nil {
        return err
    }
    defer tx.Rollback()
    bucket := tx.Bucket(dbConf)
    if err := bucket.Put(k, v); err != nil {
        return err
    }
    return tx.Commit()
}
func (b *BoltStore) Size(bucketName string) (int,error) {
    tx, err := b.conn.Begin(false)
    if err != nil {
        return 0, err
    }
    defer tx.Rollback()
    bucket := tx.Bucket([]byte(bucketName))
    if bucket == nil {
        return 0, ErrBucketNotFound
    }
    size := 0
    bucket.ForEach(func(k, v []byte) error {
        size++
        return nil
    })
    return size, nil
}
// Get is used to retrieve a value from the k/v store by key
func (b *BoltStore) Get(k []byte) ([]byte, error) {
    tx, err := b.conn.Begin(false)
    if err != nil {
        return nil, err
    }
    defer tx.Rollback()
    bucket := tx.Bucket(dbConf)
    val := bucket.Get(k)
    if val == nil {
        return nil, ErrKeyNotFound
    }
    return append([]byte(nil), val...), nil
}
// SetUint64 is like Set, but handles uint64 values
func (b *BoltStore) SetUint64(key []byte, val uint64) error {
    return b.Set(key, uint64ToBytes(val))
}
// GetUint64 is like Get, but handles uint64 values
func (b *BoltStore) GetUint64(key []byte) (uint64, error) {
    val, err := b.Get(key)
    if err != nil {
        return 0, err
    }
    return bytesToUint64(val), nil
}
// Sync performs an fsync on the database file handle. This is not necessary
// under normal operation unless NoSync is enabled, in which this forces the
// database file to sync against the disk.
func (b *BoltStore) Sync() error {
    return b.conn.Sync()
}
config.go
New file
@@ -0,0 +1,41 @@
package boltcache
import (
    "fmt"
    "time"
)
type Config struct {
    path                string
    capacity            int             //单个桶最多放多少条数据
    bucketLimit         int             //保留的bucket的最大个数
    keepDays            int             //最多保留多少天的数据
    cleanDuration         time.Duration   //清理周期
    logPrint func(v ...interface{})
}
func NewDefaultConfig() *Config {
    return &Config{
        path:          "./push.db",
        capacity:      50,
        bucketLimit:   10,
        keepDays:      1,
        cleanDuration: 10 * time.Second,
        logPrint: func(v ...interface{}) {
            fmt.Println(v...)
        },
    }
}
func NewConfig(path string, capacity int, bucketLimit int, keepDays int,logPrint func(v ...interface{})) *Config {
    return &Config{
        path:          path,
        capacity:      capacity,
        bucketLimit:   bucketLimit,
        keepDays:      keepDays,
        cleanDuration: time.Second * 10,
        logPrint:      logPrint,
    }
}
go.mod
New file
@@ -0,0 +1,8 @@
module boltcache
go 1.14
require (
    github.com/boltdb/bolt v1.3.1
    github.com/hashicorp/go-msgpack v1.1.5
)
go.sum
New file
@@ -0,0 +1,10 @@
github.com/boltdb/bolt v1.3.1 h1:JQmyP4ZBrce+ZQu0dY660FMfatumYDLun9hBCUVIkF4=
github.com/boltdb/bolt v1.3.1/go.mod h1:clJnj/oiGkjum5o1McbSZDSLxVThjynRyGBgiAx27Ps=
github.com/hashicorp/go-msgpack v1.1.5 h1:9byZdVjKTe5mce63pRVNP1L7UAmdHOTEMGehn6KvJWs=
github.com/hashicorp/go-msgpack v1.1.5/go.mod h1:gWVc3sv/wbDmR3rQsj1CAktEZzoz1YNK9NfGLXJ69/4=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/tools v0.0.0-20190424220101-1e8e1cfdf96b/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q=
state.go
New file
@@ -0,0 +1,21 @@
package boltcache
import "sync"
type storeState struct {
    lastLock sync.Mutex
    lastLogIndex   uint64
}
func (s *storeState) getLastLog() (index uint64) {
    s.lastLock.Lock()
    index = s.lastLogIndex
    s.lastLock.Unlock()
    return
}
func (s *storeState) setLastLog(index uint64) {
    s.lastLock.Lock()
    s.lastLogIndex = index
    s.lastLock.Unlock()
}
store.go
New file
@@ -0,0 +1,149 @@
package boltcache
import (
    "strconv"
    "strings"
    "time"
)
const (
    bucketPre = "bucket-"
    confBucket = "conf"
)
type Log struct {
    Index         uint64
    Data         []byte
}
type confLog struct {
    Index              int           //index
    UpdateTime         time.Time     //创建时间
    BucketName         string        //分桶名称
}
type LogStore struct {
    storeState
    store          *BoltStore
    conf           *Config
    applyCh        chan *Log
    curBucketName  string //当前存储的bucket
    curBucketIndex uint64 //bucket在conf表中的index
}
func (ls *LogStore) printLog(v ...interface{}) {
    if ls.conf.logPrint != nil {
        ls.conf.logPrint(v...)
    }
}
func Init(conf *Config) (*LogStore, error) {
    store, err := NewBoltStore(conf.path)
    if err != nil {
        return nil, err
    }
    ls := &LogStore{
        store: store,
        conf: conf,
        applyCh: make(chan *Log, 1024),
    }
    ls.setCurBucketAndIdx()
    go ls.dealLogs()
    go ls.clean()
    return ls, nil
}
//初始化当前表以及index
func (ls *LogStore) setCurBucketAndIdx() {
    idx, _ := ls.store.LastIndex(confBucket)
    lg := &confLog{}
    err := ls.store.GetConfLog(idx, lg)
    if err == ErrLogNotFound {
        ls.curBucketName = bucketPre+"0"
        ls.setLastLog(0)
        cLog := &confLog{
            UpdateTime: time.Now(),
            BucketName: ls.curBucketName,
            Index:      0,
        }
        val,err := encodeMsgPack(cLog)
        if err != nil {
            ls.printLog("encodeMsgPack err:", err)
        } else {
            ls.store.Set(uint64ToBytes(uint64(cLog.Index)), val.Bytes())
        }
    } else {
        ls.curBucketName = lg.BucketName
        ls.curBucketIndex = idx
        ls.printLog("curBucketName:", lg.BucketName)
        lastK, lErr := ls.store.LastIndex(ls.curBucketName)
        if lErr != nil {
            ls.printLog("lErr:", lErr)
            ls.setLastLog(0)
        } else {
            lastLog := &Log{}
            e := ls.store.GetLog(ls.curBucketName, lastK, lastLog)
            if e == ErrLogNotFound || e == ErrBucketNotFound {
                ls.setLastLog(0)
            } else {
                ls.setLastLog(lastLog.Index)
            }
        }
    }
}
func (ls *LogStore) dealLogs() {
    for {
        select {
            case log :=<-ls.applyCh:
                lastLogIndex := ls.getLastLog()
                if lastLogIndex > uint64(ls.conf.capacity) { //当前bucket存储已达上限,需要重新创建一个bucket
                    ls.printLog(ls.curBucketName+"容量达到上限,将新建一个bucket")
                    suffix := strings.Replace(ls.curBucketName, bucketPre, "", -1)
                    num,_ := strconv.Atoi(suffix)
                    newBucketIdx := num+1
                    ls.curBucketName = bucketPre+strconv.Itoa(newBucketIdx)
                    ls.setLastLog(0)
                    log.Index = 1
                    ls.curBucketIndex = uint64(newBucketIdx)
                    cLog := &confLog{
                        UpdateTime: time.Now(),
                        BucketName: ls.curBucketName,
                        Index:      newBucketIdx,
                    }
                    val,err := encodeMsgPack(cLog)
                    if err != nil {
                        ls.printLog("encodeMsgPack err:", err)
                    } else {
                        ls.store.Set(uint64ToBytes(uint64(cLog.Index)), val.Bytes())
                    }
                }
                err := ls.store.StoreLog(ls.curBucketName, log)
                if err == nil {
                    ls.setLastLog(log.Index)
                    ls.store.UpdateTime(ls.curBucketName, time.Now())
                    ls.printLog(ls.curBucketName + " stored a log,index:", log.Index)
                } else {
                    ls.printLog("store log err:", err, "bucketName:", ls.curBucketName)
                }
        default:
            time.Sleep(10 * time.Millisecond)
        }
    }
}
func (ls *LogStore) clean() {
    tk := time.NewTicker(ls.conf.cleanDuration)
    defer tk.Stop()
    for {
        select {
        case <-tk.C:
            ls.printLog("start clean...")
            st := time.Now()
            delBuckets,delErrBuckets, e := ls.store.Clean(ls.conf)
            ls.printLog("clean done!!! 耗时:", time.Since(st), "已清理:",delBuckets, "清理失败:", delErrBuckets , "err:", e)
        default:
            time.Sleep(100 * time.Millisecond)
        }
    }
}
store_test.go
New file
@@ -0,0 +1,38 @@
package boltcache
import (
    "strconv"
    "testing"
    "time"
)
func TestInit(t *testing.T) {
    conf := NewDefaultConfig()
    ls, err := Init(conf)
    if err != nil {
        ls.printLog("Init err:", err)
        return
    }
    defer ls.Close()
    go consume(ls)
    for i:=0; i<10000; i++{
        ls.ApplyLog([]byte("hello world "+strconv.Itoa(i)))
        time.Sleep(time.Second * 1)
    }
}
func consume(ls *LogStore) {
    for {
        lc := ls.Get()
        if lc != nil {
            ls.printLog(lc.conf.BucketName," send old log:", string(lc.Log.Data))
            ls.Delete(lc)
        }
        time.Sleep(10 * time.Second)
    }
}
util.go
New file
@@ -0,0 +1,37 @@
package boltcache
import (
    "bytes"
    "encoding/binary"
    "github.com/hashicorp/go-msgpack/codec"
)
// Decode reverses the encode operation on a byte slice input
func decodeMsgPack(buf []byte, out interface{}) error {
    r := bytes.NewBuffer(buf)
    hd := codec.MsgpackHandle{}
    dec := codec.NewDecoder(r, &hd)
    return dec.Decode(out)
}
// Encode writes an encoded object to a new bytes buffer
func encodeMsgPack(in interface{}) (*bytes.Buffer, error) {
    buf := bytes.NewBuffer(nil)
    hd := codec.MsgpackHandle{}
    enc := codec.NewEncoder(buf, &hd)
    err := enc.Encode(in)
    return buf, err
}
// Converts bytes to an integer
func bytesToUint64(b []byte) uint64 {
    return binary.BigEndian.Uint64(b)
}
// Converts a uint to a byte slice
func uint64ToBytes(u uint64) []byte {
    buf := make([]byte, 8)
    binary.BigEndian.PutUint64(buf, u)
    return buf
}