| | |
| | | package boltcache |
| | | |
| | | import "strconv" |
| | | |
| | | //添加一条日志 |
| | | func (ls *LogStore) ApplyLog(logData []byte) { |
| | | lastLogIndex := ls.getLastLog() |
| | |
| | | return nil |
| | | } |
| | | |
| | | // 遍历数据, 输出原始数据 |
| | | func (ls *LogStore) ForEach(f func(v []byte) error) { |
| | | ls.store.ForEach(f) |
| | | } |
| | | |
| | | func (ls *LogStore) Size() int { |
| | | start, _ := ls.store.FirstIndex(confBucket) |
| | | end, _ := ls.store.LastIndex(confBucket) |
| | | |
| | | totalSize := 0 |
| | | |
| | | for ; start <= end; start++ { |
| | | bucketName := bucketPre + strconv.Itoa(int(start)) |
| | | bucketSize, err := ls.store.Size(bucketName) |
| | | if err == nil { |
| | | totalSize = totalSize + bucketSize |
| | | } |
| | | } |
| | | |
| | | return totalSize |
| | | } |
| | | |
| | | //提供给外层使用,删除日志 |
| | | func (ls *LogStore) Delete(lc *LogCon) error { |
| | | return ls.store.Delete(lc) |
| | |
| | | import ( |
| | | "errors" |
| | | "github.com/boltdb/bolt" |
| | | "strconv" |
| | | "time" |
| | | ) |
| | | |
| | |
| | | return tx.Commit() |
| | | } |
| | | |
| | | func (b *BoltStore) ForEach(f func(v []byte) error) { |
| | | tx, err := b.conn.Begin(true) |
| | | if err != nil { |
| | | return |
| | | } |
| | | defer tx.Rollback() |
| | | |
| | | start, _ := b.FirstIndex(confBucket) |
| | | end, _ := b.LastIndex(confBucket) |
| | | |
| | | for ; start <= end; start++ { |
| | | bucketName := bucketPre + strconv.Itoa(int(start)) |
| | | bucket := tx.Bucket([]byte(bucketName)) |
| | | if bucket == nil { |
| | | return |
| | | } |
| | | |
| | | bucket.ForEach(func(k, v []byte) error { |
| | | log := &Log{} |
| | | err := decodeMsgPack(v, log) |
| | | f(log.Data) |
| | | return err |
| | | }) |
| | | } |
| | | } |
| | | |
| | | func (b *BoltStore) Delete(lc *LogCon) error { |
| | | tx,err := b.conn.Begin(true) |
| | | if err != nil { |
| | |
| | | |
| | | defer ls.Close() |
| | | |
| | | go consume(ls) |
| | | |
| | | for i:=0; i<10000; i++{ |
| | | //go consume(ls) |
| | | // |
| | | for i := 0; i < 100; i++ { |
| | | ls.ApplyLog([]byte("hello world "+strconv.Itoa(i))) |
| | | time.Sleep(time.Second * 1) |
| | | time.Sleep(time.Millisecond * 100) |
| | | } |
| | | |
| | | ls.printLog("Size = ", ls.Size()) |
| | | |
| | | ls.ForEach(func(v []byte) error { |
| | | ls.printLog("val=:", string(v)) |
| | | return nil |
| | | }) |
| | | |
| | | consume(ls) |
| | | } |
| | | |
| | | func consume(ls *LogStore) { |
| | |
| | | if lc != nil { |
| | | ls.printLog(lc.conf.BucketName," send old log:", string(lc.Log.Data)) |
| | | ls.Delete(lc) |
| | | } else { |
| | | return |
| | | } |
| | | |
| | | time.Sleep(10 * time.Second) |
| | | time.Sleep(10 * time.Millisecond) |
| | | } |
| | | } |