基于serf的数据库同步模块库
liuxiaolong
2019-10-11 883cdff6638c325f17db200a9c8f4984537f80b8
dbself.go
@@ -8,61 +8,67 @@
   "path/filepath"
   "strings"
   "sync"
   "github.com/jinzhu/gorm"
   "basic.com/valib/logger.git"
)
var Dbconn *Conn
var sy sync.Mutex
const (
   PersonSqliteDBPath = "~/workspace/gitblit/dbserver/config/testdb.db"
   DbT_TableName = "dbtables"
   DBP_TableName = "dbtablepersons"
)
func init() {
   GetConn()
}
var syncMut sync.Mutex
var SerfDbConn *Conn
// get Conn of db for do execute.
func GetConn() error {
   var err error
   path, err := GetCurrentPath()
   if err != nil {
      return errors.New("get current path error")
func InitDbConn(dbPath string) error {
   if dbPath == "" {
      dbPath = PersonSqliteDBPath
   }
   filepath := fmt.Sprintf("%stest.db", path)
   fmt.Println("self: ========>", filepath)
   db, err := New(filepath, "", false)
   logger.Info("self: ========>", dbPath)
   db, err := New(dbPath, "", false)
   if err != nil {
      fmt.Println("new db database: ", err)
      logger.Error("new db database: ", err)
      return err
   }
   Dbconn, err = db.Connect()
   dbConn, err := db.Connect()
   if err != nil {
      fmt.Println("new db conn error; ", err)
      logger.Error("new db conn error; ", err)
      return err
   }
   SerfDbConn = dbConn
   return nil
}
//bak dbdata.
func BakDbFile() (string, error) {
   path, err := GetCurrentPath()
   path, err := getCurrentPath()
   if err != nil {
      return "", errors.New("get current path error")
      logger.Error("getCurrentPath error; ", err)
      return "", err
   }
   filepath := fmt.Sprintf("%stmptest.db", path)
   filepath := path + "tmp.db"
   logger.Info("filepath:", filepath)
   db, err := New(filepath, "", false)
   if err != nil {
      fmt.Println("new db database: ", err)
      logger.Error("new db database: ", err)
      return "", err
   }
   tmpconn, err := db.Connect()
   if err != nil {
      fmt.Println("new db conn error; ", err)
      logger.Error("new db conn error; ", err)
      return "", err
   }
   defer tmpconn.Close()
   err = Dbconn.Backup(tmpconn)
   err = SerfDbConn.Backup(tmpconn)
   if err != nil {
      return "", err
   }
@@ -70,19 +76,146 @@
}
// do exet when get querystring.
func DoExecute(executestring []string) ([]*Result, error) {
   sy.Lock()
   defer sy.Unlock()
   allResults, err := Dbconn.Execute(executestring, false, false)
func ExecuteWriteSql(sqlString []string) ([]*Result, error) {
   syncMut.Lock()
   defer syncMut.Unlock()
   allResults, err := SerfDbConn.Execute(sqlString, false, false)
   if err != nil {
      fmt.Println("execute error!", err)
      logger.Error("execute error!", err)
      return nil, err
   }
   return allResults, nil
}
var localDb *gorm.DB
func InitLocalDb(db *gorm.DB) {
   localDb = db
}
// do exet when get querystring.
func ExecuteQuerySql(sqlString []string) ([]*Rows, error) {
   syncMut.Lock()
   defer syncMut.Unlock()
   rows, err := SerfDbConn.Query(sqlString, false, false)
   if err != nil {
      logger.Error("execute error!", err)
      return nil, err
   }
   return rows, nil
}
func ExecuteSqlByGorm(sqls []string) (bool,error) {
   if localDb != nil {
      localDb.LogMode(false)
      defer localDb.LogMode(true)
      var err error
      tx := localDb.Begin()
      defer func() {
         if err !=nil && tx !=nil {
            tx.Rollback()
         }
      }()
      for _,sql :=range sqls {
         result := tx.Exec(sql)
         err = result.Error
         if err !=nil {
            logger.Error("ExecuteSqlByGorm err:",err,",sql:",sql)
            return false,err
         }
         if result.RowsAffected == 0 {
            logger.Debug("ExecuteSqlByGorm RowsAffected == 0",",sql:",sql)
            err = errors.New("ExecuteSqlByGorm RowsAffected == 0")
            return false,err
         }
      }
      tx.Commit()
      return true,nil
   }
   return false,errors.New("localDb is nil")
}
type TableDesc struct {
   Cid int `json:"cid"`
   Name string `json:"name"`
   Type string `json:"type"`
   Notnull bool `json:"notnull"`
   DFltValue interface{} `json:"dflt_value"`
   Pk int `json:"pk"`
}
type DumpSql struct {
   Sql string `json:"sql"`
}
func ExecuteQueryByGorm(tableNames []string) ([]string, error) {
   localDb.LogMode(false)
   defer localDb.LogMode(true)
   if tableNames !=nil {
      var arr []string
      var dumpSql []DumpSql
      for _,table :=range tableNames {
         dumpSql = make([]DumpSql, 0)
         var tDescArr []TableDesc
         tSql := fmt.Sprintf(`PRAGMA table_info("%s")`, table)
         err := localDb.Raw(tSql).Scan(&tDescArr).Error
         if err !=nil {
            return nil,errors.New("tableDesc err")
         }
         logger.Info(table,"'Columns is:",tDescArr)
         if tDescArr == nil || len(tDescArr) == 0 {
            return nil,errors.New(table+" has no column")
         }
         var columnNames []string
         for _,col :=range tDescArr {
            columnNames = append(columnNames, fmt.Sprintf(`'||quote("%s")||'`, col.Name))
         }
         if table == DbT_TableName {
            tSql = fmt.Sprintf(`SELECT 'INSERT INTO "%s" VALUES(%s)' as sql FROM "%s" where (analyServerId='' or analyServerId is NULL);`,
               table,
               strings.Join(columnNames, ","),
               table)
         } else if table == DBP_TableName {
            tSql = fmt.Sprintf(`SELECT 'INSERT INTO "%s" VALUES(%s)' as sql FROM "%s" where tableId in (select id from dbTables where (analyServerId='' or analyServerId is NULL));`,
               table,
               strings.Join(columnNames, ","),
               table)
         } else {
            tSql = fmt.Sprintf(`SELECT 'INSERT INTO "%s" VALUES(%s)' as sql FROM "%s";`,
               table,
               strings.Join(columnNames, ","),
               table)
         }
         logger.Info("tSql:",tSql)
         err = localDb.Raw(tSql).Scan(&dumpSql).Error
         if err !=nil {
            return nil,errors.New("dump err")
            continue
         }
         if len(dumpSql)>0 {
            for _,d :=range dumpSql {
               arr = append(arr, d.Sql)
            }
         }
      }
      return arr,nil
   }
   return nil,errors.New("tableNames is nil")
}
func Dumpdb() {
   var b strings.Builder
   if err := SerfDbConn.Dump(&b); err != nil {
      logger.Error("dump file ", err.Error())
   }
   logger.Info("%T\n", b)
}
// get current path
func GetCurrentPath() (string, error) {
func getCurrentPath() (string, error) {
   file, err := exec.LookPath(os.Args[0])
   if err != nil {
      return "", err
@@ -99,13 +232,4 @@
      return "", errors.New(`error: Can't find "/" or "\".`)
   }
   return string(path[0 : i+1]), nil
}
func Dumpdb() {
   var b strings.Builder
   if err := Dbconn.Dump(&b); err != nil {
      fmt.Println("dump file ", err.Error())
   }
   fmt.Printf("%T\n", b)
}