zhangzengfei
2021-11-12 33cc2473a3d6ca941717c557f8e72b9904fbdc4f
bolt_store.go
@@ -3,6 +3,7 @@
import (
   "errors"
   "github.com/boltdb/bolt"
   "strconv"
   "time"
)
@@ -17,8 +18,8 @@
   dbConf = []byte(confBucket)
   // An error indicating a given key does not exist
   ErrKeyNotFound = errors.New("not found")
   ErrLogNotFound = errors.New("log not found")
   ErrKeyNotFound    = errors.New("not found")
   ErrLogNotFound    = errors.New("log not found")
   ErrBucketNotFound = errors.New("bucket not found")
)
@@ -115,7 +116,7 @@
   defer tx.Rollback()
   bucket := tx.Bucket([]byte(bucketName))
   if bucket ==nil {
   if bucket == nil {
      return 0, ErrBucketNotFound
   }
@@ -135,7 +136,7 @@
   }
   defer tx.Rollback()
   bucket := tx.Bucket([]byte(bucketName))
   if bucket ==nil {
   if bucket == nil {
      return 0, ErrBucketNotFound
   }
   curs := bucket.Cursor()
@@ -203,7 +204,7 @@
      }
      bucket := tx.Bucket([]byte(bucketName))
      if bucket == nil {
         bucket,err = tx.CreateBucket([]byte(bucketName))
         bucket, err = tx.CreateBucket([]byte(bucketName))
         if err != nil {
            return err
         }
@@ -242,8 +243,34 @@
   return tx.Commit()
}
func (b *BoltStore) ForEach(f func(v []byte) error) {
   tx, err := b.conn.Begin(true)
   if err != nil {
      return
   }
   defer tx.Rollback()
   start, _ := b.FirstIndex(confBucket)
   end, _ := b.LastIndex(confBucket)
   for ; start <= end; start++ {
      bucketName := bucketPre + strconv.Itoa(int(start))
      bucket := tx.Bucket([]byte(bucketName))
      if bucket == nil {
         return
      }
      bucket.ForEach(func(k, v []byte) error {
         log := &Log{}
         err := decodeMsgPack(v, log)
         f(log.Data)
         return err
      })
   }
}
func (b *BoltStore) Delete(lc *LogCon) error {
   tx,err := b.conn.Begin(true)
   tx, err := b.conn.Begin(true)
   if err != nil {
      return err
   }
@@ -252,7 +279,7 @@
   dc := false
   if bucket != nil {
      if lc.Log != nil {
         if dErr := bucket.Delete(uint64ToBytes(lc.Log.Index));dErr != nil {
         if dErr := bucket.Delete(uint64ToBytes(lc.Log.Index)); dErr != nil {
            return dErr
         }
      }
@@ -277,7 +304,7 @@
            return nil
         })
         if cbSize > 1 {
            if dErr := cb.Delete(uint64ToBytes(uint64(lc.conf.Index)));dErr != nil {
            if dErr := cb.Delete(uint64ToBytes(uint64(lc.conf.Index))); dErr != nil {
               return dErr
            }
         }
@@ -288,7 +315,7 @@
}
func (b *BoltStore) UpdateTime(bucketName string, t time.Time) error {
   tx,err := b.conn.Begin(true)
   tx, err := b.conn.Begin(true)
   if err != nil {
      return err
   }
@@ -302,9 +329,9 @@
            if cLog.BucketName == bucketName {
               cLog.UpdateTime = t
               cLog.Index = int(bytesToUint64(k))
               val,err := encodeMsgPack(cLog)
               if err ==nil {
                  if bucket.Put(k, val.Bytes()) ==nil {
               val, err := encodeMsgPack(cLog)
               if err == nil {
                  if bucket.Put(k, val.Bytes()) == nil {
                     flag = true
                  }
               }
@@ -313,7 +340,7 @@
         return nil
      })
      if !flag {
         return errors.New("conf表中未找到"+bucketName)
         return errors.New("conf表中未找到" + bucketName)
      }
   } else {
      return ErrBucketNotFound
@@ -324,10 +351,10 @@
func (b *BoltStore) Clean(conf *Config) ([]string, []string, error) {
   type kConfLog struct {
      k []byte
      k   []byte
      log *confLog
   }
   tx,err := b.conn.Begin(true)
   tx, err := b.conn.Begin(true)
   if err != nil {
      return nil, nil, err
   }
@@ -339,10 +366,10 @@
      var allCLogs []*kConfLog
      bucket.ForEach(func(k, v []byte) error {
         cLog := &confLog{}
         if e :=decodeMsgPack(v, cLog);e ==nil {
         if e := decodeMsgPack(v, cLog); e == nil {
            allCLogs = append(allCLogs, &kConfLog{
               k:k,
               log:cLog,
               k:   k,
               log: cLog,
            })
         }
         return nil
@@ -351,8 +378,8 @@
      if len(allCLogs) > conf.bucketLimit {
         arr := allCLogs[:len(allCLogs)-conf.bucketLimit]
         leftClogs = allCLogs[len(allCLogs)-conf.bucketLimit:]
         for _,a := range arr {
            if de := tx.DeleteBucket([]byte(a.log.BucketName));de !=nil {
         for _, a := range arr {
            if de := tx.DeleteBucket([]byte(a.log.BucketName)); de != nil {
               delErrBucketNames = append(delErrBucketNames, a.log.BucketName)
            } else {
               bucket.Delete(a.k)
@@ -362,11 +389,11 @@
      } else {
         leftClogs = allCLogs
      }
      if len(leftClogs) >1 {
      if len(leftClogs) > 1 {
         leftClogs = leftClogs[:len(leftClogs)-1]
         for _,a := range leftClogs {
            if int(time.Now().Sub(a.log.UpdateTime).Hours()) > conf.keepDays * 24 {
               if de := tx.DeleteBucket([]byte(a.log.BucketName));de !=nil {
         for _, a := range leftClogs {
            if int(time.Now().Sub(a.log.UpdateTime).Hours()) > conf.keepDays*24 {
               if de := tx.DeleteBucket([]byte(a.log.BucketName)); de != nil {
                  delErrBucketNames = append(delErrBucketNames, a.log.BucketName)
               } else {
                  bucket.Delete(a.k)
@@ -399,7 +426,7 @@
   return tx.Commit()
}
func (b *BoltStore) Size(bucketName string) (int,error) {
func (b *BoltStore) Size(bucketName string) (int, error) {
   tx, err := b.conn.Begin(false)
   if err != nil {
      return 0, err