package androidSync
|
|
import (
|
sdb "basic.com/Android/syncdb.git"
|
"basic.com/valib/logger.git"
|
"encoding/json"
|
"strconv"
|
"strings"
|
"time"
|
)
|
|
/*
|
每次开机后都会调用该接口,该接口会去查询数据库,确实之前是否已经加入过集群,若是已经加入集群,则开机自动加入
|
*/
|
func InitAgent(devID string) bool {
|
sqlFindAllCluster := string("select * from " + dBNameCluster)
|
clusters, err := sdb.ExecuteQuerySql([]string{sqlFindAllCluster})
|
if err == nil && clusters != nil && len(clusters) > 0 {
|
c := clusters[0]
|
sqlFindNodesByClusterId := string("select * from " + dBNameClusterNode + " where cluster_id='" + c.Values[0][1].(string) + "'")
|
nodes, err := sdb.ExecuteQuerySql([]string{sqlFindNodesByClusterId})
|
if err == nil && nodes != nil && len(nodes) > 0 {
|
var nodeIps []string
|
for _, n := range nodes {
|
if n.Values[0][3].(string) != devID {
|
nodeIps = append(nodeIps, n.Values[0][4].(string))
|
}
|
}
|
agent, err = sdb.Init(c.Values[0][0].(string), c.Values[0][2].(string), devID, nodeIps)
|
if agent != nil {
|
logger.Debug("sync.Agent init success!")
|
} else {
|
logger.Debug("sync.Agent init fail!")
|
}
|
}
|
}
|
|
return true
|
}
|
|
/*
|
之前没有集群。调用此接口进行集群初始化,传入集群其他节点的ip,便于初始化后直接加入集群
|
strAddrs = "ip1:port1;ip2:port2;ip3:port3"
|
*/
|
func SyncInit(clusterID string, password string, nodeID string, strAddrs string) bool {
|
var ips []string
|
if strAddrs == "" {
|
ips = nil
|
} else {
|
ips = strings.Split(strAddrs, ";")
|
}
|
|
pwdFull := syncClusterKeyPrefix + password
|
|
agent, _ = sdb.Init(clusterID, pwdFull, nodeID, ips)
|
if agent == nil {
|
logger.Error("sdb.Init")
|
return false
|
}
|
|
return true
|
}
|
|
/*
|
初始化时,若没能加入集群,可以通过该接口加入集群。
|
strAddrs = "ip1:port1;ip2:port2;ip3:port3"
|
*/
|
func JoinByNodeAddrs(strAddrs string) bool {
|
if strAddrs == "" {
|
logger.Error("strAddrs == \"\"")
|
return false
|
}
|
addrs := strings.Split(strAddrs, ";")
|
err := agent.JoinByNodeAddrs(addrs)
|
if err != nil {
|
logger.Error("agent.JoinByNodeAddrs err:", err)
|
return false
|
}
|
|
return true
|
}
|
|
/*
|
加入集群,包含初始化节点SyncInit,并根据传入的集群其他节点列表自动加入集群
|
strAddrs = "ip1:port1;ip2:port2;ip3:port3"
|
*/
|
func JoinCluster(clusterID, password, strAddrs, devID, devIP, devName string) bool {
|
|
isOk := SyncInit(clusterID, password, devID, strAddrs)
|
|
if isOk { //加入成功
|
logger.Debug("dbSync.Init success")
|
|
if ! syncTableDataFromCluster(clusterID, devID, devIP, devName) {
|
logger.Error("加入集群失败!!!")
|
return false
|
}
|
} else {
|
logger.Error("dbSync.Init error")
|
if agent != nil {
|
err := agent.Shutdown()
|
if err != nil {
|
logger.Error("dbSync.Init err,shutdown err:", err)
|
}
|
}
|
return false
|
}
|
|
return true
|
}
|
|
/*
|
加入集群后,可以通过该接口获取集群的节点信息,不过最好直接查同步库的集群节点表
|
json vector
|
[
|
{Node1},
|
{Node2},
|
...
|
{Noden}
|
]
|
type NodeInfo struct {
|
ClusterID string `json:"clusterID"`
|
NodeID string `json:"nodeID"`
|
NodeAddress string `json:"nodeAddress"`
|
IsAlive int `json:"isAlive"` //StatusNone:0, StatusAlive:1, StatusLeaving:2, StatusLeft:3, StatusFailed:4
|
}
|
*/
|
func GetNodes() []byte {
|
nodes := agent.GetNodes()
|
strNode, err := json.Marshal(nodes)
|
if err != nil {
|
logger.Error("json.Marshal err:", err)
|
return nil
|
}
|
return strNode
|
}
|
|
/*
|
加入集群后, 清空本地的同步库数据,并从集群拉取最新的同步库数据
|
*/
|
func syncTableDataFromCluster(clusterID, devID, devIP, devName string) bool {
|
var err error
|
|
//0.关闭reference
|
foreignSql := string("PRAGMA foreign_keys=OFF")
|
_, err = sdb.ExecuteWriteSql([]string{foreignSql}, false)
|
if err != nil {
|
return false
|
}
|
|
//1.删除本地的同步库数据
|
var sqls []string
|
var delSql string
|
for _, t := range syncTables {
|
if t == dBNameTables {
|
delSql = "delete from " + t + " where (analyServerId='' or analyServerId=NULL)"
|
} else if t == dBNameTablePersons {
|
delSql = "delete from " + t + " where tableId in (select id from dbtables where (analyServerId='' or analyServerId=NULL))"
|
} else {
|
delSql = "delete from " + t
|
}
|
sqls = append(sqls, delSql)
|
}
|
|
//2.拉取集群内的同步库数据到本地数据库表中
|
var dumpSqls *[]string
|
dumpSqls, err = agent.GetTableDataFromCluster(syncTables)
|
if dumpSqls != nil {
|
for _, dumpSql := range *dumpSqls {
|
sqls = append(sqls, dumpSql)
|
}
|
}
|
|
logger.Debug("成功添加当前节点到集群节点中")
|
|
//3.将本节点加入到节点列表中
|
timeUnix := time.Now().Unix()
|
fmtTimeStr := time.Unix(timeUnix, 0).Format("2006-01-02 15:04:05")
|
|
sqlSync := "insert into cluster_node(id,cluster_id,node_name,node_id,node_ip,create_time) values ('" +
|
devID + "','" + clusterID + "','" + devName + "','" + devID + "','" +
|
(devIP + ":" + strconv.Itoa(sdb.DefaultBindPort)) + "','" + fmtTimeStr + "')"
|
sqls = append(sqls, sqlSync)
|
|
//4. 写入数据库
|
_, err = sdb.ExecuteWriteSql(sqls, true)
|
if err != nil {
|
logger.Debug("sdb.ExecuteWriteSql ERROR:", err)
|
return false
|
}
|
|
//5. 同步该节点到集群
|
agent.SyncSql([]string{sqlSync})
|
|
//6.开启reference
|
foreignSql = string("PRAGMA foreign_keys=ON")
|
_, err = sdb.ExecuteWriteSql([]string{foreignSql}, false)
|
if err != nil {
|
return false
|
}
|
|
return true
|
}
|
|
/*
|
操作数据库后,需要调用该接口将对应的sql语句同步到集群
|
strSql = "sql1;sql2;sql3;...;sqln"
|
*/
|
func SyncSql(strSql string) {
|
|
sqls := strings.Split(strSql, ";")
|
|
agent.SyncSql(sqls)
|
}
|
|
/*
|
更新集群的名字
|
*/
|
func UpdateClusterName(clusterName, clusterID string) bool {
|
sql := string("update cluster set cluster_name='" + clusterName + "' where cluster_id='" + clusterID + "'")
|
_, err := sdb.ExecuteWriteSql([]string{sql}, false)
|
if err != nil {
|
return false
|
}
|
|
return true
|
}
|
|
/*
|
退出集群
|
*/
|
func Leave() bool {
|
|
if agent != nil {
|
err := agent.Leave()
|
if err != nil {
|
logger.Debug("cluster leave err")
|
return false
|
}
|
agent.Shutdown()
|
agent = nil
|
|
sqls := []string{"delete from cluster_node", "delete from cluster"}
|
|
_, err = sdb.ExecuteWriteSql(sqls, false)
|
if err != nil {
|
return false
|
}
|
}
|
|
return true
|
}
|