liuxiaolong
2020-05-14 91bd26e795e78ebfe8570f31cb80915b81d93586
sync.go
@@ -5,9 +5,8 @@
   sdb "basic.com/Android/syncdb.git"
   "basic.com/valib/logger.git"
   "encoding/json"
   "strconv"
   "fmt"
   "strings"
   "time"
)
type ReceiveSqlInterface2 interface {
   sdb.ReceiveSqlInterface
@@ -17,7 +16,7 @@
   sdb.RegisterReceiveSqlInterface(c)
}
func InitAgent(devID string) bool {
/*func InitAgent(devID string) bool {
   var nodeIps []string
   sqlFindAllCluster := string("select * from " + dBNameCluster)
   clusters, err := sdb.ExecuteQuerySql([]string{sqlFindAllCluster})
@@ -44,9 +43,9 @@
   }
   return true
}
}*/
func SyncInit(clusterID string, password string, nodeID string, strAddrs string) bool {
func SyncInit(clusterID string, password string, nodeID string, strAddrs string, snapshotPath string) bool {
   var ips []string
   if strAddrs == "" {
      ips = nil
@@ -56,13 +55,29 @@
   pwdFull := syncClusterKeyPrefix + password
   agent, _ = sdb.Init(clusterID, pwdFull, nodeID, ips)
   agent, _ = sdb.Init(clusterID, pwdFull, nodeID, ips, snapshotPath)
   if agent == nil {
      logger.Error("sdb.Init")
      return false
   }
   return true
}
type DbHandlerInterface interface {
   sdb.DbHandler
}
type DbDumpHandlerInterface interface {
   sdb.DbDumpHandler
}
func RegisterDbHandler(h DbHandlerInterface) {
   sdb.RegisterDbHandler(h)
}
func RegisterDbDumpHandler(h DbDumpHandlerInterface) {
   sdb.RegisterDbDumpHandler(h)
}
func JoinByNodeAddrs(strAddrs string) bool {
@@ -80,20 +95,28 @@
   return true
}
func JoinCluster(clusterID, password, strAddrs, devID, devIP, devName string) bool {
func JoinCluster(clusterID, password, strAddrs, devID, devIP, devName string, snapshotPath string) bool {
   isOk := SyncInit(clusterID, password, devID, strAddrs)
   isOk := SyncInit(clusterID, password, devID, strAddrs,snapshotPath)
   if isOk { 
      logger.Debug("dbSync.Init success")
      if ! syncTableDataFromCluster(clusterID, devID, devIP, devName) {
      if !syncTableDataFromCluster(clusterID, devID, devIP, devName) {
         logger.Error("加入集群失败!!!")
         if agent != nil {
            agent.Leave()
            err := agent.Shutdown()
            if err != nil {
               logger.Error("syncTableDataFromCluster err,shutdown err:", err)
            }
         }
         return false
      }
   } else {
      logger.Error("dbSync.Init error")
      if agent != nil {
         agent.Leave()
         err := agent.Shutdown()
         if err != nil {
            logger.Error("dbSync.Init err,shutdown err:", err)
@@ -116,59 +139,53 @@
}
func syncTableDataFromCluster(clusterID, devID, devIP, devName string) bool {
   var err error
   foreignSql := string("PRAGMA foreign_keys=OFF")
   _, err = sdb.ExecuteWriteSql([]string{foreignSql}, false)
   if err != nil {
      return false
   }
   var sqls []string
   var sqls = []string{"PRAGMA foreign_keys=OFF"}
   var delSql string
   for _, t := range syncTables {
      if t == dBNameTables {
         delSql = "delete from " + t + " where (analyServerId='' or analyServerId=NULL)"
      } else if t == dBNameTablePersons {
      if t == dBNameTablePersons {
         delSql = "delete from " + t + " where tableId in (select id from dbtables where (analyServerId='' or analyServerId=NULL))"
      } else if t == dBNameTables {
         delSql = "delete from " + t + " where (analyServerId='' or analyServerId=NULL)"
      } else {
         delSql = "delete from " + t
      }
      sqls = append(sqls, delSql)
   }
   var dumpSqls *[]string
   dumpSqls, err = agent.GetTableDataFromCluster(syncTables)
   if dumpSqls != nil {
      for _, dumpSql := range *dumpSqls {
         sqls = append(sqls, dumpSql)
      }
   }
   logger.Debug("成功添加当前节点到集群节点中")
   timeUnix := time.Now().Unix()
   fmtTimeStr := time.Unix(timeUnix, 0).Format("2006-01-02 15:04:05")
   sqlSync := "insert into cluster_node(id,cluster_id,node_name,node_id,node_ip,create_time) values ('" +
      devID + "','" + clusterID + "','" + devName + "','" + devID + "','" +
      (devIP + ":" + strconv.Itoa(sdb.DefaultBindPort)) + "','" + fmtTimeStr + "')"
   sqls = append(sqls, sqlSync)
   _, err = sdb.ExecuteWriteSql(sqls, true)
   if err != nil {
      logger.Debug("sdb.ExecuteWriteSql ERROR:", err)
   dumpSqls, err := agent.GetTableDataFromCluster(syncTables)
   fmt.Println("len(dumpSqls):", len(*dumpSqls), "err:",err)
   if dumpSqls != nil && len(*dumpSqls) > 0 {
      sqls = append(sqls, *dumpSqls)
   } else {
      return false
   }
   agent.SyncSql([]string{sqlSync})
   logger.Debug("成功获取集群中数据,err:",err)
   foreignSql = string("PRAGMA foreign_keys=ON")
   _, err = sdb.ExecuteWriteSql([]string{foreignSql}, false)
   if err != nil {
   //timeUnix := time.Now().Unix()
   //fmtTimeStr := time.Unix(timeUnix, 0).Format("2006-01-02 15:04:05")
   //
   //sqlSync := "insert into cluster_node(id,cluster_id,node_name,node_id,node_ip,create_time) values ('" +
   //   devID + "','" + clusterID + "','" + devName + "','" + devID + "','" +
   //   (devIP + ":" + strconv.Itoa(sdb.DefaultBindPort)) + "','" + fmtTimeStr + "')"
   //sqls = append(sqls, sqlSync)
   sqls = append(sqls, "PRAGMA foreign_keys=ON")
   sqlDump := strings.Join(sqls, ";")
   if !sdb.DbHandle.Execute(sqlDump) {
      logger.Debug("sdb.DbHandle.Execute ret: false")
      return false
   } else {
      logger.Debug("sdb.DbHandle.Execute ret: true")
   }
   //agent.SyncSql([]string{sqlSync})
   //foreignSql = string("PRAGMA foreign_keys=ON")
   //_, err = sdb.ExecuteWriteSql([]string{foreignSql}, false)
   //if err != nil {
   //   return false
   //}
   return true
}