基于serf的数据库同步模块库
liuxiaolong
2019-10-11 9570ba882a13388a16d2980fd93da25fe27e7480
transport.go
@@ -2,6 +2,10 @@
import (
   "basic.com/valib/logger.git"
   "bufio"
   "bytes"
   "encoding/binary"
   "io"
   "net"
   "strconv"
)
@@ -16,7 +20,12 @@
   defer conn.Close()
   //发送
   _, err = conn.Write(sendBuf)
   sizeBuf := make([]byte,4)
   var buf bytes.Buffer
   binary.BigEndian.PutUint32(sizeBuf,uint32(len(sendBuf)))
   buf.Write(sizeBuf)
   buf.Write(sendBuf)
   _, err = conn.Write(buf.Bytes())
   if err != nil {
      logger.Debug("conn.Write err", err)
      return err
@@ -50,17 +59,25 @@
func readStream(conn net.Conn) {
   defer conn.Close()
   data := make([]byte,0)
   buf := make([]byte, 4096)
   for {
      n,err :=conn.Read(buf)
      if n == 0{
         break
   var data []byte
   var reader io.Reader = bufio.NewReader(conn)
   sizeBuf :=make([]byte,4)
   if _,err := reader.Read(sizeBuf[:]);err !=nil {
      logger.Debug("read tcpStream msg length err:",err)
   } else {
      var msgLen uint32
      binary.Read(bytes.NewBuffer(sizeBuf),binary.BigEndian,&msgLen)
      dataLen := int(msgLen)
      logger.Debug("read tcpStream msg lenth:",dataLen)
      if dataLen >0 {
         data =make([]byte, dataLen)
         n,err := io.ReadAtLeast(reader, data, dataLen)
         if err ==nil {
            logger.Debug("io.ReadAtLeast n:",n)
         } else {
            logger.Debug("io.readAtLeast err:",err)
         }
      }
      if err !=nil {
         return
      }
      data = append(data,buf...)
   }
   QueryTcpResponseChan <- data
}