package boltcache
|
|
import (
|
"errors"
|
"github.com/boltdb/bolt"
|
"strconv"
|
"time"
|
)
|
|
const (
|
// Permissions to use on the db file. This is only used if the
|
// database file does not exist and needs to be created.
|
dbFileMode = 0600
|
)
|
|
var (
|
// Bucket names we perform transactions in
|
dbConf = []byte(confBucket)
|
|
// An error indicating a given key does not exist
|
ErrKeyNotFound = errors.New("not found")
|
ErrLogNotFound = errors.New("log not found")
|
ErrBucketNotFound = errors.New("bucket not found")
|
)
|
|
// BoltStore provides access to BoltDB for Raft to store and retrieve
|
// log entries. It also provides key/value storage, and can be used as
|
// a LogStore and StableStore.
|
type BoltStore struct {
|
// conn is the underlying handle to the db.
|
conn *bolt.DB
|
|
// The path to the Bolt database file
|
path string
|
}
|
|
// Options contains all the configuration used to open the BoltDB
|
type Options struct {
|
// Path is the file path to the BoltDB to use
|
Path string
|
|
// BoltOptions contains any specific BoltDB options you might
|
// want to specify [e.g. open timeout]
|
BoltOptions *bolt.Options
|
|
// NoSync causes the database to skip fsync calls after each
|
// write to the log. This is unsafe, so it should be used
|
// with caution.
|
NoSync bool
|
}
|
|
// readOnly returns true if the contained bolt options say to open
|
// the DB in readOnly mode [this can be useful to tools that want
|
// to examine the log]
|
func (o *Options) readOnly() bool {
|
return o != nil && o.BoltOptions != nil && o.BoltOptions.ReadOnly
|
}
|
|
// NewBoltStore takes a file path and returns a connected Raft backend.
|
func NewBoltStore(path string) (*BoltStore, error) {
|
return New(Options{Path: path})
|
}
|
|
// New uses the supplied options to open the BoltDB and prepare it for use as a raft backend.
|
func New(options Options) (*BoltStore, error) {
|
// Try to connect
|
handle, err := bolt.Open(options.Path, dbFileMode, options.BoltOptions)
|
if err != nil {
|
return nil, err
|
}
|
handle.NoSync = options.NoSync
|
|
// Create the new store
|
store := &BoltStore{
|
conn: handle,
|
path: options.Path,
|
}
|
|
// If the store was opened read-only, don't try and create buckets
|
if !options.readOnly() {
|
// Set up our buckets
|
if err := store.initialize(); err != nil {
|
store.Close()
|
return nil, err
|
}
|
}
|
return store, nil
|
}
|
|
// initialize is used to set up all of the buckets.
|
func (b *BoltStore) initialize() error {
|
tx, err := b.conn.Begin(true)
|
if err != nil {
|
return err
|
}
|
defer tx.Rollback()
|
|
if _, err := tx.CreateBucketIfNotExists(dbConf); err != nil {
|
return err
|
}
|
|
return tx.Commit()
|
}
|
|
// Close is used to gracefully close the DB connection.
|
func (b *BoltStore) Close() error {
|
return b.conn.Close()
|
}
|
|
// FirstIndex returns the first known index from the Raft log.
|
func (b *BoltStore) FirstIndex(bucketName string) (uint64, error) {
|
tx, err := b.conn.Begin(false)
|
if err != nil {
|
return 0, err
|
}
|
defer tx.Rollback()
|
|
bucket := tx.Bucket([]byte(bucketName))
|
if bucket == nil {
|
return 0, ErrBucketNotFound
|
}
|
|
curs := bucket.Cursor()
|
if first, _ := curs.First(); first == nil {
|
return 0, nil
|
} else {
|
return bytesToUint64(first), nil
|
}
|
}
|
|
// LastIndex returns the last known index from the Raft log.
|
func (b *BoltStore) LastIndex(bucketName string) (uint64, error) {
|
tx, err := b.conn.Begin(false)
|
if err != nil {
|
return 0, err
|
}
|
defer tx.Rollback()
|
bucket := tx.Bucket([]byte(bucketName))
|
if bucket == nil {
|
return 0, ErrBucketNotFound
|
}
|
curs := bucket.Cursor()
|
if last, _ := curs.Last(); last == nil {
|
return 0, nil
|
} else {
|
return bytesToUint64(last), nil
|
}
|
}
|
|
// GetLog is used to retrieve a log from BoltDB at a given index.
|
func (b *BoltStore) GetLog(bucketName string, idx uint64, log *Log) error {
|
tx, err := b.conn.Begin(false)
|
if err != nil {
|
return err
|
}
|
defer tx.Rollback()
|
|
bucket := tx.Bucket([]byte(bucketName))
|
if bucket == nil {
|
return ErrBucketNotFound
|
}
|
val := bucket.Get(uint64ToBytes(idx))
|
|
if val == nil {
|
return ErrLogNotFound
|
}
|
return decodeMsgPack(val, log)
|
}
|
|
func (b *BoltStore) GetConfLog(idx uint64, clog *confLog) error {
|
tx, err := b.conn.Begin(false)
|
if err != nil {
|
return err
|
}
|
defer tx.Rollback()
|
|
bucket := tx.Bucket([]byte(confBucket))
|
val := bucket.Get(uint64ToBytes(idx))
|
|
if val == nil {
|
return ErrLogNotFound
|
}
|
return decodeMsgPack(val, clog)
|
}
|
|
// StoreLog is used to store a single raft log
|
func (b *BoltStore) StoreLog(bucketName string, log *Log) error {
|
return b.StoreLogs(bucketName, []*Log{log})
|
}
|
|
// StoreLogs is used to store a set of raft logs
|
func (b *BoltStore) StoreLogs(bucketName string, logs []*Log) error {
|
tx, err := b.conn.Begin(true)
|
if err != nil {
|
return err
|
}
|
defer tx.Rollback()
|
|
for _, log := range logs {
|
key := uint64ToBytes(log.Index)
|
val, err := encodeMsgPack(log)
|
if err != nil {
|
return err
|
}
|
bucket := tx.Bucket([]byte(bucketName))
|
if bucket == nil {
|
bucket, err = tx.CreateBucket([]byte(bucketName))
|
if err != nil {
|
return err
|
}
|
}
|
if err := bucket.Put(key, val.Bytes()); err != nil {
|
return err
|
}
|
}
|
|
return tx.Commit()
|
}
|
|
// DeleteRange is used to delete logs within a given range inclusively.
|
func (b *BoltStore) DeleteRange(bucketName string, min, max uint64) error {
|
minKey := uint64ToBytes(min)
|
|
tx, err := b.conn.Begin(true)
|
if err != nil {
|
return err
|
}
|
defer tx.Rollback()
|
|
curs := tx.Bucket([]byte(bucketName)).Cursor()
|
for k, _ := curs.Seek(minKey); k != nil; k, _ = curs.Next() {
|
// Handle out-of-range log index
|
if bytesToUint64(k) > max {
|
break
|
}
|
|
// Delete in-range log index
|
if err := curs.Delete(); err != nil {
|
return err
|
}
|
}
|
|
return tx.Commit()
|
}
|
|
func (b *BoltStore) ForEach(f func(v []byte) error) {
|
tx, err := b.conn.Begin(true)
|
if err != nil {
|
return
|
}
|
defer tx.Rollback()
|
|
start, _ := b.FirstIndex(confBucket)
|
end, _ := b.LastIndex(confBucket)
|
|
for ; start <= end; start++ {
|
bucketName := bucketPre + strconv.Itoa(int(start))
|
bucket := tx.Bucket([]byte(bucketName))
|
if bucket == nil {
|
return
|
}
|
|
bucket.ForEach(func(k, v []byte) error {
|
log := &Log{}
|
err := decodeMsgPack(v, log)
|
f(log.Data)
|
return err
|
})
|
}
|
}
|
|
func (b *BoltStore) Delete(lc *LogCon) error {
|
tx, err := b.conn.Begin(true)
|
if err != nil {
|
return err
|
}
|
defer tx.Rollback()
|
bucket := tx.Bucket([]byte(lc.conf.BucketName))
|
dc := false
|
if bucket != nil {
|
if lc.Log != nil {
|
if dErr := bucket.Delete(uint64ToBytes(lc.Log.Index)); dErr != nil {
|
return dErr
|
}
|
}
|
|
size := 0
|
bucket.ForEach(func(k, v []byte) error {
|
size++
|
return nil
|
})
|
if size == 0 {
|
dc = true
|
}
|
} else {
|
dc = true
|
}
|
if dc {
|
cb := tx.Bucket([]byte(confBucket))
|
if cb != nil {
|
cbSize := 0
|
cb.ForEach(func(k, v []byte) error {
|
cbSize++
|
return nil
|
})
|
if cbSize > 1 {
|
if dErr := cb.Delete(uint64ToBytes(uint64(lc.conf.Index))); dErr != nil {
|
return dErr
|
}
|
}
|
}
|
}
|
tx.Commit()
|
return nil
|
}
|
|
func (b *BoltStore) UpdateTime(bucketName string, t time.Time) error {
|
tx, err := b.conn.Begin(true)
|
if err != nil {
|
return err
|
}
|
defer tx.Rollback()
|
bucket := tx.Bucket([]byte(confBucket))
|
if bucket != nil {
|
flag := false
|
bucket.ForEach(func(k, v []byte) error {
|
cLog := &confLog{}
|
if de := decodeMsgPack(v, cLog); de == nil {
|
if cLog.BucketName == bucketName {
|
cLog.UpdateTime = t
|
cLog.Index = int(bytesToUint64(k))
|
val, err := encodeMsgPack(cLog)
|
if err == nil {
|
if bucket.Put(k, val.Bytes()) == nil {
|
flag = true
|
}
|
}
|
}
|
}
|
return nil
|
})
|
if !flag {
|
return errors.New("conf表中未找到" + bucketName)
|
}
|
} else {
|
return ErrBucketNotFound
|
}
|
tx.Commit()
|
return nil
|
}
|
|
func (b *BoltStore) Clean(conf *Config) ([]string, []string, error) {
|
type kConfLog struct {
|
k []byte
|
log *confLog
|
}
|
tx, err := b.conn.Begin(true)
|
if err != nil {
|
return nil, nil, err
|
}
|
defer tx.Rollback()
|
bucket := tx.Bucket([]byte(confBucket))
|
var delBucketNames []string
|
var delErrBucketNames []string
|
if bucket != nil {
|
var allCLogs []*kConfLog
|
bucket.ForEach(func(k, v []byte) error {
|
cLog := &confLog{}
|
if e := decodeMsgPack(v, cLog); e == nil {
|
allCLogs = append(allCLogs, &kConfLog{
|
k: k,
|
log: cLog,
|
})
|
}
|
return nil
|
})
|
var leftClogs []*kConfLog
|
if len(allCLogs) > conf.bucketLimit {
|
arr := allCLogs[:len(allCLogs)-conf.bucketLimit]
|
leftClogs = allCLogs[len(allCLogs)-conf.bucketLimit:]
|
for _, a := range arr {
|
if de := tx.DeleteBucket([]byte(a.log.BucketName)); de != nil {
|
delErrBucketNames = append(delErrBucketNames, a.log.BucketName)
|
} else {
|
bucket.Delete(a.k)
|
delBucketNames = append(delBucketNames, a.log.BucketName)
|
}
|
}
|
} else {
|
leftClogs = allCLogs
|
}
|
if len(leftClogs) > 1 {
|
leftClogs = leftClogs[:len(leftClogs)-1]
|
for _, a := range leftClogs {
|
if int(time.Now().Sub(a.log.UpdateTime).Hours()) > conf.keepDays*24 {
|
if de := tx.DeleteBucket([]byte(a.log.BucketName)); de != nil {
|
delErrBucketNames = append(delErrBucketNames, a.log.BucketName)
|
} else {
|
bucket.Delete(a.k)
|
delBucketNames = append(delBucketNames, a.log.BucketName)
|
}
|
}
|
}
|
}
|
|
} else {
|
return nil, nil, errors.New("bucket not found")
|
}
|
tx.Commit()
|
return delBucketNames, delErrBucketNames, nil
|
}
|
|
// Set is used to set a key/value set outside of the raft log
|
func (b *BoltStore) Set(k, v []byte) error {
|
tx, err := b.conn.Begin(true)
|
if err != nil {
|
return err
|
}
|
defer tx.Rollback()
|
|
bucket := tx.Bucket(dbConf)
|
if err := bucket.Put(k, v); err != nil {
|
return err
|
}
|
|
return tx.Commit()
|
}
|
|
func (b *BoltStore) Size(bucketName string) (int, error) {
|
tx, err := b.conn.Begin(false)
|
if err != nil {
|
return 0, err
|
}
|
defer tx.Rollback()
|
|
bucket := tx.Bucket([]byte(bucketName))
|
if bucket == nil {
|
return 0, ErrBucketNotFound
|
}
|
size := 0
|
bucket.ForEach(func(k, v []byte) error {
|
size++
|
return nil
|
})
|
return size, nil
|
}
|
|
// Get is used to retrieve a value from the k/v store by key
|
func (b *BoltStore) Get(k []byte) ([]byte, error) {
|
tx, err := b.conn.Begin(false)
|
if err != nil {
|
return nil, err
|
}
|
defer tx.Rollback()
|
|
bucket := tx.Bucket(dbConf)
|
val := bucket.Get(k)
|
|
if val == nil {
|
return nil, ErrKeyNotFound
|
}
|
return append([]byte(nil), val...), nil
|
}
|
|
// SetUint64 is like Set, but handles uint64 values
|
func (b *BoltStore) SetUint64(key []byte, val uint64) error {
|
return b.Set(key, uint64ToBytes(val))
|
}
|
|
// GetUint64 is like Get, but handles uint64 values
|
func (b *BoltStore) GetUint64(key []byte) (uint64, error) {
|
val, err := b.Get(key)
|
if err != nil {
|
return 0, err
|
}
|
return bytesToUint64(val), nil
|
}
|
|
// Sync performs an fsync on the database file handle. This is not necessary
|
// under normal operation unless NoSync is enabled, in which this forces the
|
// database file to sync against the disk.
|
func (b *BoltStore) Sync() error {
|
return b.conn.Sync()
|
}
|