zhangzengfei
2023-11-08 2ab94056786f8a7bcf66cf1ed25002e74ed7f016
sql同步消息添加id,处理消息重复的问题
1个文件已修改
58 ■■■■ 已修改文件
serf/sync.go 58 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
serf/sync.go
@@ -1,11 +1,9 @@
package serf
import (
    "apsClient/pkg/logx"
    "context"
    "encoding/json"
    "fmt"
    "github.com/mitchellh/mapstructure"
    "os"
    "os/signal"
    "regexp"
@@ -13,12 +11,17 @@
    "syscall"
    "time"
    "apsClient/pkg/logx"
    "basic.com/pubsub/protomsg.git"
    "basic.com/valib/bhomeclient.git"
    "basic.com/valib/bhomedbapi.git"
    "github.com/gogo/protobuf/proto"
    "github.com/jinzhu/gorm"
    "github.com/satori/go.uuid"
    "github.com/mitchellh/mapstructure"
    "github.com/muesli/cache2go"
)
var (
@@ -26,6 +29,8 @@
    dependProcs = []string{
        bhomeclient.Proc_System_Service,
    }
    sqlMsgSeqCache = cache2go.Cache("syncSqlMsg")
)
const (
@@ -44,6 +49,12 @@
    Proc    string `json:"procName"` // 进程名
    Topic   string `json:"topic"`    // 主题
    Payload []byte `json:"payload"`  // 消息体,自行解析
}
type SqlMsg struct {
    Id      string
    Sql     string
    Version string
}
type SyncServer struct {
@@ -169,13 +180,20 @@
    os.Exit(0)
}
func (ss *SyncServer) pubSyncSqlMessage(payload []byte, targetId string) error {
func (ss *SyncServer) pubSyncSqlMessage(sql string, targetId string) error {
    sqlMsg := SqlMsg{
        Id:  uuid.NewV4().String(),
        Sql: sql,
    }
    bMsg, _ := json.Marshal(sqlMsg)
    var msg = ProcMessageEvent{
        Owner:   ss.ServerId,
        Target:  targetId,
        Proc:    ss.ProcName,
        Topic:   ss.syncSqlTopic,
        Payload: payload,
        Payload: bMsg,
    }
    b, err := json.Marshal(msg)
@@ -344,7 +362,7 @@
                syncSql := strings.Join(sqlBuf, "")
                //fmt.Println("同步sql语句:", syncSql)
                ss.pubSyncSqlMessage([]byte(syncSql), "")
                ss.pubSyncSqlMessage(syncSql, "")
                sqlBuf = append([]string{})
                sendSize = 0
@@ -355,7 +373,7 @@
                    syncSql := strings.Join(sqlBuf, "")
                    //fmt.Println("同步sql语句:", syncSql)
                    ss.pubSyncSqlMessage([]byte(syncSql), "")
                    ss.pubSyncSqlMessage(syncSql, "")
                    sqlBuf = append([]string{})
                }
@@ -373,9 +391,25 @@
    }
}
func (ss *SyncServer) handleClusterMessage(msg []byte) {
    logx.Infof("clusterMessage:", string(msg))
    sql := string(msg)
func (ss *SyncServer) handleClusterMessage(clusterMsgData []byte) {
    var msg SqlMsg
    err := json.Unmarshal(clusterMsgData,&msg)
    if err != nil {
        logx.Errorf(" Unmarshal cluster message error, %s",err.Error())
        return
    }
    // 判断消息是否曾经接收过
    if sqlMsgSeqCache.Exists(msg.Id) {
        logx.Infof("clusterMessage:接收到重复消息, %s", msg.Sql)
        return
    }
    // 记录消息id, 半小时过期
    sqlMsgSeqCache.Add(msg.Id, 30*time.Minute, true)
    logx.Infof("clusterMessage:%s", msg.Sql)
    sql := msg.Sql
    if len(sql) <= 0 {
        return
@@ -420,12 +454,12 @@
    logx.Infof("DumpTables sql:%v", sqls)
    syncSql := strings.Join(sqls, ";")
    if len(syncSql) < sizeLimit {
        err = ss.pubSyncSqlMessage([]byte(syncSql), targetId)
        err = ss.pubSyncSqlMessage(syncSql, targetId)
    } else {
        shard := ""
        for _, sql := range sqls {
            if len(shard)+len(sql) > sizeLimit {
                err = ss.pubSyncSqlMessage([]byte(shard), targetId)
                err = ss.pubSyncSqlMessage(shard, targetId)
                shard = ""
            }
@@ -433,7 +467,7 @@
        }
        if len(shard) > 0 {
            err = ss.pubSyncSqlMessage([]byte(shard), targetId)
            err = ss.pubSyncSqlMessage(shard, targetId)
        }
    }