zhangzengfei
2023-09-07 55aa27a6ad0e012d62dcea2db37528a1b18836fb
system-service/service/clusterService.go
@@ -100,10 +100,6 @@
//根据集群名称和密码创建集群
func (s ClusterService) Create(clusterName string, pwd string, virtualIp string) (bool, string) {
   devTyp := GetDevType(config.Server.DeviceType)
   if devTyp != DevType_Storage && devTyp != DevType_Analysis_Storage {
      return false, "只有分析和存储节点才能创建集群"
   }
   clusterId := uuid.NewV4().String()
   pwd = config.ClusterSet.PwdPre + pwd
   var clusterE = models.Cluster{
@@ -119,19 +115,6 @@
      b := clusterE.Create()
      if b {
         serf.InitAgent(context.Background())
         //创建es集群
         if _, err := CreateOriginalCluster("/opt/elasticsearch/config", "/opt/vasystem/script", "/opt/vasystem/indexInit"); err != nil {
            //创建失败,则serf集群也要推出
            s.Leave(false)
            return false, err.Error()
         }
         //创建weedfs集群
         if _, err = CreateWeedfsServer("/opt/vasystem/script"); err != nil {
            s.Leave(false)
            return false, err.Error()
         }
         chMsg := protomsg.DbChangeMessage{
            Id:     clusterE.ClusterId,
@@ -221,47 +204,31 @@
      serf.Agent = agent
      t := time.Now()
      syncB := syncTableDataFromCluster(joinArg)
      logger.Debugf("AddCluster syncTableDataFromCluster time=%v, syncB=%v", time.Since(t), syncB)
      if syncB {
         devTyp := GetDevType(config.Server.DeviceType)
         if devTyp == DevType_Analysis_Storage || devTyp == DevType_Storage { //只有能启动ES的节点才能加入ES
            go func() {
               //添加ES节点
               if _, err := AddCluster("/opt/elasticsearch/config", "/opt/vasystem/script", targetIp, "9200", config.EsConfig.StorePath); err != nil {
                  s.Leave(false)
                  return
               }
               //添加weedfs节点
               if _, err := AddWeedfsServer("/opt/vasystem/script", targetIp, config.StorageConf.VolumePath); err != nil {
                  s.Leave(false)
                  return
               }
            }()
         }
         //需要重新初始化本地比对进程
         go serf.ReInitDbPersonCompareData()
         chMsg := protomsg.DbChangeMessage{
            Id:     joinArg.ClusterId,
            Table:  protomsg.TableChanged_T_Cluster,
            Action: protomsg.DbAction_Insert,
            Info:   "",
         }
         s.AddDbMessage(&chMsg)
      syncTableDataFromCluster(joinArg)
      logger.Debugf("AddCluster  time=%v", time.Since(t))
      //if syncB {
      //   //需要重新初始化本地比对进程
      //   go serf.ReInitDbPersonCompareData()
      //   chMsg := protomsg.DbChangeMessage{
      //      Id:     joinArg.ClusterId,
      //      Table:  protomsg.TableChanged_T_Cluster,
      //      Action: protomsg.DbAction_Insert,
      //      Info:   "",
      //   }
      //
      //   s.AddDbMessage(&chMsg)
         logger.Debugf("AddCluster 加入集群成功 time=%v", time.Since(start))
         return true, nil
      } else {
         logger.Debug("AddCluster syncTableDataFromCluster fail")
         if agent != nil {
            agent.Leave()
            err = agent.Shutdown()
            if err != nil {
               logger.Debug("AddCluster agent shutdown err:", err)
            }
         }
      }
      //} else {
      //   logger.Debug("AddCluster syncTableDataFromCluster fail")
      //   if agent != nil {
      //      agent.Leave()
      //      err = agent.Shutdown()
      //      if err != nil {
      //         logger.Debug("AddCluster agent shutdown err:", err)
      //      }
      //   }
      //}
   } else {
      logger.Debug("AddCluster dbSync.Init err:", err)
      if agent != nil {
@@ -336,23 +303,6 @@
func (s ClusterService) Leave(isDel bool) (bool, error) {
   start := time.Now()
   devType := GetDevType(config.Server.DeviceType)
   logger.Debugf("Leave GetDevType=%v, isDel=%v", devType, isDel)
   if devType == DevType_Analysis_Storage || devType == DevType_Storage { //只有分析和存储才有ES集群
      go func() {
         //退出weedfs节点
         if _, err := ExitWeedfsServer("/opt/vasystem/script", isDel, config.StorageConf.VolumePath); err != nil {
            logger.Error("Leave ExitWeedfsServer err:", err)
            return
         }
         //退出es节点
         if _, err := ExitCluster("/opt/elasticsearch/config", "/opt/vasystem/script"); err != nil {
            logger.Error("Leave ExitCluster es cluster err:", err)
            return
         }
      }()
   }
   var err error
   tx := models.GetDB().Begin()
@@ -430,6 +380,7 @@
         tx.Rollback()
      }
   }()
   //0.关闭reference
   tx.Exec("PRAGMA foreign_keys=OFF")
   //1.删除本地的同步库数据
@@ -447,6 +398,7 @@
         return false
      }
   }
   //2.拉取集群内的同步库数据到本地数据库表中
   var dumpSqls *[]string
   dumpSqls, err = serf.GetTableDataFromCluster(serf.Agent, joinArg.ClusterId, serf.SyncTables, 20*time.Second)