基于serf的数据库同步模块库
liuxiaolong
2019-10-08 0b3b1aecb4a2a26f8797ea62af2b5a0381962ad5
query response by sendToTcp
1个文件已修改
1个文件已添加
154 ■■■■ 已修改文件
agent.go 94 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
userDelegate.go 60 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
agent.go
@@ -26,8 +26,6 @@
    "net"
    "os"
    "strconv"
    "sync"
    //"os"
    "strings"
    "time"
@@ -81,6 +79,9 @@
        return nil, err
    }
    serfConf.MemberlistConfig.Keyring = keyring
    serfConf.MemberlistConfig.Delegate = &UserDelegate{}
    logger.Info("[INFO] agent: Restored keyring with %d keys from %s",
        len(conf.EncryptKey), conf.EncryptKey)
@@ -189,29 +190,46 @@
            //for _, r := range rows {
            //    rowsReturn = append(rowsReturn, *r)
            //}
            var tableNames []string
            err := json.Unmarshal(ev.Payload, &tableNames)
            var fromP QueryTableDataParam
            err := json.Unmarshal(ev.Payload, &fromP)
            if err !=nil {
                logger.Error("Query tableNames unmarshal err")
                return
            }
            logger.Info("Query tableNames:",tableNames)
            datas, err := ExecuteQueryByGorm(tableNames)
            logger.Info("Query tableNames:",fromP.Tables)
            datas, err := ExecuteQueryByGorm(fromP.Tables)
            if err !=nil {
                logger.Error("queryByGorm err")
                return
            }
            bytesReturn, err := json.Marshal(datas)
            logger.Info("results.len: ", len(bytesReturn))
            if query, ok := event.(*serf.Query); ok {
                if err := query.Respond(bytesReturn); err != nil {
                    logger.Error("err: %s\n", err)
                    return
            var targetNode *memberlist.Node
            nodes := a.Serf().Memberlist().Members()
            if nodes != nil && len(nodes) > 0 {
                for _,n :=range nodes {
                    if n.Name == fromP.From {
                        targetNode = n
                        break
                    }
                }
            }
            if targetNode !=nil {
                sendErr := a.Serf().Memberlist().SendToTCP(targetNode, bytesReturn)
                if sendErr != nil {
                    logger.Debug("sendToTcp err:",sendErr)
                }
            } else {
                logger.Debug("targetNode is nil")
            }
            //var res []*Rows
            //json.Unmarshal(bytesReturn, &res)
            //if query, ok := event.(*serf.Query); ok {
            //    if err := query.Respond(bytesReturn); err != nil {
            //        logger.Error("err: %s\n", err)
            //        return
            //    }
            //}
        }
    case serf.MemberEvent:
        if event.EventType() == serf.EventMemberLeave {
@@ -445,6 +463,11 @@
    }()
}
type QueryTableDataParam struct {
    Tables []string `json:"tables"`
    From string `json:"from"`
}
//GetDbFromCluster get the newest database after join cluster
//dbPathWrite the path where to write after got a database,
func (a *Agent) GetTableDataFromCluster(tableNames []string) (*[]string,error) {
@@ -472,8 +495,12 @@
        FilterNodes: strings.Fields(specmembername),
    }
    //SQL
    tBytes, _ := json.Marshal(tableNames)
    //get db tables
    var fromP = QueryTableDataParam{
        Tables: tableNames,
        From: a.conf.NodeName,
    }
    tBytes, _ := json.Marshal(fromP)
    resp, err := a.Query(QueryEventUpdateDBData, tBytes, &params)
    if err == nil || !strings.Contains(err.Error(), "cannot contain") {
@@ -481,26 +508,27 @@
    }
    logger.Info("Query.resp.err:",err,"resp:",resp)
    var wg sync.WaitGroup
    wg.Add(1)
    var dumpSqls []string
    go func() {
        defer wg.Done()
        respCh := resp.ResponseCh()
        for {
            select {
            case r := <-respCh:
                logger.Info("Query response's len:", len(r.Payload))
                err := json.Unmarshal(r.Payload, &dumpSqls)
                if err ==nil {
                    logger.Error("dumpSql:",dumpSqls)
                    logger.Error("data dump success")
                }
                return
            }
        }
    }()
    wg.Wait()
    //var wg sync.WaitGroup
    //wg.Add(1)
    //go func() {
    //    defer wg.Done()
    //    respCh := resp.ResponseCh()
    //    for {
    //        select {
    //        case r := <-respCh:
    //            logger.Info("Query response's len:", len(r.Payload))
    //            err := json.Unmarshal(r.Payload, &dumpSqls)
    //            if err ==nil {
    //                logger.Error("dumpSql:",dumpSqls)
    //                logger.Error("data dump success")
    //            }
    //            return
    //        }
    //    }
    //}()
    //wg.Wait()
    return &dumpSqls,nil
    //r, err = c.Query([]string{query}, false, false)
userDelegate.go
New file
@@ -0,0 +1,60 @@
package syncdb
import (
    "basic.com/valib/logger.git"
    "encoding/json"
)
type UserDelegate struct {
}
// NodeMeta is used to retrieve meta-data about the current node
// when broadcasting an alive message. It's length is limited to
// the given byte size. This metadata is available in the Node structure.
func (d *UserDelegate) NodeMeta(limit int) []byte{
    return []byte{}
}
// NotifyMsg is called when a user-data message is received.
// Care should be taken that this method does not block, since doing
// so would block the entire UDP packet receive loop. Additionally, the byte
// slice may be modified after the call returns, so it should be copied if needed
func (d *UserDelegate) NotifyMsg(msg []byte) {
    recvMsg := msg
    logger.Info("UserDelegate NotifyMsg.len:",len(recvMsg))
    var dumpSqls []string
    err := json.Unmarshal(recvMsg, &dumpSqls)
    if err == nil {
        logger.Debug("dumpSqls:", dumpSqls)
    } else {
        logger.Error("UserDelete unmarshal msg to dumpSqls err:",err)
    }
}
// GetBroadcasts is called when user data messages can be broadcast.
// It can return a list of buffers to send. Each buffer should assume an
// overhead as provided with a limit on the total byte size allowed.
// The total byte size of the resulting data to send must not exceed
// the limit. Care should be taken that this method does not block,
// since doing so would block the entire UDP packet receive loop.
func (d *UserDelegate) GetBroadcasts(overhead, limit int) [][]byte {
    return [][]byte{}
}
// LocalState is used for a TCP Push/Pull. This is sent to
// the remote side in addition to the membership information. Any
// data can be sent here. See MergeRemoteState as well. The `join`
// boolean indicates this is for a join instead of a push/pull.
func (d *UserDelegate) LocalState(join bool) []byte {
    return []byte{}
}
// MergeRemoteState is invoked after a TCP Push/Pull. This is the
// state received from the remote side and is the result of the
// remote side's LocalState call. The 'join'
// boolean indicates this is for a join instead of a push/pull.
func (d *UserDelegate) MergeRemoteState(buf []byte, join bool) {
}