package boltcache import ( "strconv" "strings" "time" ) const ( bucketPre = "bucket-" confBucket = "conf" ) type Log struct { Index uint64 Data []byte } type confLog struct { Index int //index UpdateTime time.Time //创建时间 BucketName string //分桶名称 } type LogStore struct { storeState store *BoltStore conf *Config applyCh chan *Log curBucketName string //当前存储的bucket curBucketIndex uint64 //bucket在conf表中的index } func (ls *LogStore) printLog(v ...interface{}) { if ls.conf.logPrint != nil { ls.conf.logPrint(v...) } } func Init(conf *Config) (*LogStore, error) { store, err := NewBoltStore(conf.path) if err != nil { return nil, err } ls := &LogStore{ store: store, conf: conf, applyCh: make(chan *Log, 1024), } ls.setCurBucketAndIdx() go ls.dealLogs() go ls.clean() return ls, nil } //初始化当前表以及index func (ls *LogStore) setCurBucketAndIdx() { idx, _ := ls.store.LastIndex(confBucket) lg := &confLog{} err := ls.store.GetConfLog(idx, lg) if err == ErrLogNotFound { ls.curBucketName = bucketPre+"0" ls.setLastLog(0) cLog := &confLog{ UpdateTime: time.Now(), BucketName: ls.curBucketName, Index: 0, } val,err := encodeMsgPack(cLog) if err != nil { ls.printLog("encodeMsgPack err:", err) } else { ls.store.Set(uint64ToBytes(uint64(cLog.Index)), val.Bytes()) } } else { ls.curBucketName = lg.BucketName ls.curBucketIndex = idx ls.printLog("curBucketName:", lg.BucketName) lastK, lErr := ls.store.LastIndex(ls.curBucketName) if lErr != nil { ls.printLog("lErr:", lErr) ls.setLastLog(0) } else { lastLog := &Log{} e := ls.store.GetLog(ls.curBucketName, lastK, lastLog) if e == ErrLogNotFound || e == ErrBucketNotFound { ls.setLastLog(0) } else { ls.setLastLog(lastLog.Index) } } } } func (ls *LogStore) dealLogs() { for { select { case log :=<-ls.applyCh: lastLogIndex := ls.getLastLog() if lastLogIndex > uint64(ls.conf.capacity) { //当前bucket存储已达上限,需要重新创建一个bucket ls.printLog(ls.curBucketName+"容量达到上限,将新建一个bucket") suffix := strings.Replace(ls.curBucketName, bucketPre, "", -1) num,_ := strconv.Atoi(suffix) newBucketIdx := num+1 ls.curBucketName = bucketPre+strconv.Itoa(newBucketIdx) ls.setLastLog(0) log.Index = 1 ls.curBucketIndex = uint64(newBucketIdx) cLog := &confLog{ UpdateTime: time.Now(), BucketName: ls.curBucketName, Index: newBucketIdx, } val,err := encodeMsgPack(cLog) if err != nil { ls.printLog("encodeMsgPack err:", err) } else { ls.store.Set(uint64ToBytes(uint64(cLog.Index)), val.Bytes()) } } err := ls.store.StoreLog(ls.curBucketName, log) if err == nil { ls.setLastLog(log.Index) ls.store.UpdateTime(ls.curBucketName, time.Now()) ls.printLog(ls.curBucketName + " stored a log,index:", log.Index) } else { ls.printLog("store log err:", err, "bucketName:", ls.curBucketName) } default: time.Sleep(10 * time.Millisecond) } } } func (ls *LogStore) clean() { tk := time.NewTicker(ls.conf.cleanDuration) defer tk.Stop() for { select { case <-tk.C: ls.printLog("start clean...") st := time.Now() delBuckets,delErrBuckets, e := ls.store.Clean(ls.conf) ls.printLog("clean done!!! 耗时:", time.Since(st), "已清理:",delBuckets, "清理失败:", delErrBuckets , "err:", e) default: time.Sleep(100 * time.Millisecond) } } }