package boltcache
|
|
import (
|
"strconv"
|
"strings"
|
"time"
|
)
|
|
const (
|
bucketPre = "bucket-"
|
confBucket = "conf"
|
)
|
type Log struct {
|
Index uint64
|
Data []byte
|
}
|
|
type confLog struct {
|
Index int //index
|
UpdateTime time.Time //创建时间
|
BucketName string //分桶名称
|
}
|
|
type LogStore struct {
|
storeState
|
store *BoltStore
|
conf *Config
|
applyCh chan *Log
|
curBucketName string //当前存储的bucket
|
curBucketIndex uint64 //bucket在conf表中的index
|
}
|
|
func (ls *LogStore) printLog(v ...interface{}) {
|
if ls.conf.logPrint != nil {
|
ls.conf.logPrint(v...)
|
}
|
}
|
|
func Init(conf *Config) (*LogStore, error) {
|
store, err := NewBoltStore(conf.path)
|
if err != nil {
|
return nil, err
|
}
|
ls := &LogStore{
|
store: store,
|
conf: conf,
|
applyCh: make(chan *Log, 1024),
|
}
|
ls.setCurBucketAndIdx()
|
|
go ls.dealLogs()
|
go ls.clean()
|
return ls, nil
|
}
|
|
//初始化当前表以及index
|
func (ls *LogStore) setCurBucketAndIdx() {
|
idx, _ := ls.store.LastIndex(confBucket)
|
lg := &confLog{}
|
err := ls.store.GetConfLog(idx, lg)
|
if err == ErrLogNotFound {
|
ls.curBucketName = bucketPre+"0"
|
ls.setLastLog(0)
|
cLog := &confLog{
|
UpdateTime: time.Now(),
|
BucketName: ls.curBucketName,
|
Index: 0,
|
}
|
val,err := encodeMsgPack(cLog)
|
if err != nil {
|
ls.printLog("encodeMsgPack err:", err)
|
} else {
|
ls.store.Set(uint64ToBytes(uint64(cLog.Index)), val.Bytes())
|
}
|
} else {
|
ls.curBucketName = lg.BucketName
|
ls.curBucketIndex = idx
|
ls.printLog("curBucketName:", lg.BucketName)
|
lastK, lErr := ls.store.LastIndex(ls.curBucketName)
|
if lErr != nil {
|
ls.printLog("lErr:", lErr)
|
ls.setLastLog(0)
|
} else {
|
lastLog := &Log{}
|
e := ls.store.GetLog(ls.curBucketName, lastK, lastLog)
|
if e == ErrLogNotFound || e == ErrBucketNotFound {
|
ls.setLastLog(0)
|
} else {
|
ls.setLastLog(lastLog.Index)
|
}
|
}
|
}
|
}
|
|
func (ls *LogStore) dealLogs() {
|
for {
|
select {
|
case log :=<-ls.applyCh:
|
lastLogIndex := ls.getLastLog()
|
if lastLogIndex > uint64(ls.conf.capacity) { //当前bucket存储已达上限,需要重新创建一个bucket
|
ls.printLog(ls.curBucketName+"容量达到上限,将新建一个bucket")
|
suffix := strings.Replace(ls.curBucketName, bucketPre, "", -1)
|
num,_ := strconv.Atoi(suffix)
|
newBucketIdx := num+1
|
ls.curBucketName = bucketPre+strconv.Itoa(newBucketIdx)
|
ls.setLastLog(0)
|
log.Index = 1
|
ls.curBucketIndex = uint64(newBucketIdx)
|
cLog := &confLog{
|
UpdateTime: time.Now(),
|
BucketName: ls.curBucketName,
|
Index: newBucketIdx,
|
}
|
val,err := encodeMsgPack(cLog)
|
if err != nil {
|
ls.printLog("encodeMsgPack err:", err)
|
} else {
|
ls.store.Set(uint64ToBytes(uint64(cLog.Index)), val.Bytes())
|
}
|
}
|
err := ls.store.StoreLog(ls.curBucketName, log)
|
if err == nil {
|
ls.setLastLog(log.Index)
|
ls.store.UpdateTime(ls.curBucketName, time.Now())
|
ls.printLog(ls.curBucketName + " stored a log,index:", log.Index)
|
} else {
|
ls.printLog("store log err:", err, "bucketName:", ls.curBucketName)
|
}
|
default:
|
time.Sleep(10 * time.Millisecond)
|
}
|
}
|
}
|
|
func (ls *LogStore) clean() {
|
tk := time.NewTicker(ls.conf.cleanDuration)
|
defer tk.Stop()
|
for {
|
select {
|
case <-tk.C:
|
ls.printLog("start clean...")
|
st := time.Now()
|
delBuckets,delErrBuckets, e := ls.store.Clean(ls.conf)
|
ls.printLog("clean done!!! 耗时:", time.Since(st), "已清理:",delBuckets, "清理失败:", delErrBuckets , "err:", e)
|
default:
|
time.Sleep(100 * time.Millisecond)
|
}
|
}
|
}
|