From 2ab94056786f8a7bcf66cf1ed25002e74ed7f016 Mon Sep 17 00:00:00 2001 From: zhangzengfei <zhangzengfei@smartai.com> Date: 星期三, 08 十一月 2023 17:36:19 +0800 Subject: [PATCH] sql同步消息添加id,处理消息重复的问题 --- serf/sync.go | 58 ++++++++++++++++++++++++++++++++++++++++++++++------------ 1 files changed, 46 insertions(+), 12 deletions(-) diff --git a/serf/sync.go b/serf/sync.go index 1171a0a..30be0ee 100644 --- a/serf/sync.go +++ b/serf/sync.go @@ -1,11 +1,9 @@ package serf import ( - "apsClient/pkg/logx" "context" "encoding/json" "fmt" - "github.com/mitchellh/mapstructure" "os" "os/signal" "regexp" @@ -13,12 +11,17 @@ "syscall" "time" + "apsClient/pkg/logx" + "basic.com/pubsub/protomsg.git" "basic.com/valib/bhomeclient.git" "basic.com/valib/bhomedbapi.git" "github.com/gogo/protobuf/proto" "github.com/jinzhu/gorm" + "github.com/satori/go.uuid" + "github.com/mitchellh/mapstructure" + "github.com/muesli/cache2go" ) var ( @@ -26,6 +29,8 @@ dependProcs = []string{ bhomeclient.Proc_System_Service, } + + sqlMsgSeqCache = cache2go.Cache("syncSqlMsg") ) const ( @@ -44,6 +49,12 @@ Proc string `json:"procName"` // 杩涚▼鍚� Topic string `json:"topic"` // 涓婚 Payload []byte `json:"payload"` // 娑堟伅浣�,鑷瑙f瀽 +} + +type SqlMsg struct { + Id string + Sql string + Version string } type SyncServer struct { @@ -169,13 +180,20 @@ os.Exit(0) } -func (ss *SyncServer) pubSyncSqlMessage(payload []byte, targetId string) error { +func (ss *SyncServer) pubSyncSqlMessage(sql string, targetId string) error { + sqlMsg := SqlMsg{ + Id: uuid.NewV4().String(), + Sql: sql, + } + + bMsg, _ := json.Marshal(sqlMsg) + var msg = ProcMessageEvent{ Owner: ss.ServerId, Target: targetId, Proc: ss.ProcName, Topic: ss.syncSqlTopic, - Payload: payload, + Payload: bMsg, } b, err := json.Marshal(msg) @@ -344,7 +362,7 @@ syncSql := strings.Join(sqlBuf, "") //fmt.Println("鍚屾sql璇彞:", syncSql) - ss.pubSyncSqlMessage([]byte(syncSql), "") + ss.pubSyncSqlMessage(syncSql, "") sqlBuf = append([]string{}) sendSize = 0 @@ -355,7 +373,7 @@ syncSql := strings.Join(sqlBuf, "") //fmt.Println("鍚屾sql璇彞:", syncSql) - ss.pubSyncSqlMessage([]byte(syncSql), "") + ss.pubSyncSqlMessage(syncSql, "") sqlBuf = append([]string{}) } @@ -373,9 +391,25 @@ } } -func (ss *SyncServer) handleClusterMessage(msg []byte) { - logx.Infof("clusterMessage:", string(msg)) - sql := string(msg) +func (ss *SyncServer) handleClusterMessage(clusterMsgData []byte) { + var msg SqlMsg + err := json.Unmarshal(clusterMsgData,&msg) + if err != nil { + logx.Errorf(" Unmarshal cluster message error, %s",err.Error()) + return + } + + // 鍒ゆ柇娑堟伅鏄惁鏇剧粡鎺ユ敹杩� + if sqlMsgSeqCache.Exists(msg.Id) { + logx.Infof("clusterMessage:鎺ユ敹鍒伴噸澶嶆秷鎭�, %s", msg.Sql) + return + } + + // 璁板綍娑堟伅id, 鍗婂皬鏃惰繃鏈� + sqlMsgSeqCache.Add(msg.Id, 30*time.Minute, true) + + logx.Infof("clusterMessage:%s", msg.Sql) + sql := msg.Sql if len(sql) <= 0 { return @@ -420,12 +454,12 @@ logx.Infof("DumpTables sql:%v", sqls) syncSql := strings.Join(sqls, ";") if len(syncSql) < sizeLimit { - err = ss.pubSyncSqlMessage([]byte(syncSql), targetId) + err = ss.pubSyncSqlMessage(syncSql, targetId) } else { shard := "" for _, sql := range sqls { if len(shard)+len(sql) > sizeLimit { - err = ss.pubSyncSqlMessage([]byte(shard), targetId) + err = ss.pubSyncSqlMessage(shard, targetId) shard = "" } @@ -433,7 +467,7 @@ } if len(shard) > 0 { - err = ss.pubSyncSqlMessage([]byte(shard), targetId) + err = ss.pubSyncSqlMessage(shard, targetId) } } -- Gitblit v1.8.0