基于serf的数据库同步模块库
liuxiaolong
2021-05-31 d70649cfb61e64fabce40199ad1d53d6a4973f0e
no business
2个文件已修改
4个文件已删除
2139 ■■■■■ 已修改文件
agent.go 423 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
config.go 2 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
db.go 514 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
db_test.go 880 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
dbself.go 237 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
transport.go 83 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
agent.go
@@ -22,29 +22,15 @@
    "errors"
    "fmt"
    "github.com/hashicorp/memberlist"
    "io/ioutil"
    "net"
    "os"
    "strconv"
    "sync"
    //"os"
    "strings"
    "time"
    "basic.com/valib/serf.git/serf"
    "basic.com/valib/serf.git/cmd/serf/command/agent"
    "basic.com/valib/serf.git/serf"
    //"github.com/apache/servicecomb-service-center/pkg/log"
    "basic.com/valib/logger.git"
    "github.com/satori/go.uuid"
)
const (
    QueryEventGetDB        = "GetDatabase"
    QueryEventUpdateDBData = "UpdateDBData"
    UserEventSyncSql       = "SyncSql"
    UserEventSyncDbTablePersonCache       = "SyncCache"
    UserEventSyncVirtualIp = "SyncVirtualIp" //漂移ip修改
)
// Agent warps the serf agent
@@ -53,7 +39,11 @@
    conf    *Config
    readyCh chan struct{}
    errorCh chan error
    handleEv     HandleEventFunc
}
//用户自定义事件处理
type HandleEventFunc func(event serf.Event)
type NodeInfo struct {
    ClusterID   string `json:"clusterID"`
@@ -95,6 +85,12 @@
    }, nil
}
func (a *Agent) RegisterHandleEventFunc(f HandleEventFunc) {
    if f != nil {
        a.handleEv = f
    }
}
// Start agent
func (a *Agent) Start(ctx context.Context) {
    err := a.Agent.Start()
@@ -116,234 +112,14 @@
    go a.BroadcastMemberlist(BroadcastInterval * time.Second)
}
var SyncDbTablePersonCacheChan = make(chan []byte,512)
var SyncVirtualIpChan = make(chan []byte, 512)
// HandleEvent Handles serf.EventMemberJoin events,
// which will wait for members to join until the number of group members is equal to "groupExpect"
// when the startup mode is "ModeCluster",
// used for logical grouping of serf nodes
func (a *Agent) HandleEvent(event serf.Event) {
    switch ev := event.(type) {
    case serf.UserEvent:
        if ev.Name == UserEventSyncSql {
            logger.Info("receive a UserEventSyncSql event")
            var sqlUe SqlUserEvent
            err := json.Unmarshal(ev.Payload, &sqlUe)
            if err !=nil {
                logger.Error("sqlUe unmarshal err:",err)
                return
    if a.handleEv != nil {
        a.handleEv(event)
            }
            logger.Info("ev.LTime:", ev.LTime ,"owner:", sqlUe.Owner, "sql:", sqlUe.Sql)
            if sqlUe.Owner != a.conf.NodeName {
                go func() {
                    flag, e := ExecuteSqlByGorm(sqlUe.Sql)
                    logger.Info("ev.LTime:",ev.LTime,"userEvent exec ",sqlUe.Sql,",Result:",flag,", err:",e)
                    logLT := strconv.Itoa(int(ev.LTime))
                    logT := time.Now().Format("2006-01-02 15:04:05")
                    logSql := strings.ReplaceAll(strings.Join(sqlUe.Sql, ";"), "'", "''")
                    logResult := "0"
                    if flag {
                        logResult = "1"
                    }
                    logErr := ""
                    if e != nil {
                        logErr = e.Error()
                    }
                    ExecuteSqlByGorm([]string{"insert into sql_sync_his(`id`,`lTime`,`createTime`,`sql`,`from`,`result`,`err`) values ('"+uuid.NewV4().String()+"','"+ logLT +"','"+logT+"','"+logSql+"','"+sqlUe.Owner+"',"+logResult+",'"+logErr+"')"})
                }()
            }
        } else if ev.Name == UserEventSyncDbTablePersonCache {
            logger.Info("LTime:",ev.LTime,",ev.Payload.len:",len(ev.Payload))
            SyncDbTablePersonCacheChan <- ev.Payload
        } else if ev.Name == UserEventSyncVirtualIp {
            logger.Info("LTime:", ev.LTime, " Recevie virtualIp change")
            SyncVirtualIpChan <- ev.Payload
        }
    case *serf.Query:
        if ev.Name == QueryEventGetDB {
            //bak file and send resp
            filename, err := BakDbFile()
            if err != nil {
                logger.Error("bak db file error!")
                return
            }
            logger.Info(filename)
            filebuf, err := ioutil.ReadFile(filename)
            logger.Info("filebuf: ", len(filebuf))
            if err != nil {
                logger.Error("file to []bytes error: %s\n", err)
                return
            }
            err = os.Remove(filename)
            if err != nil {
                logger.Error("remove file%s\n failed", filename)
                return
            }
            logger.Info("query payload: ", len(ev.Payload))
            if query, ok := event.(*serf.Query); ok {
                if err := query.Respond(filebuf); err != nil {
                    logger.Error("err: %s\n", err)
                    return
                }
            }
        } else if ev.Name == QueryEventUpdateDBData {
            //logger.Info(string(ev.Payload))
            //var tmpstringslice []string
            //tmpstringslice = append(tmpstringslice, string(ev.Payload))
            //logger.Info(tmpstringslice)
            //rows, err := ExecuteQuerySql(tmpstringslice)
            //if err != nil {
            //    logger.Error("err: ", err)
            //    return
            //}
            //var rowsReturn []Rows
            //for _, r := range rows {
            //    rowsReturn = append(rowsReturn, *r)
            //}
            logger.Info("receive QueryEventUpdateDBData, current node:", a.conf.NodeName)
            var fromP QueryTableDataParam
            err := json.Unmarshal(ev.Payload, &fromP)
            if err !=nil {
                logger.Error("Query tableNames unmarshal err")
                if query, ok := event.(*serf.Query); ok {
                    if err := query.Respond([]byte("request unmarshal err")); err != nil {
                        logger.Error("query.Respond err: %s\n", err)
                        return
                    }
                }
                return
            }
            logger.Info("Query tableNames:",fromP.Tables)
            datas, err := ExecuteQueryByGorm(fromP.Tables)
            if err !=nil {
                logger.Error("queryByGorm err:", err)
                if query, ok := event.(*serf.Query); ok {
                    if err := query.Respond([]byte("queryByGorm err")); err != nil {
                        logger.Error("query.Respond err: %s\n", err)
                        return
                    }
                }
                return
            }
            bytesReturn, err := json.Marshal(datas)
            logger.Info("results.len: ", len(bytesReturn))
            var targetNode *memberlist.Node
            nodes := a.Serf().Memberlist().Members()
            if nodes != nil && len(nodes) > 0 {
                for _,n :=range nodes {
                    if n.Name == fromP.From {
                        targetNode = n
                        break
                    }
                }
            }
            logger.Debug("targetNode:",targetNode.Name)
            if targetNode !=nil {
                go func() {
                    addr := targetNode.Addr.String() + ":" + strconv.Itoa(TcpTransportPort)
                    sendErr := rawSendTcpMsg(addr, bytesReturn)
                    logLT := strconv.Itoa(int(ev.LTime))
                    logT := time.Now().Format("2006-01-02 15:04:05")
                    logSql := strings.ReplaceAll("QueryEventUpdateDBData from "+targetNode.Name,"'","''")
                    logResult := "0"
                    logErr := ""
                    if sendErr ==nil {
                        logResult = "1"
                        logger.Debug("sendToTcp success")
                    } else {
                        logErr = sendErr.Error()
                        logger.Debug("sendToTcp err:",sendErr)
                    }
                    ExecuteSqlByGorm([]string{"insert into sql_sync_his(`id`,`lTime`,`createTime`,`sql`,`from`,`result`,`err`) values ('"+uuid.NewV4().String()+"','"+ logLT +"','"+logT+"','"+logSql+"','"+targetNode.Name+"',"+logResult+",'"+logErr+"')"})
                }()
            } else {
                logger.Debug("targetNode is nil")
            }
            //if query, ok := event.(*serf.Query); ok {
            //    if err := query.Respond(bytesReturn); err != nil {
            //        logger.Error("err: %s\n", err)
            //        return
            //    }
            //}
        }
    case serf.MemberEvent:
        if event.EventType() == serf.EventMemberLeave {
            if ev.Members !=nil && len(ev.Members) ==1 {
                leaveMember := ev.Members[0]
                leaveSql := "update cluster_node set isDelete=1 where node_id='"+leaveMember.Name+"'"
                flag,e := ExecuteSqlByGorm([]string{ leaveSql })
                logger.Info("EventMemberLeave,current Members:",ev.Members)
                logLT := ""
                logT := time.Now().Format("2006-01-02 15:04:05")
                logSql := strings.ReplaceAll(leaveSql, "'","''")
                logResult := "0"
                if flag {
                    logResult = "1"
                }
                logErr := ""
                if e != nil {
                    logErr = e.Error()
                }
                ExecuteSqlByGorm([]string{"insert into sql_sync_his(`id`,`lTime`,`createTime`,`sql`,`from`,`result`,`err`) values ('"+uuid.NewV4().String()+"','"+ logLT +"','"+logT+"','"+logSql+"','"+leaveMember.Name+"',"+logResult+",'"+logErr+"')"})
            }
            return
        } else if event.EventType() == serf.EventMemberJoin {
            if ev.Members !=nil && len(ev.Members) ==1 {
                leaveMember := ev.Members[0]
                joinSql := "update cluster_node set isDelete=0 where node_id='"+leaveMember.Name+"'"
                flag,e := ExecuteSqlByGorm([]string{joinSql})
                logger.Info("EventMemberJoin,current Members:",ev.Members)
                logLT := ""
                logT := time.Now().Format("2006-01-02 15:04:05")
                logSql := strings.ReplaceAll(joinSql, "'", "''")
                logResult := "0"
                if flag {
                    logResult = "1"
                }
                logErr := ""
                if e != nil {
                    logErr = e.Error()
                }
                ExecuteSqlByGorm([]string{"insert into sql_sync_his(`id`,`lTime`,`createTime`,`sql`,`from`,`result`,`err`) values ('"+uuid.NewV4().String()+"','"+ logLT +"','"+logT+"','"+logSql+"','"+leaveMember.Name+"',"+logResult+",'"+logErr+"')"})
            }
            return
        }
    default:
        logger.Warn("Unknown event type: %s\n", ev.EventType().String())
    }
    //if event.EventType() != serf.EventMemberJoin {
    //    logger.Info("event.EventType() != serf.EventMemberJoin")
    //    return
    //}
    //
    //if a.conf.Mode == ModeCluster {
    //    if len(a.GroupMembers(a.conf.ClusterID)) < groupExpect {
    //        logger.Error("len(a.GroupMembers(a.conf.ClusterID)) < groupExpect")
    //        return
    //    }
    //}
    //a.DeregisterEventHandler(a)
    //close(a.readyCh)
}
@@ -499,176 +275,9 @@
    return
}
//GetDbFromCluster get the newest database after join cluster
//dbPathWrite the path where to write after got a database,
func (a *Agent) GetDbFromCluster(dbPathWrite string) {
    //members: get name of first member
    mbs := a.GroupMembers(a.conf.ClusterID)
    var specmembername string
    for _, m := range mbs {
        if m.Addr.String() != a.conf.BindAddr {
            specmembername = m.Name
            break
        }
    }
    logger.Info(specmembername)
    //query: get db file.
    params := serf.QueryParam{
        FilterNodes: strings.Fields(specmembername),
    }
    resp, err := a.Query(QueryEventGetDB, []byte(""), &params)
    if err == nil || !strings.Contains(err.Error(), "cannot contain") {
        logger.Error("err: ", err)
    }
    go func() {
        respCh := resp.ResponseCh()
        for {
            select {
            case r := <-respCh:
                logger.Info("x length is: ", len(r.Payload))
                // // byte to file.
                SerfDbConn.Close()
                SerfDbConn = nil
                err = ioutil.WriteFile(dbPathWrite, r.Payload, 0644)
                if err != nil {
                    logger.Error("query byte to file error!", err)
                }
                err := InitDbConn("")
                if err != nil {
                    logger.Error("create db conn of test.db error: ", err)
                }
                return
            }
        }
    }()
}
type QueryTableDataParam struct {
    Tables []string `json:"tables"`
    From string `json:"from"`
}
var QueryTcpResponseChan = make(chan []byte)
//GetDbFromCluster get the newest database after join cluster
//dbPathWrite the path where to write after got a database,
func (a *Agent) GetTableDataFromCluster(tableNames []string, timeout time.Duration) (*[]string,error) {
    //members: get name of first member
    mbs := a.GroupMembers(a.conf.ClusterID)
    var specmembername string
    for _, m := range mbs {
        logger.Info("m",m)
        if m.Name != a.conf.NodeName { //前缀:DSVAD:分析服务器 DSPAD:进出入pad
            if strings.HasPrefix(a.conf.NodeName, "DSVAD"){
                if strings.HasPrefix(m.Name, "DSVAD") {
                    specmembername = m.Name
                    break
                }
            }else{
                specmembername = m.Name
                break
            }
        }
    }
    logger.Info("mbs:",mbs,"a.conf.BindAddr:",a.conf.BindAddr,"specmembername:",specmembername)
    if specmembername == "" {//如果未找到目标节点,说明当前集群内除了本节点,没有其他可用节点
        return nil,errors.New("specmembername not found")
    }
    //query: get db file.
    params := serf.QueryParam{
        FilterNodes: strings.Fields(specmembername),
    }
    //get db tables
    var fromP = QueryTableDataParam{
        Tables: tableNames,
        From: a.conf.NodeName,
    }
    tBytes, _ := json.Marshal(fromP)
    resp, err := a.Query(QueryEventUpdateDBData, tBytes, &params)
    if err == nil || !strings.Contains(err.Error(), "cannot contain") {
        logger.Error("err: ", err)
    }
    logger.Info("Query.resp.err:",err,"resp:",resp)
    var dumpSqls []string
    var wg sync.WaitGroup
    wg.Add(1)
    ticker := time.NewTicker(timeout)
    go func(tk *time.Ticker) {
        defer tk.Stop()
        defer wg.Done()
        for {
            select {
            case <-tk.C:
                return
            case msg := <- QueryTcpResponseChan:
                logger.Info("Query response's len:", len(msg))
                err := json.Unmarshal(msg, &dumpSqls)
                if err == nil {
                    logger.Error("dumpSql:", dumpSqls)
                    logger.Error("data dump success")
                }
                return
            }
        }
    }(ticker)
    wg.Wait()
    return &dumpSqls,nil
    //r, err = c.Query([]string{query}, false, false)
    //if err != nil {
    //    return err
    //}
    //for _, x := range r[0].Values {
    //    y := logger.Info("%s;\n", x[0].(string))
    //    if _, err := w.Write([]byte(y)); err != nil {
    //        return err
    //    }
    //}
}
type SqlUserEvent struct {
    Owner string `json:"owner"`
    Sql []string `json:"sql"`
}
//SyncSql boardcast sql to cluster
func (a *Agent) SyncSql(sqlOp []string) {
    // event : use to send command to operate db.
    var sqlUe = SqlUserEvent{
        Owner: a.conf.NodeName,
        Sql: sqlOp,
    }
    ueB, err := json.Marshal(sqlUe)
    if err !=nil {
        logger.Error("sqlUE marshal err:",err)
        return
    }
    err = a.UserEvent(UserEventSyncSql, ueB, false)
    if err == nil || !strings.Contains(err.Error(), "cannot contain") {
        logger.Error("err: ", err)
    }
}
//更新同步库的比对缓存
func (a *Agent) SyncDbTablePersonCache(b []byte) {
    err := a.UserEvent(UserEventSyncDbTablePersonCache, b, false)
    if err !=nil{
        logger.Error("UserEventSyncDbTablePersonCache err:",err)
    }
}
//Init serf Init
func Init(clusterID string, password string, nodeID string, addrs []string, snapshotPath string) (*Agent, error) {
    agent, err := InitNode(clusterID, password, nodeID, snapshotPath)
func Init(clusterID string, password string, nodeID string, addrs []string, snapshotPath string, hef HandleEventFunc) (*Agent, error) {
    agent, err := InitNode(clusterID, password, nodeID, snapshotPath, hef)
    if err != nil {
        logger.Error("InitNode failed, error: %s", err)
        return agent, err
@@ -684,7 +293,7 @@
}
//InitNode web后台收到创建集群的请求,
func InitNode(clusterID string, password string, nodeID string, snapshotPath string) (*Agent, error) {
func InitNode(clusterID string, password string, nodeID string, snapshotPath string, hef HandleEventFunc) (*Agent, error) {
    conf := DefaultConfig()
    logger.Info("clusterID:", clusterID, "password:", password, "nodeID:", nodeID)
    conf.ClusterID = clusterID
config.go
@@ -50,8 +50,6 @@
    ReplayOnJoinDefault = false
    SnapshotPathDefault = "./serfSnapShot"
    MaxEventBufferCount = 2048
    TcpTransportPort = 30194 //tcp传输大数据量接口
)
// DefaultConfig default config
db.go
File was deleted
db_test.go
File was deleted
dbself.go
File was deleted
transport.go
File was deleted