| | |
| | | sdb "basic.com/Android/syncdb.git" |
| | | "basic.com/valib/logger.git" |
| | | "encoding/json" |
| | | "strconv" |
| | | "github.com/micro/go-micro/logger" |
| | | "strings" |
| | | "time" |
| | | ) |
| | | type ReceiveSqlInterface2 interface { |
| | | sdb.ReceiveSqlInterface |
| | |
| | | return true |
| | | } |
| | | |
| | | func SyncInit(clusterID string, password string, nodeID string, strAddrs string) bool { |
| | | func SyncInit(clusterID string, password string, nodeID string, strAddrs string, snapshotPath string) bool { |
| | | var ips []string |
| | | if strAddrs == "" { |
| | | ips = nil |
| | |
| | | |
| | | pwdFull := syncClusterKeyPrefix + password |
| | | |
| | | agent, _ = sdb.Init(clusterID, pwdFull, nodeID, ips) |
| | | agent, _ = sdb.Init(clusterID, pwdFull, nodeID, ips, snapshotPath) |
| | | if agent == nil { |
| | | logger.Error("sdb.Init") |
| | | return false |
| | | } |
| | | |
| | | return true |
| | | } |
| | | |
| | | func RegisterDbHandler(h sdb.DbHandler) { |
| | | sdb.RegisterDbHandler(h) |
| | | } |
| | | |
| | | func RegisterDbDumpHandler(h sdb.DbDumpHandler) { |
| | | sdb.RegisterDbDumpHandler(h) |
| | | } |
| | | |
| | | func JoinByNodeAddrs(strAddrs string) bool { |
| | |
| | | return true |
| | | } |
| | | |
| | | func JoinCluster(clusterID, password, strAddrs, devID, devIP, devName string) bool { |
| | | func JoinCluster(clusterID, password, strAddrs, devID, devIP, devName string, snapshotPath string) bool { |
| | | |
| | | isOk := SyncInit(clusterID, password, devID, strAddrs) |
| | | isOk := SyncInit(clusterID, password, devID, strAddrs,snapshotPath) |
| | | |
| | | if isOk { |
| | | logger.Debug("dbSync.Init success") |
| | | |
| | | if ! syncTableDataFromCluster(clusterID, devID, devIP, devName) { |
| | | if !syncTableDataFromCluster(clusterID, devID, devIP, devName) { |
| | | logger.Error("加入集群失败!!!") |
| | | if agent != nil { |
| | | agent.Leave() |
| | | err := agent.Shutdown() |
| | | if err != nil { |
| | | logger.Error("syncTableDataFromCluster err,shutdown err:", err) |
| | | } |
| | | } |
| | | return false |
| | | } |
| | | } else { |
| | | logger.Error("dbSync.Init error") |
| | | if agent != nil { |
| | | agent.Leave() |
| | | err := agent.Shutdown() |
| | | if err != nil { |
| | | logger.Error("dbSync.Init err,shutdown err:", err) |
| | |
| | | } |
| | | |
| | | func syncTableDataFromCluster(clusterID, devID, devIP, devName string) bool { |
| | | var err error |
| | | |
| | | foreignSql := string("PRAGMA foreign_keys=OFF") |
| | | _, err = sdb.ExecuteWriteSql([]string{foreignSql}, false) |
| | | if err != nil { |
| | | return false |
| | | } |
| | | |
| | | var sqls []string |
| | | var sqls = []string{"PRAGMA foreign_keys=OFF"} |
| | | var delSql string |
| | | for _, t := range syncTables { |
| | | if t == dBNameTables { |
| | | delSql = "delete from " + t + " where (analyServerId='' or analyServerId=NULL)" |
| | | } else if t == dBNameTablePersons { |
| | | if t == dBNameTablePersons { |
| | | delSql = "delete from " + t + " where tableId in (select id from dbtables where (analyServerId='' or analyServerId=NULL))" |
| | | } else if t == dBNameTables { |
| | | delSql = "delete from " + t + " where (analyServerId='' or analyServerId=NULL)" |
| | | } else { |
| | | delSql = "delete from " + t |
| | | } |
| | |
| | | } |
| | | |
| | | var dumpSqls *[]string |
| | | dumpSqls, err = agent.GetTableDataFromCluster(syncTables) |
| | | dumpSqls, err := agent.GetTableDataFromCluster(syncTables) |
| | | if dumpSqls != nil { |
| | | for _, dumpSql := range *dumpSqls { |
| | | sqls = append(sqls, dumpSql) |
| | | } |
| | | } |
| | | |
| | | logger.Debug("成功添加当前节点到集群节点中") |
| | | logger.Debug("成功获取集群中数据,err:",err) |
| | | |
| | | timeUnix := time.Now().Unix() |
| | | fmtTimeStr := time.Unix(timeUnix, 0).Format("2006-01-02 15:04:05") |
| | | |
| | | sqlSync := "insert into cluster_node(id,cluster_id,node_name,node_id,node_ip,create_time) values ('" + |
| | | devID + "','" + clusterID + "','" + devName + "','" + devID + "','" + |
| | | (devIP + ":" + strconv.Itoa(sdb.DefaultBindPort)) + "','" + fmtTimeStr + "')" |
| | | sqls = append(sqls, sqlSync) |
| | | |
| | | _, err = sdb.ExecuteWriteSql(sqls, true) |
| | | if err != nil { |
| | | logger.Debug("sdb.ExecuteWriteSql ERROR:", err) |
| | | //timeUnix := time.Now().Unix() |
| | | //fmtTimeStr := time.Unix(timeUnix, 0).Format("2006-01-02 15:04:05") |
| | | // |
| | | //sqlSync := "insert into cluster_node(id,cluster_id,node_name,node_id,node_ip,create_time) values ('" + |
| | | // devID + "','" + clusterID + "','" + devName + "','" + devID + "','" + |
| | | // (devIP + ":" + strconv.Itoa(sdb.DefaultBindPort)) + "','" + fmtTimeStr + "')" |
| | | //sqls = append(sqls, sqlSync) |
| | | sqls = append(sqls, "PRAGMA foreign_keys=ON") |
| | | sqlDump := strings.Join(sqls, ";") |
| | | if !sdb.DbHandle.Execute(sqlDump) { |
| | | logger.Debug("sdb.DbHandle.Execute ret: false") |
| | | return false |
| | | } else { |
| | | logger.Debug("sdb.DbHandle.Execute ret: true") |
| | | } |
| | | |
| | | agent.SyncSql([]string{sqlSync}) |
| | | //agent.SyncSql([]string{sqlSync}) |
| | | |
| | | foreignSql = string("PRAGMA foreign_keys=ON") |
| | | _, err = sdb.ExecuteWriteSql([]string{foreignSql}, false) |
| | | if err != nil { |
| | | return false |
| | | } |
| | | //foreignSql = string("PRAGMA foreign_keys=ON") |
| | | //_, err = sdb.ExecuteWriteSql([]string{foreignSql}, false) |
| | | //if err != nil { |
| | | // return false |
| | | //} |
| | | |
| | | return true |
| | | } |