From 62a20c839009db883a9056a095ebc9cbcee339d5 Mon Sep 17 00:00:00 2001 From: liuxiaolong <736321739@qq.com> Date: 星期四, 14 五月 2020 20:03:56 +0800 Subject: [PATCH] fix ExecuteWriteSql --- sync.go | 117 +++++++++++++++++++++++++++++++++------------------------- 1 files changed, 66 insertions(+), 51 deletions(-) diff --git a/sync.go b/sync.go index 438a3cb..3ca64f2 100755 --- a/sync.go +++ b/sync.go @@ -5,9 +5,8 @@ sdb "basic.com/Android/syncdb.git" "basic.com/valib/logger.git" "encoding/json" - "strconv" + "fmt" "strings" - "time" ) type ReceiveSqlInterface2 interface { sdb.ReceiveSqlInterface @@ -17,7 +16,7 @@ sdb.RegisterReceiveSqlInterface(c) } -func InitAgent(devID string) bool { +/*func InitAgent(devID string) bool { var nodeIps []string sqlFindAllCluster := string("select * from " + dBNameCluster) clusters, err := sdb.ExecuteQuerySql([]string{sqlFindAllCluster}) @@ -44,9 +43,9 @@ } return true -} +}*/ -func SyncInit(clusterID string, password string, nodeID string, strAddrs string) bool { +func SyncInit(clusterID string, password string, nodeID string, strAddrs string, snapshotPath string) bool { var ips []string if strAddrs == "" { ips = nil @@ -56,13 +55,29 @@ pwdFull := syncClusterKeyPrefix + password - agent, _ = sdb.Init(clusterID, pwdFull, nodeID, ips) + agent, _ = sdb.Init(clusterID, pwdFull, nodeID, ips, snapshotPath) if agent == nil { logger.Error("sdb.Init") return false } return true +} + +type DbHandlerInterface interface { + sdb.DbHandler +} + +type DbDumpHandlerInterface interface { + sdb.DbDumpHandler +} + +func RegisterDbHandler(h DbHandlerInterface) { + sdb.RegisterDbHandler(h) +} + +func RegisterDbDumpHandler(h DbDumpHandlerInterface) { + sdb.RegisterDbDumpHandler(h) } func JoinByNodeAddrs(strAddrs string) bool { @@ -80,20 +95,28 @@ return true } -func JoinCluster(clusterID, password, strAddrs, devID, devIP, devName string) bool { +func JoinCluster(clusterID, password, strAddrs, devID, devIP, devName string, snapshotPath string) bool { - isOk := SyncInit(clusterID, password, devID, strAddrs) + isOk := SyncInit(clusterID, password, devID, strAddrs,snapshotPath) if isOk { logger.Debug("dbSync.Init success") - if ! syncTableDataFromCluster(clusterID, devID, devIP, devName) { + if !syncTableDataFromCluster(clusterID, devID, devIP, devName) { logger.Error("鍔犲叆闆嗙兢澶辫触锛侊紒锛�") + if agent != nil { + agent.Leave() + err := agent.Shutdown() + if err != nil { + logger.Error("syncTableDataFromCluster err,shutdown err:", err) + } + } return false } } else { logger.Error("dbSync.Init error") if agent != nil { + agent.Leave() err := agent.Shutdown() if err != nil { logger.Error("dbSync.Init err,shutdown err:", err) @@ -116,58 +139,52 @@ } func syncTableDataFromCluster(clusterID, devID, devIP, devName string) bool { - var err error - - foreignSql := string("PRAGMA foreign_keys=OFF") - _, err = sdb.ExecuteWriteSql([]string{foreignSql}, false) - if err != nil { - return false - } - - var sqls []string + var sqls = []string{"PRAGMA foreign_keys=OFF"} var delSql string for _, t := range syncTables { - if t == dBNameTables { - delSql = "delete from " + t + " where (analyServerId='' or analyServerId=NULL)" - } else if t == dBNameTablePersons { + if t == dBNameTablePersons { delSql = "delete from " + t + " where tableId in (select id from dbtables where (analyServerId='' or analyServerId=NULL))" + } else if t == dBNameTables { + delSql = "delete from " + t + " where (analyServerId='' or analyServerId=NULL)" } else { delSql = "delete from " + t } sqls = append(sqls, delSql) } - var dumpSqls *[]string - dumpSqls, err = agent.GetTableDataFromCluster(syncTables) - if dumpSqls != nil { - for _, dumpSql := range *dumpSqls { - sqls = append(sqls, dumpSql) - } - } - - logger.Debug("鎴愬姛娣诲姞褰撳墠鑺傜偣鍒伴泦缇よ妭鐐逛腑") - - timeUnix := time.Now().Unix() - fmtTimeStr := time.Unix(timeUnix, 0).Format("2006-01-02 15:04:05") - - sqlSync := "insert into cluster_node(id,cluster_id,node_name,node_id,node_ip,create_time) values ('" + - devID + "','" + clusterID + "','" + devName + "','" + devID + "','" + - (devIP + ":" + strconv.Itoa(sdb.DefaultBindPort)) + "','" + fmtTimeStr + "')" - sqls = append(sqls, sqlSync) - - _, err = sdb.ExecuteWriteSql(sqls, true) - if err != nil { - logger.Debug("sdb.ExecuteWriteSql ERROR:", err) + dumpSqls, err := agent.GetTableDataFromCluster(syncTables) + fmt.Println("len(dumpSqls):", len(*dumpSqls), "err:",err) + if dumpSqls != nil && len(*dumpSqls) > 0 { + sqls = append(sqls, *dumpSqls) + } else { return false } - agent.SyncSql([]string{sqlSync}) + logger.Debug("鎴愬姛鑾峰彇闆嗙兢涓暟鎹�,err:",err) - foreignSql = string("PRAGMA foreign_keys=ON") - _, err = sdb.ExecuteWriteSql([]string{foreignSql}, false) - if err != nil { + //timeUnix := time.Now().Unix() + //fmtTimeStr := time.Unix(timeUnix, 0).Format("2006-01-02 15:04:05") + // + //sqlSync := "insert into cluster_node(id,cluster_id,node_name,node_id,node_ip,create_time) values ('" + + // devID + "','" + clusterID + "','" + devName + "','" + devID + "','" + + // (devIP + ":" + strconv.Itoa(sdb.DefaultBindPort)) + "','" + fmtTimeStr + "')" + //sqls = append(sqls, sqlSync) + sqls = append(sqls, "PRAGMA foreign_keys=ON") + sqlDump := strings.Join(sqls, ";") + if !sdb.DbHandle.Execute(sqlDump) { + logger.Debug("sdb.DbHandle.Execute ret: false") return false + } else { + logger.Debug("sdb.DbHandle.Execute ret: true") } + + //agent.SyncSql([]string{sqlSync}) + + //foreignSql = string("PRAGMA foreign_keys=ON") + //_, err = sdb.ExecuteWriteSql([]string{foreignSql}, false) + //if err != nil { + // return false + //} return true } @@ -181,8 +198,7 @@ func UpdateClusterName(clusterName, clusterID string) bool { sql := string("update cluster set cluster_name='" + clusterName + "' where cluster_id='" + clusterID + "'") - _, err := sdb.ExecuteWriteSql([]string{sql}, false) - if err != nil { + if !sdb.DbHandle.Execute(sql) { return false } @@ -200,10 +216,9 @@ agent.Shutdown() agent = nil - sqls := []string{"delete from cluster_node", "delete from cluster"} + sqls := "delete from cluster_node;delete from cluster;" - _, err = sdb.ExecuteWriteSql(sqls, false) - if err != nil { + if !sdb.DbHandle.Execute(sqls) { return false } } -- Gitblit v1.8.0