package boltcache import ( "errors" "github.com/boltdb/bolt" "strconv" "time" ) const ( // Permissions to use on the db file. This is only used if the // database file does not exist and needs to be created. dbFileMode = 0600 ) var ( // Bucket names we perform transactions in dbConf = []byte(confBucket) // An error indicating a given key does not exist ErrKeyNotFound = errors.New("not found") ErrLogNotFound = errors.New("log not found") ErrBucketNotFound = errors.New("bucket not found") ) // BoltStore provides access to BoltDB for Raft to store and retrieve // log entries. It also provides key/value storage, and can be used as // a LogStore and StableStore. type BoltStore struct { // conn is the underlying handle to the db. conn *bolt.DB // The path to the Bolt database file path string } // Options contains all the configuration used to open the BoltDB type Options struct { // Path is the file path to the BoltDB to use Path string // BoltOptions contains any specific BoltDB options you might // want to specify [e.g. open timeout] BoltOptions *bolt.Options // NoSync causes the database to skip fsync calls after each // write to the log. This is unsafe, so it should be used // with caution. NoSync bool } // readOnly returns true if the contained bolt options say to open // the DB in readOnly mode [this can be useful to tools that want // to examine the log] func (o *Options) readOnly() bool { return o != nil && o.BoltOptions != nil && o.BoltOptions.ReadOnly } // NewBoltStore takes a file path and returns a connected Raft backend. func NewBoltStore(path string) (*BoltStore, error) { return New(Options{Path: path}) } // New uses the supplied options to open the BoltDB and prepare it for use as a raft backend. func New(options Options) (*BoltStore, error) { // Try to connect handle, err := bolt.Open(options.Path, dbFileMode, options.BoltOptions) if err != nil { return nil, err } handle.NoSync = options.NoSync // Create the new store store := &BoltStore{ conn: handle, path: options.Path, } // If the store was opened read-only, don't try and create buckets if !options.readOnly() { // Set up our buckets if err := store.initialize(); err != nil { store.Close() return nil, err } } return store, nil } // initialize is used to set up all of the buckets. func (b *BoltStore) initialize() error { tx, err := b.conn.Begin(true) if err != nil { return err } defer tx.Rollback() if _, err := tx.CreateBucketIfNotExists(dbConf); err != nil { return err } return tx.Commit() } // Close is used to gracefully close the DB connection. func (b *BoltStore) Close() error { return b.conn.Close() } // FirstIndex returns the first known index from the Raft log. func (b *BoltStore) FirstIndex(bucketName string) (uint64, error) { tx, err := b.conn.Begin(false) if err != nil { return 0, err } defer tx.Rollback() bucket := tx.Bucket([]byte(bucketName)) if bucket == nil { return 0, ErrBucketNotFound } curs := bucket.Cursor() if first, _ := curs.First(); first == nil { return 0, nil } else { return bytesToUint64(first), nil } } // LastIndex returns the last known index from the Raft log. func (b *BoltStore) LastIndex(bucketName string) (uint64, error) { tx, err := b.conn.Begin(false) if err != nil { return 0, err } defer tx.Rollback() bucket := tx.Bucket([]byte(bucketName)) if bucket == nil { return 0, ErrBucketNotFound } curs := bucket.Cursor() if last, _ := curs.Last(); last == nil { return 0, nil } else { return bytesToUint64(last), nil } } // GetLog is used to retrieve a log from BoltDB at a given index. func (b *BoltStore) GetLog(bucketName string, idx uint64, log *Log) error { tx, err := b.conn.Begin(false) if err != nil { return err } defer tx.Rollback() bucket := tx.Bucket([]byte(bucketName)) if bucket == nil { return ErrBucketNotFound } val := bucket.Get(uint64ToBytes(idx)) if val == nil { return ErrLogNotFound } return decodeMsgPack(val, log) } func (b *BoltStore) GetConfLog(idx uint64, clog *confLog) error { tx, err := b.conn.Begin(false) if err != nil { return err } defer tx.Rollback() bucket := tx.Bucket([]byte(confBucket)) val := bucket.Get(uint64ToBytes(idx)) if val == nil { return ErrLogNotFound } return decodeMsgPack(val, clog) } // StoreLog is used to store a single raft log func (b *BoltStore) StoreLog(bucketName string, log *Log) error { return b.StoreLogs(bucketName, []*Log{log}) } // StoreLogs is used to store a set of raft logs func (b *BoltStore) StoreLogs(bucketName string, logs []*Log) error { tx, err := b.conn.Begin(true) if err != nil { return err } defer tx.Rollback() for _, log := range logs { key := uint64ToBytes(log.Index) val, err := encodeMsgPack(log) if err != nil { return err } bucket := tx.Bucket([]byte(bucketName)) if bucket == nil { bucket, err = tx.CreateBucket([]byte(bucketName)) if err != nil { return err } } if err := bucket.Put(key, val.Bytes()); err != nil { return err } } return tx.Commit() } // DeleteRange is used to delete logs within a given range inclusively. func (b *BoltStore) DeleteRange(bucketName string, min, max uint64) error { minKey := uint64ToBytes(min) tx, err := b.conn.Begin(true) if err != nil { return err } defer tx.Rollback() curs := tx.Bucket([]byte(bucketName)).Cursor() for k, _ := curs.Seek(minKey); k != nil; k, _ = curs.Next() { // Handle out-of-range log index if bytesToUint64(k) > max { break } // Delete in-range log index if err := curs.Delete(); err != nil { return err } } return tx.Commit() } func (b *BoltStore) ForEach(f func(v []byte) error) { tx, err := b.conn.Begin(true) if err != nil { return } defer tx.Rollback() start, _ := b.FirstIndex(confBucket) end, _ := b.LastIndex(confBucket) for ; start <= end; start++ { bucketName := bucketPre + strconv.Itoa(int(start)) bucket := tx.Bucket([]byte(bucketName)) if bucket == nil { return } bucket.ForEach(func(k, v []byte) error { log := &Log{} err := decodeMsgPack(v, log) f(log.Data) return err }) } } func (b *BoltStore) Delete(lc *LogCon) error { tx, err := b.conn.Begin(true) if err != nil { return err } defer tx.Rollback() bucket := tx.Bucket([]byte(lc.conf.BucketName)) dc := false if bucket != nil { if lc.Log != nil { if dErr := bucket.Delete(uint64ToBytes(lc.Log.Index)); dErr != nil { return dErr } } size := 0 bucket.ForEach(func(k, v []byte) error { size++ return nil }) if size == 0 { dc = true } } else { dc = true } if dc { cb := tx.Bucket([]byte(confBucket)) if cb != nil { cbSize := 0 cb.ForEach(func(k, v []byte) error { cbSize++ return nil }) if cbSize > 1 { if dErr := cb.Delete(uint64ToBytes(uint64(lc.conf.Index))); dErr != nil { return dErr } } } } tx.Commit() return nil } func (b *BoltStore) UpdateTime(bucketName string, t time.Time) error { tx, err := b.conn.Begin(true) if err != nil { return err } defer tx.Rollback() bucket := tx.Bucket([]byte(confBucket)) if bucket != nil { flag := false bucket.ForEach(func(k, v []byte) error { cLog := &confLog{} if de := decodeMsgPack(v, cLog); de == nil { if cLog.BucketName == bucketName { cLog.UpdateTime = t cLog.Index = int(bytesToUint64(k)) val, err := encodeMsgPack(cLog) if err == nil { if bucket.Put(k, val.Bytes()) == nil { flag = true } } } } return nil }) if !flag { return errors.New("conf表中未找到" + bucketName) } } else { return ErrBucketNotFound } tx.Commit() return nil } func (b *BoltStore) Clean(conf *Config) ([]string, []string, error) { type kConfLog struct { k []byte log *confLog } tx, err := b.conn.Begin(true) if err != nil { return nil, nil, err } defer tx.Rollback() bucket := tx.Bucket([]byte(confBucket)) var delBucketNames []string var delErrBucketNames []string if bucket != nil { var allCLogs []*kConfLog bucket.ForEach(func(k, v []byte) error { cLog := &confLog{} if e := decodeMsgPack(v, cLog); e == nil { allCLogs = append(allCLogs, &kConfLog{ k: k, log: cLog, }) } return nil }) var leftClogs []*kConfLog if len(allCLogs) > conf.bucketLimit { arr := allCLogs[:len(allCLogs)-conf.bucketLimit] leftClogs = allCLogs[len(allCLogs)-conf.bucketLimit:] for _, a := range arr { if de := tx.DeleteBucket([]byte(a.log.BucketName)); de != nil { delErrBucketNames = append(delErrBucketNames, a.log.BucketName) } else { bucket.Delete(a.k) delBucketNames = append(delBucketNames, a.log.BucketName) } } } else { leftClogs = allCLogs } if len(leftClogs) > 1 { leftClogs = leftClogs[:len(leftClogs)-1] for _, a := range leftClogs { if int(time.Now().Sub(a.log.UpdateTime).Hours()) > conf.keepDays*24 { if de := tx.DeleteBucket([]byte(a.log.BucketName)); de != nil { delErrBucketNames = append(delErrBucketNames, a.log.BucketName) } else { bucket.Delete(a.k) delBucketNames = append(delBucketNames, a.log.BucketName) } } } } } else { return nil, nil, errors.New("bucket not found") } tx.Commit() return delBucketNames, delErrBucketNames, nil } // Set is used to set a key/value set outside of the raft log func (b *BoltStore) Set(k, v []byte) error { tx, err := b.conn.Begin(true) if err != nil { return err } defer tx.Rollback() bucket := tx.Bucket(dbConf) if err := bucket.Put(k, v); err != nil { return err } return tx.Commit() } func (b *BoltStore) Size(bucketName string) (int, error) { tx, err := b.conn.Begin(false) if err != nil { return 0, err } defer tx.Rollback() bucket := tx.Bucket([]byte(bucketName)) if bucket == nil { return 0, ErrBucketNotFound } size := 0 bucket.ForEach(func(k, v []byte) error { size++ return nil }) return size, nil } // Get is used to retrieve a value from the k/v store by key func (b *BoltStore) Get(k []byte) ([]byte, error) { tx, err := b.conn.Begin(false) if err != nil { return nil, err } defer tx.Rollback() bucket := tx.Bucket(dbConf) val := bucket.Get(k) if val == nil { return nil, ErrKeyNotFound } return append([]byte(nil), val...), nil } // SetUint64 is like Set, but handles uint64 values func (b *BoltStore) SetUint64(key []byte, val uint64) error { return b.Set(key, uint64ToBytes(val)) } // GetUint64 is like Get, but handles uint64 values func (b *BoltStore) GetUint64(key []byte) (uint64, error) { val, err := b.Get(key) if err != nil { return 0, err } return bytesToUint64(val), nil } // Sync performs an fsync on the database file handle. This is not necessary // under normal operation unless NoSync is enabled, in which this forces the // database file to sync against the disk. func (b *BoltStore) Sync() error { return b.conn.Sync() }