基于serf的数据库同步模块库
liuxiaolong
2019-08-07 e86b1dc6a1a9b307dce5a7ad4be28f77e4b3bcf6
add gorm
3个文件已修改
169 ■■■■■ 已修改文件
.gitignore 3 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
agent.go 92 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
dbself.go 74 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
.gitignore
@@ -21,3 +21,6 @@
*.exe
*.test
.idea
go.mod
go.sum
agent.go
@@ -159,21 +159,31 @@
                }
            }
        } else if ev.Name == QueryEventUpdateDBData {
            fmt.Println(string(ev.Payload))
            var tmpstringslice []string
            tmpstringslice = append(tmpstringslice, string(ev.Payload))
            fmt.Println(tmpstringslice)
            rows, err := ExecuteQuerySql(tmpstringslice)
            //fmt.Println(string(ev.Payload))
            //var tmpstringslice []string
            //tmpstringslice = append(tmpstringslice, string(ev.Payload))
            //fmt.Println(tmpstringslice)
            //rows, err := ExecuteQuerySql(tmpstringslice)
            //if err != nil {
            //    fmt.Println("err: ", err)
            //    return
            //}
            //var rowsReturn []Rows
            //for _, r := range rows {
            //    rowsReturn = append(rowsReturn, *r)
            //}
            var tableNames []string
            err := json.Unmarshal(ev.Payload, &tableNames)
            if err != nil {
                fmt.Println("err: ", err)
                fmt.Println("Query tableNames unmarshal err")
                return
            }
            var rowsReturn []Rows
            for _, r := range rows {
                rowsReturn = append(rowsReturn, *r)
            datas, err := ExecuteQueryByGorm(tableNames)
            if err !=nil {
                fmt.Println("queryByGorm err")
                return
            }
            bytesReturn, err := json.Marshal(rowsReturn)
            bytesReturn, err := json.Marshal(datas)
            fmt.Println("results: ", bytesReturn)
            if query, ok := event.(*serf.Query); ok {
                if err := query.Respond(bytesReturn); err != nil {
@@ -405,6 +415,66 @@
    }()
}
//GetDbFromCluster get the newest database after join cluster
//dbPathWrite the path where to write after got a database,
func (a *Agent) GetTableDataFromCluster(tableNames []string) (*[]Rows,error) {
    //members: get name of first member
    mbs := a.GroupMembers(a.conf.ClusterID)
    var specmembername string
    for _, m := range mbs {
        if m.Addr.String() != a.conf.BindAddr {
            specmembername = m.Name
            break
        }
    }
    fmt.Println(specmembername)
    //query: get db file.
    params := serf.QueryParam{
        FilterNodes: strings.Fields(specmembername),
    }
    //SQL
    tBytes, _ := json.Marshal(tableNames)
    resp, err := a.Query(QueryEventUpdateDBData, tBytes, &params)
    if err == nil || !strings.Contains(err.Error(), "cannot contain") {
        fmt.Println("err: ", err)
    }
    go func() {
        respCh := resp.ResponseCh()
        for {
            select {
            case r := <-respCh:
                fmt.Println("x length is: ", len(r.Payload))
                // // byte to file.
                var dumpSqls []string
                err := json.Unmarshal(r.Payload, &dumpSqls)
                if err ==nil {
                    if flag,_ := ExecuteSqlByGorm(dumpSqls);flag{
                        fmt.Println("data dump success")
                    }
                }
                return
            }
        }
    }()
    //r, err = c.Query([]string{query}, false, false)
    //if err != nil {
    //    return err
    //}
    //for _, x := range r[0].Values {
    //    y := fmt.Sprintf("%s;\n", x[0].(string))
    //    if _, err := w.Write([]byte(y)); err != nil {
    //        return err
    //    }
    //}
}
//SyncSql boardcast sql to cluster
func (a *Agent) SyncSql(sqlOp string) {
    // event : use to send command to operate db.
dbself.go
@@ -8,6 +8,7 @@
    "path/filepath"
    "strings"
    "sync"
    "github.com/jinzhu/gorm"
)
const (
@@ -83,6 +84,11 @@
    return allResults, nil
}
var localDb *gorm.DB
func InitLocalDb(db *gorm.DB) {
    localDb = db
}
// do exet when get querystring.
func ExecuteQuerySql(sqlString []string) ([]*Rows, error) {
    syncMut.Lock()
@@ -95,6 +101,74 @@
    return rows, nil
}
func ExecuteSqlByGorm(sqls []string) (bool,error) {
    if localDb != nil {
        var err error
        tx := localDb.Begin()
        defer func() {
            if err !=nil && tx !=nil {
                tx.Rollback()
            }
        }()
        for _,sql :=range sqls {
            result := tx.Exec(sql)
            if result.Error !=nil {
                return false,result.Error
            }
            if result.RowsAffected == 0{
                fmt.Println("ExecuteSqlByGorm fail")
                return false,errors.New("ExecuteSqlByGorm fail")
            }
        }
        tx.Commit()
        return true,nil
    }
    return false,errors.New("localDb is nil")
}
type TableDesc struct {
    Cid int `json:"cid"`
    Name string `json:"name"`
    Type string `json:"type"`
    Notnull bool `json:"notnull"`
    DFltValue interface{} `json:"dflt_value"`
    Pk int `json:"pk"`
}
func ExecuteQueryByGorm(tableNames []string) ([]string, error) {
    if tableNames !=nil {
        var arr []string
        for _,table :=range tableNames {
            var tDescArr []TableDesc
            tSql := fmt.Sprintf(`PRAGMA table_info("%s")`, table)
            err := localDb.Raw(tSql).Scan(&tDescArr).Error
            if err !=nil {
                return nil,errors.New("tableDesc err")
            }
            fmt.Println(table,"'Columns is:",tDescArr)
            if tDescArr == nil || len(tDescArr) == 0 {
                return nil,errors.New(table+" has no column")
            }
            var columnNames []string
            for _,col :=range tDescArr {
                columnNames = append(columnNames, fmt.Sprintf(`'||quote("%s")||'`, col.Name))
            }
            tSql = fmt.Sprintf(`SELECT 'INSERT INTO "%s" VALUES(%s)' FROM "%s";`,
                table,
                strings.Join(columnNames, ","),
                table)
            var dumpSqls []string
            err = localDb.Raw(tSql).Scan(&dumpSqls).Error
            if err !=nil {
                return nil,errors.New("dump err")
            }
            arr = append(arr, dumpSqls...)
        }
        return arr,nil
    }
    return nil,errors.New("tableNames is nil")
}
func Dumpdb() {
    var b strings.Builder