基于serf的数据库同步模块库
chenshijun
2019-08-06 6ef6050a854cb9a20bef8005a5f2a8463374ef17
Merge branch 'master' of http://192.168.1.14:10010/r/syncdb

# Conflicts:
# agent.go
# config.go
# dbself.go

格式化代码
5个文件已修改
79 ■■■■ 已修改文件
agent.go 46 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
agent_test.go 3 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
config.go 20 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
dbself.go 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
searcher.go 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
agent.go
@@ -37,9 +37,10 @@
)
const (
    QueryEventGetDB = "GetDatabase"
    QueryEventGetDB        = "GetDatabase"
    QueryEventUpdateDBData = "UpdateDBData"
)
// Agent warps the serf agent
type Agent struct {
    *agent.Agent
@@ -49,10 +50,10 @@
}
type NodeInfo struct {
    ClusterID string `json:"clusterID"`
    NodeID string `json:"nodeID"`
    NodeAddress  string `json:"nodeAddress"`
    IsAlive int `json:"isAlive"`
    ClusterID   string `json:"clusterID"`
    NodeID      string `json:"nodeID"`
    NodeAddress string `json:"nodeAddress"`
    IsAlive     int    `json:"isAlive"`
}
// Create create serf agent with config
@@ -64,7 +65,7 @@
    }
    // create serf agent with serf config
    fmt.Println("conf.Config.EncryptKey:",conf.EncryptKey)
    fmt.Println("conf.Config.EncryptKey:", conf.EncryptKey)
    serfAgent, err := agent.Create(conf.Config, serfConf, nil)
    if err != nil {
        return nil, err
@@ -128,7 +129,7 @@
    case *serf.Query:
        if ev.Name == QueryEventGetDB{
        if ev.Name == QueryEventGetDB {
            //bak file and send resp
            filename, err := BakDbFile()
            if err != nil {
@@ -168,7 +169,7 @@
                return
            }
            var rowsReturn []Rows
            for _,r := range rows {
            for _, r := range rows {
                rowsReturn = append(rowsReturn, *r)
            }
@@ -184,7 +185,6 @@
            //var res []*Rows
            //json.Unmarshal(bytesReturn, &res)
        }
    default:
        fmt.Printf("Unknown event type: %s\n", ev.EventType().String())
@@ -359,7 +359,7 @@
//GetDbFromCluster get the newest database after join cluster
//dbPathWrite the path where to write after got a database,
func (a *Agent)GetDbFromCluster(dbPathWrite string) {
func (a *Agent) GetDbFromCluster(dbPathWrite string) {
    //members: get name of first member
    mbs := a.GroupMembers(a.conf.ClusterID)
    var specmembername string
@@ -406,7 +406,7 @@
}
//SyncSql boardcast sql to cluster
func (a *Agent)SyncSql(sqlOp string) {
func (a *Agent) SyncSql(sqlOp string) {
    // event : use to send command to operate db.
    err := a.UserEvent("SyncSql", []byte(sqlOp), false)
    if err == nil || !strings.Contains(err.Error(), "cannot contain") {
@@ -439,10 +439,10 @@
    conf.NodeName = nodeID
    if password == "" {
        conf.EncryptKey = DefaultEncryptKey
    }else{
    } else {
        if len(password) >= 16 {
            password = password[:16]
        }else{
        } else {
            password = fmt.Sprintf("%016s", password)[:16]
            //return nil, fmt.Errorf("error password")
        }
@@ -460,9 +460,9 @@
        agent.ShutdownCh()
    }()
    time.Sleep(time.Second)
    fmt.Println("Stats:",agent.Agent.Serf().Stats())
    fmt.Println("EncryptionEnabled:",agent.Agent.Serf().EncryptionEnabled())
    fmt.Printf("create agent sucess!!")
    fmt.Println("Stats:", agent.Agent.Serf().Stats())
    fmt.Println("EncryptionEnabled:", agent.Agent.Serf().EncryptionEnabled())
    fmt.Println("create agent sucess!!")
    return agent, nil
}
@@ -479,7 +479,7 @@
    }
    n, err := a.Agent.Join(nodes, true)
    if err != nil || n == 0{
    if err != nil || n == 0 {
        a.Stop()
        fmt.Println("Stop node")
        return fmt.Errorf("Error Encrypt Key!")
@@ -490,14 +490,14 @@
type Node struct {
    clusterID string
    NodeID string
    IP string
    isAlive int   //StatusNone:0, StatusAlive:1, StatusLeaving:2, StatusLeft:3, StatusFailed:4
    NodeID    string
    IP        string
    isAlive   int //StatusNone:0, StatusAlive:1, StatusLeaving:2, StatusLeft:3, StatusFailed:4
}
func (a *Agent) GetNodes() (nodes []Node) {
    var node Node
    fmt.Println("a.conf.ClusterID:",a.conf.ClusterID)
    fmt.Println("a.conf.ClusterID:", a.conf.ClusterID)
    mbs := a.GroupMembers(a.conf.ClusterID)
    for _, mb := range mbs {
        node.NodeID = mb.Name
@@ -510,7 +510,3 @@
    return nodes
}
agent_test.go
@@ -35,7 +35,6 @@
    fmt.Println("LocalMember1:", agent.LocalMember())
    agent.Start(context.Background())
    //<- agent.readyCh
    go func() {
@@ -85,5 +84,3 @@
        t.Errorf("angent shutdown failed, error: %s", err)
    }
}
config.go
@@ -35,21 +35,24 @@
    ModeCluster        = "cluster"
    retryMaxAttempts   = 3
    groupExpect        = 3
    DefaultEncryptKey   = "bjbasic@aiotlink"
    tagKeyClusterID  = "syncer-cluster-name"
    DefaultEncryptKey  = "bjbasic@aiotlink"
    tagKeyClusterID    = "syncer-cluster-name"
    TagKeyClusterPort  = "syncer-cluster-port"
    TagKeyRPCPort      = "syncer-rpc-port"
    BroadcastIP = "255.255.255.255"
    BroadcastPort = 30193
    BroadcastInterval = 5
    BroadcastIP        = "255.255.255.255"
    BroadcastPort      = 30193
    BroadcastInterval  = 5
    MaxQueryRespSize   = 50 * 1024 * 1024
    MaxQuerySize       = 50 * 1024 * 1024
    MaxUserEventSize   = 5 * 1024
)
// DefaultConfig default config
func DefaultConfig() *Config {
    agentConf := agent.DefaultConfig()
    agentConf.QueryResponseSizeLimit = 50 * 1024 *1024
    agentConf.QuerySizeLimit = 50 * 1024 *1024
    agentConf.UserEventSizeLimit = 1024
    agentConf.QueryResponseSizeLimit = MaxQueryRespSize
    agentConf.QuerySizeLimit = MaxQuerySize
    agentConf.UserEventSizeLimit = MaxUserEventSize
    agentConf.BindAddr = fmt.Sprintf("0.0.0.0:%d", DefaultBindPort)
    agentConf.RPCAddr = fmt.Sprintf("0.0.0.0:%d", DefaultRPCPort)
    return &Config{
@@ -136,4 +139,3 @@
    return addr.IP.String(), addr.Port, nil
}
dbself.go
@@ -10,18 +10,17 @@
    "sync"
)
const (
    PersonSqliteDBPath = "~/workspace/gitblit/dbserver/config/testdb.db"
)
var syncMut    sync.Mutex
var syncMut sync.Mutex
var SerfDbConn *Conn
// get Conn of db for do execute.
func InitDbConn(dbPath string) error {
    if dbPath == ""    {
    if dbPath == "" {
        dbPath = PersonSqliteDBPath
    }
searcher.go
@@ -101,11 +101,10 @@
    return members
}
func CloseSearchNode(m *memberlist.Memberlist) error{
func CloseSearchNode(m *memberlist.Memberlist) error {
    return m.Shutdown()
}
func GetSearchNodes()[][]byte {
func GetSearchNodes() [][]byte {
    return members
}