基于serf的数据库同步模块库
liuxiaolong
2019-10-11 ed8c08492682ac04df90fe1b80298a73d5414194
test tcp read
1个文件已修改
39 ■■■■ 已修改文件
transport.go 39 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
transport.go
@@ -2,6 +2,10 @@
import (
    "basic.com/valib/logger.git"
    "bufio"
    "bytes"
    "encoding/binary"
    "io"
    "net"
    "strconv"
)
@@ -16,7 +20,12 @@
    defer conn.Close()
    //发送
    _, err = conn.Write(sendBuf)
    sizeBuf := make([]byte,4)
    var buf bytes.Buffer
    binary.BigEndian.PutUint32(sizeBuf,uint32(len(sendBuf)))
    buf.Write(sizeBuf)
    buf.Write(sendBuf)
    _, err = conn.Write(buf.Bytes())
    if err != nil {
        logger.Debug("conn.Write err", err)
        return err
@@ -50,17 +59,25 @@
func readStream(conn net.Conn) {
    defer conn.Close()
    data := make([]byte,0)
    buf := make([]byte, 4096)
    for {
        n,err :=conn.Read(buf)
        if n == 0{
            break
    var data []byte
    var reader io.Reader = bufio.NewReader(conn)
    sizeBuf :=make([]byte,4)
    if _,err := reader.Read(sizeBuf[:]);err !=nil {
        logger.Debug("read tcpStream msg length err:",err)
    } else {
        var msgLen uint32
        binary.Read(bytes.NewBuffer(sizeBuf),binary.BigEndian,&msgLen)
        dataLen := int(msgLen)
        logger.Debug("read tcpStream msg lenth:",dataLen)
        if dataLen >0 {
            data =make([]byte, dataLen)
            n,err := io.ReadAtLeast(reader, data, dataLen)
            if err ==nil {
                logger.Debug("io.ReadAtLeast n:",n)
            } else {
                logger.Debug("io.readAtLeast err:",err)
            }
        }
        if err !=nil {
            return
        }
        data = append(data,buf...)
    }
    QueryTcpResponseChan <- data
}