基于serf的数据库同步模块库
chenshijun
2019-08-05 7d1bbdc8a9a4854edb6d45877fb0d2b9ce12478b
重构之前的db操作的代码
3个文件已修改
157 ■■■■■ 已修改文件
agent.go 77 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
config.go 13 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
dbself.go 67 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
agent.go
@@ -23,15 +23,11 @@
    "github.com/hashicorp/memberlist"
    "io/ioutil"
    "os"
    "strconv"
    //"os"
    "strings"
    "time"
    "github.com/hashicorp/serf/cmd/serf/command/agent"
    "github.com/hashicorp/serf/serf"
    //"github.com/apache/servicecomb-service-center/pkg/log"
    "log"
)
@@ -52,7 +48,7 @@
    }
    // create serf agent with serf config
    fmt.Println("conf.Config.EncryptKey:",conf.EncryptKey)
    fmt.Println("conf.Config.EncryptKey:", conf.EncryptKey)
    serfAgent, err := agent.Create(conf.Config, serfConf, nil)
    if err != nil {
        return nil, err
@@ -281,11 +277,9 @@
    return
}
//GetDbFromCluster get the newest database after join cluster
//dbPathWrite the path where to write after got a database,
func (a *Agent)GetDbFromCluster(dbPathWrite string) {
func (a *Agent) GetDbFromCluster(dbPathWrite string) {
    //members: get name of first member
    mbs := a.GroupMembers(a.conf.ClusterID)
    var specmembername string
@@ -315,13 +309,13 @@
                fmt.Println("x length is: ", len(r.Payload))
                // // byte to file.
                Dbconn.Close()
                Dbconn = nil
                SerfDbConn.Close()
                SerfDbConn = nil
                err = ioutil.WriteFile(dbPathWrite, r.Payload, 0644)
                if err != nil {
                    fmt.Println("query byte to file error!", err)
                }
                err := GetConn()
                err := InitDbConn("")
                if err != nil {
                    fmt.Println("create db conn of test.db error: ", err)
                }
@@ -332,7 +326,7 @@
}
//SyncSql boardcast sql to cluster
func (a *Agent)SyncSql(sqlOp string) {
func (a *Agent) SyncSql(sqlOp string) {
    // event : use to send command to operate db.
    err := a.UserEvent("SyncSql", []byte(sqlOp), false)
    if err == nil || !strings.Contains(err.Error(), "cannot contain") {
@@ -341,18 +335,34 @@
}
//Init serf Init
//web后台收到创建集群的请求,
func Init(clusterID string, password string, nodeID string) (*Agent, error) {
func Init(clusterID string, password string, nodeID string, ips []string) (*Agent, error) {
    agent, err := InitNode(clusterID, password, nodeID)
    if err != nil {
        fmt.Printf("InitNode failed, error: %s", err)
        return agent, err
    }
    err = agent.JoinByNodeIP(ips)
    if err != nil {
        fmt.Printf("JoinByNodeIP failed, error: %s", err)
        return agent, err
    }
    return agent, err
}
//InitNode web后台收到创建集群的请求,
func InitNode(clusterID string, password string, nodeID string) (*Agent, error) {
    conf := DefaultConfig()
    fmt.Println("clusterID:", clusterID, "password:", password, "nodeID:", nodeID)
    //conf.ClusterID = clusterID
    conf.NodeName = nodeID
    if password == "" {
        conf.EncryptKey = DefaultEncryptKey
    }else{
    } else {
        if len(password) >= 16 {
            password = password[:16]
        }else{
        } else {
            password = fmt.Sprintf("%016s", password)[:16]
            //return nil, fmt.Errorf("error password")
        }
@@ -370,19 +380,24 @@
        agent.ShutdownCh()
    }()
    time.Sleep(time.Second)
    fmt.Println("Stats:",agent.Agent.Serf().Stats())
    fmt.Println("EncryptionEnabled:",agent.Agent.Serf().EncryptionEnabled())
    fmt.Printf("create agent sucess!!")
    fmt.Println("Stats:", agent.Agent.Serf().Stats())
    fmt.Println("EncryptionEnabled:", agent.Agent.Serf().EncryptionEnabled())
    fmt.Println("create agent sucess!!")
    return agent, nil
}
func (a *Agent) JoinByNodeIP(ip string) error {
    n, err := a.Agent.Join([]string{ip + ":" + strconv.Itoa(DefaultBindPort)}, true)
    if err != nil || n == 0{
        a.Stop()
        fmt.Println("Stop node")
        return fmt.Errorf("Error Encrypt Key!")
func (a *Agent) JoinByNodeIP(ips []string) error {
    var nodes []string
    fmt.Println("len(ips):", len(ips))
    for _, ip := range ips {
        node := fmt.Sprintf("%s:%d", ip, DefaultBindPort)
        nodes = append(nodes, node)
    }
    n, err := a.Agent.Join(nodes, true)
    if err != nil || n == 0 {
        return fmt.Errorf("Error Agent.Join!")
    }
    return err
@@ -390,14 +405,14 @@
type Node struct {
    clusterID string
    NodeID string
    IP string
    isAlive int   //StatusNone:0, StatusAlive:1, StatusLeaving:2, StatusLeft:3, StatusFailed:4
    NodeID    string
    IP        string
    isAlive   int //StatusNone:0, StatusAlive:1, StatusLeaving:2, StatusLeft:3, StatusFailed:4
}
func (a *Agent) GetNodes() (nodes []Node) {
    var node Node
    fmt.Println("a.conf.ClusterID:",a.conf.ClusterID)
    fmt.Println("a.conf.ClusterID:", a.conf.ClusterID)
    mbs := a.GroupMembers(a.conf.ClusterID)
    for _, mb := range mbs {
        node.NodeID = mb.Name
@@ -410,7 +425,3 @@
    return nodes
}
config.go
@@ -28,8 +28,8 @@
)
const (
    DefaultBindPort    = 5000//30190
    DefaultRPCPort     = 7373//30191
    DefaultBindPort    = 30190
    DefaultRPCPort     = 30191
    DefaultClusterPort = 30192
    ModeSingle         = "single"
    ModeCluster        = "cluster"
@@ -39,14 +39,17 @@
    tagKeyClusterID  = "syncer-cluster-name"
    TagKeyClusterPort  = "syncer-cluster-port"
    TagKeyRPCPort      = "syncer-rpc-port"
    MaxQueryRespSize = 50 * 1024 *1024
    MaxQuerySize = 1024 *1024
    MaxUserEventSize = 1024
)
// DefaultConfig default config
func DefaultConfig() *Config {
    agentConf := agent.DefaultConfig()
    agentConf.QueryResponseSizeLimit = 50 * 1024 *1024
    agentConf.QuerySizeLimit = 50 * 1024 *1024
    agentConf.UserEventSizeLimit = 1024
    agentConf.QueryResponseSizeLimit = MaxQueryRespSize
    agentConf.QuerySizeLimit = MaxQuerySize
    agentConf.UserEventSizeLimit = MaxUserEventSize
    agentConf.BindAddr = fmt.Sprintf("0.0.0.0:%d", DefaultBindPort)
    agentConf.RPCAddr = fmt.Sprintf("0.0.0.0:%d", DefaultRPCPort)
    return &Config{
dbself.go
@@ -10,45 +10,48 @@
    "sync"
)
var Dbconn *Conn
var sy sync.Mutex
func init() {
    GetConn()
}
const (
    PersonSqliteDBPath = "/opt/workspace/DataBases/sync.db"
)
var syncMut    sync.Mutex
var SerfDbConn *Conn
// get Conn of db for do execute.
func GetConn() error {
    var err error
    path, err := GetCurrentPath()
    if err != nil {
        return errors.New("get current path error")
func InitDbConn(dbPath string) error {
    if dbPath == ""    {
        dbPath = PersonSqliteDBPath
    }
    filepath := fmt.Sprintf("%stest.db", path)
    fmt.Println("self: ========>", filepath)
    db, err := New(filepath, "", false)
    fmt.Println("self: ========>", dbPath)
    db, err := New(dbPath, "", false)
    if err != nil {
        fmt.Println("new db database: ", err)
        return err
    }
    Dbconn, err = db.Connect()
    dbConn, err := db.Connect()
    if err != nil {
        fmt.Println("new db conn error; ", err)
        return err
    }
    SerfDbConn = dbConn
    return nil
}
//bak dbdata.
func BakDbFile() (string, error) {
    path, err := GetCurrentPath()
    path, err := getCurrentPath()
    if err != nil {
        return "", errors.New("get current path error")
        fmt.Println("getCurrentPath error; ", err)
        return "", err
    }
    filepath := fmt.Sprintf("%stmptest.db", path)
    filepath := path + "tmp.db"
    fmt.Println("filepath:", filepath)
    db, err := New(filepath, "", false)
    if err != nil {
        fmt.Println("new db database: ", err)
@@ -62,7 +65,7 @@
    }
    defer tmpconn.Close()
    err = Dbconn.Backup(tmpconn)
    err = SerfDbConn.Backup(tmpconn)
    if err != nil {
        return "", err
    }
@@ -70,10 +73,10 @@
}
// do exet when get querystring.
func DoExecute(executestring []string) ([]*Result, error) {
    sy.Lock()
    defer sy.Unlock()
    allResults, err := Dbconn.Execute(executestring, false, false)
func DoExecute(sqlString []string) ([]*Result, error) {
    syncMut.Lock()
    defer syncMut.Unlock()
    allResults, err := SerfDbConn.Execute(sqlString, false, false)
    if err != nil {
        fmt.Println("execute error!", err)
        return nil, err
@@ -81,8 +84,17 @@
    return allResults, nil
}
func Dumpdb() {
    var b strings.Builder
    if err := SerfDbConn.Dump(&b); err != nil {
        fmt.Println("dump file ", err.Error())
    }
    fmt.Printf("%T\n", b)
}
// get current path
func GetCurrentPath() (string, error) {
func getCurrentPath() (string, error) {
    file, err := exec.LookPath(os.Args[0])
    if err != nil {
        return "", err
@@ -99,13 +111,4 @@
        return "", errors.New(`error: Can't find "/" or "\".`)
    }
    return string(path[0 : i+1]), nil
}
func Dumpdb() {
    var b strings.Builder
    if err := Dbconn.Dump(&b); err != nil {
        fmt.Println("dump file ", err.Error())
    }
    fmt.Printf("%T\n", b)
}