zhangzengfei
2023-11-28 3a706d3378aa3626501370352963883fd2783558
system-service/service/clusterService.go
@@ -3,6 +3,7 @@
import (
   "context"
   "encoding/json"
   "fmt"
   "strconv"
   sysSync "sync"
   "time"
@@ -137,6 +138,31 @@
   return false, ""
}
// 根据集群名称和密码创建集群
func (s ClusterService) UpdateDriftStateByNodeId(clusterId, nodeId, role string) (bool, string) {
   var node models.Node
   isSuccess := node.UpdateDriftStateByNodeId(role, nodeId, false)
   if isSuccess {
      // 通知主节点变更
      chMsg := protomsg.DbChangeMessage{
         Id:     clusterId,
         Table:  protomsg.TableChanged_T_Cluster,
         Action: protomsg.DbAction_Insert,
         Info:   "slave2master",
      }
      s.AddDbMessage(&chMsg)
      err := serf.Agent.UserEvent(serf.UserEventChangeMaster, []byte(nodeId), false)
      if err != nil {
         logger.Error("UserEventSyncVirtualIp err:", err)
      }
   }
   return isSuccess, ""
}
func (s ClusterService) SearchByPwd(pwd string) (err error) {
   _, isSearching := getFromSearchMap()
   if isSearching {
@@ -190,6 +216,7 @@
      logger.Debug("AddCluster JoinCluster len(joinIps)=0")
      return false, errors.New("加入的目标ip不能为空")
   }
   logger.Debug("AddCluster joinIps:", joinIps)
   joinArg.Password = config.ClusterSet.PwdPre + joinArg.Password
@@ -207,29 +234,29 @@
      serf.Agent = agent
      t := time.Now()
      syncTableDataFromCluster(joinArg)
      syncClusterNodes := syncTableDataFromCluster(joinArg)
      logger.Debugf("AddCluster  time=%v", time.Since(t))
      //if syncB {
      chMsg := protomsg.DbChangeMessage{
         Id:     joinArg.ClusterId,
         Table:  protomsg.TableChanged_T_Cluster,
         Action: protomsg.DbAction_Insert,
         Info:   "join",
      }
      if syncClusterNodes {
         chMsg := protomsg.DbChangeMessage{
            Id:     joinArg.ClusterId,
            Table:  protomsg.TableChanged_T_Cluster,
            Action: protomsg.DbAction_Insert,
            Info:   "join",
         }
      s.AddDbMessage(&chMsg)
      logger.Debugf("AddCluster 加入集群成功 time=%v", time.Since(start))
      return true, nil
      //} else {
      //   logger.Debug("AddCluster syncTableDataFromCluster fail")
      //   if agent != nil {
      //      agent.Leave()
      //      err = agent.Shutdown()
      //      if err != nil {
      //         logger.Debug("AddCluster agent shutdown err:", err)
      //      }
      //   }
      //}
         s.AddDbMessage(&chMsg)
         logger.Debugf("AddCluster 加入集群成功 time=%v", time.Since(start))
         return true, nil
      } else {
         logger.Debug("AddCluster syncTableDataFromCluster fail")
         agent.Leave()
         err = agent.Shutdown()
         if err != nil {
            logger.Debug("AddCluster agent shutdown err:", err)
         }
         return false, errors.New("加入集群失败")
      }
   } else {
      logger.Debug("AddCluster dbSync.Init err:", err)
      if agent != nil {
@@ -239,7 +266,9 @@
      }
   }
   logger.Debugf("AddCluster 加入集群失败, targetIp=%v, time=%v", targetIp, time.Since(start))
   return false, errors.New("加入集群失败")
}
@@ -403,7 +432,7 @@
   dumpSqls, err = serf.GetTableDataFromCluster(serf.Agent, joinArg.ClusterId, serf.SyncTables, 20*time.Second)
   if dumpSqls != nil && len(*dumpSqls) > 0 {
      for _, sqlStr := range *dumpSqls {
         //logger.Debug("gorm exec dumpSql:", sqlStr)
         logger.Debug("gorm exec dumpSql:", sqlStr)
         if err = tx.Exec(sqlStr).Error; err != nil {
            logger.Error("gorm exec dumpSql:", sqlStr, " error:", err.Error())
            return false
@@ -423,8 +452,11 @@
   serverIp, _, e1 := util.GetLocalIP(config.Server.NetworkAdapter)
   if e1 != nil || serverIp == "" {
      err = errors.New("get serverIp err")
      logger.Error("get serverIp err")
      return false
   }
   logger.Debug("cur Node serverIp:", serverIp, "nodeName:", nodeName)
   if nodeName == "" {
      nodeName = serverIp
@@ -436,7 +468,9 @@
      logger.Debug("add cur node err:", err)
      return false
   }
   if err = tx.Exec("update cluster_node set isDelete=0 where id='" + config.Server.AnalyServerId + "'").Error; err != nil {
   joinSql := fmt.Sprintf("update cluster_node set isDelete=0,drift_state='slave',node_ip='%s',create_time='%s' where node_id='%s'", serverIp+":"+strconv.Itoa(dbSync.DefaultBindPort), fmtTimeStr, config.Server.AnalyServerId)
   if err = tx.Exec(joinSql).Error; err != nil {
      logger.Debug("update isDelete err:", err)
      return false
   }
@@ -445,6 +479,7 @@
   tx.Exec("PRAGMA foreign_keys=ON")
   tx.Commit()
   serf.SyncSql([]string{sql})
   return true
}
@@ -461,6 +496,7 @@
func ClusterSyncProcMessage(payload []byte) {
   if serf.Agent == nil {
      logger.Error("未加入集群")
      return
   }
   err := serf.Agent.UserEvent(serf.UserEventSyncMessage, payload, false)