From 2ab94056786f8a7bcf66cf1ed25002e74ed7f016 Mon Sep 17 00:00:00 2001
From: zhangzengfei <zhangzengfei@smartai.com>
Date: 星期三, 08 十一月 2023 17:36:19 +0800
Subject: [PATCH] sql同步消息添加id,处理消息重复的问题
---
serf/sync.go | 58 ++++++++++++++++++++++++++++++++++++++++++++++------------
1 files changed, 46 insertions(+), 12 deletions(-)
diff --git a/serf/sync.go b/serf/sync.go
index 1171a0a..30be0ee 100644
--- a/serf/sync.go
+++ b/serf/sync.go
@@ -1,11 +1,9 @@
package serf
import (
- "apsClient/pkg/logx"
"context"
"encoding/json"
"fmt"
- "github.com/mitchellh/mapstructure"
"os"
"os/signal"
"regexp"
@@ -13,12 +11,17 @@
"syscall"
"time"
+ "apsClient/pkg/logx"
+
"basic.com/pubsub/protomsg.git"
"basic.com/valib/bhomeclient.git"
"basic.com/valib/bhomedbapi.git"
"github.com/gogo/protobuf/proto"
"github.com/jinzhu/gorm"
+ "github.com/satori/go.uuid"
+ "github.com/mitchellh/mapstructure"
+ "github.com/muesli/cache2go"
)
var (
@@ -26,6 +29,8 @@
dependProcs = []string{
bhomeclient.Proc_System_Service,
}
+
+ sqlMsgSeqCache = cache2go.Cache("syncSqlMsg")
)
const (
@@ -44,6 +49,12 @@
Proc string `json:"procName"` // 杩涚▼鍚�
Topic string `json:"topic"` // 涓婚
Payload []byte `json:"payload"` // 娑堟伅浣�,鑷瑙f瀽
+}
+
+type SqlMsg struct {
+ Id string
+ Sql string
+ Version string
}
type SyncServer struct {
@@ -169,13 +180,20 @@
os.Exit(0)
}
-func (ss *SyncServer) pubSyncSqlMessage(payload []byte, targetId string) error {
+func (ss *SyncServer) pubSyncSqlMessage(sql string, targetId string) error {
+ sqlMsg := SqlMsg{
+ Id: uuid.NewV4().String(),
+ Sql: sql,
+ }
+
+ bMsg, _ := json.Marshal(sqlMsg)
+
var msg = ProcMessageEvent{
Owner: ss.ServerId,
Target: targetId,
Proc: ss.ProcName,
Topic: ss.syncSqlTopic,
- Payload: payload,
+ Payload: bMsg,
}
b, err := json.Marshal(msg)
@@ -344,7 +362,7 @@
syncSql := strings.Join(sqlBuf, "")
//fmt.Println("鍚屾sql璇彞:", syncSql)
- ss.pubSyncSqlMessage([]byte(syncSql), "")
+ ss.pubSyncSqlMessage(syncSql, "")
sqlBuf = append([]string{})
sendSize = 0
@@ -355,7 +373,7 @@
syncSql := strings.Join(sqlBuf, "")
//fmt.Println("鍚屾sql璇彞:", syncSql)
- ss.pubSyncSqlMessage([]byte(syncSql), "")
+ ss.pubSyncSqlMessage(syncSql, "")
sqlBuf = append([]string{})
}
@@ -373,9 +391,25 @@
}
}
-func (ss *SyncServer) handleClusterMessage(msg []byte) {
- logx.Infof("clusterMessage:", string(msg))
- sql := string(msg)
+func (ss *SyncServer) handleClusterMessage(clusterMsgData []byte) {
+ var msg SqlMsg
+ err := json.Unmarshal(clusterMsgData,&msg)
+ if err != nil {
+ logx.Errorf(" Unmarshal cluster message error, %s",err.Error())
+ return
+ }
+
+ // 鍒ゆ柇娑堟伅鏄惁鏇剧粡鎺ユ敹杩�
+ if sqlMsgSeqCache.Exists(msg.Id) {
+ logx.Infof("clusterMessage:鎺ユ敹鍒伴噸澶嶆秷鎭�, %s", msg.Sql)
+ return
+ }
+
+ // 璁板綍娑堟伅id, 鍗婂皬鏃惰繃鏈�
+ sqlMsgSeqCache.Add(msg.Id, 30*time.Minute, true)
+
+ logx.Infof("clusterMessage:%s", msg.Sql)
+ sql := msg.Sql
if len(sql) <= 0 {
return
@@ -420,12 +454,12 @@
logx.Infof("DumpTables sql:%v", sqls)
syncSql := strings.Join(sqls, ";")
if len(syncSql) < sizeLimit {
- err = ss.pubSyncSqlMessage([]byte(syncSql), targetId)
+ err = ss.pubSyncSqlMessage(syncSql, targetId)
} else {
shard := ""
for _, sql := range sqls {
if len(shard)+len(sql) > sizeLimit {
- err = ss.pubSyncSqlMessage([]byte(shard), targetId)
+ err = ss.pubSyncSqlMessage(shard, targetId)
shard = ""
}
@@ -433,7 +467,7 @@
}
if len(shard) > 0 {
- err = ss.pubSyncSqlMessage([]byte(shard), targetId)
+ err = ss.pubSyncSqlMessage(shard, targetId)
}
}
--
Gitblit v1.8.0