chenshijun
2019-09-06 a6f8f26249cc3c2f1fbb66c58a18b969b31534c1
首次提交
3个文件已添加
399 ■■■■■ 已修改文件
cluster.go 147 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
sync.go 252 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
testdb.db 补丁 | 查看 | 原始文档 | blame | 历史
cluster.go
New file
@@ -0,0 +1,147 @@
package androidSync
import (
    sdb "basic.com/Android/syncdb.git"
    "basic.com/valib/logger.git"
    "encoding/json"
    "github.com/hashicorp/memberlist"
    "os"
    "time"
)
const (
    syncClusterKeyPrefix = "bjbasic123"
)
const (
    defaultSqliteDBPath = "./testdb.db"
    dBNameCluster      = "cluster"
    dBNameClusterNode  = "cluster_node"
    dBNameTables       = "dbTables"
    dBNameTablePersons = "dbtablepersons"
)
var syncTables = []string{dBNameCluster, dBNameClusterNode, dBNameTables, dBNameTablePersons}
var agent *sdb.Agent
var members *memberlist.Memberlist
var sqliteDBPath string = defaultSqliteDBPath
func init(){
    var logFile = "./logger/androidSync.log"
    var logSaveDays = 15    // 日志初始化
    logger.Config(logFile, logger.DebugLevel)
    logger.SetSaveDays(logSaveDays)
    err := sdb.InitDbConn(sqliteDBPath)
    if err != nil{
        logger.Error("sdb.InitDbConn ERROR: sqliteDBPath:", sqliteDBPath)
        os.Exit(-1)
    }
}
/*
    设置数据库路径
 */
func setDBPath(path string){
    sqliteDBPath = path
}
/*
    获取数据库路径
*/
func getDBPath() string {
    return sqliteDBPath
}
func clearSearchResult(ml *memberlist.Memberlist) {
    time.Sleep(10 * time.Second)
    sdb.CloseSearchNode(ml)
}
/*
    创建查询集群的临时节点,需要传入集群密码,6字节,数字或字母
    每个一秒调用一次GetSearchNodes,拿到搜索到的节点信息数据
    最后调用CloseSearchNode,关闭临时创建的搜索节点
    pwd = password must six bytes
*/
func CreateSearchNodeByPwd(pwd string) bool{
    pwdFull := syncClusterKeyPrefix + pwd
    var err error
    members, err = sdb.CreateSearchNode(pwdFull)
    if err != nil {
        logger.Error("sdb.CreateSearchNode:", err)
        return false
    }
    go clearSearchResult(members)
    return true
}
/*
    创建查询集群的临时节点,需要传入集群密码,16字节
    key = password
*/
func createSearchNode(key string) bool {
    var err error
    members, err = sdb.CreateSearchNode(key)
    if err != nil {
        logger.Error("sdb.CreateSearchNode:", err)
        return false
    }
    return true
}
/*
    阻塞方式创建搜索集群的节点,会直接返回搜索到的节点
    map[string]NodeInfo
    nodes[NodeInfo.NodeID] = NodeInfo
 */
func createSearchNodeWhitClose(key string, delay int) []byte {
    nodes := sdb.CreateSearchNodeWhitClose(key, time.Duration(delay))
    strNodes, err := json.Marshal(nodes)
    if err != nil {
        logger.Error("json.Marshal:", err)
        return nil
    }
    return strNodes
}
/*
    关闭临时创建的节点
*/
func CloseSearchNode() bool {
    err := sdb.CloseSearchNode(members)
    if err != nil {
        logger.Error("sdb.CloseSearchNode:", err)
        return false
    }
    return true
}
/*
    获取搜索到的节点信息
json vector
[
{Node1},
{Node2},
...
{Noden}
]
type NodeInfo struct {
    ClusterID   string `json:"clusterID"`
    NodeID      string `json:"nodeID"`
    NodeAddress string `json:"nodeAddress"`
    IsAlive     int    `json:"isAlive"`  //StatusNone:0, StatusAlive:1, StatusLeaving:2, StatusLeft:3, StatusFailed:4
}
*/
func GetSearchNodes() []byte {
    nodes := sdb.GetSearchNodes()
    strNodes, err := json.Marshal(nodes)
    if err != nil {
        logger.Error("json.Marshal:", err)
        return nil
    }
    return strNodes
}
sync.go
New file
@@ -0,0 +1,252 @@
package androidSync
import (
    sdb "basic.com/Android/syncdb.git"
    "basic.com/valib/logger.git"
    "encoding/json"
    "strconv"
    "strings"
    "time"
)
/*
    每次开机后都会调用该接口,该接口会去查询数据库,确实之前是否已经加入过集群,若是已经加入集群,则开机自动加入
 */
func InitAgent(devID string) bool {
    sqlFindAllCluster := string("select * from " + dBNameCluster)
    clusters, err := sdb.ExecuteQuerySql([]string{sqlFindAllCluster})
    if err == nil && clusters != nil && len(clusters) > 0 {
        c := clusters[0]
        sqlFindNodesByClusterId := string("select * from " + dBNameClusterNode + " where cluster_id='" + c.Values[0][1].(string) + "'")
        nodes, err := sdb.ExecuteQuerySql([]string{sqlFindNodesByClusterId})
        if err == nil && nodes != nil && len(nodes) > 0 {
            var nodeIps []string
            for _, n := range nodes {
                if n.Values[0][3].(string) != devID {
                    nodeIps = append(nodeIps, n.Values[0][4].(string))
                }
            }
            agent, err = sdb.Init(c.Values[0][0].(string), c.Values[0][2].(string), devID, nodeIps)
            if agent != nil {
                logger.Debug("sync.Agent init success!")
            } else {
                logger.Debug("sync.Agent init fail!")
            }
        }
    }
    return true
}
/*
    之前没有集群。调用此接口进行集群初始化,传入集群其他节点的ip,便于初始化后直接加入集群
    strAddrs = "ip1:port1;ip2:port2;ip3:port3"
*/
func SyncInit(clusterID string, password string, nodeID string, strAddrs string) bool {
    var ips []string
    if strAddrs == "" {
        ips = nil
    } else {
        ips = strings.Split(strAddrs, ";")
    }
    pwdFull := syncClusterKeyPrefix + password
    agent, _ = sdb.Init(clusterID, pwdFull, nodeID, ips)
    if agent == nil {
        logger.Error("sdb.Init")
        return false
    }
    return true
}
/*
    初始化时,若没能加入集群,可以通过该接口加入集群。
    strAddrs = "ip1:port1;ip2:port2;ip3:port3"
*/
func JoinByNodeAddrs(strAddrs string) bool {
    if strAddrs == "" {
        logger.Error("strAddrs == \"\"")
        return false
    }
    addrs := strings.Split(strAddrs, ";")
    err := agent.JoinByNodeAddrs(addrs)
    if err != nil {
        logger.Error("agent.JoinByNodeAddrs err:", err)
        return false
    }
    return true
}
/*
    加入集群,包含初始化节点SyncInit,并根据传入的集群其他节点列表自动加入集群
    strAddrs = "ip1:port1;ip2:port2;ip3:port3"
*/
func JoinCluster(clusterID, password, strAddrs, devID, devIP, devName string) bool {
    isOk := SyncInit(clusterID, password, devID, strAddrs)
    if isOk { //加入成功
        logger.Debug("dbSync.Init success")
        if ! syncTableDataFromCluster(clusterID, devID, devIP, devName) {
            logger.Error("加入集群失败!!!")
            return false
        }
    } else {
        logger.Error("dbSync.Init error")
        if agent != nil {
            err := agent.Shutdown()
            if err != nil {
                logger.Error("dbSync.Init err,shutdown err:", err)
            }
        }
        return false
    }
    return true
}
/*
    加入集群后,可以通过该接口获取集群的节点信息,不过最好直接查同步库的集群节点表
json vector
[
{Node1},
{Node2},
...
{Noden}
]
type NodeInfo struct {
    ClusterID   string `json:"clusterID"`
    NodeID      string `json:"nodeID"`
    NodeAddress string `json:"nodeAddress"`
    IsAlive     int    `json:"isAlive"` //StatusNone:0, StatusAlive:1, StatusLeaving:2, StatusLeft:3, StatusFailed:4
}
*/
func GetNodes() []byte {
    nodes := agent.GetNodes()
    strNode, err := json.Marshal(nodes)
    if err != nil {
        logger.Error("json.Marshal err:", err)
        return nil
    }
    return strNode
}
/*
    加入集群后, 清空本地的同步库数据,并从集群拉取最新的同步库数据
 */
func syncTableDataFromCluster(clusterID, devID, devIP, devName string) bool {
    var err error
    //0.关闭reference
    foreignSql := string("PRAGMA foreign_keys=OFF")
    _, err = sdb.ExecuteWriteSql([]string{foreignSql}, false)
    if err != nil {
        return false
    }
    //1.删除本地的同步库数据
    var sqls []string
    var delSql string
    for _, t := range syncTables {
        if t == dBNameTables {
            delSql = "delete from " + t + " where (analyServerId='' or analyServerId=NULL)"
        } else if t == dBNameTablePersons {
            delSql = "delete from " + t + " where tableId in (select id from dbtables where (analyServerId='' or analyServerId=NULL))"
        } else {
            delSql = "delete from " + t
        }
        sqls = append(sqls, delSql)
    }
    //2.拉取集群内的同步库数据到本地数据库表中
    var dumpSqls *[]string
    dumpSqls, err = agent.GetTableDataFromCluster(syncTables)
    if dumpSqls != nil {
        for _, dumpSql := range *dumpSqls {
            sqls = append(sqls, dumpSql)
        }
    }
    logger.Debug("成功添加当前节点到集群节点中")
    //3.将本节点加入到节点列表中
    timeUnix := time.Now().Unix()
    fmtTimeStr := time.Unix(timeUnix, 0).Format("2006-01-02 15:04:05")
    sqlSync := "insert into cluster_node(id,cluster_id,node_name,node_id,node_ip,create_time) values ('" +
        devID + "','" + clusterID + "','" + devName + "','" + devID + "','" +
        (devIP + ":" + strconv.Itoa(sdb.DefaultBindPort)) + "','" + fmtTimeStr + "')"
    sqls = append(sqls, sqlSync)
    //4. 写入数据库
    _, err = sdb.ExecuteWriteSql(sqls, true)
    if err != nil {
        logger.Debug("sdb.ExecuteWriteSql ERROR:", err)
        return false
    }
    //5. 同步该节点到集群
    agent.SyncSql([]string{sqlSync})
    //6.开启reference
    foreignSql = string("PRAGMA foreign_keys=ON")
    _, err = sdb.ExecuteWriteSql([]string{foreignSql}, false)
    if err != nil {
        return false
    }
    return true
}
/*
    操作数据库后,需要调用该接口将对应的sql语句同步到集群
    strSql = "sql1;sql2;sql3;...;sqln"
 */
func SyncSql(strSql string) {
    sqls := strings.Split(strSql, ";")
    agent.SyncSql(sqls)
}
/*
    更新集群的名字
 */
func UpdateClusterName(clusterName, clusterID string) bool {
    sql := string("update cluster set cluster_name='" + clusterName + "' where cluster_id='" + clusterID + "'")
    _, err := sdb.ExecuteWriteSql([]string{sql}, false)
    if err != nil {
        return false
    }
    return true
}
/*
    退出集群
 */
func Leave() bool {
    if agent != nil {
        err := agent.Leave()
        if err != nil {
            logger.Debug("cluster leave err")
            return false
        }
        agent.Shutdown()
        agent = nil
        sqls := []string{"delete from cluster_node", "delete from cluster"}
        _, err = sdb.ExecuteWriteSql(sqls, false)
        if err != nil {
            return false
        }
    }
    return true
}
testdb.db
Binary files differ