From 2ab94056786f8a7bcf66cf1ed25002e74ed7f016 Mon Sep 17 00:00:00 2001
From: zhangzengfei <zhangzengfei@smartai.com>
Date: 星期三, 08 十一月 2023 17:36:19 +0800
Subject: [PATCH] sql同步消息添加id,处理消息重复的问题

---
 serf/sync.go |   58 ++++++++++++++++++++++++++++++++++++++++++++++------------
 1 files changed, 46 insertions(+), 12 deletions(-)

diff --git a/serf/sync.go b/serf/sync.go
index 1171a0a..30be0ee 100644
--- a/serf/sync.go
+++ b/serf/sync.go
@@ -1,11 +1,9 @@
 package serf
 
 import (
-	"apsClient/pkg/logx"
 	"context"
 	"encoding/json"
 	"fmt"
-	"github.com/mitchellh/mapstructure"
 	"os"
 	"os/signal"
 	"regexp"
@@ -13,12 +11,17 @@
 	"syscall"
 	"time"
 
+	"apsClient/pkg/logx"
+
 	"basic.com/pubsub/protomsg.git"
 	"basic.com/valib/bhomeclient.git"
 	"basic.com/valib/bhomedbapi.git"
 
 	"github.com/gogo/protobuf/proto"
 	"github.com/jinzhu/gorm"
+	"github.com/satori/go.uuid"
+	"github.com/mitchellh/mapstructure"
+	"github.com/muesli/cache2go"
 )
 
 var (
@@ -26,6 +29,8 @@
 	dependProcs = []string{
 		bhomeclient.Proc_System_Service,
 	}
+
+	sqlMsgSeqCache = cache2go.Cache("syncSqlMsg")
 )
 
 const (
@@ -44,6 +49,12 @@
 	Proc    string `json:"procName"` // 杩涚▼鍚�
 	Topic   string `json:"topic"`    // 涓婚
 	Payload []byte `json:"payload"`  // 娑堟伅浣�,鑷瑙f瀽
+}
+
+type SqlMsg struct {
+	Id      string
+	Sql     string
+	Version string
 }
 
 type SyncServer struct {
@@ -169,13 +180,20 @@
 	os.Exit(0)
 }
 
-func (ss *SyncServer) pubSyncSqlMessage(payload []byte, targetId string) error {
+func (ss *SyncServer) pubSyncSqlMessage(sql string, targetId string) error {
+	sqlMsg := SqlMsg{
+		Id:  uuid.NewV4().String(),
+		Sql: sql,
+	}
+
+	bMsg, _ := json.Marshal(sqlMsg)
+
 	var msg = ProcMessageEvent{
 		Owner:   ss.ServerId,
 		Target:  targetId,
 		Proc:    ss.ProcName,
 		Topic:   ss.syncSqlTopic,
-		Payload: payload,
+		Payload: bMsg,
 	}
 
 	b, err := json.Marshal(msg)
@@ -344,7 +362,7 @@
 				syncSql := strings.Join(sqlBuf, "")
 
 				//fmt.Println("鍚屾sql璇彞:", syncSql)
-				ss.pubSyncSqlMessage([]byte(syncSql), "")
+				ss.pubSyncSqlMessage(syncSql, "")
 
 				sqlBuf = append([]string{})
 				sendSize = 0
@@ -355,7 +373,7 @@
 					syncSql := strings.Join(sqlBuf, "")
 					//fmt.Println("鍚屾sql璇彞:", syncSql)
 
-					ss.pubSyncSqlMessage([]byte(syncSql), "")
+					ss.pubSyncSqlMessage(syncSql, "")
 
 					sqlBuf = append([]string{})
 				}
@@ -373,9 +391,25 @@
 	}
 }
 
-func (ss *SyncServer) handleClusterMessage(msg []byte) {
-	logx.Infof("clusterMessage:", string(msg))
-	sql := string(msg)
+func (ss *SyncServer) handleClusterMessage(clusterMsgData []byte) {
+	var msg SqlMsg
+	err := json.Unmarshal(clusterMsgData,&msg)
+	if err != nil {
+		logx.Errorf(" Unmarshal cluster message error, %s",err.Error())
+		return
+	}
+
+	// 鍒ゆ柇娑堟伅鏄惁鏇剧粡鎺ユ敹杩�
+	if sqlMsgSeqCache.Exists(msg.Id) {
+		logx.Infof("clusterMessage:鎺ユ敹鍒伴噸澶嶆秷鎭�, %s", msg.Sql)
+		return
+	}
+
+	// 璁板綍娑堟伅id, 鍗婂皬鏃惰繃鏈�
+	sqlMsgSeqCache.Add(msg.Id, 30*time.Minute, true)
+
+	logx.Infof("clusterMessage:%s", msg.Sql)
+	sql := msg.Sql
 
 	if len(sql) <= 0 {
 		return
@@ -420,12 +454,12 @@
 	logx.Infof("DumpTables sql:%v", sqls)
 	syncSql := strings.Join(sqls, ";")
 	if len(syncSql) < sizeLimit {
-		err = ss.pubSyncSqlMessage([]byte(syncSql), targetId)
+		err = ss.pubSyncSqlMessage(syncSql, targetId)
 	} else {
 		shard := ""
 		for _, sql := range sqls {
 			if len(shard)+len(sql) > sizeLimit {
-				err = ss.pubSyncSqlMessage([]byte(shard), targetId)
+				err = ss.pubSyncSqlMessage(shard, targetId)
 				shard = ""
 			}
 
@@ -433,7 +467,7 @@
 		}
 
 		if len(shard) > 0 {
-			err = ss.pubSyncSqlMessage([]byte(shard), targetId)
+			err = ss.pubSyncSqlMessage(shard, targetId)
 		}
 	}
 

--
Gitblit v1.8.0