基于serf的数据库同步模块库
liuxiaolong
2019-10-10 34e5eae1c368848300bfa4ea1ead3b5e7c2a8a64
query use tcp transport
2个文件已修改
1个文件已添加
1个文件已删除
153 ■■■■ 已修改文件
agent.go 28 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
config.go 2 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
transport.go 61 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
userDelegate.go 62 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
agent.go
@@ -82,8 +82,6 @@
    }
    serfConf.MemberlistConfig.Keyring = keyring
    serfConf.MemberlistConfig.Delegate = &UserDelegate{}
    logger.Info("[INFO] agent: Restored keyring with %d keys from %s",
        len(conf.EncryptKey), conf.EncryptKey)
@@ -216,11 +214,17 @@
                    }
                }
            }
            logger.Debug("targetNode:",targetNode.Name)
            if targetNode !=nil {
                sendErr := a.Serf().Memberlist().SendToTCP(targetNode, bytesReturn)
                addr := targetNode.Addr.String() + ":" + strconv.Itoa(TcpTransportPort)
                sendErr := rawSendTcpMsg(addr, bytesReturn)
                if sendErr != nil {
                    logger.Debug("sendToTcp err:",sendErr)
                } else {
                    logger.Debug("sendToTcp success")
                }
            } else {
                logger.Debug("targetNode is nil")
            }
@@ -263,6 +267,7 @@
    //a.DeregisterEventHandler(a)
    //close(a.readyCh)
}
func (a *Agent) BroadcastMemberlist(delay time.Duration) {
    //serf := a.serf
@@ -516,28 +521,17 @@
    wg.Add(1)
    go func() {
        defer wg.Done()
        //respCh := resp.ResponseCh()
        for {
            select {
            //case r := <-respCh:
            //    logger.Info("Query response's len:", len(r.Payload))
            //    err := json.Unmarshal(r.Payload, &dumpSqls)
            //    if err ==nil {
            //        logger.Error("dumpSql:",dumpSqls)
            //        logger.Error("data dump success")
            //    }
            //    return
            case msg := <-QueryTcpResponseChan:
                logger.Debug("QueryTcpResponseChan receive msg len:",len(msg))
                logger.Info("Query response's len:", len(msg))
                err := json.Unmarshal(msg, &dumpSqls)
                if err ==nil {
                    logger.Error("dumpSql success:",dumpSqls)
                } else {
                    logger.Error("data dump err:",err)
                    logger.Error("dumpSql:", dumpSqls)
                    logger.Error("data dump success")
                }
                return
            }
        }
    }()
    wg.Wait()
config.go
@@ -47,6 +47,8 @@
    MaxUserEventSize   = 5 * 1024
    ReplayOnJoinDefault = false
    SnapshotPathDefault = "/opt/vasystem/serfSnapShot"
    TcpTransportPort = 30194 //tcp传输大数据量接口
)
// DefaultConfig default config
transport.go
New file
@@ -0,0 +1,61 @@
package syncdb
import (
    "basic.com/valib/logger.git"
    "net"
    "strconv"
)
func rawSendTcpMsg(addr string, sendBuf []byte) error {
    conn, err := net.Dial("tcp", addr)
    if err != nil {
        logger.Debug("net.Dialt err", err)
        return err
    }
    defer conn.Close()
    //发送
    _, err = conn.Write(sendBuf)
    if err != nil {
        logger.Debug("conn.Write err", err)
        return err
    } else {
        logger.Debug("raw send success")
        return nil
    }
}
func RawReceiveTcpMsg() {
    var tcpAddr *net.TCPAddr
    tcpAddr,_ = net.ResolveTCPAddr("tcp","127.0.0.1:"+strconv.Itoa(TcpTransportPort))
    listener,_ := net.ListenTCP("tcp",tcpAddr)
    defer listener.Close()
    for{
        conn,err := listener.AcceptTCP()
        if err!=nil {
            logger.Debug("listener.Accept err:", err)
            continue
        }
        logger.Debug("A transport client connected :" +conn.RemoteAddr().String())
        go readStream(conn)
    }
}
func readStream(conn *net.TCPConn) {
    data := make([]byte,0)
    buf := make([]byte, 4096)
    for {
        n,err :=conn.Read(buf)
        if n == 0{
            break
        }
        if err !=nil {
            return
        }
        data = append(data,buf...)
    }
    QueryTcpResponseChan <- data
}
userDelegate.go
File was deleted