package androidSync //package main import ( sdb "basic.com/Android/syncdb.git" "basic.com/valib/logger.git" "encoding/json" "strconv" "strings" "time" ) type ReceiveSqlInterface2 interface { sdb.ReceiveSqlInterface } func RegisterReceiveSqlInterfaceFromJava(c ReceiveSqlInterface2) { sdb.RegisterReceiveSqlInterface(c) } func InitAgent(devID string) bool { var nodeIps []string sqlFindAllCluster := string("select * from " + dBNameCluster) clusters, err := sdb.ExecuteQuerySql([]string{sqlFindAllCluster}) if err == nil && clusters != nil && len(clusters) > 0 && clusters[0].Values != nil { c := clusters[0] sqlFindNodesByClusterId := string("select * from " + dBNameClusterNode + " where cluster_id='" + c.Values[0][1].(string) + "'") nodes, err := sdb.ExecuteQuerySql([]string{sqlFindNodesByClusterId}) if err == nil && nodes != nil && len(nodes) > 0 { for _, n := range nodes { if n.Values != nil && n.Values[0][3].(string) != devID { nodeIps = append(nodeIps, n.Values[0][4].(string)) } } } //agent, err = sdb.Init(c.Values[0][0].(string), c.Values[0][2].(string), devID, nodeIps) //if agent != nil { // logger.Debug("sync.Agent init success!") //} else { // logger.Debug("sync.Agent init fail!") //} } return true } func SyncInit(clusterID string, password string, nodeID string, strAddrs string) bool { var ips []string if strAddrs == "" { ips = nil } else { ips = strings.Split(strAddrs, ";") } pwdFull := syncClusterKeyPrefix + password agent, _ = sdb.Init(clusterID, pwdFull, nodeID, ips) if agent == nil { logger.Error("sdb.Init") return false } return true } func JoinByNodeAddrs(strAddrs string) bool { if strAddrs == "" { logger.Error("strAddrs == \"\"") return false } addrs := strings.Split(strAddrs, ";") err := agent.JoinByNodeAddrs(addrs) if err != nil { logger.Error("agent.JoinByNodeAddrs err:", err) return false } return true } func JoinCluster(clusterID, password, strAddrs, devID, devIP, devName string) bool { isOk := SyncInit(clusterID, password, devID, strAddrs) if isOk { logger.Debug("dbSync.Init success") if ! syncTableDataFromCluster(clusterID, devID, devIP, devName) { logger.Error("加入集群失败!!!") return false } } else { logger.Error("dbSync.Init error") if agent != nil { err := agent.Shutdown() if err != nil { logger.Error("dbSync.Init err,shutdown err:", err) } } return false } return true } func GetNodes() []byte { nodes := agent.GetNodes() strNode, err := json.Marshal(nodes) if err != nil { logger.Error("json.Marshal err:", err) return nil } return strNode } func syncTableDataFromCluster(clusterID, devID, devIP, devName string) bool { var err error foreignSql := string("PRAGMA foreign_keys=OFF") _, err = sdb.ExecuteWriteSql([]string{foreignSql}, false) if err != nil { return false } var sqls []string var delSql string for _, t := range syncTables { if t == dBNameTables { delSql = "delete from " + t + " where (analyServerId='' or analyServerId=NULL)" } else if t == dBNameTablePersons { delSql = "delete from " + t + " where tableId in (select id from dbtables where (analyServerId='' or analyServerId=NULL))" } else { delSql = "delete from " + t } sqls = append(sqls, delSql) } var dumpSqls *[]string dumpSqls, err = agent.GetTableDataFromCluster(syncTables) if dumpSqls != nil { for _, dumpSql := range *dumpSqls { sqls = append(sqls, dumpSql) } } logger.Debug("成功添加当前节点到集群节点中") timeUnix := time.Now().Unix() fmtTimeStr := time.Unix(timeUnix, 0).Format("2006-01-02 15:04:05") sqlSync := "insert into cluster_node(id,cluster_id,node_name,node_id,node_ip,create_time) values ('" + devID + "','" + clusterID + "','" + devName + "','" + devID + "','" + (devIP + ":" + strconv.Itoa(sdb.DefaultBindPort)) + "','" + fmtTimeStr + "')" sqls = append(sqls, sqlSync) _, err = sdb.ExecuteWriteSql(sqls, true) if err != nil { logger.Debug("sdb.ExecuteWriteSql ERROR:", err) return false } agent.SyncSql([]string{sqlSync}) foreignSql = string("PRAGMA foreign_keys=ON") _, err = sdb.ExecuteWriteSql([]string{foreignSql}, false) if err != nil { return false } return true } func SyncSql(strSql string) { sqls := strings.Split(strSql, ";") agent.SyncSql(sqls) } func UpdateClusterName(clusterName, clusterID string) bool { sql := string("update cluster set cluster_name='" + clusterName + "' where cluster_id='" + clusterID + "'") _, err := sdb.ExecuteWriteSql([]string{sql}, false) if err != nil { return false } return true } func Leave() bool { if agent != nil { err := agent.Leave() if err != nil { logger.Debug("cluster leave err") return false } agent.Shutdown() agent = nil sqls := []string{"delete from cluster_node", "delete from cluster"} _, err = sdb.ExecuteWriteSql(sqls, false) if err != nil { return false } } return true }