From 33cc2473a3d6ca941717c557f8e72b9904fbdc4f Mon Sep 17 00:00:00 2001
From: zhangzengfei <zhangzengfei@smartai.com>
Date: 星期五, 12 十一月 2021 15:20:58 +0800
Subject: [PATCH] add foreach, size api

---
 store_test.go |   26 ++++++--
 api.go        |   46 +++++++++++---
 bolt_store.go |   77 +++++++++++++++++--------
 3 files changed, 105 insertions(+), 44 deletions(-)

diff --git a/api.go b/api.go
index dc73a4f..8870c6f 100644
--- a/api.go
+++ b/api.go
@@ -1,18 +1,20 @@
 package boltcache
 
+import "strconv"
+
 //娣诲姞涓�鏉℃棩蹇�
 func (ls *LogStore) ApplyLog(logData []byte) {
 	lastLogIndex := ls.getLastLog()
-	ls.applyCh <- &Log {
-		Index: lastLogIndex+1,
-		Data: logData,
+	ls.applyCh <- &Log{
+		Index: lastLogIndex + 1,
+		Data:  logData,
 	}
 	ls.setLastLog(lastLogIndex + 1)
 }
 
 type LogCon struct {
 	conf *confLog
-	Log *Log
+	Log  *Log
 }
 
 //鑾峰彇缂撳瓨鐨勬暟鎹�
@@ -24,21 +26,21 @@
 func (ls *LogStore) Get() *LogCon {
 	idx, _ := ls.store.FirstIndex(confBucket)
 	cLog := &confLog{}
-	if err := ls.store.GetConfLog(idx, cLog);err ==nil {
+	if err := ls.store.GetConfLog(idx, cLog); err == nil {
 		u, _ := ls.store.FirstIndex(cLog.BucketName)
 		log := &Log{}
-		if err = ls.store.GetLog(cLog.BucketName, u, log);err ==nil {
+		if err = ls.store.GetLog(cLog.BucketName, u, log); err == nil {
 			log.Index = u
 			cLog.Index = int(idx)
 			return &LogCon{
-				conf:cLog,
-				Log: log,
+				conf: cLog,
+				Log:  log,
 			}
 		} else {
-			if size,_ := ls.store.Size(confBucket);size >1 {
+			if size, _ := ls.store.Size(confBucket); size > 1 {
 				ls.Delete(&LogCon{
 					conf: cLog,
-					Log: nil,
+					Log:  nil,
 				})
 			}
 			ls.printLog("Get log err:", err)
@@ -49,6 +51,28 @@
 	return nil
 }
 
+// 閬嶅巻鏁版嵁, 杈撳嚭鍘熷鏁版嵁
+func (ls *LogStore) ForEach(f func(v []byte) error) {
+	ls.store.ForEach(f)
+}
+
+func (ls *LogStore) Size() int {
+	start, _ := ls.store.FirstIndex(confBucket)
+	end, _ := ls.store.LastIndex(confBucket)
+
+	totalSize := 0
+
+	for ; start <= end; start++ {
+		bucketName := bucketPre + strconv.Itoa(int(start))
+		bucketSize, err := ls.store.Size(bucketName)
+		if err == nil {
+			totalSize = totalSize + bucketSize
+		}
+	}
+
+	return totalSize
+}
+
 //鎻愪緵缁欏灞備娇鐢紝鍒犻櫎鏃ュ織
 func (ls *LogStore) Delete(lc *LogCon) error {
 	return ls.store.Delete(lc)
@@ -56,4 +80,4 @@
 
 func (ls *LogStore) Close() error {
 	return ls.store.Close()
-}
\ No newline at end of file
+}
diff --git a/bolt_store.go b/bolt_store.go
index 0977102..be490ba 100644
--- a/bolt_store.go
+++ b/bolt_store.go
@@ -3,6 +3,7 @@
 import (
 	"errors"
 	"github.com/boltdb/bolt"
+	"strconv"
 	"time"
 )
 
@@ -17,8 +18,8 @@
 	dbConf = []byte(confBucket)
 
 	// An error indicating a given key does not exist
-	ErrKeyNotFound = errors.New("not found")
-	ErrLogNotFound = errors.New("log not found")
+	ErrKeyNotFound    = errors.New("not found")
+	ErrLogNotFound    = errors.New("log not found")
 	ErrBucketNotFound = errors.New("bucket not found")
 )
 
@@ -115,7 +116,7 @@
 	defer tx.Rollback()
 
 	bucket := tx.Bucket([]byte(bucketName))
-	if bucket ==nil {
+	if bucket == nil {
 		return 0, ErrBucketNotFound
 	}
 
@@ -135,7 +136,7 @@
 	}
 	defer tx.Rollback()
 	bucket := tx.Bucket([]byte(bucketName))
-	if bucket ==nil {
+	if bucket == nil {
 		return 0, ErrBucketNotFound
 	}
 	curs := bucket.Cursor()
@@ -203,7 +204,7 @@
 		}
 		bucket := tx.Bucket([]byte(bucketName))
 		if bucket == nil {
-			bucket,err = tx.CreateBucket([]byte(bucketName))
+			bucket, err = tx.CreateBucket([]byte(bucketName))
 			if err != nil {
 				return err
 			}
@@ -242,8 +243,34 @@
 	return tx.Commit()
 }
 
+func (b *BoltStore) ForEach(f func(v []byte) error) {
+	tx, err := b.conn.Begin(true)
+	if err != nil {
+		return
+	}
+	defer tx.Rollback()
+
+	start, _ := b.FirstIndex(confBucket)
+	end, _ := b.LastIndex(confBucket)
+
+	for ; start <= end; start++ {
+		bucketName := bucketPre + strconv.Itoa(int(start))
+		bucket := tx.Bucket([]byte(bucketName))
+		if bucket == nil {
+			return
+		}
+
+		bucket.ForEach(func(k, v []byte) error {
+			log := &Log{}
+			err := decodeMsgPack(v, log)
+			f(log.Data)
+			return err
+		})
+	}
+}
+
 func (b *BoltStore) Delete(lc *LogCon) error {
-	tx,err := b.conn.Begin(true)
+	tx, err := b.conn.Begin(true)
 	if err != nil {
 		return err
 	}
@@ -252,7 +279,7 @@
 	dc := false
 	if bucket != nil {
 		if lc.Log != nil {
-			if dErr := bucket.Delete(uint64ToBytes(lc.Log.Index));dErr != nil {
+			if dErr := bucket.Delete(uint64ToBytes(lc.Log.Index)); dErr != nil {
 				return dErr
 			}
 		}
@@ -277,7 +304,7 @@
 				return nil
 			})
 			if cbSize > 1 {
-				if dErr := cb.Delete(uint64ToBytes(uint64(lc.conf.Index)));dErr != nil {
+				if dErr := cb.Delete(uint64ToBytes(uint64(lc.conf.Index))); dErr != nil {
 					return dErr
 				}
 			}
@@ -288,7 +315,7 @@
 }
 
 func (b *BoltStore) UpdateTime(bucketName string, t time.Time) error {
-	tx,err := b.conn.Begin(true)
+	tx, err := b.conn.Begin(true)
 	if err != nil {
 		return err
 	}
@@ -302,9 +329,9 @@
 				if cLog.BucketName == bucketName {
 					cLog.UpdateTime = t
 					cLog.Index = int(bytesToUint64(k))
-					val,err := encodeMsgPack(cLog)
-					if err ==nil {
-						if bucket.Put(k, val.Bytes()) ==nil {
+					val, err := encodeMsgPack(cLog)
+					if err == nil {
+						if bucket.Put(k, val.Bytes()) == nil {
 							flag = true
 						}
 					}
@@ -313,7 +340,7 @@
 			return nil
 		})
 		if !flag {
-			return errors.New("conf琛ㄤ腑鏈壘鍒�"+bucketName)
+			return errors.New("conf琛ㄤ腑鏈壘鍒�" + bucketName)
 		}
 	} else {
 		return ErrBucketNotFound
@@ -324,10 +351,10 @@
 
 func (b *BoltStore) Clean(conf *Config) ([]string, []string, error) {
 	type kConfLog struct {
-		k []byte
+		k   []byte
 		log *confLog
 	}
-	tx,err := b.conn.Begin(true)
+	tx, err := b.conn.Begin(true)
 	if err != nil {
 		return nil, nil, err
 	}
@@ -339,10 +366,10 @@
 		var allCLogs []*kConfLog
 		bucket.ForEach(func(k, v []byte) error {
 			cLog := &confLog{}
-			if e :=decodeMsgPack(v, cLog);e ==nil {
+			if e := decodeMsgPack(v, cLog); e == nil {
 				allCLogs = append(allCLogs, &kConfLog{
-					k:k,
-					log:cLog,
+					k:   k,
+					log: cLog,
 				})
 			}
 			return nil
@@ -351,8 +378,8 @@
 		if len(allCLogs) > conf.bucketLimit {
 			arr := allCLogs[:len(allCLogs)-conf.bucketLimit]
 			leftClogs = allCLogs[len(allCLogs)-conf.bucketLimit:]
-			for _,a := range arr {
-				if de := tx.DeleteBucket([]byte(a.log.BucketName));de !=nil {
+			for _, a := range arr {
+				if de := tx.DeleteBucket([]byte(a.log.BucketName)); de != nil {
 					delErrBucketNames = append(delErrBucketNames, a.log.BucketName)
 				} else {
 					bucket.Delete(a.k)
@@ -362,11 +389,11 @@
 		} else {
 			leftClogs = allCLogs
 		}
-		if len(leftClogs) >1 {
+		if len(leftClogs) > 1 {
 			leftClogs = leftClogs[:len(leftClogs)-1]
-			for _,a := range leftClogs {
-				if int(time.Now().Sub(a.log.UpdateTime).Hours()) > conf.keepDays * 24 {
-					if de := tx.DeleteBucket([]byte(a.log.BucketName));de !=nil {
+			for _, a := range leftClogs {
+				if int(time.Now().Sub(a.log.UpdateTime).Hours()) > conf.keepDays*24 {
+					if de := tx.DeleteBucket([]byte(a.log.BucketName)); de != nil {
 						delErrBucketNames = append(delErrBucketNames, a.log.BucketName)
 					} else {
 						bucket.Delete(a.k)
@@ -399,7 +426,7 @@
 	return tx.Commit()
 }
 
-func (b *BoltStore) Size(bucketName string) (int,error) {
+func (b *BoltStore) Size(bucketName string) (int, error) {
 	tx, err := b.conn.Begin(false)
 	if err != nil {
 		return 0, err
diff --git a/store_test.go b/store_test.go
index f1ba8f5..870c869 100644
--- a/store_test.go
+++ b/store_test.go
@@ -16,23 +16,33 @@
 
 	defer ls.Close()
 
-	go consume(ls)
-
-	for i:=0; i<10000; i++{
-		ls.ApplyLog([]byte("hello world "+strconv.Itoa(i)))
-		time.Sleep(time.Second * 1)
+	//go consume(ls)
+	//
+	for i := 0; i < 100; i++ {
+		ls.ApplyLog([]byte("hello world " + strconv.Itoa(i)))
+		time.Sleep(time.Millisecond * 100)
 	}
 
+	ls.printLog("Size = ", ls.Size())
+
+	ls.ForEach(func(v []byte) error {
+		ls.printLog("val=:", string(v))
+		return nil
+	})
+
+	consume(ls)
 }
 
 func consume(ls *LogStore) {
 	for {
 		lc := ls.Get()
 		if lc != nil {
-			ls.printLog(lc.conf.BucketName," send old log:", string(lc.Log.Data))
+			ls.printLog(lc.conf.BucketName, " send old log:", string(lc.Log.Data))
 			ls.Delete(lc)
+		} else {
+			return
 		}
 
-		time.Sleep(10 * time.Second)
+		time.Sleep(10 * time.Millisecond)
 	}
-}
\ No newline at end of file
+}

--
Gitblit v1.8.0