From 34e5eae1c368848300bfa4ea1ead3b5e7c2a8a64 Mon Sep 17 00:00:00 2001
From: liuxiaolong <736321739@qq.com>
Date: 星期四, 10 十月 2019 20:44:30 +0800
Subject: [PATCH] query use tcp transport

---
 agent.go     |   32 ++++------
 /dev/null    |   62 --------------------
 transport.go |   61 ++++++++++++++++++++
 config.go    |    2 
 4 files changed, 76 insertions(+), 81 deletions(-)

diff --git a/agent.go b/agent.go
index 974a29f..b64a425 100644
--- a/agent.go
+++ b/agent.go
@@ -82,8 +82,6 @@
 	}
 	serfConf.MemberlistConfig.Keyring = keyring
 
-	serfConf.MemberlistConfig.Delegate = &UserDelegate{}
-
 	logger.Info("[INFO] agent: Restored keyring with %d keys from %s",
 		len(conf.EncryptKey), conf.EncryptKey)
 
@@ -216,11 +214,17 @@
 					}
 				}
 			}
+			logger.Debug("targetNode:",targetNode.Name)
 			if targetNode !=nil {
-				sendErr := a.Serf().Memberlist().SendToTCP(targetNode, bytesReturn)
+				addr := targetNode.Addr.String() + ":" + strconv.Itoa(TcpTransportPort)
+				sendErr := rawSendTcpMsg(addr, bytesReturn)
+
 				if sendErr != nil {
 					logger.Debug("sendToTcp err:",sendErr)
+				} else {
+					logger.Debug("sendToTcp success")
 				}
+
 			} else {
 				logger.Debug("targetNode is nil")
 			}
@@ -263,6 +267,7 @@
 	//a.DeregisterEventHandler(a)
 	//close(a.readyCh)
 }
+
 
 func (a *Agent) BroadcastMemberlist(delay time.Duration) {
 	//serf := a.serf
@@ -516,28 +521,17 @@
 	wg.Add(1)
 	go func() {
 		defer wg.Done()
-		//respCh := resp.ResponseCh()
 		for {
 			select {
-			//case r := <-respCh:
-			//	logger.Info("Query response's len:", len(r.Payload))
-			//	err := json.Unmarshal(r.Payload, &dumpSqls)
-			//	if err ==nil {
-			//		logger.Error("dumpSql:",dumpSqls)
-			//		logger.Error("data dump success")
-			//	}
-			//	return
-			case msg := <-QueryTcpResponseChan:
-				logger.Debug("QueryTcpResponseChan receive msg len:",len(msg))
+			case msg := <- QueryTcpResponseChan:
+				logger.Info("Query response's len:", len(msg))
 				err := json.Unmarshal(msg, &dumpSqls)
-				if err ==nil {
-					logger.Error("dumpSql success:",dumpSqls)
-				} else {
-					logger.Error("data dump err:",err)
+				if err == nil {
+					logger.Error("dumpSql:", dumpSqls)
+					logger.Error("data dump success")
 				}
 				return
 			}
-
 		}
 	}()
 	wg.Wait()
diff --git a/config.go b/config.go
index 55713a3..35f1793 100644
--- a/config.go
+++ b/config.go
@@ -47,6 +47,8 @@
 	MaxUserEventSize   = 5 * 1024
 	ReplayOnJoinDefault = false
 	SnapshotPathDefault = "/opt/vasystem/serfSnapShot"
+
+	TcpTransportPort = 30194 //tcp浼犺緭澶ф暟鎹噺鎺ュ彛
 )
 
 // DefaultConfig default config
diff --git a/transport.go b/transport.go
new file mode 100644
index 0000000..3b2c634
--- /dev/null
+++ b/transport.go
@@ -0,0 +1,61 @@
+package syncdb
+
+import (
+	"basic.com/valib/logger.git"
+	"net"
+	"strconv"
+)
+
+func rawSendTcpMsg(addr string, sendBuf []byte) error {
+	conn, err := net.Dial("tcp", addr)
+	if err != nil {
+		logger.Debug("net.Dialt err", err)
+		return err
+	}
+
+	defer conn.Close()
+
+	//鍙戦��
+	_, err = conn.Write(sendBuf)
+	if err != nil {
+		logger.Debug("conn.Write err", err)
+		return err
+	} else {
+		logger.Debug("raw send success")
+		return nil
+	}
+}
+
+func RawReceiveTcpMsg() {
+	var tcpAddr *net.TCPAddr
+	tcpAddr,_ = net.ResolveTCPAddr("tcp","127.0.0.1:"+strconv.Itoa(TcpTransportPort))
+
+	listener,_ := net.ListenTCP("tcp",tcpAddr)
+	defer listener.Close()
+
+	for{
+		conn,err := listener.AcceptTCP()
+		if err!=nil {
+			logger.Debug("listener.Accept err:", err)
+			continue
+		}
+		logger.Debug("A transport client connected :" +conn.RemoteAddr().String())
+		go readStream(conn)
+	}
+}
+
+func readStream(conn *net.TCPConn) {
+	data := make([]byte,0)
+	buf := make([]byte, 4096)
+	for {
+		n,err :=conn.Read(buf)
+		if n == 0{
+			break
+		}
+		if err !=nil {
+			return
+		}
+		data = append(data,buf...)
+	}
+	QueryTcpResponseChan <- data
+}
\ No newline at end of file
diff --git a/userDelegate.go b/userDelegate.go
deleted file mode 100644
index 00c832d..0000000
--- a/userDelegate.go
+++ /dev/null
@@ -1,62 +0,0 @@
-package syncdb
-
-import (
-	"basic.com/valib/logger.git"
-	"encoding/json"
-)
-
-type UserDelegate struct {
-
-}
-
-// NodeMeta is used to retrieve meta-data about the current node
-// when broadcasting an alive message. It's length is limited to
-// the given byte size. This metadata is available in the Node structure.
-func (d *UserDelegate) NodeMeta(limit int) []byte{
-	return []byte{}
-}
-
-// NotifyMsg is called when a user-data message is received.
-// Care should be taken that this method does not block, since doing
-// so would block the entire UDP packet receive loop. Additionally, the byte
-// slice may be modified after the call returns, so it should be copied if needed
-func (d *UserDelegate) NotifyMsg(msg []byte) {
-	recvMsg := msg
-	logger.Info("UserDelegate NotifyMsg.len:",len(recvMsg))
-	QueryTcpResponseChan <- msg
-
-	var dumpSqls []string
-	err := json.Unmarshal(recvMsg, &dumpSqls)
-
-	if err == nil {
-		logger.Debug("dumpSqls:", dumpSqls)
-	} else {
-		logger.Error("UserDelegate unmarshal msg to dumpSqls err:",err)
-	}
-}
-
-// GetBroadcasts is called when user data messages can be broadcast.
-// It can return a list of buffers to send. Each buffer should assume an
-// overhead as provided with a limit on the total byte size allowed.
-// The total byte size of the resulting data to send must not exceed
-// the limit. Care should be taken that this method does not block,
-// since doing so would block the entire UDP packet receive loop.
-func (d *UserDelegate) GetBroadcasts(overhead, limit int) [][]byte {
-	return [][]byte{}
-}
-
-// LocalState is used for a TCP Push/Pull. This is sent to
-// the remote side in addition to the membership information. Any
-// data can be sent here. See MergeRemoteState as well. The `join`
-// boolean indicates this is for a join instead of a push/pull.
-func (d *UserDelegate) LocalState(join bool) []byte {
-	return []byte{}
-}
-
-// MergeRemoteState is invoked after a TCP Push/Pull. This is the
-// state received from the remote side and is the result of the
-// remote side's LocalState call. The 'join'
-// boolean indicates this is for a join instead of a push/pull.
-func (d *UserDelegate) MergeRemoteState(buf []byte, join bool) {
-
-}
\ No newline at end of file

--
Gitblit v1.8.0