From 8324f872ef3a4d0c978a9b1d062800c6a1701c12 Mon Sep 17 00:00:00 2001
From: zhangqian <zhangqian@123.com>
Date: 星期五, 01 十二月 2023 09:58:17 +0800
Subject: [PATCH] fix

---
 serf/sync.go |   90 +++++++++++++++++++++++++++++++++++++-------
 1 files changed, 75 insertions(+), 15 deletions(-)

diff --git a/serf/sync.go b/serf/sync.go
index d8ef4c7..30be0ee 100644
--- a/serf/sync.go
+++ b/serf/sync.go
@@ -1,7 +1,6 @@
 package serf
 
 import (
-	"apsClient/pkg/logx"
 	"context"
 	"encoding/json"
 	"fmt"
@@ -12,12 +11,17 @@
 	"syscall"
 	"time"
 
+	"apsClient/pkg/logx"
+
 	"basic.com/pubsub/protomsg.git"
 	"basic.com/valib/bhomeclient.git"
 	"basic.com/valib/bhomedbapi.git"
 
 	"github.com/gogo/protobuf/proto"
 	"github.com/jinzhu/gorm"
+	"github.com/satori/go.uuid"
+	"github.com/mitchellh/mapstructure"
+	"github.com/muesli/cache2go"
 )
 
 var (
@@ -25,6 +29,8 @@
 	dependProcs = []string{
 		bhomeclient.Proc_System_Service,
 	}
+
+	sqlMsgSeqCache = cache2go.Cache("syncSqlMsg")
 )
 
 const (
@@ -43,6 +49,12 @@
 	Proc    string `json:"procName"` // 杩涚▼鍚�
 	Topic   string `json:"topic"`    // 涓婚
 	Payload []byte `json:"payload"`  // 娑堟伅浣�,鑷瑙f瀽
+}
+
+type SqlMsg struct {
+	Id      string
+	Sql     string
+	Version string
 }
 
 type SyncServer struct {
@@ -168,13 +180,20 @@
 	os.Exit(0)
 }
 
-func (ss *SyncServer) pubSyncSqlMessage(payload []byte, targetId string) error {
+func (ss *SyncServer) pubSyncSqlMessage(sql string, targetId string) error {
+	sqlMsg := SqlMsg{
+		Id:  uuid.NewV4().String(),
+		Sql: sql,
+	}
+
+	bMsg, _ := json.Marshal(sqlMsg)
+
 	var msg = ProcMessageEvent{
 		Owner:   ss.ServerId,
 		Target:  targetId,
 		Proc:    ss.ProcName,
 		Topic:   ss.syncSqlTopic,
-		Payload: payload,
+		Payload: bMsg,
 	}
 
 	b, err := json.Marshal(msg)
@@ -311,7 +330,7 @@
 }
 
 // 鏌ヨ闆嗙兢鐘舵��, 杩斿洖 master, slave, leave
-func (ss *SyncServer) QueryClusterStat() string {
+func (ss *SyncServer) QueryClusterStat() *bhomeclient.Reply {
 	clusterStatTopic := "/data/api-v/cluster/status"
 	req := bhomeclient.Request{
 		Path:   clusterStatTopic,
@@ -322,14 +341,14 @@
 	if err != nil {
 		fmt.Println("RequestTopic error", err.Error())
 
-		return ""
+		return reply
 	}
 
 	ss.ClusterStatus = reply.Msg
 
 	logx.Debugf("褰撳墠闆嗙兢鐘舵��: %s", ss.ClusterStatus)
 
-	return reply.Msg
+	return reply
 }
 
 func (ss *SyncServer) handleDbLoggerPrint() {
@@ -343,7 +362,7 @@
 				syncSql := strings.Join(sqlBuf, "")
 
 				//fmt.Println("鍚屾sql璇彞:", syncSql)
-				ss.pubSyncSqlMessage([]byte(syncSql), "")
+				ss.pubSyncSqlMessage(syncSql, "")
 
 				sqlBuf = append([]string{})
 				sendSize = 0
@@ -354,7 +373,7 @@
 					syncSql := strings.Join(sqlBuf, "")
 					//fmt.Println("鍚屾sql璇彞:", syncSql)
 
-					ss.pubSyncSqlMessage([]byte(syncSql), "")
+					ss.pubSyncSqlMessage(syncSql, "")
 
 					sqlBuf = append([]string{})
 				}
@@ -372,9 +391,25 @@
 	}
 }
 
-func (ss *SyncServer) handleClusterMessage(msg []byte) {
-	logx.Infof("clusterMessage:", string(msg))
-	sql := string(msg)
+func (ss *SyncServer) handleClusterMessage(clusterMsgData []byte) {
+	var msg SqlMsg
+	err := json.Unmarshal(clusterMsgData,&msg)
+	if err != nil {
+		logx.Errorf(" Unmarshal cluster message error, %s",err.Error())
+		return
+	}
+
+	// 鍒ゆ柇娑堟伅鏄惁鏇剧粡鎺ユ敹杩�
+	if sqlMsgSeqCache.Exists(msg.Id) {
+		logx.Infof("clusterMessage:鎺ユ敹鍒伴噸澶嶆秷鎭�, %s", msg.Sql)
+		return
+	}
+
+	// 璁板綍娑堟伅id, 鍗婂皬鏃惰繃鏈�
+	sqlMsgSeqCache.Add(msg.Id, 30*time.Minute, true)
+
+	logx.Infof("clusterMessage:%s", msg.Sql)
+	sql := msg.Sql
 
 	if len(sql) <= 0 {
 		return
@@ -419,12 +454,12 @@
 	logx.Infof("DumpTables sql:%v", sqls)
 	syncSql := strings.Join(sqls, ";")
 	if len(syncSql) < sizeLimit {
-		err = ss.pubSyncSqlMessage([]byte(syncSql), targetId)
+		err = ss.pubSyncSqlMessage(syncSql, targetId)
 	} else {
 		shard := ""
 		for _, sql := range sqls {
 			if len(shard)+len(sql) > sizeLimit {
-				err = ss.pubSyncSqlMessage([]byte(shard), targetId)
+				err = ss.pubSyncSqlMessage(shard, targetId)
 				shard = ""
 			}
 
@@ -432,14 +467,14 @@
 		}
 
 		if len(shard) > 0 {
-			err = ss.pubSyncSqlMessage([]byte(shard), targetId)
+			err = ss.pubSyncSqlMessage(shard, targetId)
 		}
 	}
 
 	return err
 }
 
-func (ss *SyncServer)Print(values ...interface{}) {
+func (ss *SyncServer) Print(values ...interface{}) {
 	var (
 		level = values[0]
 	)
@@ -519,3 +554,28 @@
 
 	return ""
 }
+
+type NodeInfo struct {
+	NodeID     string `json:"node_id,omitempty"`
+	NodeIp     string `json:"node_ip,omitempty"`
+	NodeName   string `json:"node_name,omitempty"`
+	ClusterID  string `json:"cluster_id"`
+	CreateTime string `json:"create_time"`
+	DeviceType string `json:"device_type"`
+	DriftState string `json:"drift_state"`
+	Online     bool   `json:"online"`
+}
+
+func QueryClusterStatusAndNodeQuantity() (string, int) {
+	reply := agent.QueryClusterStat()
+	if reply == nil {
+		return "", 0
+	}
+	var nodes []NodeInfo
+	err := mapstructure.Decode(reply.Data, &nodes)
+	if err != nil {
+		logx.Errorf("mapstructure.Decode QueryClusterStat data err:%v", err)
+		return reply.Msg, 0
+	}
+	return reply.Msg, len(nodes)
+}

--
Gitblit v1.8.0