zhangzengfei
2023-10-08 2cd1af13bc4e7aec4c85b9fe88db2d294af6468f
system-service/service/clusterService.go
@@ -98,12 +98,8 @@
   return m
}
//根据集群名称和密码创建集群
// 根据集群名称和密码创建集群
func (s ClusterService) Create(clusterName string, pwd string, virtualIp string) (bool, string) {
   devTyp := GetDevType(config.Server.DeviceType)
   if devTyp != DevType_Storage && devTyp != DevType_Analysis_Storage {
      return false, "只有分析和存储节点才能创建集群"
   }
   clusterId := uuid.NewV4().String()
   pwd = config.ClusterSet.PwdPre + pwd
   var clusterE = models.Cluster{
@@ -116,32 +112,21 @@
   arr, err := clusterE.FindAll()
   if err == nil && (arr == nil || len(arr) == 0) {
      b := clusterE.Create()
      if b {
      err = clusterE.Create()
      if err == nil {
         serf.InitAgent(context.Background())
         //创建es集群
         if _, err := CreateOriginalCluster("/opt/elasticsearch/config", "/opt/vasystem/script", "/opt/vasystem/indexInit"); err != nil {
            //创建失败,则serf集群也要推出
            s.Leave(false)
            return false, err.Error()
         }
         //创建weedfs集群
         if _, err = CreateWeedfsServer("/opt/vasystem/script"); err != nil {
            s.Leave(false)
            return false, err.Error()
         }
         chMsg := protomsg.DbChangeMessage{
            Id:     clusterE.ClusterId,
            Table:  protomsg.TableChanged_T_Cluster,
            Action: protomsg.DbAction_Insert,
            Info:   "",
            Info:   "create",
         }
         s.AddDbMessage(&chMsg)
         return true, clusterId
      } else {
         logger.Error("初始化集群数据库信息失败. ", err.Error())
      }
   } else {
      if s.UpdateClusterName(clusterName, virtualIp) {
@@ -186,7 +171,7 @@
   }
}
//加入集群
// 加入集群
func (s ClusterService) JoinCluster(joinArg *vo.ClusterJoinVo) (bool, error) {
   start := time.Now()
   if config.Server.AnalyServerId == "" {
@@ -218,50 +203,33 @@
   agent, err := dbSync.Init(joinArg.ClusterId, joinArg.Password, config.Server.AnalyServerId, joinIps, config.ClusterSet.SerfSnapShotPath, conf)
   if err == nil && agent != nil { //加入成功
      logger.Debug("AddCluster dbSync.Init success")
      agent.RegisterHandleEventFunc(serf.HandleSerfEvent)
      serf.Agent = agent
      t := time.Now()
      syncB := syncTableDataFromCluster(joinArg)
      logger.Debugf("AddCluster syncTableDataFromCluster time=%v, syncB=%v", time.Since(t), syncB)
      if syncB {
         devTyp := GetDevType(config.Server.DeviceType)
         if devTyp == DevType_Analysis_Storage || devTyp == DevType_Storage { //只有能启动ES的节点才能加入ES
            go func() {
               //添加ES节点
               if _, err := AddCluster("/opt/elasticsearch/config", "/opt/vasystem/script", targetIp, "9200", config.EsConfig.StorePath); err != nil {
                  s.Leave(false)
                  return
               }
               //添加weedfs节点
               if _, err := AddWeedfsServer("/opt/vasystem/script", targetIp, config.StorageConf.VolumePath); err != nil {
                  s.Leave(false)
                  return
               }
            }()
         }
         //需要重新初始化本地比对进程
         go serf.ReInitDbPersonCompareData()
         chMsg := protomsg.DbChangeMessage{
            Id:     joinArg.ClusterId,
            Table:  protomsg.TableChanged_T_Cluster,
            Action: protomsg.DbAction_Insert,
            Info:   "",
         }
         s.AddDbMessage(&chMsg)
         logger.Debugf("AddCluster 加入集群成功 time=%v", time.Since(start))
         return true, nil
      } else {
         logger.Debug("AddCluster syncTableDataFromCluster fail")
         if agent != nil {
            agent.Leave()
            err = agent.Shutdown()
            if err != nil {
               logger.Debug("AddCluster agent shutdown err:", err)
            }
         }
      syncTableDataFromCluster(joinArg)
      logger.Debugf("AddCluster  time=%v", time.Since(t))
      //if syncB {
      chMsg := protomsg.DbChangeMessage{
         Id:     joinArg.ClusterId,
         Table:  protomsg.TableChanged_T_Cluster,
         Action: protomsg.DbAction_Insert,
         Info:   "join",
      }
      s.AddDbMessage(&chMsg)
      logger.Debugf("AddCluster 加入集群成功 time=%v", time.Since(start))
      return true, nil
      //} else {
      //   logger.Debug("AddCluster syncTableDataFromCluster fail")
      //   if agent != nil {
      //      agent.Leave()
      //      err = agent.Shutdown()
      //      if err != nil {
      //         logger.Debug("AddCluster agent shutdown err:", err)
      //      }
      //   }
      //}
   } else {
      logger.Debug("AddCluster dbSync.Init err:", err)
      if agent != nil {
@@ -282,7 +250,7 @@
   DevType_Other            = "other"            //其他设备类型,eg:应用
)
//获取本机类型,只进行分析、分析存储一体、只存储、其他。。
// 获取本机类型,只进行分析、分析存储一体、只存储、其他。。
func GetDevType(dt string) string {
   if dt != "" {
      if len(dt) >= 4 {
@@ -336,23 +304,6 @@
func (s ClusterService) Leave(isDel bool) (bool, error) {
   start := time.Now()
   devType := GetDevType(config.Server.DeviceType)
   logger.Debugf("Leave GetDevType=%v, isDel=%v", devType, isDel)
   if devType == DevType_Analysis_Storage || devType == DevType_Storage { //只有分析和存储才有ES集群
      go func() {
         //退出weedfs节点
         if _, err := ExitWeedfsServer("/opt/vasystem/script", isDel, config.StorageConf.VolumePath); err != nil {
            logger.Error("Leave ExitWeedfsServer err:", err)
            return
         }
         //退出es节点
         if _, err := ExitCluster("/opt/elasticsearch/config", "/opt/vasystem/script"); err != nil {
            logger.Error("Leave ExitCluster es cluster err:", err)
            return
         }
      }()
   }
   var err error
   tx := models.GetDB().Begin()
@@ -382,7 +333,7 @@
         Id:     "",
         Table:  protomsg.TableChanged_T_Cluster,
         Action: protomsg.DbAction_Delete,
         Info:   "",
         Info:   "leave",
      }
      logger.Debugf("Leave delete db time=%v", time.Since(t))
      tm := time.Now()
@@ -391,6 +342,7 @@
   }
   logger.Debugf("Leave success time=%v", time.Since(start))
   return true, nil
}
@@ -411,7 +363,7 @@
   return false
}
//加入集群后清空本地的同步库数据
// 加入集群后清空本地的同步库数据
func syncTableDataFromCluster(joinArg *vo.ClusterJoinVo) bool {
   var lc models.LocalConfig
   e := lc.Select()
@@ -424,36 +376,36 @@
   db := models.GetDB()
   db.LogMode(false)
   defer db.LogMode(true)
   tx := db.Begin()
   defer func() {
      if err != nil && tx != nil {
         tx.Rollback()
      }
   }()
   //0.关闭reference
   tx.Exec("PRAGMA foreign_keys=OFF")
   //1.删除本地的同步库数据
   for _, t := range serf.SyncTables {
      delSql := ""
      if t == "dbtables" {
         delSql = "delete from " + t + " where (analyServerId='' or analyServerId=NULL)"
      } else if t == "dbtablepersons" {
         delSql = "delete from " + t + " where tableId in (select id from dbtables where (analyServerId='' or analyServerId=NULL))"
      } else {
         delSql = "delete from " + t + ""
      }
      delSql := "delete from " + t + ""
      err = tx.Exec(delSql).Error
      if err != nil {
         logger.Error("删除本地的同步库数据失败,", err.Error())
         logger.Error("sql:", delSql)
         return false
      }
   }
   //2.拉取集群内的同步库数据到本地数据库表中
   var dumpSqls *[]string
   dumpSqls, err = serf.GetTableDataFromCluster(serf.Agent, joinArg.ClusterId, serf.SyncTables, 20*time.Second)
   if dumpSqls != nil && len(*dumpSqls) > 0 {
      for _, sqlStr := range *dumpSqls {
         logger.Debug("gorm exec dumpSql:", sqlStr)
         //logger.Debug("gorm exec dumpSql:", sqlStr)
         if err = tx.Exec(sqlStr).Error; err != nil {
            logger.Error("gorm exec dumpSql:", sqlStr, " error:", err.Error())
            return false
         }
      }
@@ -505,3 +457,14 @@
   var lc models.Node
   return lc.FindIpByNode(nodeId)
}
func ClusterSyncProcMessage(payload []byte) {
   if serf.Agent == nil {
      logger.Error("未加入集群")
   }
   err := serf.Agent.UserEvent(serf.UserEventSyncMessage, payload, false)
   if err != nil {
      logger.Error("UserEventSyncMessage err:", err)
   }
}