liuxiaolong
2020-05-13 649f2557ab1ce741a5573b14b00011206f79be74
sync.go
@@ -1,48 +1,51 @@
package androidSync
//package main
import (
   sdb "basic.com/Android/syncdb.git"
   "basic.com/valib/logger.git"
   "encoding/json"
   "strconv"
   "github.com/micro/go-micro/logger"
   "strings"
   "time"
)
type ReceiveSqlInterface2 interface {
   sdb.ReceiveSqlInterface
}
/*
   每次开机后都会调用该接口,该接口会去查询数据库,确实之前是否已经加入过集群,若是已经加入集群,则开机自动加入
 */
func RegisterReceiveSqlInterfaceFromJava(c ReceiveSqlInterface2) {
   sdb.RegisterReceiveSqlInterface(c)
}
func InitAgent(devID string) bool {
   var nodeIps []string
   sqlFindAllCluster := string("select * from " + dBNameCluster)
   clusters, err := sdb.ExecuteQuerySql([]string{sqlFindAllCluster})
   if err == nil && clusters != nil && len(clusters) > 0 {
   if err == nil && clusters != nil && len(clusters) > 0 && clusters[0].Values != nil {
      c := clusters[0]
      sqlFindNodesByClusterId := string("select * from " + dBNameClusterNode + " where cluster_id='" + c.Values[0][1].(string) + "'")
      nodes, err := sdb.ExecuteQuerySql([]string{sqlFindNodesByClusterId})
      if err == nil && nodes != nil && len(nodes) > 0 {
         var nodeIps []string
         for _, n := range nodes {
            if n.Values[0][3].(string) != devID {
            if n.Values != nil && n.Values[0][3].(string) != devID {
               nodeIps = append(nodeIps, n.Values[0][4].(string))
            }
         }
         agent, err = sdb.Init(c.Values[0][0].(string), c.Values[0][2].(string), devID, nodeIps)
         if agent != nil {
            logger.Debug("sync.Agent init success!")
         } else {
            logger.Debug("sync.Agent init fail!")
         }
      }
      //agent, err = sdb.Init(c.Values[0][0].(string), c.Values[0][2].(string), devID, nodeIps)
      //if agent != nil {
      //   logger.Debug("sync.Agent init success!")
      //} else {
      //   logger.Debug("sync.Agent init fail!")
      //}
   }
   return true
}
/*
   之前没有集群。调用此接口进行集群初始化,传入集群其他节点的ip,便于初始化后直接加入集群
   strAddrs = "ip1:port1;ip2:port2;ip3:port3"
*/
func SyncInit(clusterID string, password string, nodeID string, strAddrs string) bool {
func SyncInit(clusterID string, password string, nodeID string, strAddrs string, snapshotPath string) bool {
   var ips []string
   if strAddrs == "" {
      ips = nil
@@ -52,7 +55,7 @@
   pwdFull := syncClusterKeyPrefix + password
   agent, _ = sdb.Init(clusterID, pwdFull, nodeID, ips)
   agent, _ = sdb.Init(clusterID, pwdFull, nodeID, ips, snapshotPath)
   if agent == nil {
      logger.Error("sdb.Init")
      return false
@@ -61,10 +64,14 @@
   return true
}
/*
   初始化时,若没能加入集群,可以通过该接口加入集群。
   strAddrs = "ip1:port1;ip2:port2;ip3:port3"
*/
func RegisterDbHandler(h sdb.DbHandler) {
   sdb.RegisterDbHandler(h)
}
func RegisterDbDumpHandler(h sdb.DbDumpHandler) {
   sdb.RegisterDbDumpHandler(h)
}
func JoinByNodeAddrs(strAddrs string) bool {
   if strAddrs == "" {
      logger.Error("strAddrs == \"\"")
@@ -80,24 +87,28 @@
   return true
}
/*
   加入集群,包含初始化节点SyncInit,并根据传入的集群其他节点列表自动加入集群
   strAddrs = "ip1:port1;ip2:port2;ip3:port3"
*/
func JoinCluster(clusterID, password, strAddrs, devID, devIP, devName string) bool {
func JoinCluster(clusterID, password, strAddrs, devID, devIP, devName string, snapshotPath string) bool {
   isOk := SyncInit(clusterID, password, devID, strAddrs)
   isOk := SyncInit(clusterID, password, devID, strAddrs,snapshotPath)
   if isOk { //加入成功
   if isOk {
      logger.Debug("dbSync.Init success")
      if ! syncTableDataFromCluster(clusterID, devID, devIP, devName) {
      if !syncTableDataFromCluster(clusterID, devID, devIP, devName) {
         logger.Error("加入集群失败!!!")
         if agent != nil {
            agent.Leave()
            err := agent.Shutdown()
            if err != nil {
               logger.Error("syncTableDataFromCluster err,shutdown err:", err)
            }
         }
         return false
      }
   } else {
      logger.Error("dbSync.Init error")
      if agent != nil {
         agent.Leave()
         err := agent.Shutdown()
         if err != nil {
            logger.Error("dbSync.Init err,shutdown err:", err)
@@ -109,22 +120,6 @@
   return true
}
/*
   加入集群后,可以通过该接口获取集群的节点信息,不过最好直接查同步库的集群节点表
json vector
[
{Node1},
{Node2},
...
{Noden}
]
type NodeInfo struct {
   ClusterID   string `json:"clusterID"`
   NodeID      string `json:"nodeID"`
   NodeAddress string `json:"nodeAddress"`
   IsAlive     int    `json:"isAlive"` //StatusNone:0, StatusAlive:1, StatusLeaving:2, StatusLeft:3, StatusFailed:4
}
*/
func GetNodes() []byte {
   nodes := agent.GetNodes()
   strNode, err := json.Marshal(nodes)
@@ -135,77 +130,57 @@
   return strNode
}
/*
   加入集群后, 清空本地的同步库数据,并从集群拉取最新的同步库数据
 */
func syncTableDataFromCluster(clusterID, devID, devIP, devName string) bool {
   var err error
   //0.关闭reference
   foreignSql := string("PRAGMA foreign_keys=OFF")
   _, err = sdb.ExecuteWriteSql([]string{foreignSql}, false)
   if err != nil {
      return false
   }
   //1.删除本地的同步库数据
   var sqls []string
   var sqls = []string{"PRAGMA foreign_keys=OFF"}
   var delSql string
   for _, t := range syncTables {
      if t == dBNameTables {
         delSql = "delete from " + t + " where (analyServerId='' or analyServerId=NULL)"
      } else if t == dBNameTablePersons {
      if t == dBNameTablePersons {
         delSql = "delete from " + t + " where tableId in (select id from dbtables where (analyServerId='' or analyServerId=NULL))"
      } else if t == dBNameTables {
         delSql = "delete from " + t + " where (analyServerId='' or analyServerId=NULL)"
      } else {
         delSql = "delete from " + t
      }
      sqls = append(sqls, delSql)
   }
   //2.拉取集群内的同步库数据到本地数据库表中
   var dumpSqls *[]string
   dumpSqls, err = agent.GetTableDataFromCluster(syncTables)
   dumpSqls, err := agent.GetTableDataFromCluster(syncTables)
   if dumpSqls != nil {
      for _, dumpSql := range *dumpSqls {
         sqls = append(sqls, dumpSql)
      }
   }
   logger.Debug("成功添加当前节点到集群节点中")
   logger.Debug("成功获取集群中数据,err:",err)
   //3.将本节点加入到节点列表中
   timeUnix := time.Now().Unix()
   fmtTimeStr := time.Unix(timeUnix, 0).Format("2006-01-02 15:04:05")
   sqlSync := "insert into cluster_node(id,cluster_id,node_name,node_id,node_ip,create_time) values ('" +
      devID + "','" + clusterID + "','" + devName + "','" + devID + "','" +
      (devIP + ":" + strconv.Itoa(sdb.DefaultBindPort)) + "','" + fmtTimeStr + "')"
   sqls = append(sqls, sqlSync)
   //4. 写入数据库
   _, err = sdb.ExecuteWriteSql(sqls, true)
   if err != nil {
      logger.Debug("sdb.ExecuteWriteSql ERROR:", err)
   //timeUnix := time.Now().Unix()
   //fmtTimeStr := time.Unix(timeUnix, 0).Format("2006-01-02 15:04:05")
   //
   //sqlSync := "insert into cluster_node(id,cluster_id,node_name,node_id,node_ip,create_time) values ('" +
   //   devID + "','" + clusterID + "','" + devName + "','" + devID + "','" +
   //   (devIP + ":" + strconv.Itoa(sdb.DefaultBindPort)) + "','" + fmtTimeStr + "')"
   //sqls = append(sqls, sqlSync)
   sqls = append(sqls, "PRAGMA foreign_keys=ON")
   sqlDump := strings.Join(sqls, ";")
   if !sdb.DbHandle.Execute(sqlDump) {
      logger.Debug("sdb.DbHandle.Execute ret: false")
      return false
   } else {
      logger.Debug("sdb.DbHandle.Execute ret: true")
   }
   //5. 同步该节点到集群
   agent.SyncSql([]string{sqlSync})
   //agent.SyncSql([]string{sqlSync})
   //6.开启reference
   foreignSql = string("PRAGMA foreign_keys=ON")
   _, err = sdb.ExecuteWriteSql([]string{foreignSql}, false)
   if err != nil {
      return false
   }
   //foreignSql = string("PRAGMA foreign_keys=ON")
   //_, err = sdb.ExecuteWriteSql([]string{foreignSql}, false)
   //if err != nil {
   //   return false
   //}
   return true
}
/*
   操作数据库后,需要调用该接口将对应的sql语句同步到集群
   strSql = "sql1;sql2;sql3;...;sqln"
 */
func SyncSql(strSql string) {
   sqls := strings.Split(strSql, ";")
@@ -213,9 +188,6 @@
   agent.SyncSql(sqls)
}
/*
   更新集群的名字
 */
func UpdateClusterName(clusterName, clusterID string) bool {
   sql := string("update cluster set cluster_name='" + clusterName + "' where cluster_id='" + clusterID + "'")
   _, err := sdb.ExecuteWriteSql([]string{sql}, false)
@@ -226,9 +198,6 @@
   return true
}
/*
   退出集群
 */
func Leave() bool {
   if agent != nil {
@@ -249,4 +218,4 @@
   }
   return true
}
}