From a6f8f26249cc3c2f1fbb66c58a18b969b31534c1 Mon Sep 17 00:00:00 2001 From: chenshijun <csj_sky@126.com> Date: 星期五, 06 九月 2019 17:43:46 +0800 Subject: [PATCH] 首次提交 --- sync.go | 252 ++++++++++++++++++++++++++++++++++++ testdb.db | 0 cluster.go | 147 +++++++++++++++++++++ 3 files changed, 399 insertions(+), 0 deletions(-) diff --git a/cluster.go b/cluster.go new file mode 100644 index 0000000..7a48bb0 --- /dev/null +++ b/cluster.go @@ -0,0 +1,147 @@ +package androidSync + +import ( + sdb "basic.com/Android/syncdb.git" + "basic.com/valib/logger.git" + "encoding/json" + "github.com/hashicorp/memberlist" + "os" + "time" +) + +const ( + syncClusterKeyPrefix = "bjbasic123" +) +const ( + defaultSqliteDBPath = "./testdb.db" + + dBNameCluster = "cluster" + dBNameClusterNode = "cluster_node" + dBNameTables = "dbTables" + dBNameTablePersons = "dbtablepersons" +) + +var syncTables = []string{dBNameCluster, dBNameClusterNode, dBNameTables, dBNameTablePersons} +var agent *sdb.Agent +var members *memberlist.Memberlist +var sqliteDBPath string = defaultSqliteDBPath + +func init(){ + var logFile = "./logger/androidSync.log" + var logSaveDays = 15 // 鏃ュ織鍒濆鍖� + logger.Config(logFile, logger.DebugLevel) + logger.SetSaveDays(logSaveDays) + err := sdb.InitDbConn(sqliteDBPath) + if err != nil{ + logger.Error("sdb.InitDbConn ERROR: sqliteDBPath:", sqliteDBPath) + os.Exit(-1) + } +} + +/* + 璁剧疆鏁版嵁搴撹矾寰� + */ +func setDBPath(path string){ + sqliteDBPath = path +} + +/* + 鑾峰彇鏁版嵁搴撹矾寰� +*/ +func getDBPath() string { + return sqliteDBPath +} + +func clearSearchResult(ml *memberlist.Memberlist) { + time.Sleep(10 * time.Second) + sdb.CloseSearchNode(ml) +} + +/* + 鍒涘缓鏌ヨ闆嗙兢鐨勪复鏃惰妭鐐癸紝闇�瑕佷紶鍏ラ泦缇ゅ瘑鐮侊紝6瀛楄妭锛屾暟瀛楁垨瀛楁瘝 + 姣忎釜涓�绉掕皟鐢ㄤ竴娆etSearchNodes锛屾嬁鍒版悳绱㈠埌鐨勮妭鐐逛俊鎭暟鎹� + 鏈�鍚庤皟鐢–loseSearchNode锛屽叧闂复鏃跺垱寤虹殑鎼滅储鑺傜偣 + pwd = password must six bytes +*/ +func CreateSearchNodeByPwd(pwd string) bool{ + + pwdFull := syncClusterKeyPrefix + pwd + var err error + members, err = sdb.CreateSearchNode(pwdFull) + if err != nil { + logger.Error("sdb.CreateSearchNode:", err) + return false + } + + go clearSearchResult(members) + + return true +} + +/* + 鍒涘缓鏌ヨ闆嗙兢鐨勪复鏃惰妭鐐癸紝闇�瑕佷紶鍏ラ泦缇ゅ瘑鐮侊紝16瀛楄妭 + key = password +*/ +func createSearchNode(key string) bool { + var err error + members, err = sdb.CreateSearchNode(key) + if err != nil { + logger.Error("sdb.CreateSearchNode:", err) + return false + } + + return true +} + +/* + 闃诲鏂瑰紡鍒涘缓鎼滅储闆嗙兢鐨勮妭鐐癸紝浼氱洿鎺ヨ繑鍥炴悳绱㈠埌鐨勮妭鐐� + map[string]NodeInfo + nodes[NodeInfo.NodeID] = NodeInfo + */ +func createSearchNodeWhitClose(key string, delay int) []byte { + nodes := sdb.CreateSearchNodeWhitClose(key, time.Duration(delay)) + strNodes, err := json.Marshal(nodes) + if err != nil { + logger.Error("json.Marshal:", err) + return nil + } + return strNodes +} + +/* + 鍏抽棴涓存椂鍒涘缓鐨勮妭鐐� +*/ +func CloseSearchNode() bool { + err := sdb.CloseSearchNode(members) + if err != nil { + logger.Error("sdb.CloseSearchNode:", err) + return false + } + return true +} + +/* + 鑾峰彇鎼滅储鍒扮殑鑺傜偣淇℃伅 +json vector +[ +{Node1}, +{Node2}, +... +{Noden} +] +type NodeInfo struct { + ClusterID string `json:"clusterID"` + NodeID string `json:"nodeID"` + NodeAddress string `json:"nodeAddress"` + IsAlive int `json:"isAlive"` //StatusNone:0, StatusAlive:1, StatusLeaving:2, StatusLeft:3, StatusFailed:4 +} +*/ +func GetSearchNodes() []byte { + nodes := sdb.GetSearchNodes() + strNodes, err := json.Marshal(nodes) + if err != nil { + logger.Error("json.Marshal:", err) + return nil + } + return strNodes +} diff --git a/sync.go b/sync.go new file mode 100755 index 0000000..328683b --- /dev/null +++ b/sync.go @@ -0,0 +1,252 @@ +package androidSync + +import ( + sdb "basic.com/Android/syncdb.git" + "basic.com/valib/logger.git" + "encoding/json" + "strconv" + "strings" + "time" +) + +/* + 姣忔寮�鏈哄悗閮戒細璋冪敤璇ユ帴鍙o紝璇ユ帴鍙d細鍘绘煡璇㈡暟鎹簱锛岀‘瀹炰箣鍓嶆槸鍚﹀凡缁忓姞鍏ヨ繃闆嗙兢锛岃嫢鏄凡缁忓姞鍏ラ泦缇わ紝鍒欏紑鏈鸿嚜鍔ㄥ姞鍏� + */ +func InitAgent(devID string) bool { + sqlFindAllCluster := string("select * from " + dBNameCluster) + clusters, err := sdb.ExecuteQuerySql([]string{sqlFindAllCluster}) + if err == nil && clusters != nil && len(clusters) > 0 { + c := clusters[0] + sqlFindNodesByClusterId := string("select * from " + dBNameClusterNode + " where cluster_id='" + c.Values[0][1].(string) + "'") + nodes, err := sdb.ExecuteQuerySql([]string{sqlFindNodesByClusterId}) + if err == nil && nodes != nil && len(nodes) > 0 { + var nodeIps []string + for _, n := range nodes { + if n.Values[0][3].(string) != devID { + nodeIps = append(nodeIps, n.Values[0][4].(string)) + } + } + agent, err = sdb.Init(c.Values[0][0].(string), c.Values[0][2].(string), devID, nodeIps) + if agent != nil { + logger.Debug("sync.Agent init success!") + } else { + logger.Debug("sync.Agent init fail!") + } + } + } + + return true +} + +/* + 涔嬪墠娌℃湁闆嗙兢銆傝皟鐢ㄦ鎺ュ彛杩涜闆嗙兢鍒濆鍖栵紝浼犲叆闆嗙兢鍏朵粬鑺傜偣鐨刬p锛屼究浜庡垵濮嬪寲鍚庣洿鎺ュ姞鍏ラ泦缇� + strAddrs = "ip1:port1;ip2:port2;ip3:port3" +*/ +func SyncInit(clusterID string, password string, nodeID string, strAddrs string) bool { + var ips []string + if strAddrs == "" { + ips = nil + } else { + ips = strings.Split(strAddrs, ";") + } + + pwdFull := syncClusterKeyPrefix + password + + agent, _ = sdb.Init(clusterID, pwdFull, nodeID, ips) + if agent == nil { + logger.Error("sdb.Init") + return false + } + + return true +} + +/* + 鍒濆鍖栨椂锛岃嫢娌¤兘鍔犲叆闆嗙兢锛屽彲浠ラ�氳繃璇ユ帴鍙e姞鍏ラ泦缇ゃ�� + strAddrs = "ip1:port1;ip2:port2;ip3:port3" +*/ +func JoinByNodeAddrs(strAddrs string) bool { + if strAddrs == "" { + logger.Error("strAddrs == \"\"") + return false + } + addrs := strings.Split(strAddrs, ";") + err := agent.JoinByNodeAddrs(addrs) + if err != nil { + logger.Error("agent.JoinByNodeAddrs err:", err) + return false + } + + return true +} + +/* + 鍔犲叆闆嗙兢锛屽寘鍚垵濮嬪寲鑺傜偣SyncInit锛屽苟鏍规嵁浼犲叆鐨勯泦缇ゅ叾浠栬妭鐐瑰垪琛ㄨ嚜鍔ㄥ姞鍏ラ泦缇� + strAddrs = "ip1:port1;ip2:port2;ip3:port3" +*/ +func JoinCluster(clusterID, password, strAddrs, devID, devIP, devName string) bool { + + isOk := SyncInit(clusterID, password, devID, strAddrs) + + if isOk { //鍔犲叆鎴愬姛 + logger.Debug("dbSync.Init success") + + if ! syncTableDataFromCluster(clusterID, devID, devIP, devName) { + logger.Error("鍔犲叆闆嗙兢澶辫触锛侊紒锛�") + return false + } + } else { + logger.Error("dbSync.Init error") + if agent != nil { + err := agent.Shutdown() + if err != nil { + logger.Error("dbSync.Init err,shutdown err:", err) + } + } + return false + } + + return true +} + +/* + 鍔犲叆闆嗙兢鍚庯紝鍙互閫氳繃璇ユ帴鍙h幏鍙栭泦缇ょ殑鑺傜偣淇℃伅锛屼笉杩囨渶濂界洿鎺ユ煡鍚屾搴撶殑闆嗙兢鑺傜偣琛� +json vector +[ +{Node1}, +{Node2}, +... +{Noden} +] +type NodeInfo struct { + ClusterID string `json:"clusterID"` + NodeID string `json:"nodeID"` + NodeAddress string `json:"nodeAddress"` + IsAlive int `json:"isAlive"` //StatusNone:0, StatusAlive:1, StatusLeaving:2, StatusLeft:3, StatusFailed:4 +} +*/ +func GetNodes() []byte { + nodes := agent.GetNodes() + strNode, err := json.Marshal(nodes) + if err != nil { + logger.Error("json.Marshal err:", err) + return nil + } + return strNode +} + +/* + 鍔犲叆闆嗙兢鍚�, 娓呯┖鏈湴鐨勫悓姝ュ簱鏁版嵁,骞朵粠闆嗙兢鎷夊彇鏈�鏂扮殑鍚屾搴撴暟鎹� + */ +func syncTableDataFromCluster(clusterID, devID, devIP, devName string) bool { + var err error + + //0.鍏抽棴reference + foreignSql := string("PRAGMA foreign_keys=OFF") + _, err = sdb.ExecuteWriteSql([]string{foreignSql}, false) + if err != nil { + return false + } + + //1.鍒犻櫎鏈湴鐨勫悓姝ュ簱鏁版嵁 + var sqls []string + var delSql string + for _, t := range syncTables { + if t == dBNameTables { + delSql = "delete from " + t + " where (analyServerId='' or analyServerId=NULL)" + } else if t == dBNameTablePersons { + delSql = "delete from " + t + " where tableId in (select id from dbtables where (analyServerId='' or analyServerId=NULL))" + } else { + delSql = "delete from " + t + } + sqls = append(sqls, delSql) + } + + //2.鎷夊彇闆嗙兢鍐呯殑鍚屾搴撴暟鎹埌鏈湴鏁版嵁搴撹〃涓� + var dumpSqls *[]string + dumpSqls, err = agent.GetTableDataFromCluster(syncTables) + if dumpSqls != nil { + for _, dumpSql := range *dumpSqls { + sqls = append(sqls, dumpSql) + } + } + + logger.Debug("鎴愬姛娣诲姞褰撳墠鑺傜偣鍒伴泦缇よ妭鐐逛腑") + + //3.灏嗘湰鑺傜偣鍔犲叆鍒拌妭鐐瑰垪琛ㄤ腑 + timeUnix := time.Now().Unix() + fmtTimeStr := time.Unix(timeUnix, 0).Format("2006-01-02 15:04:05") + + sqlSync := "insert into cluster_node(id,cluster_id,node_name,node_id,node_ip,create_time) values ('" + + devID + "','" + clusterID + "','" + devName + "','" + devID + "','" + + (devIP + ":" + strconv.Itoa(sdb.DefaultBindPort)) + "','" + fmtTimeStr + "')" + sqls = append(sqls, sqlSync) + + //4. 鍐欏叆鏁版嵁搴� + _, err = sdb.ExecuteWriteSql(sqls, true) + if err != nil { + logger.Debug("sdb.ExecuteWriteSql ERROR:", err) + return false + } + + //5. 鍚屾璇ヨ妭鐐瑰埌闆嗙兢 + agent.SyncSql([]string{sqlSync}) + + //6.寮�鍚痳eference + foreignSql = string("PRAGMA foreign_keys=ON") + _, err = sdb.ExecuteWriteSql([]string{foreignSql}, false) + if err != nil { + return false + } + + return true +} + +/* + 鎿嶄綔鏁版嵁搴撳悗锛岄渶瑕佽皟鐢ㄨ鎺ュ彛灏嗗搴旂殑sql璇彞鍚屾鍒伴泦缇� + strSql = "sql1;sql2;sql3;...;sqln" + */ +func SyncSql(strSql string) { + + sqls := strings.Split(strSql, ";") + + agent.SyncSql(sqls) +} + +/* + 鏇存柊闆嗙兢鐨勫悕瀛� + */ +func UpdateClusterName(clusterName, clusterID string) bool { + sql := string("update cluster set cluster_name='" + clusterName + "' where cluster_id='" + clusterID + "'") + _, err := sdb.ExecuteWriteSql([]string{sql}, false) + if err != nil { + return false + } + + return true +} + +/* + 閫�鍑洪泦缇� + */ +func Leave() bool { + + if agent != nil { + err := agent.Leave() + if err != nil { + logger.Debug("cluster leave err") + return false + } + agent.Shutdown() + agent = nil + + sqls := []string{"delete from cluster_node", "delete from cluster"} + + _, err = sdb.ExecuteWriteSql(sqls, false) + if err != nil { + return false + } + } + + return true +} diff --git a/testdb.db b/testdb.db new file mode 100644 index 0000000..d20c73e --- /dev/null +++ b/testdb.db Binary files differ -- Gitblit v1.8.0