From 33cc2473a3d6ca941717c557f8e72b9904fbdc4f Mon Sep 17 00:00:00 2001 From: zhangzengfei <zhangzengfei@smartai.com> Date: 星期五, 12 十一月 2021 15:20:58 +0800 Subject: [PATCH] add foreach, size api --- store_test.go | 26 ++++++-- api.go | 46 +++++++++++--- bolt_store.go | 77 +++++++++++++++++-------- 3 files changed, 105 insertions(+), 44 deletions(-) diff --git a/api.go b/api.go index dc73a4f..8870c6f 100644 --- a/api.go +++ b/api.go @@ -1,18 +1,20 @@ package boltcache +import "strconv" + //娣诲姞涓�鏉℃棩蹇� func (ls *LogStore) ApplyLog(logData []byte) { lastLogIndex := ls.getLastLog() - ls.applyCh <- &Log { - Index: lastLogIndex+1, - Data: logData, + ls.applyCh <- &Log{ + Index: lastLogIndex + 1, + Data: logData, } ls.setLastLog(lastLogIndex + 1) } type LogCon struct { conf *confLog - Log *Log + Log *Log } //鑾峰彇缂撳瓨鐨勬暟鎹� @@ -24,21 +26,21 @@ func (ls *LogStore) Get() *LogCon { idx, _ := ls.store.FirstIndex(confBucket) cLog := &confLog{} - if err := ls.store.GetConfLog(idx, cLog);err ==nil { + if err := ls.store.GetConfLog(idx, cLog); err == nil { u, _ := ls.store.FirstIndex(cLog.BucketName) log := &Log{} - if err = ls.store.GetLog(cLog.BucketName, u, log);err ==nil { + if err = ls.store.GetLog(cLog.BucketName, u, log); err == nil { log.Index = u cLog.Index = int(idx) return &LogCon{ - conf:cLog, - Log: log, + conf: cLog, + Log: log, } } else { - if size,_ := ls.store.Size(confBucket);size >1 { + if size, _ := ls.store.Size(confBucket); size > 1 { ls.Delete(&LogCon{ conf: cLog, - Log: nil, + Log: nil, }) } ls.printLog("Get log err:", err) @@ -49,6 +51,28 @@ return nil } +// 閬嶅巻鏁版嵁, 杈撳嚭鍘熷鏁版嵁 +func (ls *LogStore) ForEach(f func(v []byte) error) { + ls.store.ForEach(f) +} + +func (ls *LogStore) Size() int { + start, _ := ls.store.FirstIndex(confBucket) + end, _ := ls.store.LastIndex(confBucket) + + totalSize := 0 + + for ; start <= end; start++ { + bucketName := bucketPre + strconv.Itoa(int(start)) + bucketSize, err := ls.store.Size(bucketName) + if err == nil { + totalSize = totalSize + bucketSize + } + } + + return totalSize +} + //鎻愪緵缁欏灞備娇鐢紝鍒犻櫎鏃ュ織 func (ls *LogStore) Delete(lc *LogCon) error { return ls.store.Delete(lc) @@ -56,4 +80,4 @@ func (ls *LogStore) Close() error { return ls.store.Close() -} \ No newline at end of file +} diff --git a/bolt_store.go b/bolt_store.go index 0977102..be490ba 100644 --- a/bolt_store.go +++ b/bolt_store.go @@ -3,6 +3,7 @@ import ( "errors" "github.com/boltdb/bolt" + "strconv" "time" ) @@ -17,8 +18,8 @@ dbConf = []byte(confBucket) // An error indicating a given key does not exist - ErrKeyNotFound = errors.New("not found") - ErrLogNotFound = errors.New("log not found") + ErrKeyNotFound = errors.New("not found") + ErrLogNotFound = errors.New("log not found") ErrBucketNotFound = errors.New("bucket not found") ) @@ -115,7 +116,7 @@ defer tx.Rollback() bucket := tx.Bucket([]byte(bucketName)) - if bucket ==nil { + if bucket == nil { return 0, ErrBucketNotFound } @@ -135,7 +136,7 @@ } defer tx.Rollback() bucket := tx.Bucket([]byte(bucketName)) - if bucket ==nil { + if bucket == nil { return 0, ErrBucketNotFound } curs := bucket.Cursor() @@ -203,7 +204,7 @@ } bucket := tx.Bucket([]byte(bucketName)) if bucket == nil { - bucket,err = tx.CreateBucket([]byte(bucketName)) + bucket, err = tx.CreateBucket([]byte(bucketName)) if err != nil { return err } @@ -242,8 +243,34 @@ return tx.Commit() } +func (b *BoltStore) ForEach(f func(v []byte) error) { + tx, err := b.conn.Begin(true) + if err != nil { + return + } + defer tx.Rollback() + + start, _ := b.FirstIndex(confBucket) + end, _ := b.LastIndex(confBucket) + + for ; start <= end; start++ { + bucketName := bucketPre + strconv.Itoa(int(start)) + bucket := tx.Bucket([]byte(bucketName)) + if bucket == nil { + return + } + + bucket.ForEach(func(k, v []byte) error { + log := &Log{} + err := decodeMsgPack(v, log) + f(log.Data) + return err + }) + } +} + func (b *BoltStore) Delete(lc *LogCon) error { - tx,err := b.conn.Begin(true) + tx, err := b.conn.Begin(true) if err != nil { return err } @@ -252,7 +279,7 @@ dc := false if bucket != nil { if lc.Log != nil { - if dErr := bucket.Delete(uint64ToBytes(lc.Log.Index));dErr != nil { + if dErr := bucket.Delete(uint64ToBytes(lc.Log.Index)); dErr != nil { return dErr } } @@ -277,7 +304,7 @@ return nil }) if cbSize > 1 { - if dErr := cb.Delete(uint64ToBytes(uint64(lc.conf.Index)));dErr != nil { + if dErr := cb.Delete(uint64ToBytes(uint64(lc.conf.Index))); dErr != nil { return dErr } } @@ -288,7 +315,7 @@ } func (b *BoltStore) UpdateTime(bucketName string, t time.Time) error { - tx,err := b.conn.Begin(true) + tx, err := b.conn.Begin(true) if err != nil { return err } @@ -302,9 +329,9 @@ if cLog.BucketName == bucketName { cLog.UpdateTime = t cLog.Index = int(bytesToUint64(k)) - val,err := encodeMsgPack(cLog) - if err ==nil { - if bucket.Put(k, val.Bytes()) ==nil { + val, err := encodeMsgPack(cLog) + if err == nil { + if bucket.Put(k, val.Bytes()) == nil { flag = true } } @@ -313,7 +340,7 @@ return nil }) if !flag { - return errors.New("conf琛ㄤ腑鏈壘鍒�"+bucketName) + return errors.New("conf琛ㄤ腑鏈壘鍒�" + bucketName) } } else { return ErrBucketNotFound @@ -324,10 +351,10 @@ func (b *BoltStore) Clean(conf *Config) ([]string, []string, error) { type kConfLog struct { - k []byte + k []byte log *confLog } - tx,err := b.conn.Begin(true) + tx, err := b.conn.Begin(true) if err != nil { return nil, nil, err } @@ -339,10 +366,10 @@ var allCLogs []*kConfLog bucket.ForEach(func(k, v []byte) error { cLog := &confLog{} - if e :=decodeMsgPack(v, cLog);e ==nil { + if e := decodeMsgPack(v, cLog); e == nil { allCLogs = append(allCLogs, &kConfLog{ - k:k, - log:cLog, + k: k, + log: cLog, }) } return nil @@ -351,8 +378,8 @@ if len(allCLogs) > conf.bucketLimit { arr := allCLogs[:len(allCLogs)-conf.bucketLimit] leftClogs = allCLogs[len(allCLogs)-conf.bucketLimit:] - for _,a := range arr { - if de := tx.DeleteBucket([]byte(a.log.BucketName));de !=nil { + for _, a := range arr { + if de := tx.DeleteBucket([]byte(a.log.BucketName)); de != nil { delErrBucketNames = append(delErrBucketNames, a.log.BucketName) } else { bucket.Delete(a.k) @@ -362,11 +389,11 @@ } else { leftClogs = allCLogs } - if len(leftClogs) >1 { + if len(leftClogs) > 1 { leftClogs = leftClogs[:len(leftClogs)-1] - for _,a := range leftClogs { - if int(time.Now().Sub(a.log.UpdateTime).Hours()) > conf.keepDays * 24 { - if de := tx.DeleteBucket([]byte(a.log.BucketName));de !=nil { + for _, a := range leftClogs { + if int(time.Now().Sub(a.log.UpdateTime).Hours()) > conf.keepDays*24 { + if de := tx.DeleteBucket([]byte(a.log.BucketName)); de != nil { delErrBucketNames = append(delErrBucketNames, a.log.BucketName) } else { bucket.Delete(a.k) @@ -399,7 +426,7 @@ return tx.Commit() } -func (b *BoltStore) Size(bucketName string) (int,error) { +func (b *BoltStore) Size(bucketName string) (int, error) { tx, err := b.conn.Begin(false) if err != nil { return 0, err diff --git a/store_test.go b/store_test.go index f1ba8f5..870c869 100644 --- a/store_test.go +++ b/store_test.go @@ -16,23 +16,33 @@ defer ls.Close() - go consume(ls) - - for i:=0; i<10000; i++{ - ls.ApplyLog([]byte("hello world "+strconv.Itoa(i))) - time.Sleep(time.Second * 1) + //go consume(ls) + // + for i := 0; i < 100; i++ { + ls.ApplyLog([]byte("hello world " + strconv.Itoa(i))) + time.Sleep(time.Millisecond * 100) } + ls.printLog("Size = ", ls.Size()) + + ls.ForEach(func(v []byte) error { + ls.printLog("val=:", string(v)) + return nil + }) + + consume(ls) } func consume(ls *LogStore) { for { lc := ls.Get() if lc != nil { - ls.printLog(lc.conf.BucketName," send old log:", string(lc.Log.Data)) + ls.printLog(lc.conf.BucketName, " send old log:", string(lc.Log.Data)) ls.Delete(lc) + } else { + return } - time.Sleep(10 * time.Second) + time.Sleep(10 * time.Millisecond) } -} \ No newline at end of file +} -- Gitblit v1.8.0