From 0b3b1aecb4a2a26f8797ea62af2b5a0381962ad5 Mon Sep 17 00:00:00 2001
From: liuxiaolong <736321739@qq.com>
Date: 星期二, 08 十月 2019 20:50:01 +0800
Subject: [PATCH] query response by sendToTcp

---
 agent.go        |   94 ++++++++++++++++++++-----------
 userDelegate.go |   60 ++++++++++++++++++++
 2 files changed, 121 insertions(+), 33 deletions(-)

diff --git a/agent.go b/agent.go
index 0a2a2ff..3504419 100644
--- a/agent.go
+++ b/agent.go
@@ -26,8 +26,6 @@
 	"net"
 	"os"
 	"strconv"
-	"sync"
-
 	//"os"
 	"strings"
 	"time"
@@ -81,6 +79,9 @@
 		return nil, err
 	}
 	serfConf.MemberlistConfig.Keyring = keyring
+
+	serfConf.MemberlistConfig.Delegate = &UserDelegate{}
+
 	logger.Info("[INFO] agent: Restored keyring with %d keys from %s",
 		len(conf.EncryptKey), conf.EncryptKey)
 
@@ -189,29 +190,46 @@
 			//for _, r := range rows {
 			//	rowsReturn = append(rowsReturn, *r)
 			//}
-			var tableNames []string
-			err := json.Unmarshal(ev.Payload, &tableNames)
+			var fromP QueryTableDataParam
+			err := json.Unmarshal(ev.Payload, &fromP)
 			if err !=nil {
 				logger.Error("Query tableNames unmarshal err")
 				return
 			}
-			logger.Info("Query tableNames:",tableNames)
-			datas, err := ExecuteQueryByGorm(tableNames)
+			logger.Info("Query tableNames:",fromP.Tables)
+			datas, err := ExecuteQueryByGorm(fromP.Tables)
 			if err !=nil {
 				logger.Error("queryByGorm err")
 				return
 			}
 			bytesReturn, err := json.Marshal(datas)
 			logger.Info("results.len: ", len(bytesReturn))
-			if query, ok := event.(*serf.Query); ok {
-				if err := query.Respond(bytesReturn); err != nil {
-					logger.Error("err: %s\n", err)
-					return
+
+			var targetNode *memberlist.Node
+			nodes := a.Serf().Memberlist().Members()
+			if nodes != nil && len(nodes) > 0 {
+				for _,n :=range nodes {
+					if n.Name == fromP.From {
+						targetNode = n
+						break
+					}
 				}
 			}
+			if targetNode !=nil {
+				sendErr := a.Serf().Memberlist().SendToTCP(targetNode, bytesReturn)
+				if sendErr != nil {
+					logger.Debug("sendToTcp err:",sendErr)
+				}
+			} else {
+				logger.Debug("targetNode is nil")
+			}
 
-			//var res []*Rows
-			//json.Unmarshal(bytesReturn, &res)
+			//if query, ok := event.(*serf.Query); ok {
+			//	if err := query.Respond(bytesReturn); err != nil {
+			//		logger.Error("err: %s\n", err)
+			//		return
+			//	}
+			//}
 		}
 	case serf.MemberEvent:
 		if event.EventType() == serf.EventMemberLeave {
@@ -445,6 +463,11 @@
 	}()
 }
 
+type QueryTableDataParam struct {
+	Tables []string `json:"tables"`
+	From string `json:"from"`
+}
+
 //GetDbFromCluster get the newest database after join cluster
 //dbPathWrite the path where to write after got a database,
 func (a *Agent) GetTableDataFromCluster(tableNames []string) (*[]string,error) {
@@ -472,8 +495,12 @@
 		FilterNodes: strings.Fields(specmembername),
 	}
 
-	//SQL
-	tBytes, _ := json.Marshal(tableNames)
+	//get db tables
+	var fromP = QueryTableDataParam{
+		Tables: tableNames,
+		From: a.conf.NodeName,
+	}
+	tBytes, _ := json.Marshal(fromP)
 
 	resp, err := a.Query(QueryEventUpdateDBData, tBytes, &params)
 	if err == nil || !strings.Contains(err.Error(), "cannot contain") {
@@ -481,26 +508,27 @@
 	}
 	logger.Info("Query.resp.err:",err,"resp:",resp)
 
-	var wg sync.WaitGroup
-	wg.Add(1)
 	var dumpSqls []string
-	go func() {
-		defer wg.Done()
-		respCh := resp.ResponseCh()
-		for {
-			select {
-			case r := <-respCh:
-				logger.Info("Query response's len:", len(r.Payload))
-				err := json.Unmarshal(r.Payload, &dumpSqls)
-				if err ==nil {
-					logger.Error("dumpSql:",dumpSqls)
-					logger.Error("data dump success")
-				}
-				return
-			}
-		}
-	}()
-	wg.Wait()
+
+	//var wg sync.WaitGroup
+	//wg.Add(1)
+	//go func() {
+	//	defer wg.Done()
+	//	respCh := resp.ResponseCh()
+	//	for {
+	//		select {
+	//		case r := <-respCh:
+	//			logger.Info("Query response's len:", len(r.Payload))
+	//			err := json.Unmarshal(r.Payload, &dumpSqls)
+	//			if err ==nil {
+	//				logger.Error("dumpSql:",dumpSqls)
+	//				logger.Error("data dump success")
+	//			}
+	//			return
+	//		}
+	//	}
+	//}()
+	//wg.Wait()
 	return &dumpSqls,nil
 
 	//r, err = c.Query([]string{query}, false, false)
diff --git a/userDelegate.go b/userDelegate.go
new file mode 100644
index 0000000..d297f77
--- /dev/null
+++ b/userDelegate.go
@@ -0,0 +1,60 @@
+package syncdb
+
+import (
+	"basic.com/valib/logger.git"
+	"encoding/json"
+)
+
+type UserDelegate struct {
+
+}
+
+// NodeMeta is used to retrieve meta-data about the current node
+// when broadcasting an alive message. It's length is limited to
+// the given byte size. This metadata is available in the Node structure.
+func (d *UserDelegate) NodeMeta(limit int) []byte{
+	return []byte{}
+}
+
+// NotifyMsg is called when a user-data message is received.
+// Care should be taken that this method does not block, since doing
+// so would block the entire UDP packet receive loop. Additionally, the byte
+// slice may be modified after the call returns, so it should be copied if needed
+func (d *UserDelegate) NotifyMsg(msg []byte) {
+	recvMsg := msg
+	logger.Info("UserDelegate NotifyMsg.len:",len(recvMsg))
+	var dumpSqls []string
+	err := json.Unmarshal(recvMsg, &dumpSqls)
+
+	if err == nil {
+		logger.Debug("dumpSqls:", dumpSqls)
+	} else {
+		logger.Error("UserDelete unmarshal msg to dumpSqls err:",err)
+	}
+}
+
+// GetBroadcasts is called when user data messages can be broadcast.
+// It can return a list of buffers to send. Each buffer should assume an
+// overhead as provided with a limit on the total byte size allowed.
+// The total byte size of the resulting data to send must not exceed
+// the limit. Care should be taken that this method does not block,
+// since doing so would block the entire UDP packet receive loop.
+func (d *UserDelegate) GetBroadcasts(overhead, limit int) [][]byte {
+	return [][]byte{}
+}
+
+// LocalState is used for a TCP Push/Pull. This is sent to
+// the remote side in addition to the membership information. Any
+// data can be sent here. See MergeRemoteState as well. The `join`
+// boolean indicates this is for a join instead of a push/pull.
+func (d *UserDelegate) LocalState(join bool) []byte {
+	return []byte{}
+}
+
+// MergeRemoteState is invoked after a TCP Push/Pull. This is the
+// state received from the remote side and is the result of the
+// remote side's LocalState call. The 'join'
+// boolean indicates this is for a join instead of a push/pull.
+func (d *UserDelegate) MergeRemoteState(buf []byte, join bool) {
+
+}
\ No newline at end of file

--
Gitblit v1.8.0