基于serf的数据库同步模块库
chenshijun
2019-08-05 7d1bbdc8a9a4854edb6d45877fb0d2b9ce12478b
重构之前的db操作的代码
3个文件已修改
133 ■■■■■ 已修改文件
agent.go 53 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
config.go 13 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
dbself.go 67 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
agent.go
@@ -23,15 +23,11 @@
    "github.com/hashicorp/memberlist"
    "io/ioutil"
    "os"
    "strconv"
    //"os"
    "strings"
    "time"
    "github.com/hashicorp/serf/cmd/serf/command/agent"
    "github.com/hashicorp/serf/serf"
    //"github.com/apache/servicecomb-service-center/pkg/log"
    "log"
)
@@ -281,8 +277,6 @@
    return
}
//GetDbFromCluster get the newest database after join cluster
//dbPathWrite the path where to write after got a database,
func (a *Agent)GetDbFromCluster(dbPathWrite string) {
@@ -315,13 +309,13 @@
                fmt.Println("x length is: ", len(r.Payload))
                // // byte to file.
                Dbconn.Close()
                Dbconn = nil
                SerfDbConn.Close()
                SerfDbConn = nil
                err = ioutil.WriteFile(dbPathWrite, r.Payload, 0644)
                if err != nil {
                    fmt.Println("query byte to file error!", err)
                }
                err := GetConn()
                err := InitDbConn("")
                if err != nil {
                    fmt.Println("create db conn of test.db error: ", err)
                }
@@ -341,8 +335,24 @@
}
//Init serf Init
//web后台收到创建集群的请求,
func Init(clusterID string, password string, nodeID string) (*Agent, error) {
func Init(clusterID string, password string, nodeID string, ips []string) (*Agent, error) {
    agent, err := InitNode(clusterID, password, nodeID)
    if err != nil {
        fmt.Printf("InitNode failed, error: %s", err)
        return agent, err
    }
    err = agent.JoinByNodeIP(ips)
    if err != nil {
        fmt.Printf("JoinByNodeIP failed, error: %s", err)
        return agent, err
    }
    return agent, err
}
//InitNode web后台收到创建集群的请求,
func InitNode(clusterID string, password string, nodeID string) (*Agent, error) {
    conf := DefaultConfig()
    fmt.Println("clusterID:", clusterID, "password:", password, "nodeID:", nodeID)
    //conf.ClusterID = clusterID
@@ -372,17 +382,22 @@
    time.Sleep(time.Second)
    fmt.Println("Stats:",agent.Agent.Serf().Stats())
    fmt.Println("EncryptionEnabled:",agent.Agent.Serf().EncryptionEnabled())
    fmt.Printf("create agent sucess!!")
    fmt.Println("create agent sucess!!")
    return agent, nil
}
func (a *Agent) JoinByNodeIP(ip string) error {
    n, err := a.Agent.Join([]string{ip + ":" + strconv.Itoa(DefaultBindPort)}, true)
func (a *Agent) JoinByNodeIP(ips []string) error {
    var nodes []string
    fmt.Println("len(ips):", len(ips))
    for _, ip := range ips {
        node := fmt.Sprintf("%s:%d", ip, DefaultBindPort)
        nodes = append(nodes, node)
    }
    n, err := a.Agent.Join(nodes, true)
    if err != nil || n == 0{
        a.Stop()
        fmt.Println("Stop node")
        return fmt.Errorf("Error Encrypt Key!")
        return fmt.Errorf("Error Agent.Join!")
    }
    return err
@@ -410,7 +425,3 @@
    return nodes
}
config.go
@@ -28,8 +28,8 @@
)
const (
    DefaultBindPort    = 5000//30190
    DefaultRPCPort     = 7373//30191
    DefaultBindPort    = 30190
    DefaultRPCPort     = 30191
    DefaultClusterPort = 30192
    ModeSingle         = "single"
    ModeCluster        = "cluster"
@@ -39,14 +39,17 @@
    tagKeyClusterID  = "syncer-cluster-name"
    TagKeyClusterPort  = "syncer-cluster-port"
    TagKeyRPCPort      = "syncer-rpc-port"
    MaxQueryRespSize = 50 * 1024 *1024
    MaxQuerySize = 1024 *1024
    MaxUserEventSize = 1024
)
// DefaultConfig default config
func DefaultConfig() *Config {
    agentConf := agent.DefaultConfig()
    agentConf.QueryResponseSizeLimit = 50 * 1024 *1024
    agentConf.QuerySizeLimit = 50 * 1024 *1024
    agentConf.UserEventSizeLimit = 1024
    agentConf.QueryResponseSizeLimit = MaxQueryRespSize
    agentConf.QuerySizeLimit = MaxQuerySize
    agentConf.UserEventSizeLimit = MaxUserEventSize
    agentConf.BindAddr = fmt.Sprintf("0.0.0.0:%d", DefaultBindPort)
    agentConf.RPCAddr = fmt.Sprintf("0.0.0.0:%d", DefaultRPCPort)
    return &Config{
dbself.go
@@ -10,45 +10,48 @@
    "sync"
)
var Dbconn *Conn
var sy sync.Mutex
func init() {
    GetConn()
}
const (
    PersonSqliteDBPath = "/opt/workspace/DataBases/sync.db"
)
var syncMut    sync.Mutex
var SerfDbConn *Conn
// get Conn of db for do execute.
func GetConn() error {
    var err error
    path, err := GetCurrentPath()
    if err != nil {
        return errors.New("get current path error")
func InitDbConn(dbPath string) error {
    if dbPath == ""    {
        dbPath = PersonSqliteDBPath
    }
    filepath := fmt.Sprintf("%stest.db", path)
    fmt.Println("self: ========>", filepath)
    db, err := New(filepath, "", false)
    fmt.Println("self: ========>", dbPath)
    db, err := New(dbPath, "", false)
    if err != nil {
        fmt.Println("new db database: ", err)
        return err
    }
    Dbconn, err = db.Connect()
    dbConn, err := db.Connect()
    if err != nil {
        fmt.Println("new db conn error; ", err)
        return err
    }
    SerfDbConn = dbConn
    return nil
}
//bak dbdata.
func BakDbFile() (string, error) {
    path, err := GetCurrentPath()
    path, err := getCurrentPath()
    if err != nil {
        return "", errors.New("get current path error")
        fmt.Println("getCurrentPath error; ", err)
        return "", err
    }
    filepath := fmt.Sprintf("%stmptest.db", path)
    filepath := path + "tmp.db"
    fmt.Println("filepath:", filepath)
    db, err := New(filepath, "", false)
    if err != nil {
        fmt.Println("new db database: ", err)
@@ -62,7 +65,7 @@
    }
    defer tmpconn.Close()
    err = Dbconn.Backup(tmpconn)
    err = SerfDbConn.Backup(tmpconn)
    if err != nil {
        return "", err
    }
@@ -70,10 +73,10 @@
}
// do exet when get querystring.
func DoExecute(executestring []string) ([]*Result, error) {
    sy.Lock()
    defer sy.Unlock()
    allResults, err := Dbconn.Execute(executestring, false, false)
func DoExecute(sqlString []string) ([]*Result, error) {
    syncMut.Lock()
    defer syncMut.Unlock()
    allResults, err := SerfDbConn.Execute(sqlString, false, false)
    if err != nil {
        fmt.Println("execute error!", err)
        return nil, err
@@ -81,8 +84,17 @@
    return allResults, nil
}
func Dumpdb() {
    var b strings.Builder
    if err := SerfDbConn.Dump(&b); err != nil {
        fmt.Println("dump file ", err.Error())
    }
    fmt.Printf("%T\n", b)
}
// get current path
func GetCurrentPath() (string, error) {
func getCurrentPath() (string, error) {
    file, err := exec.LookPath(os.Args[0])
    if err != nil {
        return "", err
@@ -99,13 +111,4 @@
        return "", errors.New(`error: Can't find "/" or "\".`)
    }
    return string(path[0 : i+1]), nil
}
func Dumpdb() {
    var b strings.Builder
    if err := Dbconn.Dump(&b); err != nil {
        fmt.Println("dump file ", err.Error())
    }
    fmt.Printf("%T\n", b)
}