liuxiaolong
2020-05-13 a6af456f6aae31c0abb6cb1f87a450a1ab9adebb
sync.go
@@ -5,9 +5,7 @@
   sdb "basic.com/Android/syncdb.git"
   "basic.com/valib/logger.git"
   "encoding/json"
   "strconv"
   "strings"
   "time"
)
type ReceiveSqlInterface2 interface {
   sdb.ReceiveSqlInterface
@@ -65,6 +63,22 @@
   return true
}
type DbHandlerInterface interface {
   sdb.DbHandler
}
type DbDumpHandlerInterface interface {
   sdb.DbDumpHandler
}
func RegisterDbHandler(h DbHandlerInterface) {
   sdb.RegisterDbHandler(h)
}
func RegisterDbDumpHandler(h DbDumpHandlerInterface) {
   sdb.RegisterDbDumpHandler(h)
}
func JoinByNodeAddrs(strAddrs string) bool {
   if strAddrs == "" {
      logger.Error("strAddrs == \"\"")
@@ -87,13 +101,21 @@
   if isOk { 
      logger.Debug("dbSync.Init success")
      if ! syncTableDataFromCluster(clusterID, devID, devIP, devName) {
      if !syncTableDataFromCluster(clusterID, devID, devIP, devName) {
         logger.Error("加入集群失败!!!")
         if agent != nil {
            agent.Leave()
            err := agent.Shutdown()
            if err != nil {
               logger.Error("syncTableDataFromCluster err,shutdown err:", err)
            }
         }
         return false
      }
   } else {
      logger.Error("dbSync.Init error")
      if agent != nil {
         agent.Leave()
         err := agent.Shutdown()
         if err != nil {
            logger.Error("dbSync.Init err,shutdown err:", err)
@@ -116,21 +138,13 @@
}
func syncTableDataFromCluster(clusterID, devID, devIP, devName string) bool {
   var err error
   foreignSql := string("PRAGMA foreign_keys=OFF")
   _, err = sdb.ExecuteWriteSql([]string{foreignSql}, false)
   if err != nil {
      return false
   }
   var sqls []string
   var sqls = []string{"PRAGMA foreign_keys=OFF"}
   var delSql string
   for _, t := range syncTables {
      if t == dBNameTables {
         delSql = "delete from " + t + " where (analyServerId='' or analyServerId=NULL)"
      } else if t == dBNameTablePersons {
      if t == dBNameTablePersons {
         delSql = "delete from " + t + " where tableId in (select id from dbtables where (analyServerId='' or analyServerId=NULL))"
      } else if t == dBNameTables {
         delSql = "delete from " + t + " where (analyServerId='' or analyServerId=NULL)"
      } else {
         delSql = "delete from " + t
      }
@@ -138,36 +152,38 @@
   }
   var dumpSqls *[]string
   dumpSqls, err = agent.GetTableDataFromCluster(syncTables)
   dumpSqls, err := agent.GetTableDataFromCluster(syncTables)
   if dumpSqls != nil {
      for _, dumpSql := range *dumpSqls {
         sqls = append(sqls, dumpSql)
      }
   }
   logger.Debug("成功添加当前节点到集群节点中")
   logger.Debug("成功获取集群中数据,err:",err)
   timeUnix := time.Now().Unix()
   fmtTimeStr := time.Unix(timeUnix, 0).Format("2006-01-02 15:04:05")
   sqlSync := "insert into cluster_node(id,cluster_id,node_name,node_id,node_ip,create_time) values ('" +
      devID + "','" + clusterID + "','" + devName + "','" + devID + "','" +
      (devIP + ":" + strconv.Itoa(sdb.DefaultBindPort)) + "','" + fmtTimeStr + "')"
   sqls = append(sqls, sqlSync)
   _, err = sdb.ExecuteWriteSql(sqls, true)
   if err != nil {
      logger.Debug("sdb.ExecuteWriteSql ERROR:", err)
   //timeUnix := time.Now().Unix()
   //fmtTimeStr := time.Unix(timeUnix, 0).Format("2006-01-02 15:04:05")
   //
   //sqlSync := "insert into cluster_node(id,cluster_id,node_name,node_id,node_ip,create_time) values ('" +
   //   devID + "','" + clusterID + "','" + devName + "','" + devID + "','" +
   //   (devIP + ":" + strconv.Itoa(sdb.DefaultBindPort)) + "','" + fmtTimeStr + "')"
   //sqls = append(sqls, sqlSync)
   sqls = append(sqls, "PRAGMA foreign_keys=ON")
   sqlDump := strings.Join(sqls, ";")
   if !sdb.DbHandle.Execute(sqlDump) {
      logger.Debug("sdb.DbHandle.Execute ret: false")
      return false
   } else {
      logger.Debug("sdb.DbHandle.Execute ret: true")
   }
   agent.SyncSql([]string{sqlSync})
   //agent.SyncSql([]string{sqlSync})
   foreignSql = string("PRAGMA foreign_keys=ON")
   _, err = sdb.ExecuteWriteSql([]string{foreignSql}, false)
   if err != nil {
      return false
   }
   //foreignSql = string("PRAGMA foreign_keys=ON")
   //_, err = sdb.ExecuteWriteSql([]string{foreignSql}, false)
   //if err != nil {
   //   return false
   //}
   return true
}