From 15c091bccbe9ee6e5e72dc93d413a2b67c13579c Mon Sep 17 00:00:00 2001
From: liuxiaolong <liuxiaolong@aiotlink.com>
Date: 星期三, 18 十一月 2020 14:32:20 +0800
Subject: [PATCH] first push

---
 util.go       |   37 ++
 go.sum        |   10 
 state.go      |   21 +
 config.go     |   41 ++
 store_test.go |   38 ++
 go.mod        |    8 
 api.go        |   59 ++++
 store.go      |  149 ++++++++++
 bolt_store.go |  457 ++++++++++++++++++++++++++++++++
 9 files changed, 820 insertions(+), 0 deletions(-)

diff --git a/api.go b/api.go
new file mode 100644
index 0000000..dc73a4f
--- /dev/null
+++ b/api.go
@@ -0,0 +1,59 @@
+package boltcache
+
+//娣诲姞涓�鏉℃棩蹇�
+func (ls *LogStore) ApplyLog(logData []byte) {
+	lastLogIndex := ls.getLastLog()
+	ls.applyCh <- &Log {
+		Index: lastLogIndex+1,
+		Data: logData,
+	}
+	ls.setLastLog(lastLogIndex + 1)
+}
+
+type LogCon struct {
+	conf *confLog
+	Log *Log
+}
+
+//鑾峰彇缂撳瓨鐨勬暟鎹�
+func (lc *LogCon) GetData() []byte {
+	return lc.Log.Data
+}
+
+//鎻愪緵缁欏灞備娇鐢紝鎸夐『搴忚幏鍙栫紦瀛樼殑鏃ュ織
+func (ls *LogStore) Get() *LogCon {
+	idx, _ := ls.store.FirstIndex(confBucket)
+	cLog := &confLog{}
+	if err := ls.store.GetConfLog(idx, cLog);err ==nil {
+		u, _ := ls.store.FirstIndex(cLog.BucketName)
+		log := &Log{}
+		if err = ls.store.GetLog(cLog.BucketName, u, log);err ==nil {
+			log.Index = u
+			cLog.Index = int(idx)
+			return &LogCon{
+				conf:cLog,
+				Log: log,
+			}
+		} else {
+			if size,_ := ls.store.Size(confBucket);size >1 {
+				ls.Delete(&LogCon{
+					conf: cLog,
+					Log: nil,
+				})
+			}
+			ls.printLog("Get log err:", err)
+		}
+	} else {
+		ls.printLog("Get conf idx:", idx, "err:", err)
+	}
+	return nil
+}
+
+//鎻愪緵缁欏灞備娇鐢紝鍒犻櫎鏃ュ織
+func (ls *LogStore) Delete(lc *LogCon) error {
+	return ls.store.Delete(lc)
+}
+
+func (ls *LogStore) Close() error {
+	return ls.store.Close()
+}
\ No newline at end of file
diff --git a/bolt_store.go b/bolt_store.go
new file mode 100644
index 0000000..0977102
--- /dev/null
+++ b/bolt_store.go
@@ -0,0 +1,457 @@
+package boltcache
+
+import (
+	"errors"
+	"github.com/boltdb/bolt"
+	"time"
+)
+
+const (
+	// Permissions to use on the db file. This is only used if the
+	// database file does not exist and needs to be created.
+	dbFileMode = 0600
+)
+
+var (
+	// Bucket names we perform transactions in
+	dbConf = []byte(confBucket)
+
+	// An error indicating a given key does not exist
+	ErrKeyNotFound = errors.New("not found")
+	ErrLogNotFound = errors.New("log not found")
+	ErrBucketNotFound = errors.New("bucket not found")
+)
+
+// BoltStore provides access to BoltDB for Raft to store and retrieve
+// log entries. It also provides key/value storage, and can be used as
+// a LogStore and StableStore.
+type BoltStore struct {
+	// conn is the underlying handle to the db.
+	conn *bolt.DB
+
+	// The path to the Bolt database file
+	path string
+}
+
+// Options contains all the configuration used to open the BoltDB
+type Options struct {
+	// Path is the file path to the BoltDB to use
+	Path string
+
+	// BoltOptions contains any specific BoltDB options you might
+	// want to specify [e.g. open timeout]
+	BoltOptions *bolt.Options
+
+	// NoSync causes the database to skip fsync calls after each
+	// write to the log. This is unsafe, so it should be used
+	// with caution.
+	NoSync bool
+}
+
+// readOnly returns true if the contained bolt options say to open
+// the DB in readOnly mode [this can be useful to tools that want
+// to examine the log]
+func (o *Options) readOnly() bool {
+	return o != nil && o.BoltOptions != nil && o.BoltOptions.ReadOnly
+}
+
+// NewBoltStore takes a file path and returns a connected Raft backend.
+func NewBoltStore(path string) (*BoltStore, error) {
+	return New(Options{Path: path})
+}
+
+// New uses the supplied options to open the BoltDB and prepare it for use as a raft backend.
+func New(options Options) (*BoltStore, error) {
+	// Try to connect
+	handle, err := bolt.Open(options.Path, dbFileMode, options.BoltOptions)
+	if err != nil {
+		return nil, err
+	}
+	handle.NoSync = options.NoSync
+
+	// Create the new store
+	store := &BoltStore{
+		conn: handle,
+		path: options.Path,
+	}
+
+	// If the store was opened read-only, don't try and create buckets
+	if !options.readOnly() {
+		// Set up our buckets
+		if err := store.initialize(); err != nil {
+			store.Close()
+			return nil, err
+		}
+	}
+	return store, nil
+}
+
+// initialize is used to set up all of the buckets.
+func (b *BoltStore) initialize() error {
+	tx, err := b.conn.Begin(true)
+	if err != nil {
+		return err
+	}
+	defer tx.Rollback()
+
+	if _, err := tx.CreateBucketIfNotExists(dbConf); err != nil {
+		return err
+	}
+
+	return tx.Commit()
+}
+
+// Close is used to gracefully close the DB connection.
+func (b *BoltStore) Close() error {
+	return b.conn.Close()
+}
+
+// FirstIndex returns the first known index from the Raft log.
+func (b *BoltStore) FirstIndex(bucketName string) (uint64, error) {
+	tx, err := b.conn.Begin(false)
+	if err != nil {
+		return 0, err
+	}
+	defer tx.Rollback()
+
+	bucket := tx.Bucket([]byte(bucketName))
+	if bucket ==nil {
+		return 0, ErrBucketNotFound
+	}
+
+	curs := bucket.Cursor()
+	if first, _ := curs.First(); first == nil {
+		return 0, nil
+	} else {
+		return bytesToUint64(first), nil
+	}
+}
+
+// LastIndex returns the last known index from the Raft log.
+func (b *BoltStore) LastIndex(bucketName string) (uint64, error) {
+	tx, err := b.conn.Begin(false)
+	if err != nil {
+		return 0, err
+	}
+	defer tx.Rollback()
+	bucket := tx.Bucket([]byte(bucketName))
+	if bucket ==nil {
+		return 0, ErrBucketNotFound
+	}
+	curs := bucket.Cursor()
+	if last, _ := curs.Last(); last == nil {
+		return 0, nil
+	} else {
+		return bytesToUint64(last), nil
+	}
+}
+
+// GetLog is used to retrieve a log from BoltDB at a given index.
+func (b *BoltStore) GetLog(bucketName string, idx uint64, log *Log) error {
+	tx, err := b.conn.Begin(false)
+	if err != nil {
+		return err
+	}
+	defer tx.Rollback()
+
+	bucket := tx.Bucket([]byte(bucketName))
+	if bucket == nil {
+		return ErrBucketNotFound
+	}
+	val := bucket.Get(uint64ToBytes(idx))
+
+	if val == nil {
+		return ErrLogNotFound
+	}
+	return decodeMsgPack(val, log)
+}
+
+func (b *BoltStore) GetConfLog(idx uint64, clog *confLog) error {
+	tx, err := b.conn.Begin(false)
+	if err != nil {
+		return err
+	}
+	defer tx.Rollback()
+
+	bucket := tx.Bucket([]byte(confBucket))
+	val := bucket.Get(uint64ToBytes(idx))
+
+	if val == nil {
+		return ErrLogNotFound
+	}
+	return decodeMsgPack(val, clog)
+}
+
+// StoreLog is used to store a single raft log
+func (b *BoltStore) StoreLog(bucketName string, log *Log) error {
+	return b.StoreLogs(bucketName, []*Log{log})
+}
+
+// StoreLogs is used to store a set of raft logs
+func (b *BoltStore) StoreLogs(bucketName string, logs []*Log) error {
+	tx, err := b.conn.Begin(true)
+	if err != nil {
+		return err
+	}
+	defer tx.Rollback()
+
+	for _, log := range logs {
+		key := uint64ToBytes(log.Index)
+		val, err := encodeMsgPack(log)
+		if err != nil {
+			return err
+		}
+		bucket := tx.Bucket([]byte(bucketName))
+		if bucket == nil {
+			bucket,err = tx.CreateBucket([]byte(bucketName))
+			if err != nil {
+				return err
+			}
+		}
+		if err := bucket.Put(key, val.Bytes()); err != nil {
+			return err
+		}
+	}
+
+	return tx.Commit()
+}
+
+// DeleteRange is used to delete logs within a given range inclusively.
+func (b *BoltStore) DeleteRange(bucketName string, min, max uint64) error {
+	minKey := uint64ToBytes(min)
+
+	tx, err := b.conn.Begin(true)
+	if err != nil {
+		return err
+	}
+	defer tx.Rollback()
+
+	curs := tx.Bucket([]byte(bucketName)).Cursor()
+	for k, _ := curs.Seek(minKey); k != nil; k, _ = curs.Next() {
+		// Handle out-of-range log index
+		if bytesToUint64(k) > max {
+			break
+		}
+
+		// Delete in-range log index
+		if err := curs.Delete(); err != nil {
+			return err
+		}
+	}
+
+	return tx.Commit()
+}
+
+func (b *BoltStore) Delete(lc *LogCon) error {
+	tx,err := b.conn.Begin(true)
+	if err != nil {
+		return err
+	}
+	defer tx.Rollback()
+	bucket := tx.Bucket([]byte(lc.conf.BucketName))
+	dc := false
+	if bucket != nil {
+		if lc.Log != nil {
+			if dErr := bucket.Delete(uint64ToBytes(lc.Log.Index));dErr != nil {
+				return dErr
+			}
+		}
+
+		size := 0
+		bucket.ForEach(func(k, v []byte) error {
+			size++
+			return nil
+		})
+		if size == 0 {
+			dc = true
+		}
+	} else {
+		dc = true
+	}
+	if dc {
+		cb := tx.Bucket([]byte(confBucket))
+		if cb != nil {
+			cbSize := 0
+			cb.ForEach(func(k, v []byte) error {
+				cbSize++
+				return nil
+			})
+			if cbSize > 1 {
+				if dErr := cb.Delete(uint64ToBytes(uint64(lc.conf.Index)));dErr != nil {
+					return dErr
+				}
+			}
+		}
+	}
+	tx.Commit()
+	return nil
+}
+
+func (b *BoltStore) UpdateTime(bucketName string, t time.Time) error {
+	tx,err := b.conn.Begin(true)
+	if err != nil {
+		return err
+	}
+	defer tx.Rollback()
+	bucket := tx.Bucket([]byte(confBucket))
+	if bucket != nil {
+		flag := false
+		bucket.ForEach(func(k, v []byte) error {
+			cLog := &confLog{}
+			if de := decodeMsgPack(v, cLog); de == nil {
+				if cLog.BucketName == bucketName {
+					cLog.UpdateTime = t
+					cLog.Index = int(bytesToUint64(k))
+					val,err := encodeMsgPack(cLog)
+					if err ==nil {
+						if bucket.Put(k, val.Bytes()) ==nil {
+							flag = true
+						}
+					}
+				}
+			}
+			return nil
+		})
+		if !flag {
+			return errors.New("conf琛ㄤ腑鏈壘鍒�"+bucketName)
+		}
+	} else {
+		return ErrBucketNotFound
+	}
+	tx.Commit()
+	return nil
+}
+
+func (b *BoltStore) Clean(conf *Config) ([]string, []string, error) {
+	type kConfLog struct {
+		k []byte
+		log *confLog
+	}
+	tx,err := b.conn.Begin(true)
+	if err != nil {
+		return nil, nil, err
+	}
+	defer tx.Rollback()
+	bucket := tx.Bucket([]byte(confBucket))
+	var delBucketNames []string
+	var delErrBucketNames []string
+	if bucket != nil {
+		var allCLogs []*kConfLog
+		bucket.ForEach(func(k, v []byte) error {
+			cLog := &confLog{}
+			if e :=decodeMsgPack(v, cLog);e ==nil {
+				allCLogs = append(allCLogs, &kConfLog{
+					k:k,
+					log:cLog,
+				})
+			}
+			return nil
+		})
+		var leftClogs []*kConfLog
+		if len(allCLogs) > conf.bucketLimit {
+			arr := allCLogs[:len(allCLogs)-conf.bucketLimit]
+			leftClogs = allCLogs[len(allCLogs)-conf.bucketLimit:]
+			for _,a := range arr {
+				if de := tx.DeleteBucket([]byte(a.log.BucketName));de !=nil {
+					delErrBucketNames = append(delErrBucketNames, a.log.BucketName)
+				} else {
+					bucket.Delete(a.k)
+					delBucketNames = append(delBucketNames, a.log.BucketName)
+				}
+			}
+		} else {
+			leftClogs = allCLogs
+		}
+		if len(leftClogs) >1 {
+			leftClogs = leftClogs[:len(leftClogs)-1]
+			for _,a := range leftClogs {
+				if int(time.Now().Sub(a.log.UpdateTime).Hours()) > conf.keepDays * 24 {
+					if de := tx.DeleteBucket([]byte(a.log.BucketName));de !=nil {
+						delErrBucketNames = append(delErrBucketNames, a.log.BucketName)
+					} else {
+						bucket.Delete(a.k)
+						delBucketNames = append(delBucketNames, a.log.BucketName)
+					}
+				}
+			}
+		}
+
+	} else {
+		return nil, nil, errors.New("bucket not found")
+	}
+	tx.Commit()
+	return delBucketNames, delErrBucketNames, nil
+}
+
+// Set is used to set a key/value set outside of the raft log
+func (b *BoltStore) Set(k, v []byte) error {
+	tx, err := b.conn.Begin(true)
+	if err != nil {
+		return err
+	}
+	defer tx.Rollback()
+
+	bucket := tx.Bucket(dbConf)
+	if err := bucket.Put(k, v); err != nil {
+		return err
+	}
+
+	return tx.Commit()
+}
+
+func (b *BoltStore) Size(bucketName string) (int,error) {
+	tx, err := b.conn.Begin(false)
+	if err != nil {
+		return 0, err
+	}
+	defer tx.Rollback()
+
+	bucket := tx.Bucket([]byte(bucketName))
+	if bucket == nil {
+		return 0, ErrBucketNotFound
+	}
+	size := 0
+	bucket.ForEach(func(k, v []byte) error {
+		size++
+		return nil
+	})
+	return size, nil
+}
+
+// Get is used to retrieve a value from the k/v store by key
+func (b *BoltStore) Get(k []byte) ([]byte, error) {
+	tx, err := b.conn.Begin(false)
+	if err != nil {
+		return nil, err
+	}
+	defer tx.Rollback()
+
+	bucket := tx.Bucket(dbConf)
+	val := bucket.Get(k)
+
+	if val == nil {
+		return nil, ErrKeyNotFound
+	}
+	return append([]byte(nil), val...), nil
+}
+
+// SetUint64 is like Set, but handles uint64 values
+func (b *BoltStore) SetUint64(key []byte, val uint64) error {
+	return b.Set(key, uint64ToBytes(val))
+}
+
+// GetUint64 is like Get, but handles uint64 values
+func (b *BoltStore) GetUint64(key []byte) (uint64, error) {
+	val, err := b.Get(key)
+	if err != nil {
+		return 0, err
+	}
+	return bytesToUint64(val), nil
+}
+
+// Sync performs an fsync on the database file handle. This is not necessary
+// under normal operation unless NoSync is enabled, in which this forces the
+// database file to sync against the disk.
+func (b *BoltStore) Sync() error {
+	return b.conn.Sync()
+}
diff --git a/config.go b/config.go
new file mode 100644
index 0000000..5315fe8
--- /dev/null
+++ b/config.go
@@ -0,0 +1,41 @@
+package boltcache
+
+import (
+	"fmt"
+	"time"
+)
+
+type Config struct {
+	path        		string
+	capacity    		int 			//鍗曚釜妗舵渶澶氭斁澶氬皯鏉℃暟鎹�
+	bucketLimit 		int 			//淇濈暀鐨刡ucket鐨勬渶澶т釜鏁�
+	keepDays    		int 			//鏈�澶氫繚鐣欏灏戝ぉ鐨勬暟鎹�
+
+	cleanDuration 		time.Duration   //娓呯悊鍛ㄦ湡
+
+	logPrint func(v ...interface{})
+}
+
+func NewDefaultConfig() *Config {
+	return &Config{
+		path:          "./push.db",
+		capacity:      50,
+		bucketLimit:   10,
+		keepDays:      1,
+		cleanDuration: 10 * time.Second,
+		logPrint: func(v ...interface{}) {
+			fmt.Println(v...)
+		},
+	}
+}
+
+func NewConfig(path string, capacity int, bucketLimit int, keepDays int,logPrint func(v ...interface{})) *Config {
+	return &Config{
+		path:          path,
+		capacity:      capacity,
+		bucketLimit:   bucketLimit,
+		keepDays:      keepDays,
+		cleanDuration: time.Second * 10,
+		logPrint:      logPrint,
+	}
+}
\ No newline at end of file
diff --git a/go.mod b/go.mod
new file mode 100644
index 0000000..44d8645
--- /dev/null
+++ b/go.mod
@@ -0,0 +1,8 @@
+module boltcache
+
+go 1.14
+
+require (
+	github.com/boltdb/bolt v1.3.1
+	github.com/hashicorp/go-msgpack v1.1.5
+)
diff --git a/go.sum b/go.sum
new file mode 100644
index 0000000..01fb5c7
--- /dev/null
+++ b/go.sum
@@ -0,0 +1,10 @@
+github.com/boltdb/bolt v1.3.1 h1:JQmyP4ZBrce+ZQu0dY660FMfatumYDLun9hBCUVIkF4=
+github.com/boltdb/bolt v1.3.1/go.mod h1:clJnj/oiGkjum5o1McbSZDSLxVThjynRyGBgiAx27Ps=
+github.com/hashicorp/go-msgpack v1.1.5 h1:9byZdVjKTe5mce63pRVNP1L7UAmdHOTEMGehn6KvJWs=
+github.com/hashicorp/go-msgpack v1.1.5/go.mod h1:gWVc3sv/wbDmR3rQsj1CAktEZzoz1YNK9NfGLXJ69/4=
+golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
+golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
+golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
+golang.org/x/tools v0.0.0-20190424220101-1e8e1cfdf96b/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q=
diff --git a/state.go b/state.go
new file mode 100644
index 0000000..ed4bb8f
--- /dev/null
+++ b/state.go
@@ -0,0 +1,21 @@
+package boltcache
+
+import "sync"
+
+type storeState struct {
+	lastLock sync.Mutex
+	lastLogIndex   uint64
+}
+
+func (s *storeState) getLastLog() (index uint64) {
+	s.lastLock.Lock()
+	index = s.lastLogIndex
+	s.lastLock.Unlock()
+	return
+}
+
+func (s *storeState) setLastLog(index uint64) {
+	s.lastLock.Lock()
+	s.lastLogIndex = index
+	s.lastLock.Unlock()
+}
diff --git a/store.go b/store.go
new file mode 100644
index 0000000..e0545f3
--- /dev/null
+++ b/store.go
@@ -0,0 +1,149 @@
+package boltcache
+
+import (
+	"strconv"
+	"strings"
+	"time"
+)
+
+const (
+	bucketPre = "bucket-"
+	confBucket = "conf"
+)
+type Log struct {
+	Index 		uint64
+	Data 		[]byte
+}
+
+type confLog struct {
+	Index      		int       	//index
+	UpdateTime 		time.Time 	//鍒涘缓鏃堕棿
+	BucketName 		string    	//鍒嗘《鍚嶇О
+}
+
+type LogStore struct {
+	storeState
+	store          *BoltStore
+	conf           *Config
+	applyCh        chan *Log
+	curBucketName  string //褰撳墠瀛樺偍鐨刡ucket
+	curBucketIndex uint64 //bucket鍦╟onf琛ㄤ腑鐨刬ndex
+}
+
+func (ls *LogStore) printLog(v ...interface{}) {
+	if ls.conf.logPrint != nil {
+		ls.conf.logPrint(v...)
+	}
+}
+
+func Init(conf *Config) (*LogStore, error) {
+	store, err := NewBoltStore(conf.path)
+	if err != nil {
+		return nil, err
+	}
+	ls := &LogStore{
+		store: store,
+		conf: conf,
+		applyCh: make(chan *Log, 1024),
+	}
+	ls.setCurBucketAndIdx()
+
+	go ls.dealLogs()
+	go ls.clean()
+	return ls, nil
+}
+
+//鍒濆鍖栧綋鍓嶈〃浠ュ強index
+func (ls *LogStore) setCurBucketAndIdx() {
+	idx, _ := ls.store.LastIndex(confBucket)
+	lg := &confLog{}
+	err := ls.store.GetConfLog(idx, lg)
+	if err == ErrLogNotFound {
+		ls.curBucketName = bucketPre+"0"
+		ls.setLastLog(0)
+		cLog := &confLog{
+			UpdateTime: time.Now(),
+			BucketName: ls.curBucketName,
+			Index:      0,
+		}
+		val,err := encodeMsgPack(cLog)
+		if err != nil {
+			ls.printLog("encodeMsgPack err:", err)
+		} else {
+			ls.store.Set(uint64ToBytes(uint64(cLog.Index)), val.Bytes())
+		}
+	} else {
+		ls.curBucketName = lg.BucketName
+		ls.curBucketIndex = idx
+		ls.printLog("curBucketName:", lg.BucketName)
+		lastK, lErr := ls.store.LastIndex(ls.curBucketName)
+		if lErr != nil {
+			ls.printLog("lErr:", lErr)
+			ls.setLastLog(0)
+		} else {
+			lastLog := &Log{}
+			e := ls.store.GetLog(ls.curBucketName, lastK, lastLog)
+			if e == ErrLogNotFound || e == ErrBucketNotFound {
+				ls.setLastLog(0)
+			} else {
+				ls.setLastLog(lastLog.Index)
+			}
+		}
+	}
+}
+
+func (ls *LogStore) dealLogs() {
+	for {
+		select {
+			case log :=<-ls.applyCh:
+				lastLogIndex := ls.getLastLog()
+				if lastLogIndex > uint64(ls.conf.capacity) { //褰撳墠bucket瀛樺偍宸茶揪涓婇檺锛岄渶瑕侀噸鏂板垱寤轰竴涓猙ucket
+					ls.printLog(ls.curBucketName+"瀹归噺杈惧埌涓婇檺,灏嗘柊寤轰竴涓猙ucket")
+					suffix := strings.Replace(ls.curBucketName, bucketPre, "", -1)
+					num,_ := strconv.Atoi(suffix)
+					newBucketIdx := num+1
+					ls.curBucketName = bucketPre+strconv.Itoa(newBucketIdx)
+					ls.setLastLog(0)
+					log.Index = 1
+					ls.curBucketIndex = uint64(newBucketIdx)
+					cLog := &confLog{
+						UpdateTime: time.Now(),
+						BucketName: ls.curBucketName,
+						Index:      newBucketIdx,
+					}
+					val,err := encodeMsgPack(cLog)
+					if err != nil {
+						ls.printLog("encodeMsgPack err:", err)
+					} else {
+						ls.store.Set(uint64ToBytes(uint64(cLog.Index)), val.Bytes())
+					}
+				}
+				err := ls.store.StoreLog(ls.curBucketName, log)
+				if err == nil {
+					ls.setLastLog(log.Index)
+					ls.store.UpdateTime(ls.curBucketName, time.Now())
+					ls.printLog(ls.curBucketName + " stored a log,index:", log.Index)
+				} else {
+					ls.printLog("store log err:", err, "bucketName:", ls.curBucketName)
+				}
+		default:
+			time.Sleep(10 * time.Millisecond)
+		}
+	}
+}
+
+func (ls *LogStore) clean() {
+	tk := time.NewTicker(ls.conf.cleanDuration)
+	defer tk.Stop()
+	for {
+		select {
+		case <-tk.C:
+			ls.printLog("start clean...")
+			st := time.Now()
+			delBuckets,delErrBuckets, e := ls.store.Clean(ls.conf)
+			ls.printLog("clean done!!! 鑰楁椂:", time.Since(st), "宸叉竻鐞嗭細",delBuckets, "娓呯悊澶辫触锛�", delErrBuckets , "err:", e)
+		default:
+			time.Sleep(100 * time.Millisecond)
+		}
+	}
+}
\ No newline at end of file
diff --git a/store_test.go b/store_test.go
new file mode 100644
index 0000000..f1ba8f5
--- /dev/null
+++ b/store_test.go
@@ -0,0 +1,38 @@
+package boltcache
+
+import (
+	"strconv"
+	"testing"
+	"time"
+)
+
+func TestInit(t *testing.T) {
+	conf := NewDefaultConfig()
+	ls, err := Init(conf)
+	if err != nil {
+		ls.printLog("Init err:", err)
+		return
+	}
+
+	defer ls.Close()
+
+	go consume(ls)
+
+	for i:=0; i<10000; i++{
+		ls.ApplyLog([]byte("hello world "+strconv.Itoa(i)))
+		time.Sleep(time.Second * 1)
+	}
+
+}
+
+func consume(ls *LogStore) {
+	for {
+		lc := ls.Get()
+		if lc != nil {
+			ls.printLog(lc.conf.BucketName," send old log:", string(lc.Log.Data))
+			ls.Delete(lc)
+		}
+
+		time.Sleep(10 * time.Second)
+	}
+}
\ No newline at end of file
diff --git a/util.go b/util.go
new file mode 100644
index 0000000..ad75bda
--- /dev/null
+++ b/util.go
@@ -0,0 +1,37 @@
+package boltcache
+
+import (
+	"bytes"
+	"encoding/binary"
+
+	"github.com/hashicorp/go-msgpack/codec"
+)
+
+// Decode reverses the encode operation on a byte slice input
+func decodeMsgPack(buf []byte, out interface{}) error {
+	r := bytes.NewBuffer(buf)
+	hd := codec.MsgpackHandle{}
+	dec := codec.NewDecoder(r, &hd)
+	return dec.Decode(out)
+}
+
+// Encode writes an encoded object to a new bytes buffer
+func encodeMsgPack(in interface{}) (*bytes.Buffer, error) {
+	buf := bytes.NewBuffer(nil)
+	hd := codec.MsgpackHandle{}
+	enc := codec.NewEncoder(buf, &hd)
+	err := enc.Encode(in)
+	return buf, err
+}
+
+// Converts bytes to an integer
+func bytesToUint64(b []byte) uint64 {
+	return binary.BigEndian.Uint64(b)
+}
+
+// Converts a uint to a byte slice
+func uint64ToBytes(u uint64) []byte {
+	buf := make([]byte, 8)
+	binary.BigEndian.PutUint64(buf, u)
+	return buf
+}

--
Gitblit v1.8.0