From 34e5eae1c368848300bfa4ea1ead3b5e7c2a8a64 Mon Sep 17 00:00:00 2001
From: liuxiaolong <736321739@qq.com>
Date: 星期四, 10 十月 2019 20:44:30 +0800
Subject: [PATCH] query use tcp transport
---
agent.go | 32 ++++------
/dev/null | 62 --------------------
transport.go | 61 ++++++++++++++++++++
config.go | 2
4 files changed, 76 insertions(+), 81 deletions(-)
diff --git a/agent.go b/agent.go
index 974a29f..b64a425 100644
--- a/agent.go
+++ b/agent.go
@@ -82,8 +82,6 @@
}
serfConf.MemberlistConfig.Keyring = keyring
- serfConf.MemberlistConfig.Delegate = &UserDelegate{}
-
logger.Info("[INFO] agent: Restored keyring with %d keys from %s",
len(conf.EncryptKey), conf.EncryptKey)
@@ -216,11 +214,17 @@
}
}
}
+ logger.Debug("targetNode:",targetNode.Name)
if targetNode !=nil {
- sendErr := a.Serf().Memberlist().SendToTCP(targetNode, bytesReturn)
+ addr := targetNode.Addr.String() + ":" + strconv.Itoa(TcpTransportPort)
+ sendErr := rawSendTcpMsg(addr, bytesReturn)
+
if sendErr != nil {
logger.Debug("sendToTcp err:",sendErr)
+ } else {
+ logger.Debug("sendToTcp success")
}
+
} else {
logger.Debug("targetNode is nil")
}
@@ -263,6 +267,7 @@
//a.DeregisterEventHandler(a)
//close(a.readyCh)
}
+
func (a *Agent) BroadcastMemberlist(delay time.Duration) {
//serf := a.serf
@@ -516,28 +521,17 @@
wg.Add(1)
go func() {
defer wg.Done()
- //respCh := resp.ResponseCh()
for {
select {
- //case r := <-respCh:
- // logger.Info("Query response's len:", len(r.Payload))
- // err := json.Unmarshal(r.Payload, &dumpSqls)
- // if err ==nil {
- // logger.Error("dumpSql:",dumpSqls)
- // logger.Error("data dump success")
- // }
- // return
- case msg := <-QueryTcpResponseChan:
- logger.Debug("QueryTcpResponseChan receive msg len:",len(msg))
+ case msg := <- QueryTcpResponseChan:
+ logger.Info("Query response's len:", len(msg))
err := json.Unmarshal(msg, &dumpSqls)
- if err ==nil {
- logger.Error("dumpSql success:",dumpSqls)
- } else {
- logger.Error("data dump err:",err)
+ if err == nil {
+ logger.Error("dumpSql:", dumpSqls)
+ logger.Error("data dump success")
}
return
}
-
}
}()
wg.Wait()
diff --git a/config.go b/config.go
index 55713a3..35f1793 100644
--- a/config.go
+++ b/config.go
@@ -47,6 +47,8 @@
MaxUserEventSize = 5 * 1024
ReplayOnJoinDefault = false
SnapshotPathDefault = "/opt/vasystem/serfSnapShot"
+
+ TcpTransportPort = 30194 //tcp浼犺緭澶ф暟鎹噺鎺ュ彛
)
// DefaultConfig default config
diff --git a/transport.go b/transport.go
new file mode 100644
index 0000000..3b2c634
--- /dev/null
+++ b/transport.go
@@ -0,0 +1,61 @@
+package syncdb
+
+import (
+ "basic.com/valib/logger.git"
+ "net"
+ "strconv"
+)
+
+func rawSendTcpMsg(addr string, sendBuf []byte) error {
+ conn, err := net.Dial("tcp", addr)
+ if err != nil {
+ logger.Debug("net.Dialt err", err)
+ return err
+ }
+
+ defer conn.Close()
+
+ //鍙戦��
+ _, err = conn.Write(sendBuf)
+ if err != nil {
+ logger.Debug("conn.Write err", err)
+ return err
+ } else {
+ logger.Debug("raw send success")
+ return nil
+ }
+}
+
+func RawReceiveTcpMsg() {
+ var tcpAddr *net.TCPAddr
+ tcpAddr,_ = net.ResolveTCPAddr("tcp","127.0.0.1:"+strconv.Itoa(TcpTransportPort))
+
+ listener,_ := net.ListenTCP("tcp",tcpAddr)
+ defer listener.Close()
+
+ for{
+ conn,err := listener.AcceptTCP()
+ if err!=nil {
+ logger.Debug("listener.Accept err:", err)
+ continue
+ }
+ logger.Debug("A transport client connected :" +conn.RemoteAddr().String())
+ go readStream(conn)
+ }
+}
+
+func readStream(conn *net.TCPConn) {
+ data := make([]byte,0)
+ buf := make([]byte, 4096)
+ for {
+ n,err :=conn.Read(buf)
+ if n == 0{
+ break
+ }
+ if err !=nil {
+ return
+ }
+ data = append(data,buf...)
+ }
+ QueryTcpResponseChan <- data
+}
\ No newline at end of file
diff --git a/userDelegate.go b/userDelegate.go
deleted file mode 100644
index 00c832d..0000000
--- a/userDelegate.go
+++ /dev/null
@@ -1,62 +0,0 @@
-package syncdb
-
-import (
- "basic.com/valib/logger.git"
- "encoding/json"
-)
-
-type UserDelegate struct {
-
-}
-
-// NodeMeta is used to retrieve meta-data about the current node
-// when broadcasting an alive message. It's length is limited to
-// the given byte size. This metadata is available in the Node structure.
-func (d *UserDelegate) NodeMeta(limit int) []byte{
- return []byte{}
-}
-
-// NotifyMsg is called when a user-data message is received.
-// Care should be taken that this method does not block, since doing
-// so would block the entire UDP packet receive loop. Additionally, the byte
-// slice may be modified after the call returns, so it should be copied if needed
-func (d *UserDelegate) NotifyMsg(msg []byte) {
- recvMsg := msg
- logger.Info("UserDelegate NotifyMsg.len:",len(recvMsg))
- QueryTcpResponseChan <- msg
-
- var dumpSqls []string
- err := json.Unmarshal(recvMsg, &dumpSqls)
-
- if err == nil {
- logger.Debug("dumpSqls:", dumpSqls)
- } else {
- logger.Error("UserDelegate unmarshal msg to dumpSqls err:",err)
- }
-}
-
-// GetBroadcasts is called when user data messages can be broadcast.
-// It can return a list of buffers to send. Each buffer should assume an
-// overhead as provided with a limit on the total byte size allowed.
-// The total byte size of the resulting data to send must not exceed
-// the limit. Care should be taken that this method does not block,
-// since doing so would block the entire UDP packet receive loop.
-func (d *UserDelegate) GetBroadcasts(overhead, limit int) [][]byte {
- return [][]byte{}
-}
-
-// LocalState is used for a TCP Push/Pull. This is sent to
-// the remote side in addition to the membership information. Any
-// data can be sent here. See MergeRemoteState as well. The `join`
-// boolean indicates this is for a join instead of a push/pull.
-func (d *UserDelegate) LocalState(join bool) []byte {
- return []byte{}
-}
-
-// MergeRemoteState is invoked after a TCP Push/Pull. This is the
-// state received from the remote side and is the result of the
-// remote side's LocalState call. The 'join'
-// boolean indicates this is for a join instead of a push/pull.
-func (d *UserDelegate) MergeRemoteState(buf []byte, join bool) {
-
-}
\ No newline at end of file
--
Gitblit v1.8.0