From 33cc2473a3d6ca941717c557f8e72b9904fbdc4f Mon Sep 17 00:00:00 2001
From: zhangzengfei <zhangzengfei@smartai.com>
Date: 星期五, 12 十一月 2021 15:20:58 +0800
Subject: [PATCH] add foreach, size api
---
store_test.go | 26 ++++++--
api.go | 46 +++++++++++---
bolt_store.go | 77 +++++++++++++++++--------
3 files changed, 105 insertions(+), 44 deletions(-)
diff --git a/api.go b/api.go
index dc73a4f..8870c6f 100644
--- a/api.go
+++ b/api.go
@@ -1,18 +1,20 @@
package boltcache
+import "strconv"
+
//娣诲姞涓�鏉℃棩蹇�
func (ls *LogStore) ApplyLog(logData []byte) {
lastLogIndex := ls.getLastLog()
- ls.applyCh <- &Log {
- Index: lastLogIndex+1,
- Data: logData,
+ ls.applyCh <- &Log{
+ Index: lastLogIndex + 1,
+ Data: logData,
}
ls.setLastLog(lastLogIndex + 1)
}
type LogCon struct {
conf *confLog
- Log *Log
+ Log *Log
}
//鑾峰彇缂撳瓨鐨勬暟鎹�
@@ -24,21 +26,21 @@
func (ls *LogStore) Get() *LogCon {
idx, _ := ls.store.FirstIndex(confBucket)
cLog := &confLog{}
- if err := ls.store.GetConfLog(idx, cLog);err ==nil {
+ if err := ls.store.GetConfLog(idx, cLog); err == nil {
u, _ := ls.store.FirstIndex(cLog.BucketName)
log := &Log{}
- if err = ls.store.GetLog(cLog.BucketName, u, log);err ==nil {
+ if err = ls.store.GetLog(cLog.BucketName, u, log); err == nil {
log.Index = u
cLog.Index = int(idx)
return &LogCon{
- conf:cLog,
- Log: log,
+ conf: cLog,
+ Log: log,
}
} else {
- if size,_ := ls.store.Size(confBucket);size >1 {
+ if size, _ := ls.store.Size(confBucket); size > 1 {
ls.Delete(&LogCon{
conf: cLog,
- Log: nil,
+ Log: nil,
})
}
ls.printLog("Get log err:", err)
@@ -49,6 +51,28 @@
return nil
}
+// 閬嶅巻鏁版嵁, 杈撳嚭鍘熷鏁版嵁
+func (ls *LogStore) ForEach(f func(v []byte) error) {
+ ls.store.ForEach(f)
+}
+
+func (ls *LogStore) Size() int {
+ start, _ := ls.store.FirstIndex(confBucket)
+ end, _ := ls.store.LastIndex(confBucket)
+
+ totalSize := 0
+
+ for ; start <= end; start++ {
+ bucketName := bucketPre + strconv.Itoa(int(start))
+ bucketSize, err := ls.store.Size(bucketName)
+ if err == nil {
+ totalSize = totalSize + bucketSize
+ }
+ }
+
+ return totalSize
+}
+
//鎻愪緵缁欏灞備娇鐢紝鍒犻櫎鏃ュ織
func (ls *LogStore) Delete(lc *LogCon) error {
return ls.store.Delete(lc)
@@ -56,4 +80,4 @@
func (ls *LogStore) Close() error {
return ls.store.Close()
-}
\ No newline at end of file
+}
diff --git a/bolt_store.go b/bolt_store.go
index 0977102..be490ba 100644
--- a/bolt_store.go
+++ b/bolt_store.go
@@ -3,6 +3,7 @@
import (
"errors"
"github.com/boltdb/bolt"
+ "strconv"
"time"
)
@@ -17,8 +18,8 @@
dbConf = []byte(confBucket)
// An error indicating a given key does not exist
- ErrKeyNotFound = errors.New("not found")
- ErrLogNotFound = errors.New("log not found")
+ ErrKeyNotFound = errors.New("not found")
+ ErrLogNotFound = errors.New("log not found")
ErrBucketNotFound = errors.New("bucket not found")
)
@@ -115,7 +116,7 @@
defer tx.Rollback()
bucket := tx.Bucket([]byte(bucketName))
- if bucket ==nil {
+ if bucket == nil {
return 0, ErrBucketNotFound
}
@@ -135,7 +136,7 @@
}
defer tx.Rollback()
bucket := tx.Bucket([]byte(bucketName))
- if bucket ==nil {
+ if bucket == nil {
return 0, ErrBucketNotFound
}
curs := bucket.Cursor()
@@ -203,7 +204,7 @@
}
bucket := tx.Bucket([]byte(bucketName))
if bucket == nil {
- bucket,err = tx.CreateBucket([]byte(bucketName))
+ bucket, err = tx.CreateBucket([]byte(bucketName))
if err != nil {
return err
}
@@ -242,8 +243,34 @@
return tx.Commit()
}
+func (b *BoltStore) ForEach(f func(v []byte) error) {
+ tx, err := b.conn.Begin(true)
+ if err != nil {
+ return
+ }
+ defer tx.Rollback()
+
+ start, _ := b.FirstIndex(confBucket)
+ end, _ := b.LastIndex(confBucket)
+
+ for ; start <= end; start++ {
+ bucketName := bucketPre + strconv.Itoa(int(start))
+ bucket := tx.Bucket([]byte(bucketName))
+ if bucket == nil {
+ return
+ }
+
+ bucket.ForEach(func(k, v []byte) error {
+ log := &Log{}
+ err := decodeMsgPack(v, log)
+ f(log.Data)
+ return err
+ })
+ }
+}
+
func (b *BoltStore) Delete(lc *LogCon) error {
- tx,err := b.conn.Begin(true)
+ tx, err := b.conn.Begin(true)
if err != nil {
return err
}
@@ -252,7 +279,7 @@
dc := false
if bucket != nil {
if lc.Log != nil {
- if dErr := bucket.Delete(uint64ToBytes(lc.Log.Index));dErr != nil {
+ if dErr := bucket.Delete(uint64ToBytes(lc.Log.Index)); dErr != nil {
return dErr
}
}
@@ -277,7 +304,7 @@
return nil
})
if cbSize > 1 {
- if dErr := cb.Delete(uint64ToBytes(uint64(lc.conf.Index)));dErr != nil {
+ if dErr := cb.Delete(uint64ToBytes(uint64(lc.conf.Index))); dErr != nil {
return dErr
}
}
@@ -288,7 +315,7 @@
}
func (b *BoltStore) UpdateTime(bucketName string, t time.Time) error {
- tx,err := b.conn.Begin(true)
+ tx, err := b.conn.Begin(true)
if err != nil {
return err
}
@@ -302,9 +329,9 @@
if cLog.BucketName == bucketName {
cLog.UpdateTime = t
cLog.Index = int(bytesToUint64(k))
- val,err := encodeMsgPack(cLog)
- if err ==nil {
- if bucket.Put(k, val.Bytes()) ==nil {
+ val, err := encodeMsgPack(cLog)
+ if err == nil {
+ if bucket.Put(k, val.Bytes()) == nil {
flag = true
}
}
@@ -313,7 +340,7 @@
return nil
})
if !flag {
- return errors.New("conf琛ㄤ腑鏈壘鍒�"+bucketName)
+ return errors.New("conf琛ㄤ腑鏈壘鍒�" + bucketName)
}
} else {
return ErrBucketNotFound
@@ -324,10 +351,10 @@
func (b *BoltStore) Clean(conf *Config) ([]string, []string, error) {
type kConfLog struct {
- k []byte
+ k []byte
log *confLog
}
- tx,err := b.conn.Begin(true)
+ tx, err := b.conn.Begin(true)
if err != nil {
return nil, nil, err
}
@@ -339,10 +366,10 @@
var allCLogs []*kConfLog
bucket.ForEach(func(k, v []byte) error {
cLog := &confLog{}
- if e :=decodeMsgPack(v, cLog);e ==nil {
+ if e := decodeMsgPack(v, cLog); e == nil {
allCLogs = append(allCLogs, &kConfLog{
- k:k,
- log:cLog,
+ k: k,
+ log: cLog,
})
}
return nil
@@ -351,8 +378,8 @@
if len(allCLogs) > conf.bucketLimit {
arr := allCLogs[:len(allCLogs)-conf.bucketLimit]
leftClogs = allCLogs[len(allCLogs)-conf.bucketLimit:]
- for _,a := range arr {
- if de := tx.DeleteBucket([]byte(a.log.BucketName));de !=nil {
+ for _, a := range arr {
+ if de := tx.DeleteBucket([]byte(a.log.BucketName)); de != nil {
delErrBucketNames = append(delErrBucketNames, a.log.BucketName)
} else {
bucket.Delete(a.k)
@@ -362,11 +389,11 @@
} else {
leftClogs = allCLogs
}
- if len(leftClogs) >1 {
+ if len(leftClogs) > 1 {
leftClogs = leftClogs[:len(leftClogs)-1]
- for _,a := range leftClogs {
- if int(time.Now().Sub(a.log.UpdateTime).Hours()) > conf.keepDays * 24 {
- if de := tx.DeleteBucket([]byte(a.log.BucketName));de !=nil {
+ for _, a := range leftClogs {
+ if int(time.Now().Sub(a.log.UpdateTime).Hours()) > conf.keepDays*24 {
+ if de := tx.DeleteBucket([]byte(a.log.BucketName)); de != nil {
delErrBucketNames = append(delErrBucketNames, a.log.BucketName)
} else {
bucket.Delete(a.k)
@@ -399,7 +426,7 @@
return tx.Commit()
}
-func (b *BoltStore) Size(bucketName string) (int,error) {
+func (b *BoltStore) Size(bucketName string) (int, error) {
tx, err := b.conn.Begin(false)
if err != nil {
return 0, err
diff --git a/store_test.go b/store_test.go
index f1ba8f5..870c869 100644
--- a/store_test.go
+++ b/store_test.go
@@ -16,23 +16,33 @@
defer ls.Close()
- go consume(ls)
-
- for i:=0; i<10000; i++{
- ls.ApplyLog([]byte("hello world "+strconv.Itoa(i)))
- time.Sleep(time.Second * 1)
+ //go consume(ls)
+ //
+ for i := 0; i < 100; i++ {
+ ls.ApplyLog([]byte("hello world " + strconv.Itoa(i)))
+ time.Sleep(time.Millisecond * 100)
}
+ ls.printLog("Size = ", ls.Size())
+
+ ls.ForEach(func(v []byte) error {
+ ls.printLog("val=:", string(v))
+ return nil
+ })
+
+ consume(ls)
}
func consume(ls *LogStore) {
for {
lc := ls.Get()
if lc != nil {
- ls.printLog(lc.conf.BucketName," send old log:", string(lc.Log.Data))
+ ls.printLog(lc.conf.BucketName, " send old log:", string(lc.Log.Data))
ls.Delete(lc)
+ } else {
+ return
}
- time.Sleep(10 * time.Second)
+ time.Sleep(10 * time.Millisecond)
}
-}
\ No newline at end of file
+}
--
Gitblit v1.8.0