liuxiaolong
2020-05-13 649f2557ab1ce741a5573b14b00011206f79be74
sync.go
@@ -5,9 +5,8 @@
   sdb "basic.com/Android/syncdb.git"
   "basic.com/valib/logger.git"
   "encoding/json"
   "strconv"
   "github.com/micro/go-micro/logger"
   "strings"
   "time"
)
type ReceiveSqlInterface2 interface {
   sdb.ReceiveSqlInterface
@@ -46,7 +45,7 @@
   return true
}
func SyncInit(clusterID string, password string, nodeID string, strAddrs string) bool {
func SyncInit(clusterID string, password string, nodeID string, strAddrs string, snapshotPath string) bool {
   var ips []string
   if strAddrs == "" {
      ips = nil
@@ -56,13 +55,21 @@
   pwdFull := syncClusterKeyPrefix + password
   agent, _ = sdb.Init(clusterID, pwdFull, nodeID, ips)
   agent, _ = sdb.Init(clusterID, pwdFull, nodeID, ips, snapshotPath)
   if agent == nil {
      logger.Error("sdb.Init")
      return false
   }
   return true
}
func RegisterDbHandler(h sdb.DbHandler) {
   sdb.RegisterDbHandler(h)
}
func RegisterDbDumpHandler(h sdb.DbDumpHandler) {
   sdb.RegisterDbDumpHandler(h)
}
func JoinByNodeAddrs(strAddrs string) bool {
@@ -80,20 +87,28 @@
   return true
}
func JoinCluster(clusterID, password, strAddrs, devID, devIP, devName string) bool {
func JoinCluster(clusterID, password, strAddrs, devID, devIP, devName string, snapshotPath string) bool {
   isOk := SyncInit(clusterID, password, devID, strAddrs)
   isOk := SyncInit(clusterID, password, devID, strAddrs,snapshotPath)
   if isOk { 
      logger.Debug("dbSync.Init success")
      if ! syncTableDataFromCluster(clusterID, devID, devIP, devName) {
      if !syncTableDataFromCluster(clusterID, devID, devIP, devName) {
         logger.Error("加入集群失败!!!")
         if agent != nil {
            agent.Leave()
            err := agent.Shutdown()
            if err != nil {
               logger.Error("syncTableDataFromCluster err,shutdown err:", err)
            }
         }
         return false
      }
   } else {
      logger.Error("dbSync.Init error")
      if agent != nil {
         agent.Leave()
         err := agent.Shutdown()
         if err != nil {
            logger.Error("dbSync.Init err,shutdown err:", err)
@@ -116,21 +131,13 @@
}
func syncTableDataFromCluster(clusterID, devID, devIP, devName string) bool {
   var err error
   foreignSql := string("PRAGMA foreign_keys=OFF")
   _, err = sdb.ExecuteWriteSql([]string{foreignSql}, false)
   if err != nil {
      return false
   }
   var sqls []string
   var sqls = []string{"PRAGMA foreign_keys=OFF"}
   var delSql string
   for _, t := range syncTables {
      if t == dBNameTables {
         delSql = "delete from " + t + " where (analyServerId='' or analyServerId=NULL)"
      } else if t == dBNameTablePersons {
      if t == dBNameTablePersons {
         delSql = "delete from " + t + " where tableId in (select id from dbtables where (analyServerId='' or analyServerId=NULL))"
      } else if t == dBNameTables {
         delSql = "delete from " + t + " where (analyServerId='' or analyServerId=NULL)"
      } else {
         delSql = "delete from " + t
      }
@@ -138,36 +145,38 @@
   }
   var dumpSqls *[]string
   dumpSqls, err = agent.GetTableDataFromCluster(syncTables)
   dumpSqls, err := agent.GetTableDataFromCluster(syncTables)
   if dumpSqls != nil {
      for _, dumpSql := range *dumpSqls {
         sqls = append(sqls, dumpSql)
      }
   }
   logger.Debug("成功添加当前节点到集群节点中")
   logger.Debug("成功获取集群中数据,err:",err)
   timeUnix := time.Now().Unix()
   fmtTimeStr := time.Unix(timeUnix, 0).Format("2006-01-02 15:04:05")
   sqlSync := "insert into cluster_node(id,cluster_id,node_name,node_id,node_ip,create_time) values ('" +
      devID + "','" + clusterID + "','" + devName + "','" + devID + "','" +
      (devIP + ":" + strconv.Itoa(sdb.DefaultBindPort)) + "','" + fmtTimeStr + "')"
   sqls = append(sqls, sqlSync)
   _, err = sdb.ExecuteWriteSql(sqls, true)
   if err != nil {
      logger.Debug("sdb.ExecuteWriteSql ERROR:", err)
   //timeUnix := time.Now().Unix()
   //fmtTimeStr := time.Unix(timeUnix, 0).Format("2006-01-02 15:04:05")
   //
   //sqlSync := "insert into cluster_node(id,cluster_id,node_name,node_id,node_ip,create_time) values ('" +
   //   devID + "','" + clusterID + "','" + devName + "','" + devID + "','" +
   //   (devIP + ":" + strconv.Itoa(sdb.DefaultBindPort)) + "','" + fmtTimeStr + "')"
   //sqls = append(sqls, sqlSync)
   sqls = append(sqls, "PRAGMA foreign_keys=ON")
   sqlDump := strings.Join(sqls, ";")
   if !sdb.DbHandle.Execute(sqlDump) {
      logger.Debug("sdb.DbHandle.Execute ret: false")
      return false
   } else {
      logger.Debug("sdb.DbHandle.Execute ret: true")
   }
   agent.SyncSql([]string{sqlSync})
   //agent.SyncSql([]string{sqlSync})
   foreignSql = string("PRAGMA foreign_keys=ON")
   _, err = sdb.ExecuteWriteSql([]string{foreignSql}, false)
   if err != nil {
      return false
   }
   //foreignSql = string("PRAGMA foreign_keys=ON")
   //_, err = sdb.ExecuteWriteSql([]string{foreignSql}, false)
   //if err != nil {
   //   return false
   //}
   return true
}