zhangzengfei
2023-11-08 2ab94056786f8a7bcf66cf1ed25002e74ed7f016
serf/sync.go
@@ -1,11 +1,9 @@
package serf
import (
   "apsClient/pkg/logx"
   "context"
   "encoding/json"
   "fmt"
   "github.com/mitchellh/mapstructure"
   "os"
   "os/signal"
   "regexp"
@@ -13,12 +11,17 @@
   "syscall"
   "time"
   "apsClient/pkg/logx"
   "basic.com/pubsub/protomsg.git"
   "basic.com/valib/bhomeclient.git"
   "basic.com/valib/bhomedbapi.git"
   "github.com/gogo/protobuf/proto"
   "github.com/jinzhu/gorm"
   "github.com/satori/go.uuid"
   "github.com/mitchellh/mapstructure"
   "github.com/muesli/cache2go"
)
var (
@@ -26,6 +29,8 @@
   dependProcs = []string{
      bhomeclient.Proc_System_Service,
   }
   sqlMsgSeqCache = cache2go.Cache("syncSqlMsg")
)
const (
@@ -44,6 +49,12 @@
   Proc    string `json:"procName"` // 进程名
   Topic   string `json:"topic"`    // 主题
   Payload []byte `json:"payload"`  // 消息体,自行解析
}
type SqlMsg struct {
   Id      string
   Sql     string
   Version string
}
type SyncServer struct {
@@ -169,13 +180,20 @@
   os.Exit(0)
}
func (ss *SyncServer) pubSyncSqlMessage(payload []byte, targetId string) error {
func (ss *SyncServer) pubSyncSqlMessage(sql string, targetId string) error {
   sqlMsg := SqlMsg{
      Id:  uuid.NewV4().String(),
      Sql: sql,
   }
   bMsg, _ := json.Marshal(sqlMsg)
   var msg = ProcMessageEvent{
      Owner:   ss.ServerId,
      Target:  targetId,
      Proc:    ss.ProcName,
      Topic:   ss.syncSqlTopic,
      Payload: payload,
      Payload: bMsg,
   }
   b, err := json.Marshal(msg)
@@ -344,7 +362,7 @@
            syncSql := strings.Join(sqlBuf, "")
            //fmt.Println("同步sql语句:", syncSql)
            ss.pubSyncSqlMessage([]byte(syncSql), "")
            ss.pubSyncSqlMessage(syncSql, "")
            sqlBuf = append([]string{})
            sendSize = 0
@@ -355,7 +373,7 @@
               syncSql := strings.Join(sqlBuf, "")
               //fmt.Println("同步sql语句:", syncSql)
               ss.pubSyncSqlMessage([]byte(syncSql), "")
               ss.pubSyncSqlMessage(syncSql, "")
               sqlBuf = append([]string{})
            }
@@ -373,9 +391,25 @@
   }
}
func (ss *SyncServer) handleClusterMessage(msg []byte) {
   logx.Infof("clusterMessage:", string(msg))
   sql := string(msg)
func (ss *SyncServer) handleClusterMessage(clusterMsgData []byte) {
   var msg SqlMsg
   err := json.Unmarshal(clusterMsgData,&msg)
   if err != nil {
      logx.Errorf(" Unmarshal cluster message error, %s",err.Error())
      return
   }
   // 判断消息是否曾经接收过
   if sqlMsgSeqCache.Exists(msg.Id) {
      logx.Infof("clusterMessage:接收到重复消息, %s", msg.Sql)
      return
   }
   // 记录消息id, 半小时过期
   sqlMsgSeqCache.Add(msg.Id, 30*time.Minute, true)
   logx.Infof("clusterMessage:%s", msg.Sql)
   sql := msg.Sql
   if len(sql) <= 0 {
      return
@@ -420,12 +454,12 @@
   logx.Infof("DumpTables sql:%v", sqls)
   syncSql := strings.Join(sqls, ";")
   if len(syncSql) < sizeLimit {
      err = ss.pubSyncSqlMessage([]byte(syncSql), targetId)
      err = ss.pubSyncSqlMessage(syncSql, targetId)
   } else {
      shard := ""
      for _, sql := range sqls {
         if len(shard)+len(sql) > sizeLimit {
            err = ss.pubSyncSqlMessage([]byte(shard), targetId)
            err = ss.pubSyncSqlMessage(shard, targetId)
            shard = ""
         }
@@ -433,7 +467,7 @@
      }
      if len(shard) > 0 {
         err = ss.pubSyncSqlMessage([]byte(shard), targetId)
         err = ss.pubSyncSqlMessage(shard, targetId)
      }
   }