From 911e6eb6a1a1ab5dd979a1917b79a5465da88181 Mon Sep 17 00:00:00 2001 From: liuxiaolong <liuxiaolong@aiotlink.com> Date: 星期三, 18 十一月 2020 14:33:28 +0800 Subject: [PATCH] 修改gitignore --- bolt_store.go | 457 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 files changed, 457 insertions(+), 0 deletions(-) diff --git a/bolt_store.go b/bolt_store.go new file mode 100644 index 0000000..0977102 --- /dev/null +++ b/bolt_store.go @@ -0,0 +1,457 @@ +package boltcache + +import ( + "errors" + "github.com/boltdb/bolt" + "time" +) + +const ( + // Permissions to use on the db file. This is only used if the + // database file does not exist and needs to be created. + dbFileMode = 0600 +) + +var ( + // Bucket names we perform transactions in + dbConf = []byte(confBucket) + + // An error indicating a given key does not exist + ErrKeyNotFound = errors.New("not found") + ErrLogNotFound = errors.New("log not found") + ErrBucketNotFound = errors.New("bucket not found") +) + +// BoltStore provides access to BoltDB for Raft to store and retrieve +// log entries. It also provides key/value storage, and can be used as +// a LogStore and StableStore. +type BoltStore struct { + // conn is the underlying handle to the db. + conn *bolt.DB + + // The path to the Bolt database file + path string +} + +// Options contains all the configuration used to open the BoltDB +type Options struct { + // Path is the file path to the BoltDB to use + Path string + + // BoltOptions contains any specific BoltDB options you might + // want to specify [e.g. open timeout] + BoltOptions *bolt.Options + + // NoSync causes the database to skip fsync calls after each + // write to the log. This is unsafe, so it should be used + // with caution. + NoSync bool +} + +// readOnly returns true if the contained bolt options say to open +// the DB in readOnly mode [this can be useful to tools that want +// to examine the log] +func (o *Options) readOnly() bool { + return o != nil && o.BoltOptions != nil && o.BoltOptions.ReadOnly +} + +// NewBoltStore takes a file path and returns a connected Raft backend. +func NewBoltStore(path string) (*BoltStore, error) { + return New(Options{Path: path}) +} + +// New uses the supplied options to open the BoltDB and prepare it for use as a raft backend. +func New(options Options) (*BoltStore, error) { + // Try to connect + handle, err := bolt.Open(options.Path, dbFileMode, options.BoltOptions) + if err != nil { + return nil, err + } + handle.NoSync = options.NoSync + + // Create the new store + store := &BoltStore{ + conn: handle, + path: options.Path, + } + + // If the store was opened read-only, don't try and create buckets + if !options.readOnly() { + // Set up our buckets + if err := store.initialize(); err != nil { + store.Close() + return nil, err + } + } + return store, nil +} + +// initialize is used to set up all of the buckets. +func (b *BoltStore) initialize() error { + tx, err := b.conn.Begin(true) + if err != nil { + return err + } + defer tx.Rollback() + + if _, err := tx.CreateBucketIfNotExists(dbConf); err != nil { + return err + } + + return tx.Commit() +} + +// Close is used to gracefully close the DB connection. +func (b *BoltStore) Close() error { + return b.conn.Close() +} + +// FirstIndex returns the first known index from the Raft log. +func (b *BoltStore) FirstIndex(bucketName string) (uint64, error) { + tx, err := b.conn.Begin(false) + if err != nil { + return 0, err + } + defer tx.Rollback() + + bucket := tx.Bucket([]byte(bucketName)) + if bucket ==nil { + return 0, ErrBucketNotFound + } + + curs := bucket.Cursor() + if first, _ := curs.First(); first == nil { + return 0, nil + } else { + return bytesToUint64(first), nil + } +} + +// LastIndex returns the last known index from the Raft log. +func (b *BoltStore) LastIndex(bucketName string) (uint64, error) { + tx, err := b.conn.Begin(false) + if err != nil { + return 0, err + } + defer tx.Rollback() + bucket := tx.Bucket([]byte(bucketName)) + if bucket ==nil { + return 0, ErrBucketNotFound + } + curs := bucket.Cursor() + if last, _ := curs.Last(); last == nil { + return 0, nil + } else { + return bytesToUint64(last), nil + } +} + +// GetLog is used to retrieve a log from BoltDB at a given index. +func (b *BoltStore) GetLog(bucketName string, idx uint64, log *Log) error { + tx, err := b.conn.Begin(false) + if err != nil { + return err + } + defer tx.Rollback() + + bucket := tx.Bucket([]byte(bucketName)) + if bucket == nil { + return ErrBucketNotFound + } + val := bucket.Get(uint64ToBytes(idx)) + + if val == nil { + return ErrLogNotFound + } + return decodeMsgPack(val, log) +} + +func (b *BoltStore) GetConfLog(idx uint64, clog *confLog) error { + tx, err := b.conn.Begin(false) + if err != nil { + return err + } + defer tx.Rollback() + + bucket := tx.Bucket([]byte(confBucket)) + val := bucket.Get(uint64ToBytes(idx)) + + if val == nil { + return ErrLogNotFound + } + return decodeMsgPack(val, clog) +} + +// StoreLog is used to store a single raft log +func (b *BoltStore) StoreLog(bucketName string, log *Log) error { + return b.StoreLogs(bucketName, []*Log{log}) +} + +// StoreLogs is used to store a set of raft logs +func (b *BoltStore) StoreLogs(bucketName string, logs []*Log) error { + tx, err := b.conn.Begin(true) + if err != nil { + return err + } + defer tx.Rollback() + + for _, log := range logs { + key := uint64ToBytes(log.Index) + val, err := encodeMsgPack(log) + if err != nil { + return err + } + bucket := tx.Bucket([]byte(bucketName)) + if bucket == nil { + bucket,err = tx.CreateBucket([]byte(bucketName)) + if err != nil { + return err + } + } + if err := bucket.Put(key, val.Bytes()); err != nil { + return err + } + } + + return tx.Commit() +} + +// DeleteRange is used to delete logs within a given range inclusively. +func (b *BoltStore) DeleteRange(bucketName string, min, max uint64) error { + minKey := uint64ToBytes(min) + + tx, err := b.conn.Begin(true) + if err != nil { + return err + } + defer tx.Rollback() + + curs := tx.Bucket([]byte(bucketName)).Cursor() + for k, _ := curs.Seek(minKey); k != nil; k, _ = curs.Next() { + // Handle out-of-range log index + if bytesToUint64(k) > max { + break + } + + // Delete in-range log index + if err := curs.Delete(); err != nil { + return err + } + } + + return tx.Commit() +} + +func (b *BoltStore) Delete(lc *LogCon) error { + tx,err := b.conn.Begin(true) + if err != nil { + return err + } + defer tx.Rollback() + bucket := tx.Bucket([]byte(lc.conf.BucketName)) + dc := false + if bucket != nil { + if lc.Log != nil { + if dErr := bucket.Delete(uint64ToBytes(lc.Log.Index));dErr != nil { + return dErr + } + } + + size := 0 + bucket.ForEach(func(k, v []byte) error { + size++ + return nil + }) + if size == 0 { + dc = true + } + } else { + dc = true + } + if dc { + cb := tx.Bucket([]byte(confBucket)) + if cb != nil { + cbSize := 0 + cb.ForEach(func(k, v []byte) error { + cbSize++ + return nil + }) + if cbSize > 1 { + if dErr := cb.Delete(uint64ToBytes(uint64(lc.conf.Index)));dErr != nil { + return dErr + } + } + } + } + tx.Commit() + return nil +} + +func (b *BoltStore) UpdateTime(bucketName string, t time.Time) error { + tx,err := b.conn.Begin(true) + if err != nil { + return err + } + defer tx.Rollback() + bucket := tx.Bucket([]byte(confBucket)) + if bucket != nil { + flag := false + bucket.ForEach(func(k, v []byte) error { + cLog := &confLog{} + if de := decodeMsgPack(v, cLog); de == nil { + if cLog.BucketName == bucketName { + cLog.UpdateTime = t + cLog.Index = int(bytesToUint64(k)) + val,err := encodeMsgPack(cLog) + if err ==nil { + if bucket.Put(k, val.Bytes()) ==nil { + flag = true + } + } + } + } + return nil + }) + if !flag { + return errors.New("conf琛ㄤ腑鏈壘鍒�"+bucketName) + } + } else { + return ErrBucketNotFound + } + tx.Commit() + return nil +} + +func (b *BoltStore) Clean(conf *Config) ([]string, []string, error) { + type kConfLog struct { + k []byte + log *confLog + } + tx,err := b.conn.Begin(true) + if err != nil { + return nil, nil, err + } + defer tx.Rollback() + bucket := tx.Bucket([]byte(confBucket)) + var delBucketNames []string + var delErrBucketNames []string + if bucket != nil { + var allCLogs []*kConfLog + bucket.ForEach(func(k, v []byte) error { + cLog := &confLog{} + if e :=decodeMsgPack(v, cLog);e ==nil { + allCLogs = append(allCLogs, &kConfLog{ + k:k, + log:cLog, + }) + } + return nil + }) + var leftClogs []*kConfLog + if len(allCLogs) > conf.bucketLimit { + arr := allCLogs[:len(allCLogs)-conf.bucketLimit] + leftClogs = allCLogs[len(allCLogs)-conf.bucketLimit:] + for _,a := range arr { + if de := tx.DeleteBucket([]byte(a.log.BucketName));de !=nil { + delErrBucketNames = append(delErrBucketNames, a.log.BucketName) + } else { + bucket.Delete(a.k) + delBucketNames = append(delBucketNames, a.log.BucketName) + } + } + } else { + leftClogs = allCLogs + } + if len(leftClogs) >1 { + leftClogs = leftClogs[:len(leftClogs)-1] + for _,a := range leftClogs { + if int(time.Now().Sub(a.log.UpdateTime).Hours()) > conf.keepDays * 24 { + if de := tx.DeleteBucket([]byte(a.log.BucketName));de !=nil { + delErrBucketNames = append(delErrBucketNames, a.log.BucketName) + } else { + bucket.Delete(a.k) + delBucketNames = append(delBucketNames, a.log.BucketName) + } + } + } + } + + } else { + return nil, nil, errors.New("bucket not found") + } + tx.Commit() + return delBucketNames, delErrBucketNames, nil +} + +// Set is used to set a key/value set outside of the raft log +func (b *BoltStore) Set(k, v []byte) error { + tx, err := b.conn.Begin(true) + if err != nil { + return err + } + defer tx.Rollback() + + bucket := tx.Bucket(dbConf) + if err := bucket.Put(k, v); err != nil { + return err + } + + return tx.Commit() +} + +func (b *BoltStore) Size(bucketName string) (int,error) { + tx, err := b.conn.Begin(false) + if err != nil { + return 0, err + } + defer tx.Rollback() + + bucket := tx.Bucket([]byte(bucketName)) + if bucket == nil { + return 0, ErrBucketNotFound + } + size := 0 + bucket.ForEach(func(k, v []byte) error { + size++ + return nil + }) + return size, nil +} + +// Get is used to retrieve a value from the k/v store by key +func (b *BoltStore) Get(k []byte) ([]byte, error) { + tx, err := b.conn.Begin(false) + if err != nil { + return nil, err + } + defer tx.Rollback() + + bucket := tx.Bucket(dbConf) + val := bucket.Get(k) + + if val == nil { + return nil, ErrKeyNotFound + } + return append([]byte(nil), val...), nil +} + +// SetUint64 is like Set, but handles uint64 values +func (b *BoltStore) SetUint64(key []byte, val uint64) error { + return b.Set(key, uint64ToBytes(val)) +} + +// GetUint64 is like Get, but handles uint64 values +func (b *BoltStore) GetUint64(key []byte) (uint64, error) { + val, err := b.Get(key) + if err != nil { + return 0, err + } + return bytesToUint64(val), nil +} + +// Sync performs an fsync on the database file handle. This is not necessary +// under normal operation unless NoSync is enabled, in which this forces the +// database file to sync against the disk. +func (b *BoltStore) Sync() error { + return b.conn.Sync() +} -- Gitblit v1.8.0