New file |
| | |
| | | package androidSync |
| | | |
| | | import ( |
| | | sdb "basic.com/Android/syncdb.git" |
| | | "basic.com/valib/logger.git" |
| | | "encoding/json" |
| | | "strconv" |
| | | "strings" |
| | | "time" |
| | | ) |
| | | |
| | | /* |
| | | 每次开机后都会调用该接口,该接口会去查询数据库,确实之前是否已经加入过集群,若是已经加入集群,则开机自动加入 |
| | | */ |
| | | func InitAgent(devID string) bool { |
| | | sqlFindAllCluster := string("select * from " + dBNameCluster) |
| | | clusters, err := sdb.ExecuteQuerySql([]string{sqlFindAllCluster}) |
| | | if err == nil && clusters != nil && len(clusters) > 0 { |
| | | c := clusters[0] |
| | | sqlFindNodesByClusterId := string("select * from " + dBNameClusterNode + " where cluster_id='" + c.Values[0][1].(string) + "'") |
| | | nodes, err := sdb.ExecuteQuerySql([]string{sqlFindNodesByClusterId}) |
| | | if err == nil && nodes != nil && len(nodes) > 0 { |
| | | var nodeIps []string |
| | | for _, n := range nodes { |
| | | if n.Values[0][3].(string) != devID { |
| | | nodeIps = append(nodeIps, n.Values[0][4].(string)) |
| | | } |
| | | } |
| | | agent, err = sdb.Init(c.Values[0][0].(string), c.Values[0][2].(string), devID, nodeIps) |
| | | if agent != nil { |
| | | logger.Debug("sync.Agent init success!") |
| | | } else { |
| | | logger.Debug("sync.Agent init fail!") |
| | | } |
| | | } |
| | | } |
| | | |
| | | return true |
| | | } |
| | | |
| | | /* |
| | | 之前没有集群。调用此接口进行集群初始化,传入集群其他节点的ip,便于初始化后直接加入集群 |
| | | strAddrs = "ip1:port1;ip2:port2;ip3:port3" |
| | | */ |
| | | func SyncInit(clusterID string, password string, nodeID string, strAddrs string) bool { |
| | | var ips []string |
| | | if strAddrs == "" { |
| | | ips = nil |
| | | } else { |
| | | ips = strings.Split(strAddrs, ";") |
| | | } |
| | | |
| | | pwdFull := syncClusterKeyPrefix + password |
| | | |
| | | agent, _ = sdb.Init(clusterID, pwdFull, nodeID, ips) |
| | | if agent == nil { |
| | | logger.Error("sdb.Init") |
| | | return false |
| | | } |
| | | |
| | | return true |
| | | } |
| | | |
| | | /* |
| | | 初始化时,若没能加入集群,可以通过该接口加入集群。 |
| | | strAddrs = "ip1:port1;ip2:port2;ip3:port3" |
| | | */ |
| | | func JoinByNodeAddrs(strAddrs string) bool { |
| | | if strAddrs == "" { |
| | | logger.Error("strAddrs == \"\"") |
| | | return false |
| | | } |
| | | addrs := strings.Split(strAddrs, ";") |
| | | err := agent.JoinByNodeAddrs(addrs) |
| | | if err != nil { |
| | | logger.Error("agent.JoinByNodeAddrs err:", err) |
| | | return false |
| | | } |
| | | |
| | | return true |
| | | } |
| | | |
| | | /* |
| | | 加入集群,包含初始化节点SyncInit,并根据传入的集群其他节点列表自动加入集群 |
| | | strAddrs = "ip1:port1;ip2:port2;ip3:port3" |
| | | */ |
| | | func JoinCluster(clusterID, password, strAddrs, devID, devIP, devName string) bool { |
| | | |
| | | isOk := SyncInit(clusterID, password, devID, strAddrs) |
| | | |
| | | if isOk { //加入成功 |
| | | logger.Debug("dbSync.Init success") |
| | | |
| | | if ! syncTableDataFromCluster(clusterID, devID, devIP, devName) { |
| | | logger.Error("加入集群失败!!!") |
| | | return false |
| | | } |
| | | } else { |
| | | logger.Error("dbSync.Init error") |
| | | if agent != nil { |
| | | err := agent.Shutdown() |
| | | if err != nil { |
| | | logger.Error("dbSync.Init err,shutdown err:", err) |
| | | } |
| | | } |
| | | return false |
| | | } |
| | | |
| | | return true |
| | | } |
| | | |
| | | /* |
| | | 加入集群后,可以通过该接口获取集群的节点信息,不过最好直接查同步库的集群节点表 |
| | | json vector |
| | | [ |
| | | {Node1}, |
| | | {Node2}, |
| | | ... |
| | | {Noden} |
| | | ] |
| | | type NodeInfo struct { |
| | | ClusterID string `json:"clusterID"` |
| | | NodeID string `json:"nodeID"` |
| | | NodeAddress string `json:"nodeAddress"` |
| | | IsAlive int `json:"isAlive"` //StatusNone:0, StatusAlive:1, StatusLeaving:2, StatusLeft:3, StatusFailed:4 |
| | | } |
| | | */ |
| | | func GetNodes() []byte { |
| | | nodes := agent.GetNodes() |
| | | strNode, err := json.Marshal(nodes) |
| | | if err != nil { |
| | | logger.Error("json.Marshal err:", err) |
| | | return nil |
| | | } |
| | | return strNode |
| | | } |
| | | |
| | | /* |
| | | 加入集群后, 清空本地的同步库数据,并从集群拉取最新的同步库数据 |
| | | */ |
| | | func syncTableDataFromCluster(clusterID, devID, devIP, devName string) bool { |
| | | var err error |
| | | |
| | | //0.关闭reference |
| | | foreignSql := string("PRAGMA foreign_keys=OFF") |
| | | _, err = sdb.ExecuteWriteSql([]string{foreignSql}, false) |
| | | if err != nil { |
| | | return false |
| | | } |
| | | |
| | | //1.删除本地的同步库数据 |
| | | var sqls []string |
| | | var delSql string |
| | | for _, t := range syncTables { |
| | | if t == dBNameTables { |
| | | delSql = "delete from " + t + " where (analyServerId='' or analyServerId=NULL)" |
| | | } else if t == dBNameTablePersons { |
| | | delSql = "delete from " + t + " where tableId in (select id from dbtables where (analyServerId='' or analyServerId=NULL))" |
| | | } else { |
| | | delSql = "delete from " + t |
| | | } |
| | | sqls = append(sqls, delSql) |
| | | } |
| | | |
| | | //2.拉取集群内的同步库数据到本地数据库表中 |
| | | var dumpSqls *[]string |
| | | dumpSqls, err = agent.GetTableDataFromCluster(syncTables) |
| | | if dumpSqls != nil { |
| | | for _, dumpSql := range *dumpSqls { |
| | | sqls = append(sqls, dumpSql) |
| | | } |
| | | } |
| | | |
| | | logger.Debug("成功添加当前节点到集群节点中") |
| | | |
| | | //3.将本节点加入到节点列表中 |
| | | timeUnix := time.Now().Unix() |
| | | fmtTimeStr := time.Unix(timeUnix, 0).Format("2006-01-02 15:04:05") |
| | | |
| | | sqlSync := "insert into cluster_node(id,cluster_id,node_name,node_id,node_ip,create_time) values ('" + |
| | | devID + "','" + clusterID + "','" + devName + "','" + devID + "','" + |
| | | (devIP + ":" + strconv.Itoa(sdb.DefaultBindPort)) + "','" + fmtTimeStr + "')" |
| | | sqls = append(sqls, sqlSync) |
| | | |
| | | //4. 写入数据库 |
| | | _, err = sdb.ExecuteWriteSql(sqls, true) |
| | | if err != nil { |
| | | logger.Debug("sdb.ExecuteWriteSql ERROR:", err) |
| | | return false |
| | | } |
| | | |
| | | //5. 同步该节点到集群 |
| | | agent.SyncSql([]string{sqlSync}) |
| | | |
| | | //6.开启reference |
| | | foreignSql = string("PRAGMA foreign_keys=ON") |
| | | _, err = sdb.ExecuteWriteSql([]string{foreignSql}, false) |
| | | if err != nil { |
| | | return false |
| | | } |
| | | |
| | | return true |
| | | } |
| | | |
| | | /* |
| | | 操作数据库后,需要调用该接口将对应的sql语句同步到集群 |
| | | strSql = "sql1;sql2;sql3;...;sqln" |
| | | */ |
| | | func SyncSql(strSql string) { |
| | | |
| | | sqls := strings.Split(strSql, ";") |
| | | |
| | | agent.SyncSql(sqls) |
| | | } |
| | | |
| | | /* |
| | | 更新集群的名字 |
| | | */ |
| | | func UpdateClusterName(clusterName, clusterID string) bool { |
| | | sql := string("update cluster set cluster_name='" + clusterName + "' where cluster_id='" + clusterID + "'") |
| | | _, err := sdb.ExecuteWriteSql([]string{sql}, false) |
| | | if err != nil { |
| | | return false |
| | | } |
| | | |
| | | return true |
| | | } |
| | | |
| | | /* |
| | | 退出集群 |
| | | */ |
| | | func Leave() bool { |
| | | |
| | | if agent != nil { |
| | | err := agent.Leave() |
| | | if err != nil { |
| | | logger.Debug("cluster leave err") |
| | | return false |
| | | } |
| | | agent.Shutdown() |
| | | agent = nil |
| | | |
| | | sqls := []string{"delete from cluster_node", "delete from cluster"} |
| | | |
| | | _, err = sdb.ExecuteWriteSql(sqls, false) |
| | | if err != nil { |
| | | return false |
| | | } |
| | | } |
| | | |
| | | return true |
| | | } |