From 14f478ef3cc343e72f76e07b75ea6927b8f305f6 Mon Sep 17 00:00:00 2001 From: liuxiaolong <736321739@qq.com> Date: 星期三, 13 五月 2020 18:10:48 +0800 Subject: [PATCH] sync share data from cluster --- sync.go | 166 ++++++++++++++++++++++--------------------------------- 1 files changed, 67 insertions(+), 99 deletions(-) diff --git a/sync.go b/sync.go index 328683b..0370b53 100755 --- a/sync.go +++ b/sync.go @@ -1,48 +1,50 @@ package androidSync +//package main import ( sdb "basic.com/Android/syncdb.git" "basic.com/valib/logger.git" "encoding/json" - "strconv" "strings" - "time" ) +type ReceiveSqlInterface2 interface { + sdb.ReceiveSqlInterface +} -/* - 姣忔寮�鏈哄悗閮戒細璋冪敤璇ユ帴鍙o紝璇ユ帴鍙d細鍘绘煡璇㈡暟鎹簱锛岀‘瀹炰箣鍓嶆槸鍚﹀凡缁忓姞鍏ヨ繃闆嗙兢锛岃嫢鏄凡缁忓姞鍏ラ泦缇わ紝鍒欏紑鏈鸿嚜鍔ㄥ姞鍏� - */ +func RegisterReceiveSqlInterfaceFromJava(c ReceiveSqlInterface2) { + sdb.RegisterReceiveSqlInterface(c) +} + func InitAgent(devID string) bool { + var nodeIps []string sqlFindAllCluster := string("select * from " + dBNameCluster) clusters, err := sdb.ExecuteQuerySql([]string{sqlFindAllCluster}) - if err == nil && clusters != nil && len(clusters) > 0 { + if err == nil && clusters != nil && len(clusters) > 0 && clusters[0].Values != nil { c := clusters[0] + sqlFindNodesByClusterId := string("select * from " + dBNameClusterNode + " where cluster_id='" + c.Values[0][1].(string) + "'") nodes, err := sdb.ExecuteQuerySql([]string{sqlFindNodesByClusterId}) if err == nil && nodes != nil && len(nodes) > 0 { - var nodeIps []string for _, n := range nodes { - if n.Values[0][3].(string) != devID { + if n.Values != nil && n.Values[0][3].(string) != devID { nodeIps = append(nodeIps, n.Values[0][4].(string)) } } - agent, err = sdb.Init(c.Values[0][0].(string), c.Values[0][2].(string), devID, nodeIps) - if agent != nil { - logger.Debug("sync.Agent init success!") - } else { - logger.Debug("sync.Agent init fail!") - } } + + //agent, err = sdb.Init(c.Values[0][0].(string), c.Values[0][2].(string), devID, nodeIps) + //if agent != nil { + // logger.Debug("sync.Agent init success!") + //} else { + // logger.Debug("sync.Agent init fail!") + //} + } return true } -/* - 涔嬪墠娌℃湁闆嗙兢銆傝皟鐢ㄦ鎺ュ彛杩涜闆嗙兢鍒濆鍖栵紝浼犲叆闆嗙兢鍏朵粬鑺傜偣鐨刬p锛屼究浜庡垵濮嬪寲鍚庣洿鎺ュ姞鍏ラ泦缇� - strAddrs = "ip1:port1;ip2:port2;ip3:port3" -*/ -func SyncInit(clusterID string, password string, nodeID string, strAddrs string) bool { +func SyncInit(clusterID string, password string, nodeID string, strAddrs string, snapshotPath string) bool { var ips []string if strAddrs == "" { ips = nil @@ -52,7 +54,7 @@ pwdFull := syncClusterKeyPrefix + password - agent, _ = sdb.Init(clusterID, pwdFull, nodeID, ips) + agent, _ = sdb.Init(clusterID, pwdFull, nodeID, ips, snapshotPath) if agent == nil { logger.Error("sdb.Init") return false @@ -61,10 +63,14 @@ return true } -/* - 鍒濆鍖栨椂锛岃嫢娌¤兘鍔犲叆闆嗙兢锛屽彲浠ラ�氳繃璇ユ帴鍙e姞鍏ラ泦缇ゃ�� - strAddrs = "ip1:port1;ip2:port2;ip3:port3" -*/ +func RegisterDbHandler(h sdb.DbHandler) { + sdb.RegisterDbHandler(h) +} + +func RegisterDbDumpHandler(h sdb.DbDumpHandler) { + sdb.RegisterDbDumpHandler(h) +} + func JoinByNodeAddrs(strAddrs string) bool { if strAddrs == "" { logger.Error("strAddrs == \"\"") @@ -80,24 +86,28 @@ return true } -/* - 鍔犲叆闆嗙兢锛屽寘鍚垵濮嬪寲鑺傜偣SyncInit锛屽苟鏍规嵁浼犲叆鐨勯泦缇ゅ叾浠栬妭鐐瑰垪琛ㄨ嚜鍔ㄥ姞鍏ラ泦缇� - strAddrs = "ip1:port1;ip2:port2;ip3:port3" -*/ -func JoinCluster(clusterID, password, strAddrs, devID, devIP, devName string) bool { +func JoinCluster(clusterID, password, strAddrs, devID, devIP, devName string, snapshotPath string) bool { - isOk := SyncInit(clusterID, password, devID, strAddrs) + isOk := SyncInit(clusterID, password, devID, strAddrs,snapshotPath) - if isOk { //鍔犲叆鎴愬姛 + if isOk { logger.Debug("dbSync.Init success") - if ! syncTableDataFromCluster(clusterID, devID, devIP, devName) { + if !syncTableDataFromCluster(clusterID, devID, devIP, devName) { logger.Error("鍔犲叆闆嗙兢澶辫触锛侊紒锛�") + if agent != nil { + agent.Leave() + err := agent.Shutdown() + if err != nil { + logger.Error("syncTableDataFromCluster err,shutdown err:", err) + } + } return false } } else { logger.Error("dbSync.Init error") if agent != nil { + agent.Leave() err := agent.Shutdown() if err != nil { logger.Error("dbSync.Init err,shutdown err:", err) @@ -109,22 +119,6 @@ return true } -/* - 鍔犲叆闆嗙兢鍚庯紝鍙互閫氳繃璇ユ帴鍙h幏鍙栭泦缇ょ殑鑺傜偣淇℃伅锛屼笉杩囨渶濂界洿鎺ユ煡鍚屾搴撶殑闆嗙兢鑺傜偣琛� -json vector -[ -{Node1}, -{Node2}, -... -{Noden} -] -type NodeInfo struct { - ClusterID string `json:"clusterID"` - NodeID string `json:"nodeID"` - NodeAddress string `json:"nodeAddress"` - IsAlive int `json:"isAlive"` //StatusNone:0, StatusAlive:1, StatusLeaving:2, StatusLeft:3, StatusFailed:4 -} -*/ func GetNodes() []byte { nodes := agent.GetNodes() strNode, err := json.Marshal(nodes) @@ -135,34 +129,20 @@ return strNode } -/* - 鍔犲叆闆嗙兢鍚�, 娓呯┖鏈湴鐨勫悓姝ュ簱鏁版嵁,骞朵粠闆嗙兢鎷夊彇鏈�鏂扮殑鍚屾搴撴暟鎹� - */ func syncTableDataFromCluster(clusterID, devID, devIP, devName string) bool { - var err error - - //0.鍏抽棴reference - foreignSql := string("PRAGMA foreign_keys=OFF") - _, err = sdb.ExecuteWriteSql([]string{foreignSql}, false) - if err != nil { - return false - } - - //1.鍒犻櫎鏈湴鐨勫悓姝ュ簱鏁版嵁 - var sqls []string + var sqls = []string{"PRAGMA foreign_keys=OFF"} var delSql string for _, t := range syncTables { - if t == dBNameTables { - delSql = "delete from " + t + " where (analyServerId='' or analyServerId=NULL)" - } else if t == dBNameTablePersons { + if t == dBNameTablePersons { delSql = "delete from " + t + " where tableId in (select id from dbtables where (analyServerId='' or analyServerId=NULL))" + } else if t == dBNameTables { + delSql = "delete from " + t + " where (analyServerId='' or analyServerId=NULL)" } else { delSql = "delete from " + t } sqls = append(sqls, delSql) } - //2.鎷夊彇闆嗙兢鍐呯殑鍚屾搴撴暟鎹埌鏈湴鏁版嵁搴撹〃涓� var dumpSqls *[]string dumpSqls, err = agent.GetTableDataFromCluster(syncTables) if dumpSqls != nil { @@ -171,41 +151,35 @@ } } - logger.Debug("鎴愬姛娣诲姞褰撳墠鑺傜偣鍒伴泦缇よ妭鐐逛腑") + logger.Debug("鎴愬姛鑾峰彇闆嗙兢涓暟鎹�") - //3.灏嗘湰鑺傜偣鍔犲叆鍒拌妭鐐瑰垪琛ㄤ腑 - timeUnix := time.Now().Unix() - fmtTimeStr := time.Unix(timeUnix, 0).Format("2006-01-02 15:04:05") - - sqlSync := "insert into cluster_node(id,cluster_id,node_name,node_id,node_ip,create_time) values ('" + - devID + "','" + clusterID + "','" + devName + "','" + devID + "','" + - (devIP + ":" + strconv.Itoa(sdb.DefaultBindPort)) + "','" + fmtTimeStr + "')" - sqls = append(sqls, sqlSync) - - //4. 鍐欏叆鏁版嵁搴� - _, err = sdb.ExecuteWriteSql(sqls, true) - if err != nil { - logger.Debug("sdb.ExecuteWriteSql ERROR:", err) + //timeUnix := time.Now().Unix() + //fmtTimeStr := time.Unix(timeUnix, 0).Format("2006-01-02 15:04:05") + // + //sqlSync := "insert into cluster_node(id,cluster_id,node_name,node_id,node_ip,create_time) values ('" + + // devID + "','" + clusterID + "','" + devName + "','" + devID + "','" + + // (devIP + ":" + strconv.Itoa(sdb.DefaultBindPort)) + "','" + fmtTimeStr + "')" + //sqls = append(sqls, sqlSync) + sqls = append(sqls, "PRAGMA foreign_keys=ON") + sqlDump := strings.Join(sqls, ";") + if !sdb.DbHandle.Execute(sqlDump) { + logger.Debug("sdb.DbHandle.Execute ret: false") return false + } else { + logger.Debug("sdb.DbHandle.Execute ret: true") } - //5. 鍚屾璇ヨ妭鐐瑰埌闆嗙兢 - agent.SyncSql([]string{sqlSync}) + //agent.SyncSql([]string{sqlSync}) - //6.寮�鍚痳eference - foreignSql = string("PRAGMA foreign_keys=ON") - _, err = sdb.ExecuteWriteSql([]string{foreignSql}, false) - if err != nil { - return false - } + //foreignSql = string("PRAGMA foreign_keys=ON") + //_, err = sdb.ExecuteWriteSql([]string{foreignSql}, false) + //if err != nil { + // return false + //} return true } -/* - 鎿嶄綔鏁版嵁搴撳悗锛岄渶瑕佽皟鐢ㄨ鎺ュ彛灏嗗搴旂殑sql璇彞鍚屾鍒伴泦缇� - strSql = "sql1;sql2;sql3;...;sqln" - */ func SyncSql(strSql string) { sqls := strings.Split(strSql, ";") @@ -213,9 +187,6 @@ agent.SyncSql(sqls) } -/* - 鏇存柊闆嗙兢鐨勫悕瀛� - */ func UpdateClusterName(clusterName, clusterID string) bool { sql := string("update cluster set cluster_name='" + clusterName + "' where cluster_id='" + clusterID + "'") _, err := sdb.ExecuteWriteSql([]string{sql}, false) @@ -226,9 +197,6 @@ return true } -/* - 閫�鍑洪泦缇� - */ func Leave() bool { if agent != nil { @@ -249,4 +217,4 @@ } return true -} +} \ No newline at end of file -- Gitblit v1.8.0