liuxiaolong
2020-11-18 911e6eb6a1a1ab5dd979a1917b79a5465da88181
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
package boltcache
 
import (
    "strconv"
    "strings"
    "time"
)
 
const (
    bucketPre = "bucket-"
    confBucket = "conf"
)
type Log struct {
    Index         uint64
    Data         []byte
}
 
type confLog struct {
    Index              int           //index
    UpdateTime         time.Time     //创建时间
    BucketName         string        //分桶名称
}
 
type LogStore struct {
    storeState
    store          *BoltStore
    conf           *Config
    applyCh        chan *Log
    curBucketName  string //当前存储的bucket
    curBucketIndex uint64 //bucket在conf表中的index
}
 
func (ls *LogStore) printLog(v ...interface{}) {
    if ls.conf.logPrint != nil {
        ls.conf.logPrint(v...)
    }
}
 
func Init(conf *Config) (*LogStore, error) {
    store, err := NewBoltStore(conf.path)
    if err != nil {
        return nil, err
    }
    ls := &LogStore{
        store: store,
        conf: conf,
        applyCh: make(chan *Log, 1024),
    }
    ls.setCurBucketAndIdx()
 
    go ls.dealLogs()
    go ls.clean()
    return ls, nil
}
 
//初始化当前表以及index
func (ls *LogStore) setCurBucketAndIdx() {
    idx, _ := ls.store.LastIndex(confBucket)
    lg := &confLog{}
    err := ls.store.GetConfLog(idx, lg)
    if err == ErrLogNotFound {
        ls.curBucketName = bucketPre+"0"
        ls.setLastLog(0)
        cLog := &confLog{
            UpdateTime: time.Now(),
            BucketName: ls.curBucketName,
            Index:      0,
        }
        val,err := encodeMsgPack(cLog)
        if err != nil {
            ls.printLog("encodeMsgPack err:", err)
        } else {
            ls.store.Set(uint64ToBytes(uint64(cLog.Index)), val.Bytes())
        }
    } else {
        ls.curBucketName = lg.BucketName
        ls.curBucketIndex = idx
        ls.printLog("curBucketName:", lg.BucketName)
        lastK, lErr := ls.store.LastIndex(ls.curBucketName)
        if lErr != nil {
            ls.printLog("lErr:", lErr)
            ls.setLastLog(0)
        } else {
            lastLog := &Log{}
            e := ls.store.GetLog(ls.curBucketName, lastK, lastLog)
            if e == ErrLogNotFound || e == ErrBucketNotFound {
                ls.setLastLog(0)
            } else {
                ls.setLastLog(lastLog.Index)
            }
        }
    }
}
 
func (ls *LogStore) dealLogs() {
    for {
        select {
            case log :=<-ls.applyCh:
                lastLogIndex := ls.getLastLog()
                if lastLogIndex > uint64(ls.conf.capacity) { //当前bucket存储已达上限,需要重新创建一个bucket
                    ls.printLog(ls.curBucketName+"容量达到上限,将新建一个bucket")
                    suffix := strings.Replace(ls.curBucketName, bucketPre, "", -1)
                    num,_ := strconv.Atoi(suffix)
                    newBucketIdx := num+1
                    ls.curBucketName = bucketPre+strconv.Itoa(newBucketIdx)
                    ls.setLastLog(0)
                    log.Index = 1
                    ls.curBucketIndex = uint64(newBucketIdx)
                    cLog := &confLog{
                        UpdateTime: time.Now(),
                        BucketName: ls.curBucketName,
                        Index:      newBucketIdx,
                    }
                    val,err := encodeMsgPack(cLog)
                    if err != nil {
                        ls.printLog("encodeMsgPack err:", err)
                    } else {
                        ls.store.Set(uint64ToBytes(uint64(cLog.Index)), val.Bytes())
                    }
                }
                err := ls.store.StoreLog(ls.curBucketName, log)
                if err == nil {
                    ls.setLastLog(log.Index)
                    ls.store.UpdateTime(ls.curBucketName, time.Now())
                    ls.printLog(ls.curBucketName + " stored a log,index:", log.Index)
                } else {
                    ls.printLog("store log err:", err, "bucketName:", ls.curBucketName)
                }
        default:
            time.Sleep(10 * time.Millisecond)
        }
    }
}
 
func (ls *LogStore) clean() {
    tk := time.NewTicker(ls.conf.cleanDuration)
    defer tk.Stop()
    for {
        select {
        case <-tk.C:
            ls.printLog("start clean...")
            st := time.Now()
            delBuckets,delErrBuckets, e := ls.store.Clean(ls.conf)
            ls.printLog("clean done!!! 耗时:", time.Since(st), "已清理:",delBuckets, "清理失败:", delErrBuckets , "err:", e)
        default:
            time.Sleep(100 * time.Millisecond)
        }
    }
}