基于serf的数据库同步模块库
liuxiaolong
2019-09-28 c02c0ac5e06ee1a556107baa071ef0c80a387a13
dbself.go
@@ -6,9 +6,11 @@
   "os"
   "os/exec"
   "path/filepath"
   "strconv"
   "strings"
   "sync"
   "github.com/jinzhu/gorm"
   "basic.com/valib/logger.git"
)
const (
@@ -27,15 +29,15 @@
      dbPath = PersonSqliteDBPath
   }
   fmt.Println("self: ========>", dbPath)
   logger.Info("self: ========>", dbPath)
   db, err := New(dbPath, "", false)
   if err != nil {
      fmt.Println("new db database: ", err)
      logger.Error("new db database: ", err)
      return err
   }
   dbConn, err := db.Connect()
   if err != nil {
      fmt.Println("new db conn error; ", err)
      logger.Error("new db conn error; ", err)
      return err
   }
@@ -48,21 +50,21 @@
   path, err := getCurrentPath()
   if err != nil {
      fmt.Println("getCurrentPath error; ", err)
      logger.Error("getCurrentPath error; ", err)
      return "", err
   }
   filepath := path + "tmp.db"
   fmt.Println("filepath:", filepath)
   logger.Info("filepath:", filepath)
   db, err := New(filepath, "", false)
   if err != nil {
      fmt.Println("new db database: ", err)
      logger.Error("new db database: ", err)
      return "", err
   }
   tmpconn, err := db.Connect()
   if err != nil {
      fmt.Println("new db conn error; ", err)
      logger.Error("new db conn error; ", err)
      return "", err
   }
   defer tmpconn.Close()
@@ -80,7 +82,7 @@
   defer syncMut.Unlock()
   allResults, err := SerfDbConn.Execute(sqlString, false, false)
   if err != nil {
      fmt.Println("execute error!", err)
      logger.Error("execute error!", err)
      return nil, err
   }
   return allResults, nil
@@ -97,7 +99,7 @@
   defer syncMut.Unlock()
   rows, err := SerfDbConn.Query(sqlString, false, false)
   if err != nil {
      fmt.Println("execute error!", err)
      logger.Error("execute error!", err)
      return nil, err
   }
   return rows, nil
@@ -121,7 +123,7 @@
            return false,result.Error
         }
         if result.RowsAffected == 0{
            fmt.Println("ExecuteSqlByGorm fail")
            logger.Error("ExecuteSqlByGorm fail")
            return false,errors.New("ExecuteSqlByGorm fail")
         }
      }
@@ -129,6 +131,39 @@
      return true,nil
   }
   return false,errors.New("localDb is nil")
}
type SyncSerf struct {
   LamportTime string `json:"lamport_time"`
}
func QueryLTimeFromDbByGorm() uint64 {
   if localDb != nil {
      var syncSerf []SyncSerf
      err := localDb.Raw("select * from sync_serf").Scan(&syncSerf).Error
      if err == nil {
         if len(syncSerf) > 0{
            ltStr := syncSerf[0].LamportTime
            logger.Info("db.LamportTime str:", ltStr)
            t, e := strconv.ParseUint(ltStr, 10, 64)
            if e != nil {
               logger.Error("db.LamportTime parseUint err:", e)
            } else {
               curLTime = t
            }
            logger.Info("db.LamportTime:", ltStr)
         }else {
            err = localDb.Exec("insert into sync_serf values('0')").Error
            if err !=nil {
               logger.Error("sync_serf lamport_time init err:",err)
            }
         }
      } else {
         logger.Error("get db.LamportTime err:", err)
      }
   }
   return 0
}
type TableDesc struct {
@@ -158,7 +193,7 @@
         if err !=nil {
            return nil,errors.New("tableDesc err")
         }
         fmt.Println(table,"'Columns is:",tDescArr)
         logger.Info(table,"'Columns is:",tDescArr)
         if tDescArr == nil || len(tDescArr) == 0 {
            return nil,errors.New(table+" has no column")
         }
@@ -183,7 +218,7 @@
               table)
         }
         fmt.Println("tSql:",tSql)
         logger.Info("tSql:",tSql)
         err = localDb.Raw(tSql).Scan(&dumpSql).Error
         if err !=nil {
@@ -206,9 +241,9 @@
   var b strings.Builder
   if err := SerfDbConn.Dump(&b); err != nil {
      fmt.Println("dump file ", err.Error())
      logger.Error("dump file ", err.Error())
   }
   fmt.Printf("%T\n", b)
   logger.Info("%T\n", b)
}
// get current path