基于serf的数据库同步模块库
chenshijun
2019-08-06 954aef5b16c944b0f27e03ccaafb1748b62f0d48
增加接口,初始化自动加入集群
3个文件已修改
209 ■■■■■ 已修改文件
agent.go 126 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
config.go 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
dbself.go 79 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
agent.go
@@ -18,12 +18,12 @@
import (
    "context"
    "encoding/json"
    "errors"
    "fmt"
    "github.com/hashicorp/memberlist"
    "io/ioutil"
    "os"
    "strconv"
    //"os"
    "strings"
@@ -35,6 +35,10 @@
    "log"
)
const (
    QueryEventGetDB = "GetDatabase"
    QueryEventUpdateDBData = "UpdateDBData"
)
// Agent warps the serf agent
type Agent struct {
    *agent.Agent
@@ -106,41 +110,71 @@
        var tmpstringslice []string
        tmpstringslice = append(tmpstringslice, string(ev.Payload))
        fmt.Println(tmpstringslice)
        results, err := DoExecute(tmpstringslice)
        results, err := ExecuteWriteSql(tmpstringslice)
        for _, result := range results {
            fmt.Println(result, "results err: ", err)
        }
    case *serf.Query:
        //bak file and send resp
        filename, err := BakDbFile()
        if err != nil {
            fmt.Println("bak db file error!")
            return
        }
        fmt.Println(filename)
        filebuf, err := ioutil.ReadFile(filename)
        fmt.Println("filebuf: ", len(filebuf))
        if err != nil {
            fmt.Printf("file to []bytes error: %s\n", err)
            return
        }
        err = os.Remove(filename)
        if err != nil {
            fmt.Printf("remove file%s\n failed", filename)
            return
        }
        fmt.Println("query payload: ", len(ev.Payload))
        if query, ok := event.(*serf.Query); ok {
            if err := query.Respond(filebuf); err != nil {
                fmt.Printf("err: %s\n", err)
        if ev.Name == QueryEventGetDB{
            //bak file and send resp
            filename, err := BakDbFile()
            if err != nil {
                fmt.Println("bak db file error!")
                return
            }
            fmt.Println(filename)
            filebuf, err := ioutil.ReadFile(filename)
            fmt.Println("filebuf: ", len(filebuf))
            if err != nil {
                fmt.Printf("file to []bytes error: %s\n", err)
                return
            }
            err = os.Remove(filename)
            if err != nil {
                fmt.Printf("remove file%s\n failed", filename)
                return
            }
            fmt.Println("query payload: ", len(ev.Payload))
            if query, ok := event.(*serf.Query); ok {
                if err := query.Respond(filebuf); err != nil {
                    fmt.Printf("err: %s\n", err)
                    return
                }
            }
        } else if ev.Name == QueryEventUpdateDBData {
            fmt.Println(string(ev.Payload))
            var tmpstringslice []string
            tmpstringslice = append(tmpstringslice, string(ev.Payload))
            fmt.Println(tmpstringslice)
            rows, err := ExecuteQuerySql(tmpstringslice)
            if err != nil {
                fmt.Println("err: ", err)
                return
            }
            var rowsReturn []Rows
            for _,r := range rows {
                rowsReturn = append(rowsReturn, *r)
            }
            bytesReturn, err := json.Marshal(rowsReturn)
            fmt.Println("results: ", bytesReturn)
            if query, ok := event.(*serf.Query); ok {
                if err := query.Respond(bytesReturn); err != nil {
                    fmt.Printf("err: %s\n", err)
                    return
                }
            }
            //var res []*Rows
            //json.Unmarshal(bytesReturn, &res)
        }
    default:
        fmt.Printf("Unknown event type: %s\n", ev.EventType().String())
@@ -281,8 +315,6 @@
    return
}
//GetDbFromCluster get the newest database after join cluster
//dbPathWrite the path where to write after got a database,
func (a *Agent)GetDbFromCluster(dbPathWrite string) {
@@ -302,7 +334,7 @@
        FilterNodes: strings.Fields(specmembername),
    }
    resp, err := a.Query("getDatabase", []byte(""), &params)
    resp, err := a.Query(QueryEventGetDB, []byte(""), &params)
    if err == nil || !strings.Contains(err.Error(), "cannot contain") {
        fmt.Println("err: ", err)
    }
@@ -315,13 +347,13 @@
                fmt.Println("x length is: ", len(r.Payload))
                // // byte to file.
                Dbconn.Close()
                Dbconn = nil
                SerfDbConn.Close()
                SerfDbConn = nil
                err = ioutil.WriteFile(dbPathWrite, r.Payload, 0644)
                if err != nil {
                    fmt.Println("query byte to file error!", err)
                }
                err := GetConn()
                err := InitDbConn("")
                if err != nil {
                    fmt.Println("create db conn of test.db error: ", err)
                }
@@ -341,8 +373,24 @@
}
//Init serf Init
//web后台收到创建集群的请求,
func Init(clusterID string, password string, nodeID string) (*Agent, error) {
func Init(clusterID string, password string, nodeID string, ips []string) (*Agent, error) {
    agent, err := InitNode(clusterID, password, nodeID)
    if err != nil {
        fmt.Printf("InitNode failed, error: %s", err)
        return agent, err
    }
    err = agent.JoinByNodeIP(ips)
    if err != nil {
        fmt.Printf("JoinByNodeIP failed, error: %s", err)
        return agent, err
    }
    return agent, err
}
//InitNode web后台收到创建集群的请求,
func InitNode(clusterID string, password string, nodeID string) (*Agent, error) {
    conf := DefaultConfig()
    fmt.Println("clusterID:", clusterID, "password:", password, "nodeID:", nodeID)
    //conf.ClusterID = clusterID
@@ -377,8 +425,14 @@
    return agent, nil
}
func (a *Agent) JoinByNodeIP(ip string) error {
    n, err := a.Agent.Join([]string{ip + ":" + strconv.Itoa(DefaultBindPort)}, true)
func (a *Agent) JoinByNodeIP(ips []string) error {
    var nodes []string
    for _, ip := range ips {
        node := fmt.Sprintf("%s:%d", ip, DefaultBindPort)
        nodes = append(nodes, node)
    }
    n, err := a.Agent.Join(nodes, true)
    if err != nil || n == 0{
        a.Stop()
        fmt.Println("Stop node")
config.go
@@ -28,8 +28,8 @@
)
const (
    DefaultBindPort    = 5000//30190
    DefaultRPCPort     = 7373//30191
    DefaultBindPort    = 30190
    DefaultRPCPort     = 30191
    DefaultClusterPort = 30192
    ModeSingle         = "single"
    ModeCluster        = "cluster"
dbself.go
@@ -10,45 +10,48 @@
    "sync"
)
var Dbconn *Conn
var sy sync.Mutex
func init() {
    GetConn()
}
const (
    PersonSqliteDBPath = "~/workspace/gitblit/dbserver/config/testdb.db"
)
var syncMut    sync.Mutex
var SerfDbConn *Conn
// get Conn of db for do execute.
func GetConn() error {
    var err error
    path, err := GetCurrentPath()
    if err != nil {
        return errors.New("get current path error")
func InitDbConn(dbPath string) error {
    if dbPath == ""    {
        dbPath = PersonSqliteDBPath
    }
    filepath := fmt.Sprintf("%stest.db", path)
    fmt.Println("self: ========>", filepath)
    db, err := New(filepath, "", false)
    fmt.Println("self: ========>", dbPath)
    db, err := New(dbPath, "", false)
    if err != nil {
        fmt.Println("new db database: ", err)
        return err
    }
    Dbconn, err = db.Connect()
    dbConn, err := db.Connect()
    if err != nil {
        fmt.Println("new db conn error; ", err)
        return err
    }
    SerfDbConn = dbConn
    return nil
}
//bak dbdata.
func BakDbFile() (string, error) {
    path, err := GetCurrentPath()
    path, err := getCurrentPath()
    if err != nil {
        return "", errors.New("get current path error")
        fmt.Println("getCurrentPath error; ", err)
        return "", err
    }
    filepath := fmt.Sprintf("%stmptest.db", path)
    filepath := path + "tmp.db"
    fmt.Println("filepath:", filepath)
    db, err := New(filepath, "", false)
    if err != nil {
        fmt.Println("new db database: ", err)
@@ -62,7 +65,7 @@
    }
    defer tmpconn.Close()
    err = Dbconn.Backup(tmpconn)
    err = SerfDbConn.Backup(tmpconn)
    if err != nil {
        return "", err
    }
@@ -70,10 +73,10 @@
}
// do exet when get querystring.
func DoExecute(executestring []string) ([]*Result, error) {
    sy.Lock()
    defer sy.Unlock()
    allResults, err := Dbconn.Execute(executestring, false, false)
func ExecuteWriteSql(sqlString []string) ([]*Result, error) {
    syncMut.Lock()
    defer syncMut.Unlock()
    allResults, err := SerfDbConn.Execute(sqlString, false, false)
    if err != nil {
        fmt.Println("execute error!", err)
        return nil, err
@@ -81,8 +84,29 @@
    return allResults, nil
}
// do exet when get querystring.
func ExecuteQuerySql(sqlString []string) ([]*Rows, error) {
    syncMut.Lock()
    defer syncMut.Unlock()
    rows, err := SerfDbConn.Query(sqlString, false, false)
    if err != nil {
        fmt.Println("execute error!", err)
        return nil, err
    }
    return rows, nil
}
func Dumpdb() {
    var b strings.Builder
    if err := SerfDbConn.Dump(&b); err != nil {
        fmt.Println("dump file ", err.Error())
    }
    fmt.Printf("%T\n", b)
}
// get current path
func GetCurrentPath() (string, error) {
func getCurrentPath() (string, error) {
    file, err := exec.LookPath(os.Args[0])
    if err != nil {
        return "", err
@@ -99,13 +123,4 @@
        return "", errors.New(`error: Can't find "/" or "\".`)
    }
    return string(path[0 : i+1]), nil
}
func Dumpdb() {
    var b strings.Builder
    if err := Dbconn.Dump(&b); err != nil {
        fmt.Println("dump file ", err.Error())
    }
    fmt.Printf("%T\n", b)
}