fix
zhangqian
2023-12-01 8324f872ef3a4d0c978a9b1d062800c6a1701c12
serf/sync.go
@@ -6,9 +6,12 @@
   "fmt"
   "os"
   "os/signal"
   "regexp"
   "strings"
   "syscall"
   "time"
   "apsClient/pkg/logx"
   "basic.com/pubsub/protomsg.git"
   "basic.com/valib/bhomeclient.git"
@@ -16,10 +19,18 @@
   "github.com/gogo/protobuf/proto"
   "github.com/jinzhu/gorm"
   "github.com/satori/go.uuid"
   "github.com/mitchellh/mapstructure"
   "github.com/muesli/cache2go"
)
var (
   agent = SyncServer{}
   agent       = SyncServer{}
   dependProcs = []string{
      bhomeclient.Proc_System_Service,
   }
   sqlMsgSeqCache = cache2go.Cache("syncSqlMsg")
)
const (
@@ -38,6 +49,12 @@
   Proc    string `json:"procName"` // 进程名
   Topic   string `json:"topic"`    // 主题
   Payload []byte `json:"payload"`  // 消息体,自行解析
}
type SqlMsg struct {
   Id      string
   Sql     string
   Version string
}
type SyncServer struct {
@@ -61,9 +78,10 @@
   agent.queryTableTopic = procName + "/serf/query/sqls"
   // 设置日志回调
   db.SetLogger(&DbLogger{})
   db.SetLogger(&agent)
   // 先关闭日志
   db.LogMode(false)
   //db.LogMode(false)
   return &agent
}
@@ -101,6 +119,42 @@
   bhomedbapi.InitDoReq(client.RequestOnly)
   //bhomedbapi.InitLog(logger.Debug)
   // 需要等待system-service进程成功启动后,才能获取集群状态(或者保证程序启动时获取到正确的状态)
   tryTimes := 0
loop:
   for {
      select {
      case <-q:
         initChan <- false
         return
      default:
         if tryTimes < 15 {
            clients, err := client.GetRegisteredClient()
            if err == nil && len(clients) > 0 {
               var existingProcs []string
               for _, c := range clients {
                  if c.Online {
                     existingProcs = append(existingProcs, string(c.Proc.ProcId))
                  }
               }
               if diff := arrayContains(existingProcs, dependProcs); diff == "" {
                  break loop
               } else {
                  logx.Errorf("Proc: %s is not running!", diff)
                  time.Sleep(time.Second * 1)
               }
            } else {
               tryTimes++
               time.Sleep(time.Second * 5)
            }
         } else {
            logx.Errorf("tried 15 times, client.GetRegisteredClient failed")
            initChan <- false
            return
         }
      }
   }
   go client.StartServer(nil)
   ss.bhClient = client
@@ -112,9 +166,9 @@
   // 启动后查询一次集群状态
   ss.QueryClusterStat()
   if ss.ClusterStatus != "" {
      ss.sqlDB.LogMode(true)
   }
   //if ss.ClusterStatus != "" {
   //ss.sqlDB.LogMode(true)
   //}
   initChan <- true
   <-q
@@ -126,13 +180,20 @@
   os.Exit(0)
}
func (ss *SyncServer) pubSyncSqlMessage(payload []byte, targetId string) error {
func (ss *SyncServer) pubSyncSqlMessage(sql string, targetId string) error {
   sqlMsg := SqlMsg{
      Id:  uuid.NewV4().String(),
      Sql: sql,
   }
   bMsg, _ := json.Marshal(sqlMsg)
   var msg = ProcMessageEvent{
      Owner:   ss.ServerId,
      Target:  targetId,
      Proc:    ss.ProcName,
      Topic:   ss.syncSqlTopic,
      Payload: payload,
      Payload: bMsg,
   }
   b, err := json.Marshal(msg)
@@ -157,7 +218,7 @@
      return err
   }
   fmt.Println("加入集群, 请求同步全量数据,id:", ss.ServerId)
   logx.Debugf("加入集群, 请求同步全量数据,id:%s", ss.ServerId)
   return ss.bhClient.Publish(serfSyncTopic, b)
}
@@ -177,7 +238,7 @@
         // 处理同步全量数据的请求
         if string(busMsg.Topic) == ss.queryTableTopic {
            if ss.ClusterStatus == "master" {
               fmt.Println("接收到同步全量数据请求")
               logx.Debugf("接收到同步全量数据请求.")
               ss.handleSyncTableMessage(busMsg.Data)
            }
         }
@@ -199,28 +260,28 @@
                  // 创建集群, 开启日志跟踪, 设置角色master
                  ss.clusterEventFn(EventCreateCluster)
                  ss.ClusterStatus = "master"
                  ss.sqlDB.LogMode(true)
                  //ss.sqlDB.LogMode(true)
               case "join":
                  // 加入集群, 开启日志跟踪, 设置角色slave
                  ss.clusterEventFn(EventJoinCluster)
                  ss.onJoinCluster()
                  ss.ClusterStatus = "slave"
                  ss.sqlDB.LogMode(true)
                  //ss.sqlDB.LogMode(true)
               case "leave":
                  // 退出集群, 开启日志跟踪, 设置角色slave
                  ss.clusterEventFn(EventLeaveCluster)
                  ss.ClusterStatus = ""
                  ss.sqlDB.LogMode(false)
                  //ss.sqlDB.LogMode(true)
               case "slave2master":
                  ss.clusterEventFn(EventSlave2Master)
                  ss.ClusterStatus = "master"
                  ss.sqlDB.LogMode(true)
                  //ss.sqlDB.LogMode(true)
               case "master2slave":
                  ss.clusterEventFn(EventMaster2Slave)
                  ss.ClusterStatus = "slave"
                  ss.sqlDB.LogMode(true)
                  //ss.sqlDB.LogMode(true)
               }
            }
         }
@@ -248,7 +309,7 @@
      err = tx.Exec(delSql).Error
      if err != nil {
         fmt.Println("删除本地的同步库数据失败,", err.Error())
         logx.Errorf("删除本地的同步库数据失败, %s", err.Error())
      }
   }
@@ -269,7 +330,7 @@
}
// 查询集群状态, 返回 master, slave, leave
func (ss *SyncServer) QueryClusterStat() string {
func (ss *SyncServer) QueryClusterStat() *bhomeclient.Reply {
   clusterStatTopic := "/data/api-v/cluster/status"
   req := bhomeclient.Request{
      Path:   clusterStatTopic,
@@ -280,14 +341,14 @@
   if err != nil {
      fmt.Println("RequestTopic error", err.Error())
      return ""
      return reply
   }
   ss.ClusterStatus = reply.Msg
   fmt.Println("当前集群状态:", ss.ClusterStatus)
   logx.Debugf("当前集群状态: %s", ss.ClusterStatus)
   return reply.Msg
   return reply
}
func (ss *SyncServer) handleDbLoggerPrint() {
@@ -301,7 +362,7 @@
            syncSql := strings.Join(sqlBuf, "")
            //fmt.Println("同步sql语句:", syncSql)
            ss.pubSyncSqlMessage([]byte(syncSql), "")
            ss.pubSyncSqlMessage(syncSql, "")
            sqlBuf = append([]string{})
            sendSize = 0
@@ -312,7 +373,7 @@
               syncSql := strings.Join(sqlBuf, "")
               //fmt.Println("同步sql语句:", syncSql)
               ss.pubSyncSqlMessage([]byte(syncSql), "")
               ss.pubSyncSqlMessage(syncSql, "")
               sqlBuf = append([]string{})
            }
@@ -330,9 +391,25 @@
   }
}
func (ss *SyncServer) handleClusterMessage(msg []byte) {
   //fmt.Println("clusterMessage:", string(msg))
   sql := string(msg)
func (ss *SyncServer) handleClusterMessage(clusterMsgData []byte) {
   var msg SqlMsg
   err := json.Unmarshal(clusterMsgData,&msg)
   if err != nil {
      logx.Errorf(" Unmarshal cluster message error, %s",err.Error())
      return
   }
   // 判断消息是否曾经接收过
   if sqlMsgSeqCache.Exists(msg.Id) {
      logx.Infof("clusterMessage:接收到重复消息, %s", msg.Sql)
      return
   }
   // 记录消息id, 半小时过期
   sqlMsgSeqCache.Add(msg.Id, 30*time.Minute, true)
   logx.Infof("clusterMessage:%s", msg.Sql)
   sql := msg.Sql
   if len(sql) <= 0 {
      return
@@ -362,17 +439,143 @@
   }
}
// serf 同步数据的限制为92160 byte
func (ss *SyncServer) handleSyncTableMessage(msg []byte) error {
   sizeLimit := 61440
   targetId := string(msg)
   fmt.Println("同步全量数据给节点:", targetId)
   //fmt.Println("同步全量数据给节点:", targetId)
   sqls, err := DumpTables(ss.sqlDB, ss.syncTables)
   if err != nil {
      fmt.Println("DumpTables error, ", err.Error())
      logx.Errorf("DumpTables error: %s", err.Error())
      return err
   }
   logx.Infof("DumpTables sql:%v", sqls)
   syncSql := strings.Join(sqls, ";")
   err = ss.pubSyncSqlMessage([]byte(syncSql), targetId)
   if len(syncSql) < sizeLimit {
      err = ss.pubSyncSqlMessage(syncSql, targetId)
   } else {
      shard := ""
      for _, sql := range sqls {
         if len(shard)+len(sql) > sizeLimit {
            err = ss.pubSyncSqlMessage(shard, targetId)
            shard = ""
         }
         shard = fmt.Sprintf("%s%s;", shard, sql)
      }
      if len(shard) > 0 {
         err = ss.pubSyncSqlMessage(shard, targetId)
      }
   }
   return err
}
func (ss *SyncServer) Print(values ...interface{}) {
   var (
      level = values[0]
   )
   //fmt.Println("dblogger", values)
   if level == "sql" {
      msgArr := gorm.LogFormatter(values...)
      sql := msgArr[3].(string)
      logx.Infof("sql: %v", sql)
      sql = strings.TrimPrefix(sql, " ")
      if !strings.HasPrefix(sql, "SELECT") && !strings.HasPrefix(sql, "select") && !strings.Contains(sql, "PRAGMA") && !strings.Contains(sql, "pragma") {
         affected := values[5].(int64)
         if affected > 0 { //执行成功
            //判断操作的是哪张表
            whereIdx := strings.Index(sql, "WHERE")
            sqlWithTable := sql
            if whereIdx > -1 {
               sqlWithTable = sql[:whereIdx]
            }
            //fmt.Println("判断是哪张表 sqlWithTable:", sqlWithTable)
            insertReg := regexp.MustCompile(`^\s*(?i:insert)\s`) //insert
            updateReg := regexp.MustCompile(`^\s*(?i:update)\s`) //update
            delReg := regexp.MustCompile(`^\s*(?i:delete)\s`)    //delete
            if insertReg.MatchString(sqlWithTable) {
               //fmt.Println("插入操作")
               for _, t := range agent.syncTables {
                  reg := regexp.MustCompile(`\s+\"?(?i:` + t + `)\"?\s+`)
                  if reg.MatchString(sqlWithTable) {
                     //fmt.Println("属于同步表:", t)
                     // 判断是在集群内, 同步消息, 判断两种角色, 为避免其他出现状态
                     if ss.ClusterStatus == "master" || ss.ClusterStatus == "slave" {
                        syncSqlChan <- sql
                     }
                  }
               }
            } else if updateReg.MatchString(sqlWithTable) || delReg.MatchString(sqlWithTable) {
               //fmt.Println("删除或者更新")
               for _, t := range agent.syncTables {
                  reg := regexp.MustCompile(`\s+\"?(?i:` + t + `)\"?\s+`)
                  if reg.MatchString(sqlWithTable) {
                     //fmt.Println("属于同步表:", t)
                     if ss.ClusterStatus == "master" || ss.ClusterStatus == "slave" {
                        syncSqlChan <- sql
                     }
                  }
               }
            }
         }
      }
   } else {
      fmt.Println("dbLogger level!=sql")
   }
}
func arrayContains(list []string, arr []string) string {
   if arr == nil || list == nil {
      return ""
   }
   for _, s := range arr {
      isExist := false
      for _, t := range list {
         if s == t {
            isExist = true
            break
         }
      }
      if !isExist {
         return s
      }
   }
   return ""
}
type NodeInfo struct {
   NodeID     string `json:"node_id,omitempty"`
   NodeIp     string `json:"node_ip,omitempty"`
   NodeName   string `json:"node_name,omitempty"`
   ClusterID  string `json:"cluster_id"`
   CreateTime string `json:"create_time"`
   DeviceType string `json:"device_type"`
   DriftState string `json:"drift_state"`
   Online     bool   `json:"online"`
}
func QueryClusterStatusAndNodeQuantity() (string, int) {
   reply := agent.QueryClusterStat()
   if reply == nil {
      return "", 0
   }
   var nodes []NodeInfo
   err := mapstructure.Decode(reply.Data, &nodes)
   if err != nil {
      logx.Errorf("mapstructure.Decode QueryClusterStat data err:%v", err)
      return reply.Msg, 0
   }
   return reply.Msg, len(nodes)
}