New file |
| | |
| | | package boltcache |
| | | |
| | | import ( |
| | | "errors" |
| | | "github.com/boltdb/bolt" |
| | | "time" |
| | | ) |
| | | |
| | | const ( |
| | | // Permissions to use on the db file. This is only used if the |
| | | // database file does not exist and needs to be created. |
| | | dbFileMode = 0600 |
| | | ) |
| | | |
| | | var ( |
| | | // Bucket names we perform transactions in |
| | | dbConf = []byte(confBucket) |
| | | |
| | | // An error indicating a given key does not exist |
| | | ErrKeyNotFound = errors.New("not found") |
| | | ErrLogNotFound = errors.New("log not found") |
| | | ErrBucketNotFound = errors.New("bucket not found") |
| | | ) |
| | | |
| | | // BoltStore provides access to BoltDB for Raft to store and retrieve |
| | | // log entries. It also provides key/value storage, and can be used as |
| | | // a LogStore and StableStore. |
| | | type BoltStore struct { |
| | | // conn is the underlying handle to the db. |
| | | conn *bolt.DB |
| | | |
| | | // The path to the Bolt database file |
| | | path string |
| | | } |
| | | |
| | | // Options contains all the configuration used to open the BoltDB |
| | | type Options struct { |
| | | // Path is the file path to the BoltDB to use |
| | | Path string |
| | | |
| | | // BoltOptions contains any specific BoltDB options you might |
| | | // want to specify [e.g. open timeout] |
| | | BoltOptions *bolt.Options |
| | | |
| | | // NoSync causes the database to skip fsync calls after each |
| | | // write to the log. This is unsafe, so it should be used |
| | | // with caution. |
| | | NoSync bool |
| | | } |
| | | |
| | | // readOnly returns true if the contained bolt options say to open |
| | | // the DB in readOnly mode [this can be useful to tools that want |
| | | // to examine the log] |
| | | func (o *Options) readOnly() bool { |
| | | return o != nil && o.BoltOptions != nil && o.BoltOptions.ReadOnly |
| | | } |
| | | |
| | | // NewBoltStore takes a file path and returns a connected Raft backend. |
| | | func NewBoltStore(path string) (*BoltStore, error) { |
| | | return New(Options{Path: path}) |
| | | } |
| | | |
| | | // New uses the supplied options to open the BoltDB and prepare it for use as a raft backend. |
| | | func New(options Options) (*BoltStore, error) { |
| | | // Try to connect |
| | | handle, err := bolt.Open(options.Path, dbFileMode, options.BoltOptions) |
| | | if err != nil { |
| | | return nil, err |
| | | } |
| | | handle.NoSync = options.NoSync |
| | | |
| | | // Create the new store |
| | | store := &BoltStore{ |
| | | conn: handle, |
| | | path: options.Path, |
| | | } |
| | | |
| | | // If the store was opened read-only, don't try and create buckets |
| | | if !options.readOnly() { |
| | | // Set up our buckets |
| | | if err := store.initialize(); err != nil { |
| | | store.Close() |
| | | return nil, err |
| | | } |
| | | } |
| | | return store, nil |
| | | } |
| | | |
| | | // initialize is used to set up all of the buckets. |
| | | func (b *BoltStore) initialize() error { |
| | | tx, err := b.conn.Begin(true) |
| | | if err != nil { |
| | | return err |
| | | } |
| | | defer tx.Rollback() |
| | | |
| | | if _, err := tx.CreateBucketIfNotExists(dbConf); err != nil { |
| | | return err |
| | | } |
| | | |
| | | return tx.Commit() |
| | | } |
| | | |
| | | // Close is used to gracefully close the DB connection. |
| | | func (b *BoltStore) Close() error { |
| | | return b.conn.Close() |
| | | } |
| | | |
| | | // FirstIndex returns the first known index from the Raft log. |
| | | func (b *BoltStore) FirstIndex(bucketName string) (uint64, error) { |
| | | tx, err := b.conn.Begin(false) |
| | | if err != nil { |
| | | return 0, err |
| | | } |
| | | defer tx.Rollback() |
| | | |
| | | bucket := tx.Bucket([]byte(bucketName)) |
| | | if bucket ==nil { |
| | | return 0, ErrBucketNotFound |
| | | } |
| | | |
| | | curs := bucket.Cursor() |
| | | if first, _ := curs.First(); first == nil { |
| | | return 0, nil |
| | | } else { |
| | | return bytesToUint64(first), nil |
| | | } |
| | | } |
| | | |
| | | // LastIndex returns the last known index from the Raft log. |
| | | func (b *BoltStore) LastIndex(bucketName string) (uint64, error) { |
| | | tx, err := b.conn.Begin(false) |
| | | if err != nil { |
| | | return 0, err |
| | | } |
| | | defer tx.Rollback() |
| | | bucket := tx.Bucket([]byte(bucketName)) |
| | | if bucket ==nil { |
| | | return 0, ErrBucketNotFound |
| | | } |
| | | curs := bucket.Cursor() |
| | | if last, _ := curs.Last(); last == nil { |
| | | return 0, nil |
| | | } else { |
| | | return bytesToUint64(last), nil |
| | | } |
| | | } |
| | | |
| | | // GetLog is used to retrieve a log from BoltDB at a given index. |
| | | func (b *BoltStore) GetLog(bucketName string, idx uint64, log *Log) error { |
| | | tx, err := b.conn.Begin(false) |
| | | if err != nil { |
| | | return err |
| | | } |
| | | defer tx.Rollback() |
| | | |
| | | bucket := tx.Bucket([]byte(bucketName)) |
| | | if bucket == nil { |
| | | return ErrBucketNotFound |
| | | } |
| | | val := bucket.Get(uint64ToBytes(idx)) |
| | | |
| | | if val == nil { |
| | | return ErrLogNotFound |
| | | } |
| | | return decodeMsgPack(val, log) |
| | | } |
| | | |
| | | func (b *BoltStore) GetConfLog(idx uint64, clog *confLog) error { |
| | | tx, err := b.conn.Begin(false) |
| | | if err != nil { |
| | | return err |
| | | } |
| | | defer tx.Rollback() |
| | | |
| | | bucket := tx.Bucket([]byte(confBucket)) |
| | | val := bucket.Get(uint64ToBytes(idx)) |
| | | |
| | | if val == nil { |
| | | return ErrLogNotFound |
| | | } |
| | | return decodeMsgPack(val, clog) |
| | | } |
| | | |
| | | // StoreLog is used to store a single raft log |
| | | func (b *BoltStore) StoreLog(bucketName string, log *Log) error { |
| | | return b.StoreLogs(bucketName, []*Log{log}) |
| | | } |
| | | |
| | | // StoreLogs is used to store a set of raft logs |
| | | func (b *BoltStore) StoreLogs(bucketName string, logs []*Log) error { |
| | | tx, err := b.conn.Begin(true) |
| | | if err != nil { |
| | | return err |
| | | } |
| | | defer tx.Rollback() |
| | | |
| | | for _, log := range logs { |
| | | key := uint64ToBytes(log.Index) |
| | | val, err := encodeMsgPack(log) |
| | | if err != nil { |
| | | return err |
| | | } |
| | | bucket := tx.Bucket([]byte(bucketName)) |
| | | if bucket == nil { |
| | | bucket,err = tx.CreateBucket([]byte(bucketName)) |
| | | if err != nil { |
| | | return err |
| | | } |
| | | } |
| | | if err := bucket.Put(key, val.Bytes()); err != nil { |
| | | return err |
| | | } |
| | | } |
| | | |
| | | return tx.Commit() |
| | | } |
| | | |
| | | // DeleteRange is used to delete logs within a given range inclusively. |
| | | func (b *BoltStore) DeleteRange(bucketName string, min, max uint64) error { |
| | | minKey := uint64ToBytes(min) |
| | | |
| | | tx, err := b.conn.Begin(true) |
| | | if err != nil { |
| | | return err |
| | | } |
| | | defer tx.Rollback() |
| | | |
| | | curs := tx.Bucket([]byte(bucketName)).Cursor() |
| | | for k, _ := curs.Seek(minKey); k != nil; k, _ = curs.Next() { |
| | | // Handle out-of-range log index |
| | | if bytesToUint64(k) > max { |
| | | break |
| | | } |
| | | |
| | | // Delete in-range log index |
| | | if err := curs.Delete(); err != nil { |
| | | return err |
| | | } |
| | | } |
| | | |
| | | return tx.Commit() |
| | | } |
| | | |
| | | func (b *BoltStore) Delete(lc *LogCon) error { |
| | | tx,err := b.conn.Begin(true) |
| | | if err != nil { |
| | | return err |
| | | } |
| | | defer tx.Rollback() |
| | | bucket := tx.Bucket([]byte(lc.conf.BucketName)) |
| | | dc := false |
| | | if bucket != nil { |
| | | if lc.Log != nil { |
| | | if dErr := bucket.Delete(uint64ToBytes(lc.Log.Index));dErr != nil { |
| | | return dErr |
| | | } |
| | | } |
| | | |
| | | size := 0 |
| | | bucket.ForEach(func(k, v []byte) error { |
| | | size++ |
| | | return nil |
| | | }) |
| | | if size == 0 { |
| | | dc = true |
| | | } |
| | | } else { |
| | | dc = true |
| | | } |
| | | if dc { |
| | | cb := tx.Bucket([]byte(confBucket)) |
| | | if cb != nil { |
| | | cbSize := 0 |
| | | cb.ForEach(func(k, v []byte) error { |
| | | cbSize++ |
| | | return nil |
| | | }) |
| | | if cbSize > 1 { |
| | | if dErr := cb.Delete(uint64ToBytes(uint64(lc.conf.Index)));dErr != nil { |
| | | return dErr |
| | | } |
| | | } |
| | | } |
| | | } |
| | | tx.Commit() |
| | | return nil |
| | | } |
| | | |
| | | func (b *BoltStore) UpdateTime(bucketName string, t time.Time) error { |
| | | tx,err := b.conn.Begin(true) |
| | | if err != nil { |
| | | return err |
| | | } |
| | | defer tx.Rollback() |
| | | bucket := tx.Bucket([]byte(confBucket)) |
| | | if bucket != nil { |
| | | flag := false |
| | | bucket.ForEach(func(k, v []byte) error { |
| | | cLog := &confLog{} |
| | | if de := decodeMsgPack(v, cLog); de == nil { |
| | | if cLog.BucketName == bucketName { |
| | | cLog.UpdateTime = t |
| | | cLog.Index = int(bytesToUint64(k)) |
| | | val,err := encodeMsgPack(cLog) |
| | | if err ==nil { |
| | | if bucket.Put(k, val.Bytes()) ==nil { |
| | | flag = true |
| | | } |
| | | } |
| | | } |
| | | } |
| | | return nil |
| | | }) |
| | | if !flag { |
| | | return errors.New("conf表中未找到"+bucketName) |
| | | } |
| | | } else { |
| | | return ErrBucketNotFound |
| | | } |
| | | tx.Commit() |
| | | return nil |
| | | } |
| | | |
| | | func (b *BoltStore) Clean(conf *Config) ([]string, []string, error) { |
| | | type kConfLog struct { |
| | | k []byte |
| | | log *confLog |
| | | } |
| | | tx,err := b.conn.Begin(true) |
| | | if err != nil { |
| | | return nil, nil, err |
| | | } |
| | | defer tx.Rollback() |
| | | bucket := tx.Bucket([]byte(confBucket)) |
| | | var delBucketNames []string |
| | | var delErrBucketNames []string |
| | | if bucket != nil { |
| | | var allCLogs []*kConfLog |
| | | bucket.ForEach(func(k, v []byte) error { |
| | | cLog := &confLog{} |
| | | if e :=decodeMsgPack(v, cLog);e ==nil { |
| | | allCLogs = append(allCLogs, &kConfLog{ |
| | | k:k, |
| | | log:cLog, |
| | | }) |
| | | } |
| | | return nil |
| | | }) |
| | | var leftClogs []*kConfLog |
| | | if len(allCLogs) > conf.bucketLimit { |
| | | arr := allCLogs[:len(allCLogs)-conf.bucketLimit] |
| | | leftClogs = allCLogs[len(allCLogs)-conf.bucketLimit:] |
| | | for _,a := range arr { |
| | | if de := tx.DeleteBucket([]byte(a.log.BucketName));de !=nil { |
| | | delErrBucketNames = append(delErrBucketNames, a.log.BucketName) |
| | | } else { |
| | | bucket.Delete(a.k) |
| | | delBucketNames = append(delBucketNames, a.log.BucketName) |
| | | } |
| | | } |
| | | } else { |
| | | leftClogs = allCLogs |
| | | } |
| | | if len(leftClogs) >1 { |
| | | leftClogs = leftClogs[:len(leftClogs)-1] |
| | | for _,a := range leftClogs { |
| | | if int(time.Now().Sub(a.log.UpdateTime).Hours()) > conf.keepDays * 24 { |
| | | if de := tx.DeleteBucket([]byte(a.log.BucketName));de !=nil { |
| | | delErrBucketNames = append(delErrBucketNames, a.log.BucketName) |
| | | } else { |
| | | bucket.Delete(a.k) |
| | | delBucketNames = append(delBucketNames, a.log.BucketName) |
| | | } |
| | | } |
| | | } |
| | | } |
| | | |
| | | } else { |
| | | return nil, nil, errors.New("bucket not found") |
| | | } |
| | | tx.Commit() |
| | | return delBucketNames, delErrBucketNames, nil |
| | | } |
| | | |
| | | // Set is used to set a key/value set outside of the raft log |
| | | func (b *BoltStore) Set(k, v []byte) error { |
| | | tx, err := b.conn.Begin(true) |
| | | if err != nil { |
| | | return err |
| | | } |
| | | defer tx.Rollback() |
| | | |
| | | bucket := tx.Bucket(dbConf) |
| | | if err := bucket.Put(k, v); err != nil { |
| | | return err |
| | | } |
| | | |
| | | return tx.Commit() |
| | | } |
| | | |
| | | func (b *BoltStore) Size(bucketName string) (int,error) { |
| | | tx, err := b.conn.Begin(false) |
| | | if err != nil { |
| | | return 0, err |
| | | } |
| | | defer tx.Rollback() |
| | | |
| | | bucket := tx.Bucket([]byte(bucketName)) |
| | | if bucket == nil { |
| | | return 0, ErrBucketNotFound |
| | | } |
| | | size := 0 |
| | | bucket.ForEach(func(k, v []byte) error { |
| | | size++ |
| | | return nil |
| | | }) |
| | | return size, nil |
| | | } |
| | | |
| | | // Get is used to retrieve a value from the k/v store by key |
| | | func (b *BoltStore) Get(k []byte) ([]byte, error) { |
| | | tx, err := b.conn.Begin(false) |
| | | if err != nil { |
| | | return nil, err |
| | | } |
| | | defer tx.Rollback() |
| | | |
| | | bucket := tx.Bucket(dbConf) |
| | | val := bucket.Get(k) |
| | | |
| | | if val == nil { |
| | | return nil, ErrKeyNotFound |
| | | } |
| | | return append([]byte(nil), val...), nil |
| | | } |
| | | |
| | | // SetUint64 is like Set, but handles uint64 values |
| | | func (b *BoltStore) SetUint64(key []byte, val uint64) error { |
| | | return b.Set(key, uint64ToBytes(val)) |
| | | } |
| | | |
| | | // GetUint64 is like Get, but handles uint64 values |
| | | func (b *BoltStore) GetUint64(key []byte) (uint64, error) { |
| | | val, err := b.Get(key) |
| | | if err != nil { |
| | | return 0, err |
| | | } |
| | | return bytesToUint64(val), nil |
| | | } |
| | | |
| | | // Sync performs an fsync on the database file handle. This is not necessary |
| | | // under normal operation unless NoSync is enabled, in which this forces the |
| | | // database file to sync against the disk. |
| | | func (b *BoltStore) Sync() error { |
| | | return b.conn.Sync() |
| | | } |