From 15c091bccbe9ee6e5e72dc93d413a2b67c13579c Mon Sep 17 00:00:00 2001
From: liuxiaolong <liuxiaolong@aiotlink.com>
Date: 星期三, 18 十一月 2020 14:32:20 +0800
Subject: [PATCH] first push
---
util.go | 37 ++
go.sum | 10
state.go | 21 +
config.go | 41 ++
store_test.go | 38 ++
go.mod | 8
api.go | 59 ++++
store.go | 149 ++++++++++
bolt_store.go | 457 ++++++++++++++++++++++++++++++++
9 files changed, 820 insertions(+), 0 deletions(-)
diff --git a/api.go b/api.go
new file mode 100644
index 0000000..dc73a4f
--- /dev/null
+++ b/api.go
@@ -0,0 +1,59 @@
+package boltcache
+
+//娣诲姞涓�鏉℃棩蹇�
+func (ls *LogStore) ApplyLog(logData []byte) {
+ lastLogIndex := ls.getLastLog()
+ ls.applyCh <- &Log {
+ Index: lastLogIndex+1,
+ Data: logData,
+ }
+ ls.setLastLog(lastLogIndex + 1)
+}
+
+type LogCon struct {
+ conf *confLog
+ Log *Log
+}
+
+//鑾峰彇缂撳瓨鐨勬暟鎹�
+func (lc *LogCon) GetData() []byte {
+ return lc.Log.Data
+}
+
+//鎻愪緵缁欏灞備娇鐢紝鎸夐『搴忚幏鍙栫紦瀛樼殑鏃ュ織
+func (ls *LogStore) Get() *LogCon {
+ idx, _ := ls.store.FirstIndex(confBucket)
+ cLog := &confLog{}
+ if err := ls.store.GetConfLog(idx, cLog);err ==nil {
+ u, _ := ls.store.FirstIndex(cLog.BucketName)
+ log := &Log{}
+ if err = ls.store.GetLog(cLog.BucketName, u, log);err ==nil {
+ log.Index = u
+ cLog.Index = int(idx)
+ return &LogCon{
+ conf:cLog,
+ Log: log,
+ }
+ } else {
+ if size,_ := ls.store.Size(confBucket);size >1 {
+ ls.Delete(&LogCon{
+ conf: cLog,
+ Log: nil,
+ })
+ }
+ ls.printLog("Get log err:", err)
+ }
+ } else {
+ ls.printLog("Get conf idx:", idx, "err:", err)
+ }
+ return nil
+}
+
+//鎻愪緵缁欏灞備娇鐢紝鍒犻櫎鏃ュ織
+func (ls *LogStore) Delete(lc *LogCon) error {
+ return ls.store.Delete(lc)
+}
+
+func (ls *LogStore) Close() error {
+ return ls.store.Close()
+}
\ No newline at end of file
diff --git a/bolt_store.go b/bolt_store.go
new file mode 100644
index 0000000..0977102
--- /dev/null
+++ b/bolt_store.go
@@ -0,0 +1,457 @@
+package boltcache
+
+import (
+ "errors"
+ "github.com/boltdb/bolt"
+ "time"
+)
+
+const (
+ // Permissions to use on the db file. This is only used if the
+ // database file does not exist and needs to be created.
+ dbFileMode = 0600
+)
+
+var (
+ // Bucket names we perform transactions in
+ dbConf = []byte(confBucket)
+
+ // An error indicating a given key does not exist
+ ErrKeyNotFound = errors.New("not found")
+ ErrLogNotFound = errors.New("log not found")
+ ErrBucketNotFound = errors.New("bucket not found")
+)
+
+// BoltStore provides access to BoltDB for Raft to store and retrieve
+// log entries. It also provides key/value storage, and can be used as
+// a LogStore and StableStore.
+type BoltStore struct {
+ // conn is the underlying handle to the db.
+ conn *bolt.DB
+
+ // The path to the Bolt database file
+ path string
+}
+
+// Options contains all the configuration used to open the BoltDB
+type Options struct {
+ // Path is the file path to the BoltDB to use
+ Path string
+
+ // BoltOptions contains any specific BoltDB options you might
+ // want to specify [e.g. open timeout]
+ BoltOptions *bolt.Options
+
+ // NoSync causes the database to skip fsync calls after each
+ // write to the log. This is unsafe, so it should be used
+ // with caution.
+ NoSync bool
+}
+
+// readOnly returns true if the contained bolt options say to open
+// the DB in readOnly mode [this can be useful to tools that want
+// to examine the log]
+func (o *Options) readOnly() bool {
+ return o != nil && o.BoltOptions != nil && o.BoltOptions.ReadOnly
+}
+
+// NewBoltStore takes a file path and returns a connected Raft backend.
+func NewBoltStore(path string) (*BoltStore, error) {
+ return New(Options{Path: path})
+}
+
+// New uses the supplied options to open the BoltDB and prepare it for use as a raft backend.
+func New(options Options) (*BoltStore, error) {
+ // Try to connect
+ handle, err := bolt.Open(options.Path, dbFileMode, options.BoltOptions)
+ if err != nil {
+ return nil, err
+ }
+ handle.NoSync = options.NoSync
+
+ // Create the new store
+ store := &BoltStore{
+ conn: handle,
+ path: options.Path,
+ }
+
+ // If the store was opened read-only, don't try and create buckets
+ if !options.readOnly() {
+ // Set up our buckets
+ if err := store.initialize(); err != nil {
+ store.Close()
+ return nil, err
+ }
+ }
+ return store, nil
+}
+
+// initialize is used to set up all of the buckets.
+func (b *BoltStore) initialize() error {
+ tx, err := b.conn.Begin(true)
+ if err != nil {
+ return err
+ }
+ defer tx.Rollback()
+
+ if _, err := tx.CreateBucketIfNotExists(dbConf); err != nil {
+ return err
+ }
+
+ return tx.Commit()
+}
+
+// Close is used to gracefully close the DB connection.
+func (b *BoltStore) Close() error {
+ return b.conn.Close()
+}
+
+// FirstIndex returns the first known index from the Raft log.
+func (b *BoltStore) FirstIndex(bucketName string) (uint64, error) {
+ tx, err := b.conn.Begin(false)
+ if err != nil {
+ return 0, err
+ }
+ defer tx.Rollback()
+
+ bucket := tx.Bucket([]byte(bucketName))
+ if bucket ==nil {
+ return 0, ErrBucketNotFound
+ }
+
+ curs := bucket.Cursor()
+ if first, _ := curs.First(); first == nil {
+ return 0, nil
+ } else {
+ return bytesToUint64(first), nil
+ }
+}
+
+// LastIndex returns the last known index from the Raft log.
+func (b *BoltStore) LastIndex(bucketName string) (uint64, error) {
+ tx, err := b.conn.Begin(false)
+ if err != nil {
+ return 0, err
+ }
+ defer tx.Rollback()
+ bucket := tx.Bucket([]byte(bucketName))
+ if bucket ==nil {
+ return 0, ErrBucketNotFound
+ }
+ curs := bucket.Cursor()
+ if last, _ := curs.Last(); last == nil {
+ return 0, nil
+ } else {
+ return bytesToUint64(last), nil
+ }
+}
+
+// GetLog is used to retrieve a log from BoltDB at a given index.
+func (b *BoltStore) GetLog(bucketName string, idx uint64, log *Log) error {
+ tx, err := b.conn.Begin(false)
+ if err != nil {
+ return err
+ }
+ defer tx.Rollback()
+
+ bucket := tx.Bucket([]byte(bucketName))
+ if bucket == nil {
+ return ErrBucketNotFound
+ }
+ val := bucket.Get(uint64ToBytes(idx))
+
+ if val == nil {
+ return ErrLogNotFound
+ }
+ return decodeMsgPack(val, log)
+}
+
+func (b *BoltStore) GetConfLog(idx uint64, clog *confLog) error {
+ tx, err := b.conn.Begin(false)
+ if err != nil {
+ return err
+ }
+ defer tx.Rollback()
+
+ bucket := tx.Bucket([]byte(confBucket))
+ val := bucket.Get(uint64ToBytes(idx))
+
+ if val == nil {
+ return ErrLogNotFound
+ }
+ return decodeMsgPack(val, clog)
+}
+
+// StoreLog is used to store a single raft log
+func (b *BoltStore) StoreLog(bucketName string, log *Log) error {
+ return b.StoreLogs(bucketName, []*Log{log})
+}
+
+// StoreLogs is used to store a set of raft logs
+func (b *BoltStore) StoreLogs(bucketName string, logs []*Log) error {
+ tx, err := b.conn.Begin(true)
+ if err != nil {
+ return err
+ }
+ defer tx.Rollback()
+
+ for _, log := range logs {
+ key := uint64ToBytes(log.Index)
+ val, err := encodeMsgPack(log)
+ if err != nil {
+ return err
+ }
+ bucket := tx.Bucket([]byte(bucketName))
+ if bucket == nil {
+ bucket,err = tx.CreateBucket([]byte(bucketName))
+ if err != nil {
+ return err
+ }
+ }
+ if err := bucket.Put(key, val.Bytes()); err != nil {
+ return err
+ }
+ }
+
+ return tx.Commit()
+}
+
+// DeleteRange is used to delete logs within a given range inclusively.
+func (b *BoltStore) DeleteRange(bucketName string, min, max uint64) error {
+ minKey := uint64ToBytes(min)
+
+ tx, err := b.conn.Begin(true)
+ if err != nil {
+ return err
+ }
+ defer tx.Rollback()
+
+ curs := tx.Bucket([]byte(bucketName)).Cursor()
+ for k, _ := curs.Seek(minKey); k != nil; k, _ = curs.Next() {
+ // Handle out-of-range log index
+ if bytesToUint64(k) > max {
+ break
+ }
+
+ // Delete in-range log index
+ if err := curs.Delete(); err != nil {
+ return err
+ }
+ }
+
+ return tx.Commit()
+}
+
+func (b *BoltStore) Delete(lc *LogCon) error {
+ tx,err := b.conn.Begin(true)
+ if err != nil {
+ return err
+ }
+ defer tx.Rollback()
+ bucket := tx.Bucket([]byte(lc.conf.BucketName))
+ dc := false
+ if bucket != nil {
+ if lc.Log != nil {
+ if dErr := bucket.Delete(uint64ToBytes(lc.Log.Index));dErr != nil {
+ return dErr
+ }
+ }
+
+ size := 0
+ bucket.ForEach(func(k, v []byte) error {
+ size++
+ return nil
+ })
+ if size == 0 {
+ dc = true
+ }
+ } else {
+ dc = true
+ }
+ if dc {
+ cb := tx.Bucket([]byte(confBucket))
+ if cb != nil {
+ cbSize := 0
+ cb.ForEach(func(k, v []byte) error {
+ cbSize++
+ return nil
+ })
+ if cbSize > 1 {
+ if dErr := cb.Delete(uint64ToBytes(uint64(lc.conf.Index)));dErr != nil {
+ return dErr
+ }
+ }
+ }
+ }
+ tx.Commit()
+ return nil
+}
+
+func (b *BoltStore) UpdateTime(bucketName string, t time.Time) error {
+ tx,err := b.conn.Begin(true)
+ if err != nil {
+ return err
+ }
+ defer tx.Rollback()
+ bucket := tx.Bucket([]byte(confBucket))
+ if bucket != nil {
+ flag := false
+ bucket.ForEach(func(k, v []byte) error {
+ cLog := &confLog{}
+ if de := decodeMsgPack(v, cLog); de == nil {
+ if cLog.BucketName == bucketName {
+ cLog.UpdateTime = t
+ cLog.Index = int(bytesToUint64(k))
+ val,err := encodeMsgPack(cLog)
+ if err ==nil {
+ if bucket.Put(k, val.Bytes()) ==nil {
+ flag = true
+ }
+ }
+ }
+ }
+ return nil
+ })
+ if !flag {
+ return errors.New("conf琛ㄤ腑鏈壘鍒�"+bucketName)
+ }
+ } else {
+ return ErrBucketNotFound
+ }
+ tx.Commit()
+ return nil
+}
+
+func (b *BoltStore) Clean(conf *Config) ([]string, []string, error) {
+ type kConfLog struct {
+ k []byte
+ log *confLog
+ }
+ tx,err := b.conn.Begin(true)
+ if err != nil {
+ return nil, nil, err
+ }
+ defer tx.Rollback()
+ bucket := tx.Bucket([]byte(confBucket))
+ var delBucketNames []string
+ var delErrBucketNames []string
+ if bucket != nil {
+ var allCLogs []*kConfLog
+ bucket.ForEach(func(k, v []byte) error {
+ cLog := &confLog{}
+ if e :=decodeMsgPack(v, cLog);e ==nil {
+ allCLogs = append(allCLogs, &kConfLog{
+ k:k,
+ log:cLog,
+ })
+ }
+ return nil
+ })
+ var leftClogs []*kConfLog
+ if len(allCLogs) > conf.bucketLimit {
+ arr := allCLogs[:len(allCLogs)-conf.bucketLimit]
+ leftClogs = allCLogs[len(allCLogs)-conf.bucketLimit:]
+ for _,a := range arr {
+ if de := tx.DeleteBucket([]byte(a.log.BucketName));de !=nil {
+ delErrBucketNames = append(delErrBucketNames, a.log.BucketName)
+ } else {
+ bucket.Delete(a.k)
+ delBucketNames = append(delBucketNames, a.log.BucketName)
+ }
+ }
+ } else {
+ leftClogs = allCLogs
+ }
+ if len(leftClogs) >1 {
+ leftClogs = leftClogs[:len(leftClogs)-1]
+ for _,a := range leftClogs {
+ if int(time.Now().Sub(a.log.UpdateTime).Hours()) > conf.keepDays * 24 {
+ if de := tx.DeleteBucket([]byte(a.log.BucketName));de !=nil {
+ delErrBucketNames = append(delErrBucketNames, a.log.BucketName)
+ } else {
+ bucket.Delete(a.k)
+ delBucketNames = append(delBucketNames, a.log.BucketName)
+ }
+ }
+ }
+ }
+
+ } else {
+ return nil, nil, errors.New("bucket not found")
+ }
+ tx.Commit()
+ return delBucketNames, delErrBucketNames, nil
+}
+
+// Set is used to set a key/value set outside of the raft log
+func (b *BoltStore) Set(k, v []byte) error {
+ tx, err := b.conn.Begin(true)
+ if err != nil {
+ return err
+ }
+ defer tx.Rollback()
+
+ bucket := tx.Bucket(dbConf)
+ if err := bucket.Put(k, v); err != nil {
+ return err
+ }
+
+ return tx.Commit()
+}
+
+func (b *BoltStore) Size(bucketName string) (int,error) {
+ tx, err := b.conn.Begin(false)
+ if err != nil {
+ return 0, err
+ }
+ defer tx.Rollback()
+
+ bucket := tx.Bucket([]byte(bucketName))
+ if bucket == nil {
+ return 0, ErrBucketNotFound
+ }
+ size := 0
+ bucket.ForEach(func(k, v []byte) error {
+ size++
+ return nil
+ })
+ return size, nil
+}
+
+// Get is used to retrieve a value from the k/v store by key
+func (b *BoltStore) Get(k []byte) ([]byte, error) {
+ tx, err := b.conn.Begin(false)
+ if err != nil {
+ return nil, err
+ }
+ defer tx.Rollback()
+
+ bucket := tx.Bucket(dbConf)
+ val := bucket.Get(k)
+
+ if val == nil {
+ return nil, ErrKeyNotFound
+ }
+ return append([]byte(nil), val...), nil
+}
+
+// SetUint64 is like Set, but handles uint64 values
+func (b *BoltStore) SetUint64(key []byte, val uint64) error {
+ return b.Set(key, uint64ToBytes(val))
+}
+
+// GetUint64 is like Get, but handles uint64 values
+func (b *BoltStore) GetUint64(key []byte) (uint64, error) {
+ val, err := b.Get(key)
+ if err != nil {
+ return 0, err
+ }
+ return bytesToUint64(val), nil
+}
+
+// Sync performs an fsync on the database file handle. This is not necessary
+// under normal operation unless NoSync is enabled, in which this forces the
+// database file to sync against the disk.
+func (b *BoltStore) Sync() error {
+ return b.conn.Sync()
+}
diff --git a/config.go b/config.go
new file mode 100644
index 0000000..5315fe8
--- /dev/null
+++ b/config.go
@@ -0,0 +1,41 @@
+package boltcache
+
+import (
+ "fmt"
+ "time"
+)
+
+type Config struct {
+ path string
+ capacity int //鍗曚釜妗舵渶澶氭斁澶氬皯鏉℃暟鎹�
+ bucketLimit int //淇濈暀鐨刡ucket鐨勬渶澶т釜鏁�
+ keepDays int //鏈�澶氫繚鐣欏灏戝ぉ鐨勬暟鎹�
+
+ cleanDuration time.Duration //娓呯悊鍛ㄦ湡
+
+ logPrint func(v ...interface{})
+}
+
+func NewDefaultConfig() *Config {
+ return &Config{
+ path: "./push.db",
+ capacity: 50,
+ bucketLimit: 10,
+ keepDays: 1,
+ cleanDuration: 10 * time.Second,
+ logPrint: func(v ...interface{}) {
+ fmt.Println(v...)
+ },
+ }
+}
+
+func NewConfig(path string, capacity int, bucketLimit int, keepDays int,logPrint func(v ...interface{})) *Config {
+ return &Config{
+ path: path,
+ capacity: capacity,
+ bucketLimit: bucketLimit,
+ keepDays: keepDays,
+ cleanDuration: time.Second * 10,
+ logPrint: logPrint,
+ }
+}
\ No newline at end of file
diff --git a/go.mod b/go.mod
new file mode 100644
index 0000000..44d8645
--- /dev/null
+++ b/go.mod
@@ -0,0 +1,8 @@
+module boltcache
+
+go 1.14
+
+require (
+ github.com/boltdb/bolt v1.3.1
+ github.com/hashicorp/go-msgpack v1.1.5
+)
diff --git a/go.sum b/go.sum
new file mode 100644
index 0000000..01fb5c7
--- /dev/null
+++ b/go.sum
@@ -0,0 +1,10 @@
+github.com/boltdb/bolt v1.3.1 h1:JQmyP4ZBrce+ZQu0dY660FMfatumYDLun9hBCUVIkF4=
+github.com/boltdb/bolt v1.3.1/go.mod h1:clJnj/oiGkjum5o1McbSZDSLxVThjynRyGBgiAx27Ps=
+github.com/hashicorp/go-msgpack v1.1.5 h1:9byZdVjKTe5mce63pRVNP1L7UAmdHOTEMGehn6KvJWs=
+github.com/hashicorp/go-msgpack v1.1.5/go.mod h1:gWVc3sv/wbDmR3rQsj1CAktEZzoz1YNK9NfGLXJ69/4=
+golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
+golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
+golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
+golang.org/x/tools v0.0.0-20190424220101-1e8e1cfdf96b/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q=
diff --git a/state.go b/state.go
new file mode 100644
index 0000000..ed4bb8f
--- /dev/null
+++ b/state.go
@@ -0,0 +1,21 @@
+package boltcache
+
+import "sync"
+
+type storeState struct {
+ lastLock sync.Mutex
+ lastLogIndex uint64
+}
+
+func (s *storeState) getLastLog() (index uint64) {
+ s.lastLock.Lock()
+ index = s.lastLogIndex
+ s.lastLock.Unlock()
+ return
+}
+
+func (s *storeState) setLastLog(index uint64) {
+ s.lastLock.Lock()
+ s.lastLogIndex = index
+ s.lastLock.Unlock()
+}
diff --git a/store.go b/store.go
new file mode 100644
index 0000000..e0545f3
--- /dev/null
+++ b/store.go
@@ -0,0 +1,149 @@
+package boltcache
+
+import (
+ "strconv"
+ "strings"
+ "time"
+)
+
+const (
+ bucketPre = "bucket-"
+ confBucket = "conf"
+)
+type Log struct {
+ Index uint64
+ Data []byte
+}
+
+type confLog struct {
+ Index int //index
+ UpdateTime time.Time //鍒涘缓鏃堕棿
+ BucketName string //鍒嗘《鍚嶇О
+}
+
+type LogStore struct {
+ storeState
+ store *BoltStore
+ conf *Config
+ applyCh chan *Log
+ curBucketName string //褰撳墠瀛樺偍鐨刡ucket
+ curBucketIndex uint64 //bucket鍦╟onf琛ㄤ腑鐨刬ndex
+}
+
+func (ls *LogStore) printLog(v ...interface{}) {
+ if ls.conf.logPrint != nil {
+ ls.conf.logPrint(v...)
+ }
+}
+
+func Init(conf *Config) (*LogStore, error) {
+ store, err := NewBoltStore(conf.path)
+ if err != nil {
+ return nil, err
+ }
+ ls := &LogStore{
+ store: store,
+ conf: conf,
+ applyCh: make(chan *Log, 1024),
+ }
+ ls.setCurBucketAndIdx()
+
+ go ls.dealLogs()
+ go ls.clean()
+ return ls, nil
+}
+
+//鍒濆鍖栧綋鍓嶈〃浠ュ強index
+func (ls *LogStore) setCurBucketAndIdx() {
+ idx, _ := ls.store.LastIndex(confBucket)
+ lg := &confLog{}
+ err := ls.store.GetConfLog(idx, lg)
+ if err == ErrLogNotFound {
+ ls.curBucketName = bucketPre+"0"
+ ls.setLastLog(0)
+ cLog := &confLog{
+ UpdateTime: time.Now(),
+ BucketName: ls.curBucketName,
+ Index: 0,
+ }
+ val,err := encodeMsgPack(cLog)
+ if err != nil {
+ ls.printLog("encodeMsgPack err:", err)
+ } else {
+ ls.store.Set(uint64ToBytes(uint64(cLog.Index)), val.Bytes())
+ }
+ } else {
+ ls.curBucketName = lg.BucketName
+ ls.curBucketIndex = idx
+ ls.printLog("curBucketName:", lg.BucketName)
+ lastK, lErr := ls.store.LastIndex(ls.curBucketName)
+ if lErr != nil {
+ ls.printLog("lErr:", lErr)
+ ls.setLastLog(0)
+ } else {
+ lastLog := &Log{}
+ e := ls.store.GetLog(ls.curBucketName, lastK, lastLog)
+ if e == ErrLogNotFound || e == ErrBucketNotFound {
+ ls.setLastLog(0)
+ } else {
+ ls.setLastLog(lastLog.Index)
+ }
+ }
+ }
+}
+
+func (ls *LogStore) dealLogs() {
+ for {
+ select {
+ case log :=<-ls.applyCh:
+ lastLogIndex := ls.getLastLog()
+ if lastLogIndex > uint64(ls.conf.capacity) { //褰撳墠bucket瀛樺偍宸茶揪涓婇檺锛岄渶瑕侀噸鏂板垱寤轰竴涓猙ucket
+ ls.printLog(ls.curBucketName+"瀹归噺杈惧埌涓婇檺,灏嗘柊寤轰竴涓猙ucket")
+ suffix := strings.Replace(ls.curBucketName, bucketPre, "", -1)
+ num,_ := strconv.Atoi(suffix)
+ newBucketIdx := num+1
+ ls.curBucketName = bucketPre+strconv.Itoa(newBucketIdx)
+ ls.setLastLog(0)
+ log.Index = 1
+ ls.curBucketIndex = uint64(newBucketIdx)
+ cLog := &confLog{
+ UpdateTime: time.Now(),
+ BucketName: ls.curBucketName,
+ Index: newBucketIdx,
+ }
+ val,err := encodeMsgPack(cLog)
+ if err != nil {
+ ls.printLog("encodeMsgPack err:", err)
+ } else {
+ ls.store.Set(uint64ToBytes(uint64(cLog.Index)), val.Bytes())
+ }
+ }
+ err := ls.store.StoreLog(ls.curBucketName, log)
+ if err == nil {
+ ls.setLastLog(log.Index)
+ ls.store.UpdateTime(ls.curBucketName, time.Now())
+ ls.printLog(ls.curBucketName + " stored a log,index:", log.Index)
+ } else {
+ ls.printLog("store log err:", err, "bucketName:", ls.curBucketName)
+ }
+ default:
+ time.Sleep(10 * time.Millisecond)
+ }
+ }
+}
+
+func (ls *LogStore) clean() {
+ tk := time.NewTicker(ls.conf.cleanDuration)
+ defer tk.Stop()
+ for {
+ select {
+ case <-tk.C:
+ ls.printLog("start clean...")
+ st := time.Now()
+ delBuckets,delErrBuckets, e := ls.store.Clean(ls.conf)
+ ls.printLog("clean done!!! 鑰楁椂:", time.Since(st), "宸叉竻鐞嗭細",delBuckets, "娓呯悊澶辫触锛�", delErrBuckets , "err:", e)
+ default:
+ time.Sleep(100 * time.Millisecond)
+ }
+ }
+}
\ No newline at end of file
diff --git a/store_test.go b/store_test.go
new file mode 100644
index 0000000..f1ba8f5
--- /dev/null
+++ b/store_test.go
@@ -0,0 +1,38 @@
+package boltcache
+
+import (
+ "strconv"
+ "testing"
+ "time"
+)
+
+func TestInit(t *testing.T) {
+ conf := NewDefaultConfig()
+ ls, err := Init(conf)
+ if err != nil {
+ ls.printLog("Init err:", err)
+ return
+ }
+
+ defer ls.Close()
+
+ go consume(ls)
+
+ for i:=0; i<10000; i++{
+ ls.ApplyLog([]byte("hello world "+strconv.Itoa(i)))
+ time.Sleep(time.Second * 1)
+ }
+
+}
+
+func consume(ls *LogStore) {
+ for {
+ lc := ls.Get()
+ if lc != nil {
+ ls.printLog(lc.conf.BucketName," send old log:", string(lc.Log.Data))
+ ls.Delete(lc)
+ }
+
+ time.Sleep(10 * time.Second)
+ }
+}
\ No newline at end of file
diff --git a/util.go b/util.go
new file mode 100644
index 0000000..ad75bda
--- /dev/null
+++ b/util.go
@@ -0,0 +1,37 @@
+package boltcache
+
+import (
+ "bytes"
+ "encoding/binary"
+
+ "github.com/hashicorp/go-msgpack/codec"
+)
+
+// Decode reverses the encode operation on a byte slice input
+func decodeMsgPack(buf []byte, out interface{}) error {
+ r := bytes.NewBuffer(buf)
+ hd := codec.MsgpackHandle{}
+ dec := codec.NewDecoder(r, &hd)
+ return dec.Decode(out)
+}
+
+// Encode writes an encoded object to a new bytes buffer
+func encodeMsgPack(in interface{}) (*bytes.Buffer, error) {
+ buf := bytes.NewBuffer(nil)
+ hd := codec.MsgpackHandle{}
+ enc := codec.NewEncoder(buf, &hd)
+ err := enc.Encode(in)
+ return buf, err
+}
+
+// Converts bytes to an integer
+func bytesToUint64(b []byte) uint64 {
+ return binary.BigEndian.Uint64(b)
+}
+
+// Converts a uint to a byte slice
+func uint64ToBytes(u uint64) []byte {
+ buf := make([]byte, 8)
+ binary.BigEndian.PutUint64(buf, u)
+ return buf
+}
--
Gitblit v1.8.0