基于serf的数据库同步模块库
chenshijun
2019-09-27 803756f1de3f52027dbe7cf191c8d4a043a37662
将fmt改成logger,需要外面初始化logger
3个文件已修改
160 ■■■■ 已修改文件
agent.go 116 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
dbself.go 29 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
searcher.go 15 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
agent.go
@@ -35,7 +35,7 @@
    "github.com/hashicorp/serf/cmd/serf/command/agent"
    "github.com/hashicorp/serf/serf"
    //"github.com/apache/servicecomb-service-center/pkg/log"
    "log"
    "basic.com/valib/logger.git"
)
const (
@@ -69,7 +69,7 @@
    }
    // create serf agent with serf config
    fmt.Println("conf.Config.EncryptKey:", conf.EncryptKey)
    logger.Info("conf.Config.EncryptKey:", conf.EncryptKey)
    serfAgent, err := agent.Create(conf.Config, serfConf, nil)
    if err != nil {
        return nil, err
@@ -77,11 +77,11 @@
    // Create the keyring
    keyring, err := memberlist.NewKeyring(nil, []byte(conf.EncryptKey))
    if err != nil {
        fmt.Printf("Failed to restore keyring: %s", err)
        logger.Error("Failed to restore keyring: %s", err)
        return nil, err
    }
    serfConf.MemberlistConfig.Keyring = keyring
    fmt.Printf("[INFO] agent: Restored keyring with %d keys from %s",
    logger.Info("[INFO] agent: Restored keyring with %d keys from %s",
        len(conf.EncryptKey), conf.EncryptKey)
    return &Agent{
@@ -96,7 +96,7 @@
func (a *Agent) Start(ctx context.Context) {
    err := a.Agent.Start()
    if err != nil {
        log.Println(err, "start serf agent failed")
        logger.Error(err, "start serf agent failed")
        a.errorCh <- err
        return
    }
@@ -104,7 +104,7 @@
    err = a.retryJoin(ctx)
    if err != nil {
        log.Println(err, "start serf agent failed")
        logger.Error(err, "start serf agent failed")
        if err != ctx.Err() && a.errorCh != nil {
            a.errorCh <- err
        }
@@ -127,13 +127,13 @@
            var sqlUe SqlUserEvent
            err := json.Unmarshal(ev.Payload, &sqlUe)
            if err !=nil {
                fmt.Println("sqlUe unmarshal err:",err)
                logger.Error("sqlUe unmarshal err:",err)
                return
            }
            if sqlUe.Owner != a.conf.NodeName {
                //results, err := ExecuteWriteSql(sqlArr)
                flag, _ := ExecuteSqlByGorm(sqlUe.Sql)
                fmt.Println("userEvent exec ",sqlUe.Sql,",Result:",flag)
                logger.Info("userEvent exec ",sqlUe.Sql,",Result:",flag)
            }
        } else if ev.Name == UserEventSyncDbTablePersonCache {
            SyncDbTablePersonCacheChan <- ev.Payload
@@ -146,39 +146,39 @@
            //bak file and send resp
            filename, err := BakDbFile()
            if err != nil {
                fmt.Println("bak db file error!")
                logger.Error("bak db file error!")
                return
            }
            fmt.Println(filename)
            logger.Info(filename)
            filebuf, err := ioutil.ReadFile(filename)
            fmt.Println("filebuf: ", len(filebuf))
            logger.Info("filebuf: ", len(filebuf))
            if err != nil {
                fmt.Printf("file to []bytes error: %s\n", err)
                logger.Error("file to []bytes error: %s\n", err)
                return
            }
            err = os.Remove(filename)
            if err != nil {
                fmt.Printf("remove file%s\n failed", filename)
                logger.Error("remove file%s\n failed", filename)
                return
            }
            fmt.Println("query payload: ", len(ev.Payload))
            logger.Info("query payload: ", len(ev.Payload))
            if query, ok := event.(*serf.Query); ok {
                if err := query.Respond(filebuf); err != nil {
                    fmt.Printf("err: %s\n", err)
                    logger.Error("err: %s\n", err)
                    return
                }
            }
        } else if ev.Name == QueryEventUpdateDBData {
            //fmt.Println(string(ev.Payload))
            //logger.Info(string(ev.Payload))
            //var tmpstringslice []string
            //tmpstringslice = append(tmpstringslice, string(ev.Payload))
            //fmt.Println(tmpstringslice)
            //logger.Info(tmpstringslice)
            //rows, err := ExecuteQuerySql(tmpstringslice)
            //if err != nil {
            //    fmt.Println("err: ", err)
            //    logger.Error("err: ", err)
            //    return
            //}
            //var rowsReturn []Rows
@@ -188,20 +188,20 @@
            var tableNames []string
            err := json.Unmarshal(ev.Payload, &tableNames)
            if err !=nil {
                fmt.Println("Query tableNames unmarshal err")
                logger.Error("Query tableNames unmarshal err")
                return
            }
            fmt.Println("Query tableNames:",tableNames)
            logger.Info("Query tableNames:",tableNames)
            datas, err := ExecuteQueryByGorm(tableNames)
            if err !=nil {
                fmt.Println("queryByGorm err")
                logger.Error("queryByGorm err")
                return
            }
            bytesReturn, err := json.Marshal(datas)
            fmt.Println("results.len: ", len(bytesReturn))
            logger.Info("results.len: ", len(bytesReturn))
            if query, ok := event.(*serf.Query); ok {
                if err := query.Respond(bytesReturn); err != nil {
                    fmt.Printf("err: %s\n", err)
                    logger.Error("err: %s\n", err)
                    return
                }
            }
@@ -216,24 +216,24 @@
                leaveSql := "delete from cluster_node where node_id='"+leaveMember.Name+"'"
                ExecuteSqlByGorm([]string{ leaveSql })
                fmt.Println("EventMemberLeave,current Members:",ev.Members)
                logger.Info("EventMemberLeave,current Members:",ev.Members)
            }
            return
        }
    default:
        fmt.Printf("Unknown event type: %s\n", ev.EventType().String())
        logger.Warn("Unknown event type: %s\n", ev.EventType().String())
    }
    //if event.EventType() != serf.EventMemberJoin {
    //    fmt.Printf("event.EventType() != serf.EventMemberJoin")
    //    logger.Info("event.EventType() != serf.EventMemberJoin")
    //    return
    //}
    //
    //if a.conf.Mode == ModeCluster {
    //    if len(a.GroupMembers(a.conf.ClusterID)) < groupExpect {
    //        fmt.Printf("len(a.GroupMembers(a.conf.ClusterID)) < groupExpect")
    //        logger.Error("len(a.GroupMembers(a.conf.ClusterID)) < groupExpect")
    //        return
    //    }
    //}
@@ -246,7 +246,7 @@
    serf := a.Agent.Serf()
    mb := serf.LocalMember()
    mblist := serf.Memberlist()
    fmt.Println("mb:", mb)
    logger.Info("mb:", mb)
    // copy local node
    localNode := *mblist.LocalNode()
@@ -267,7 +267,7 @@
    //localNode.Addr = net.IPv4(255,255,255,255)
    localNode.Port = BroadcastPort
    for {
        // fmt.Printf("localNode: %v %v\n", nodeName, nodeAddress)
        // logger.Info("localNode: %v %v\n", nodeName, nodeAddress)
        mblist.SendBestEffort(&localNode, []byte(message))
        time.Sleep(delay)
    }
@@ -308,7 +308,7 @@
    serfAgent := a.Agent.Serf()
    if serfAgent != nil {
        for _, member := range serfAgent.Members() {
            log.Printf("member = %s, clusterID = %s", member.Name, member.Tags[tagKeyClusterID])
            logger.Info("member = %s, clusterID = %s", member.Name, member.Tags[tagKeyClusterID])
            if member.Tags[tagKeyClusterID] == clusterID {
                members = append(members, member)
            }
@@ -353,7 +353,7 @@
func (a *Agent) retryJoin(ctx context.Context) (err error) {
    if len(a.conf.RetryJoin) == 0 {
        log.Printf("retry join mumber %d", len(a.conf.RetryJoin))
        logger.Error("retry join mumber %d", len(a.conf.RetryJoin))
        return nil
    }
@@ -361,13 +361,13 @@
    attempt := 0
    ticker := time.NewTicker(a.conf.RetryInterval)
    for {
        log.Printf("serf: Joining cluster...(replay: %v)", a.conf.ReplayOnJoin)
        logger.Info("serf: Joining cluster...(replay: %v)", a.conf.ReplayOnJoin)
        var n int
        // Try to join the specified serf nodes
        n, err = a.Join(a.conf.RetryJoin, a.conf.ReplayOnJoin)
        if err == nil {
            log.Printf("serf: Join completed. Synced with %d initial agents", n)
            logger.Error("serf: Join completed. Synced with %d initial agents", n)
            break
        }
        attempt++
@@ -377,7 +377,7 @@
        // else agent will try to join other nodes until successful always
        if a.conf.RetryMaxAttempts > 0 && attempt > a.conf.RetryMaxAttempts {
            err = errors.New("serf: maximum retry join attempts made, exiting")
            log.Println(err, err.Error())
            logger.Error(err, err.Error())
            break
        }
        select {
@@ -405,7 +405,7 @@
            break
        }
    }
    fmt.Println(specmembername)
    logger.Info(specmembername)
    //query: get db file.
    params := serf.QueryParam{
@@ -414,7 +414,7 @@
    resp, err := a.Query(QueryEventGetDB, []byte(""), &params)
    if err == nil || !strings.Contains(err.Error(), "cannot contain") {
        fmt.Println("err: ", err)
        logger.Error("err: ", err)
    }
    go func() {
@@ -422,18 +422,18 @@
        for {
            select {
            case r := <-respCh:
                fmt.Println("x length is: ", len(r.Payload))
                logger.Info("x length is: ", len(r.Payload))
                // // byte to file.
                SerfDbConn.Close()
                SerfDbConn = nil
                err = ioutil.WriteFile(dbPathWrite, r.Payload, 0644)
                if err != nil {
                    fmt.Println("query byte to file error!", err)
                    logger.Error("query byte to file error!", err)
                }
                err := InitDbConn("")
                if err != nil {
                    fmt.Println("create db conn of test.db error: ", err)
                    logger.Error("create db conn of test.db error: ", err)
                }
                return
            }
@@ -448,7 +448,7 @@
    mbs := a.GroupMembers(a.conf.ClusterID)
    var specmembername string
    for _, m := range mbs {
        fmt.Println("m",m)
        logger.Info("m",m)
        if m.Name != a.conf.NodeName { //前缀:DSVAD:分析服务器 DSPAD:进出入pad
            if strings.HasPrefix(a.conf.NodeName, "DSVAD"){
                if strings.HasPrefix(m.Name, "DSVAD") {
@@ -461,7 +461,7 @@
            }
        }
    }
    fmt.Println("mbs:",mbs,"a.conf.BindAddr:",a.conf.BindAddr,"specmembername:",specmembername)
    logger.Info("mbs:",mbs,"a.conf.BindAddr:",a.conf.BindAddr,"specmembername:",specmembername)
    //query: get db file.
    params := serf.QueryParam{
@@ -473,9 +473,9 @@
    resp, err := a.Query(QueryEventUpdateDBData, tBytes, &params)
    if err == nil || !strings.Contains(err.Error(), "cannot contain") {
        fmt.Println("err: ", err)
        logger.Error("err: ", err)
    }
    fmt.Println("Query.resp.err:",err,"resp:",resp)
    logger.Info("Query.resp.err:",err,"resp:",resp)
    var wg sync.WaitGroup
    wg.Add(1)
@@ -486,11 +486,11 @@
        for {
            select {
            case r := <-respCh:
                fmt.Println("Query response's len:", len(r.Payload))
                logger.Info("Query response's len:", len(r.Payload))
                err := json.Unmarshal(r.Payload, &dumpSqls)
                if err ==nil {
                    fmt.Println("dumpSql:",dumpSqls)
                    fmt.Println("data dump success")
                    logger.Error("dumpSql:",dumpSqls)
                    logger.Error("data dump success")
                }
                return
            }
@@ -504,7 +504,7 @@
    //    return err
    //}
    //for _, x := range r[0].Values {
    //    y := fmt.Sprintf("%s;\n", x[0].(string))
    //    y := logger.Info("%s;\n", x[0].(string))
    //    if _, err := w.Write([]byte(y)); err != nil {
    //        return err
    //    }
@@ -526,12 +526,12 @@
    }
    ueB, err := json.Marshal(sqlUe)
    if err !=nil {
        fmt.Println("sqlUE marshal err:",err)
        logger.Error("sqlUE marshal err:",err)
        return
    }
    err = a.UserEvent(UserEventSyncSql, ueB, false)
    if err == nil || !strings.Contains(err.Error(), "cannot contain") {
        fmt.Println("err: ", err)
        logger.Error("err: ", err)
    }
}
@@ -539,7 +539,7 @@
func (a *Agent) SyncDbTablePersonCache(b []byte) {
    err := a.UserEvent(UserEventSyncDbTablePersonCache, b, false)
    if err !=nil{
        fmt.Println("UserEventSyncDbTablePersonCache err:",err)
        logger.Error("UserEventSyncDbTablePersonCache err:",err)
    }
}
@@ -547,13 +547,13 @@
func Init(clusterID string, password string, nodeID string, addrs []string) (*Agent, error) {
    agent, err := InitNode(clusterID, password, nodeID)
    if err != nil {
        fmt.Printf("InitNode failed, error: %s", err)
        logger.Error("InitNode failed, error: %s", err)
        return agent, err
    }
    err = agent.JoinByNodeAddrs(addrs)
    if err != nil {
        fmt.Printf("JoinByNodeIP failed, error: %s", err)
        logger.Error("JoinByNodeIP failed, error: %s", err)
        return agent, err
    }
@@ -563,7 +563,7 @@
//InitNode web后台收到创建集群的请求,
func InitNode(clusterID string, password string, nodeID string) (*Agent, error) {
    conf := DefaultConfig()
    fmt.Println("clusterID:", clusterID, "password:", password, "nodeID:", nodeID)
    logger.Info("clusterID:", clusterID, "password:", password, "nodeID:", nodeID)
    conf.ClusterID = clusterID
    conf.NodeName = nodeID
    if password == "" {
@@ -579,7 +579,7 @@
    }
    agent, err := Create(conf)
    if err != nil {
        fmt.Printf("create agent failed, error: %s", err)
        logger.Error("create agent failed, error: %s", err)
        return agent, err
    }
@@ -589,9 +589,9 @@
        agent.ShutdownCh()
    }()
    time.Sleep(time.Second)
    fmt.Println("Stats:", agent.Agent.Serf().Stats())
    fmt.Println("EncryptionEnabled:", agent.Agent.Serf().EncryptionEnabled())
    fmt.Println("create agent sucess!!")
    logger.Info("Stats:", agent.Agent.Serf().Stats())
    logger.Info("EncryptionEnabled:", agent.Agent.Serf().EncryptionEnabled())
    logger.Info("create agent sucess!!")
    return agent, nil
}
@@ -632,7 +632,7 @@
func (a *Agent) GetNodes() (nodes []NodeInfo) {
    var node NodeInfo
    fmt.Println("a.conf.ClusterID:", a.conf.ClusterID)
    logger.Info("a.conf.ClusterID:", a.conf.ClusterID)
    mbs := a.GroupMembers(a.conf.ClusterID)
    for _, mb := range mbs {
        node.NodeID = mb.Name
dbself.go
@@ -9,6 +9,7 @@
    "strings"
    "sync"
    "github.com/jinzhu/gorm"
    "basic.com/valib/logger.git"
)
const (
@@ -27,15 +28,15 @@
        dbPath = PersonSqliteDBPath
    }
    fmt.Println("self: ========>", dbPath)
    logger.Info("self: ========>", dbPath)
    db, err := New(dbPath, "", false)
    if err != nil {
        fmt.Println("new db database: ", err)
        logger.Error("new db database: ", err)
        return err
    }
    dbConn, err := db.Connect()
    if err != nil {
        fmt.Println("new db conn error; ", err)
        logger.Error("new db conn error; ", err)
        return err
    }
@@ -48,21 +49,21 @@
    path, err := getCurrentPath()
    if err != nil {
        fmt.Println("getCurrentPath error; ", err)
        logger.Error("getCurrentPath error; ", err)
        return "", err
    }
    filepath := path + "tmp.db"
    fmt.Println("filepath:", filepath)
    logger.Info("filepath:", filepath)
    db, err := New(filepath, "", false)
    if err != nil {
        fmt.Println("new db database: ", err)
        logger.Error("new db database: ", err)
        return "", err
    }
    tmpconn, err := db.Connect()
    if err != nil {
        fmt.Println("new db conn error; ", err)
        logger.Error("new db conn error; ", err)
        return "", err
    }
    defer tmpconn.Close()
@@ -80,7 +81,7 @@
    defer syncMut.Unlock()
    allResults, err := SerfDbConn.Execute(sqlString, false, false)
    if err != nil {
        fmt.Println("execute error!", err)
        logger.Error("execute error!", err)
        return nil, err
    }
    return allResults, nil
@@ -97,7 +98,7 @@
    defer syncMut.Unlock()
    rows, err := SerfDbConn.Query(sqlString, false, false)
    if err != nil {
        fmt.Println("execute error!", err)
        logger.Error("execute error!", err)
        return nil, err
    }
    return rows, nil
@@ -121,7 +122,7 @@
                return false,result.Error
            }
            if result.RowsAffected == 0{
                fmt.Println("ExecuteSqlByGorm fail")
                logger.Error("ExecuteSqlByGorm fail")
                return false,errors.New("ExecuteSqlByGorm fail")
            }
        }
@@ -158,7 +159,7 @@
            if err !=nil {
                return nil,errors.New("tableDesc err")
            }
            fmt.Println(table,"'Columns is:",tDescArr)
            logger.Info(table,"'Columns is:",tDescArr)
            if tDescArr == nil || len(tDescArr) == 0 {
                return nil,errors.New(table+" has no column")
            }
@@ -183,7 +184,7 @@
                    table)
            }
            fmt.Println("tSql:",tSql)
            logger.Info("tSql:",tSql)
            err = localDb.Raw(tSql).Scan(&dumpSql).Error
            if err !=nil {
@@ -206,9 +207,9 @@
    var b strings.Builder
    if err := SerfDbConn.Dump(&b); err != nil {
        fmt.Println("dump file ", err.Error())
        logger.Error("dump file ", err.Error())
    }
    fmt.Printf("%T\n", b)
    logger.Info("%T\n", b)
}
// get current path
searcher.go
@@ -2,7 +2,7 @@
import (
    "encoding/json"
    "fmt"
    "basic.com/valib/logger.git"
    "time"
    "github.com/hashicorp/memberlist"
@@ -57,7 +57,7 @@
    if err:= json.Unmarshal(b, &n);err ==nil {
        members[n.NodeID] = n
    } else {
        fmt.Println("NotifyMsg msg unmarshal err")
        logger.Error("NotifyMsg msg unmarshal err")
    }
}
@@ -70,11 +70,11 @@
    node := nodeInfo{}
    if err := json.Unmarshal(b, &node); err != nil {
        fmt.Println("Umarshal failed:", err)
        logger.Error("Umarshal failed:", err)
        return
    }
    fmt.Println(node)
    logger.Info(node)
}
func CreateSearchNode(key string) (*memberlist.Memberlist, error) {
@@ -87,7 +87,7 @@
    keyring, err := memberlist.NewKeyring(nil, []byte(key))
    if err != nil {
        fmt.Printf("Failed to restore keyring: %s", err)
        logger.Error("Failed to restore keyring: %s", err)
        return nil, err
    }
    conf.Keyring = keyring
@@ -98,18 +98,15 @@
func CreateSearchNodeWhitClose(key string, delay time.Duration) map[string]NodeInfo {
    m, err := CreateSearchNode(key)
    if err == nil {
        // fmt.Printf("Local member %s:%d\n", m.LocalNode().Addr, m.LocalNode().Port)
        //logger.Info("Local member %s:%d\n", m.LocalNode().Addr, m.LocalNode().Port)
        time.Sleep(delay)
        m.Shutdown()
        fmt.Println("CreateSearchNodeWhitClose after ",delay,",shutdown success")
    }
    return members
}
func CloseSearchNode(m *memberlist.Memberlist) error {
    fmt.Println("CloseSearchNode")
    defer fmt.Println("ShutDown done")
    return m.Shutdown()
}