From 33cc2473a3d6ca941717c557f8e72b9904fbdc4f Mon Sep 17 00:00:00 2001 From: zhangzengfei <zhangzengfei@smartai.com> Date: 星期五, 12 十一月 2021 15:20:58 +0800 Subject: [PATCH] add foreach, size api --- bolt_store.go | 77 ++++++++++++++++++++++++++------------ 1 files changed, 52 insertions(+), 25 deletions(-) diff --git a/bolt_store.go b/bolt_store.go index 0977102..be490ba 100644 --- a/bolt_store.go +++ b/bolt_store.go @@ -3,6 +3,7 @@ import ( "errors" "github.com/boltdb/bolt" + "strconv" "time" ) @@ -17,8 +18,8 @@ dbConf = []byte(confBucket) // An error indicating a given key does not exist - ErrKeyNotFound = errors.New("not found") - ErrLogNotFound = errors.New("log not found") + ErrKeyNotFound = errors.New("not found") + ErrLogNotFound = errors.New("log not found") ErrBucketNotFound = errors.New("bucket not found") ) @@ -115,7 +116,7 @@ defer tx.Rollback() bucket := tx.Bucket([]byte(bucketName)) - if bucket ==nil { + if bucket == nil { return 0, ErrBucketNotFound } @@ -135,7 +136,7 @@ } defer tx.Rollback() bucket := tx.Bucket([]byte(bucketName)) - if bucket ==nil { + if bucket == nil { return 0, ErrBucketNotFound } curs := bucket.Cursor() @@ -203,7 +204,7 @@ } bucket := tx.Bucket([]byte(bucketName)) if bucket == nil { - bucket,err = tx.CreateBucket([]byte(bucketName)) + bucket, err = tx.CreateBucket([]byte(bucketName)) if err != nil { return err } @@ -242,8 +243,34 @@ return tx.Commit() } +func (b *BoltStore) ForEach(f func(v []byte) error) { + tx, err := b.conn.Begin(true) + if err != nil { + return + } + defer tx.Rollback() + + start, _ := b.FirstIndex(confBucket) + end, _ := b.LastIndex(confBucket) + + for ; start <= end; start++ { + bucketName := bucketPre + strconv.Itoa(int(start)) + bucket := tx.Bucket([]byte(bucketName)) + if bucket == nil { + return + } + + bucket.ForEach(func(k, v []byte) error { + log := &Log{} + err := decodeMsgPack(v, log) + f(log.Data) + return err + }) + } +} + func (b *BoltStore) Delete(lc *LogCon) error { - tx,err := b.conn.Begin(true) + tx, err := b.conn.Begin(true) if err != nil { return err } @@ -252,7 +279,7 @@ dc := false if bucket != nil { if lc.Log != nil { - if dErr := bucket.Delete(uint64ToBytes(lc.Log.Index));dErr != nil { + if dErr := bucket.Delete(uint64ToBytes(lc.Log.Index)); dErr != nil { return dErr } } @@ -277,7 +304,7 @@ return nil }) if cbSize > 1 { - if dErr := cb.Delete(uint64ToBytes(uint64(lc.conf.Index)));dErr != nil { + if dErr := cb.Delete(uint64ToBytes(uint64(lc.conf.Index))); dErr != nil { return dErr } } @@ -288,7 +315,7 @@ } func (b *BoltStore) UpdateTime(bucketName string, t time.Time) error { - tx,err := b.conn.Begin(true) + tx, err := b.conn.Begin(true) if err != nil { return err } @@ -302,9 +329,9 @@ if cLog.BucketName == bucketName { cLog.UpdateTime = t cLog.Index = int(bytesToUint64(k)) - val,err := encodeMsgPack(cLog) - if err ==nil { - if bucket.Put(k, val.Bytes()) ==nil { + val, err := encodeMsgPack(cLog) + if err == nil { + if bucket.Put(k, val.Bytes()) == nil { flag = true } } @@ -313,7 +340,7 @@ return nil }) if !flag { - return errors.New("conf琛ㄤ腑鏈壘鍒�"+bucketName) + return errors.New("conf琛ㄤ腑鏈壘鍒�" + bucketName) } } else { return ErrBucketNotFound @@ -324,10 +351,10 @@ func (b *BoltStore) Clean(conf *Config) ([]string, []string, error) { type kConfLog struct { - k []byte + k []byte log *confLog } - tx,err := b.conn.Begin(true) + tx, err := b.conn.Begin(true) if err != nil { return nil, nil, err } @@ -339,10 +366,10 @@ var allCLogs []*kConfLog bucket.ForEach(func(k, v []byte) error { cLog := &confLog{} - if e :=decodeMsgPack(v, cLog);e ==nil { + if e := decodeMsgPack(v, cLog); e == nil { allCLogs = append(allCLogs, &kConfLog{ - k:k, - log:cLog, + k: k, + log: cLog, }) } return nil @@ -351,8 +378,8 @@ if len(allCLogs) > conf.bucketLimit { arr := allCLogs[:len(allCLogs)-conf.bucketLimit] leftClogs = allCLogs[len(allCLogs)-conf.bucketLimit:] - for _,a := range arr { - if de := tx.DeleteBucket([]byte(a.log.BucketName));de !=nil { + for _, a := range arr { + if de := tx.DeleteBucket([]byte(a.log.BucketName)); de != nil { delErrBucketNames = append(delErrBucketNames, a.log.BucketName) } else { bucket.Delete(a.k) @@ -362,11 +389,11 @@ } else { leftClogs = allCLogs } - if len(leftClogs) >1 { + if len(leftClogs) > 1 { leftClogs = leftClogs[:len(leftClogs)-1] - for _,a := range leftClogs { - if int(time.Now().Sub(a.log.UpdateTime).Hours()) > conf.keepDays * 24 { - if de := tx.DeleteBucket([]byte(a.log.BucketName));de !=nil { + for _, a := range leftClogs { + if int(time.Now().Sub(a.log.UpdateTime).Hours()) > conf.keepDays*24 { + if de := tx.DeleteBucket([]byte(a.log.BucketName)); de != nil { delErrBucketNames = append(delErrBucketNames, a.log.BucketName) } else { bucket.Delete(a.k) @@ -399,7 +426,7 @@ return tx.Commit() } -func (b *BoltStore) Size(bucketName string) (int,error) { +func (b *BoltStore) Size(bucketName string) (int, error) { tx, err := b.conn.Begin(false) if err != nil { return 0, err -- Gitblit v1.8.0