zhangzengfei
2023-09-05 63645d248c765244488cd34dbc1bb6528ca6b7c7
system-service/service/clusterService.go
@@ -1,507 +1,507 @@
package service
import (
   "context"
   "encoding/json"
   "strconv"
   sysSync "sync"
   "time"
   "vamicro/config"
   "vamicro/extend/util"
   "vamicro/system-service/models"
   "vamicro/system-service/serf"
   "vamicro/system-service/sys"
   "vamicro/system-service/vo"
   "basic.com/pubsub/protomsg.git"
   dbSync "basic.com/syncdb.git"
   "basic.com/valib/bhomeclient.git"
   "basic.com/valib/logger.git"
   "github.com/hashicorp/memberlist"
   "github.com/pkg/errors"
   uuid "github.com/satori/go.uuid"
)
type ClusterService struct {
   bk bhomeclient.Broker
}
func NewClusterService(broker bhomeclient.Broker) *ClusterService {
   return &ClusterService{
      bk: broker,
   }
}
var clusterSearchKey = "clusterSearchKey"
var searchMap = make(map[string]*memberlist.Memberlist, 0)
var lock sysSync.Mutex
func set2SearchMap(memList *memberlist.Memberlist) {
   lock.Lock()
   defer lock.Unlock()
   searchMap[clusterSearchKey] = memList
}
func getFromSearchMap() (*memberlist.Memberlist, bool) {
   lock.Lock()
   defer lock.Unlock()
   if v, ok := searchMap[clusterSearchKey]; ok {
      return v, true
   } else {
      return nil, false
   }
}
func clearSearchResult(ml *memberlist.Memberlist) {
   time.Sleep(10 * time.Second)
   lock.Lock()
   defer lock.Unlock()
   if _, ok := searchMap[clusterSearchKey]; ok {
      dbSync.CloseSearchNode(ml)
      delete(searchMap, clusterSearchKey)
   }
}
func deleteFromSearchMap() {
   lock.Lock()
   defer lock.Unlock()
   if _, ok := searchMap[clusterSearchKey]; ok {
      delete(searchMap, clusterSearchKey)
   }
}
func (s ClusterService) FindAll() (arr []models.Cluster, err error) {
   var clusterE models.Cluster
   return clusterE.FindAll()
}
func (s ClusterService) FindNodesByClusterId(clusterId string) (nodes []models.Node, err error) {
   var nodeE models.Node
   return nodeE.FindNodesByClusterId(clusterId)
}
func (s ClusterService) FindAllClusterNodes() map[string]models.Node {
   m := make(map[string]models.Node, 0)
   var clusterE models.Cluster
   var nodeE models.Node
   arr, err := clusterE.FindAll()
   if err == nil && arr != nil {
      for _, clu := range arr {
         nodes, e := nodeE.FindNodesByClusterId(clu.ClusterId)
         if e == nil && nodes != nil {
            for _, n := range nodes {
               m[n.Id] = n
            }
         }
      }
   }
   return m
}
//根据集群名称和密码创建集群
func (s ClusterService) Create(clusterName string, pwd string, virtualIp string) (bool, string) {
   devTyp := GetDevType(config.Server.DeviceType)
   if devTyp != DevType_Storage && devTyp != DevType_Analysis_Storage {
      return false, "只有分析和存储节点才能创建集群"
   }
   clusterId := uuid.NewV4().String()
   pwd = config.ClusterSet.PwdPre + pwd
   var clusterE = models.Cluster{
      ClusterId:   clusterId,
      ClusterName: clusterName,
      Password:    pwd,
      VirtualIp:   virtualIp,
   }
   arr, err := clusterE.FindAll()
   if err == nil && (arr == nil || len(arr) == 0) {
      b := clusterE.Create()
      if b {
         serf.InitAgent(context.Background())
         //创建es集群
         if _, err := CreateOriginalCluster("/opt/elasticsearch/config", "/opt/vasystem/script", "/opt/vasystem/indexInit"); err != nil {
            //创建失败,则serf集群也要推出
            s.Leave(false)
            return false, err.Error()
         }
         //创建weedfs集群
         if _, err = CreateWeedfsServer("/opt/vasystem/script"); err != nil {
            s.Leave(false)
            return false, err.Error()
         }
         chMsg := protomsg.DbChangeMessage{
            Id:     clusterE.ClusterId,
            Table:  protomsg.TableChanged_T_Cluster,
            Action: protomsg.DbAction_Insert,
            Info:   "",
         }
         s.AddDbMessage(&chMsg)
         return true, clusterId
      }
   } else {
      if s.UpdateClusterName(clusterName, virtualIp) {
         return true, ""
      }
   }
   return false, ""
}
func (s ClusterService) SearchByPwd(pwd string) (err error) {
   _, isSearching := getFromSearchMap()
   if isSearching {
      return errors.New("other is searching,please wait")
   }
   pwd = config.ClusterSet.PwdPre + pwd
   ml, e := dbSync.CreateSearchNode(pwd)
   if e != nil {
      logger.Debug("CreateSearchNode err:", e)
      return errors.New("createSearchNode err")
   }
   set2SearchMap(ml)
   go clearSearchResult(ml)
   return nil
}
func (s ClusterService) SearchNodes() map[string]dbSync.NodeInfo {
   return dbSync.GetSearchNodes()
}
func (s ClusterService) StopSearching() bool {
   ml, _ := getFromSearchMap()
   if ml != nil {
      dbSync.CloseSearchNode(ml)
      deleteFromSearchMap()
      return true
   } else {
      return true
   }
}
//加入集群
func (s ClusterService) JoinCluster(joinArg *vo.ClusterJoinVo) (bool, error) {
   start := time.Now()
   if config.Server.AnalyServerId == "" {
      logger.Debug("AddCluster config serverId is nil")
      return false, errors.New("serverId配置有误")
   }
   targetIp := ""
   var joinIps []string
   for _, ipStr := range joinArg.NodeIps { //输入ip加入的情况,NodeIps里面元素只是ip
      if ip, b := util.IpCheck(ipStr); b {
         targetIp = ip
         joinIps = append(joinIps, ip+":30190")
      }
   }
   if len(joinIps) == 0 {
      logger.Debug("AddCluster JoinCluster len(joinIps)=0")
      return false, errors.New("加入的目标ip不能为空")
   }
   logger.Debug("AddCluster joinIps:", joinIps)
   joinArg.Password = config.ClusterSet.PwdPre + joinArg.Password
   conf := dbSync.DefaultConfig()
   localIp, _, _ := sys.GetLocalIP(config.Server.NetworkAdapter)
   if localIp != "" {
      conf.BindAddr = localIp
   }
   agent, err := dbSync.Init(joinArg.ClusterId, joinArg.Password, config.Server.AnalyServerId, joinIps, config.ClusterSet.SerfSnapShotPath, conf)
   if err == nil && agent != nil { //加入成功
      logger.Debug("AddCluster dbSync.Init success")
      serf.Agent = agent
      t := time.Now()
      syncB := syncTableDataFromCluster(joinArg)
      logger.Debugf("AddCluster syncTableDataFromCluster time=%v, syncB=%v", time.Since(t), syncB)
      if syncB {
         devTyp := GetDevType(config.Server.DeviceType)
         if devTyp == DevType_Analysis_Storage || devTyp == DevType_Storage { //只有能启动ES的节点才能加入ES
            go func() {
               //添加ES节点
               if _, err := AddCluster("/opt/elasticsearch/config", "/opt/vasystem/script", targetIp, "9200", config.EsConfig.StorePath); err != nil {
                  s.Leave(false)
                  return
               }
               //添加weedfs节点
               if _, err := AddWeedfsServer("/opt/vasystem/script", targetIp, config.StorageConf.VolumePath); err != nil {
                  s.Leave(false)
                  return
               }
            }()
         }
         //需要重新初始化本地比对进程
         go serf.ReInitDbPersonCompareData()
         chMsg := protomsg.DbChangeMessage{
            Id:     joinArg.ClusterId,
            Table:  protomsg.TableChanged_T_Cluster,
            Action: protomsg.DbAction_Insert,
            Info:   "",
         }
         s.AddDbMessage(&chMsg)
         logger.Debugf("AddCluster 加入集群成功 time=%v", time.Since(start))
         return true, nil
      } else {
         logger.Debug("AddCluster syncTableDataFromCluster fail")
         if agent != nil {
            agent.Leave()
            err = agent.Shutdown()
            if err != nil {
               logger.Debug("AddCluster agent shutdown err:", err)
            }
         }
      }
   } else {
      logger.Debug("AddCluster dbSync.Init err:", err)
      if agent != nil {
         agent.Leave()
         err = agent.Shutdown()
         logger.Debugf("AddCluster dbSync.Init err,shutdown, err=%v", err)
      }
   }
   logger.Debugf("AddCluster 加入集群失败, targetIp=%v, time=%v", targetIp, time.Since(start))
   return false, errors.New("加入集群失败")
}
const (
   DevType_Analysis         = "analysis"         //只做分析
   DevType_Storage          = "storage"          //只做存储
   DevType_Analysis_Storage = "analysis_storage" //分析加存储
   DevType_Other            = "other"            //其他设备类型,eg:应用
)
//获取本机类型,只进行分析、分析存储一体、只存储、其他。。
func GetDevType(dt string) string {
   if dt != "" {
      if len(dt) >= 4 {
         s := dt[2:4]
         if s == "01" {
            return DevType_Analysis
         } else if s == "02" {
            return DevType_Storage
         } else if s == "03" {
            return DevType_Analysis_Storage
         }
      }
   }
   return DevType_Other
}
func (s ClusterService) UpdateClusterName(clusterName string, virtualIp string) bool {
   var cE models.Cluster
   arr, e := cE.FindAll()
   if e == nil && arr != nil && len(arr) > 0 {
      if cE.UpdateClusterName(clusterName, virtualIp) {
         chMsg := protomsg.DbChangeMessage{
            Id:     "",
            Table:  protomsg.TableChanged_T_Cluster,
            Action: protomsg.DbAction_Update,
            Info:   virtualIp,
         }
         s.AddDbMessage(&chMsg)
         if arr[0].VirtualIp != virtualIp { //漂移ip有变化
            if serf.Agent != nil {
               b, _ := json.Marshal(&protomsg.DbChangeMessage{
                  Id:     "",
                  Table:  protomsg.TableChanged_T_Cluster,
                  Action: protomsg.DbAction_Update,
                  Info:   virtualIp,
               })
               err := serf.Agent.UserEvent(serf.UserEventSyncVirtualIp, b, false)
               if err != nil {
                  logger.Error("UserEventSyncVirtualIp err:", err)
               }
            }
         }
         return true
      }
   }
   return false
}
func (s ClusterService) Leave(isDel bool) (bool, error) {
   start := time.Now()
   devType := GetDevType(config.Server.DeviceType)
   logger.Debugf("Leave GetDevType=%v, isDel=%v", devType, isDel)
   if devType == DevType_Analysis_Storage || devType == DevType_Storage { //只有分析和存储才有ES集群
      go func() {
         //退出weedfs节点
         if _, err := ExitWeedfsServer("/opt/vasystem/script", isDel, config.StorageConf.VolumePath); err != nil {
            logger.Error("Leave ExitWeedfsServer err:", err)
            return
         }
         //退出es节点
         if _, err := ExitCluster("/opt/elasticsearch/config", "/opt/vasystem/script"); err != nil {
            logger.Error("Leave ExitCluster es cluster err:", err)
            return
         }
      }()
   }
   var err error
   tx := models.GetDB().Begin()
   defer func() {
      if err != nil && tx != nil {
         tx.Rollback()
      }
   }()
   if serf.Agent != nil {
      t := time.Now()
      serf.Agent.Stop()
      serf.Agent = nil
      err = tx.Exec("delete from cluster_node").Error
      if err != nil {
         logger.Error("Leave delete from cluster_node err:", err)
         return false, err
      }
      err = tx.Exec("delete from cluster").Error
      if err != nil {
         logger.Error("Leave delete from cluster err:", err)
         return false, err
      }
      tx.Commit()
      logger.Debug("Leave delete cluster_node and cluster from db")
      chMsg := protomsg.DbChangeMessage{
         Id:     "",
         Table:  protomsg.TableChanged_T_Cluster,
         Action: protomsg.DbAction_Delete,
         Info:   "",
      }
      logger.Debugf("Leave delete db time=%v", time.Since(t))
      tm := time.Now()
      s.AddDbMessage(&chMsg)
      logger.Debugf("Leave AddDbMessage time=%v", time.Since(tm))
   }
   logger.Debugf("Leave success time=%v", time.Since(start))
   return true, nil
}
func (s ClusterService) TestSyncSql() bool {
   var lc models.LocalConfig
   lc.Select()
   timeUnix := time.Now().Unix()
   fmtTimeStr := time.Unix(timeUnix, 0).Format("2006-01-02 15:04:05")
   serverIp, _, e := util.GetLocalIP(config.Server.NetworkAdapter)
   if e == nil && serverIp != "" {
      sql := "insert into cluster_node (id,cluster_id,node_name,node_id,node_ip,create_time) values ('" + uuid.NewV4().String() + "','" + uuid.NewV4().String() + "','" + lc.ServerName + "','" + config.Server.AnalyServerId + "','" + (serverIp + ":" + strconv.Itoa(dbSync.DefaultBindPort)) + "','" + fmtTimeStr + "')"
      if err := models.GetDB().Exec(sql).Error; err != nil {
         return false
      }
      return true
   }
   return false
}
//加入集群后清空本地的同步库数据
func syncTableDataFromCluster(joinArg *vo.ClusterJoinVo) bool {
   var lc models.LocalConfig
   e := lc.Select()
   nodeName := ""
   if e == nil && lc.ServerName != "" {
      nodeName = lc.ServerName
   }
   var err error
   db := models.GetDB()
   db.LogMode(false)
   defer db.LogMode(true)
   tx := db.Begin()
   defer func() {
      if err != nil && tx != nil {
         tx.Rollback()
      }
   }()
   //0.关闭reference
   tx.Exec("PRAGMA foreign_keys=OFF")
   //1.删除本地的同步库数据
   for _, t := range serf.SyncTables {
      delSql := ""
      if t == "dbtables" {
         delSql = "delete from " + t + " where (analyServerId='' or analyServerId=NULL)"
      } else if t == "dbtablepersons" {
         delSql = "delete from " + t + " where tableId in (select id from dbtables where (analyServerId='' or analyServerId=NULL))"
      } else {
         delSql = "delete from " + t + ""
      }
      err = tx.Exec(delSql).Error
      if err != nil {
         return false
      }
   }
   //2.拉取集群内的同步库数据到本地数据库表中
   var dumpSqls *[]string
   dumpSqls, err = serf.GetTableDataFromCluster(serf.Agent, joinArg.ClusterId, serf.SyncTables, 20*time.Second)
   if dumpSqls != nil && len(*dumpSqls) > 0 {
      for _, sqlStr := range *dumpSqls {
         logger.Debug("gorm exec dumpSql:", sqlStr)
         if err = tx.Exec(sqlStr).Error; err != nil {
            return false
         }
      }
   } else {
      logger.Debug("get cluster db data err, dumpSqls is nil,err:", err)
      err = errors.New("dumpSqls is nil")
      return false
   }
   logger.Debug("成功添加当前节点到集群节点中")
   //3.将本节点加入到节点列表中
   timeUnix := time.Now().Unix()
   fmtTimeStr := time.Unix(timeUnix, 0).Format("2006-01-02 15:04:05")
   serverIp, _, e1 := util.GetLocalIP(config.Server.NetworkAdapter)
   if e1 != nil || serverIp == "" {
      err = errors.New("get serverIp err")
      return false
   }
   logger.Debug("cur Node serverIp:", serverIp, "nodeName:", nodeName)
   if nodeName == "" {
      nodeName = serverIp
   }
   //添加本身节点,此处修复bug,加入集群的节点退出集群后重新加入,会报id冲突
   var sql = "insert into cluster_node (id,cluster_id,node_name,node_id,node_ip,create_time,isDelete,device_type) select '" + config.Server.AnalyServerId + "','" + joinArg.ClusterId + "','" + nodeName + "','" + config.Server.AnalyServerId + "','" + (serverIp + ":" + strconv.Itoa(dbSync.DefaultBindPort)) + "','" + fmtTimeStr + "',0,'" + config.Server.DeviceType + "' where not exists (select 1 from cluster_node where id='" + config.Server.AnalyServerId + "')"
   if err = tx.Exec(sql).Error; err != nil {
      logger.Debug("add cur node err:", err)
      return false
   }
   if err = tx.Exec("update cluster_node set isDelete=0 where id='" + config.Server.AnalyServerId + "'").Error; err != nil {
      logger.Debug("update isDelete err:", err)
      return false
   }
   //4.开启reference
   tx.Exec("PRAGMA foreign_keys=ON")
   tx.Commit()
   serf.SyncSql([]string{sql})
   return true
}
func (s ClusterService) AddDbMessage(pbMsg *protomsg.DbChangeMessage) {
   bts, _ := json.Marshal(*pbMsg)
   s.bk.Publish(ProcName, bts)
}
func (s ClusterService) FindIpByNode(nodeId string) (string, error) {
   var lc models.Node
   return lc.FindIpByNode(nodeId)
}
package service
import (
   "context"
   "encoding/json"
   "strconv"
   sysSync "sync"
   "time"
   "vamicro/config"
   "vamicro/extend/util"
   "vamicro/system-service/models"
   "vamicro/system-service/serf"
   "vamicro/system-service/sys"
   "vamicro/system-service/vo"
   "basic.com/pubsub/protomsg.git"
   dbSync "basic.com/syncdb.git"
   "basic.com/valib/bhomeclient.git"
   "basic.com/valib/logger.git"
   "github.com/hashicorp/memberlist"
   "github.com/pkg/errors"
   uuid "github.com/satori/go.uuid"
)
type ClusterService struct {
   bk bhomeclient.Broker
}
func NewClusterService(broker bhomeclient.Broker) *ClusterService {
   return &ClusterService{
      bk: broker,
   }
}
var clusterSearchKey = "clusterSearchKey"
var searchMap = make(map[string]*memberlist.Memberlist, 0)
var lock sysSync.Mutex
func set2SearchMap(memList *memberlist.Memberlist) {
   lock.Lock()
   defer lock.Unlock()
   searchMap[clusterSearchKey] = memList
}
func getFromSearchMap() (*memberlist.Memberlist, bool) {
   lock.Lock()
   defer lock.Unlock()
   if v, ok := searchMap[clusterSearchKey]; ok {
      return v, true
   } else {
      return nil, false
   }
}
func clearSearchResult(ml *memberlist.Memberlist) {
   time.Sleep(10 * time.Second)
   lock.Lock()
   defer lock.Unlock()
   if _, ok := searchMap[clusterSearchKey]; ok {
      dbSync.CloseSearchNode(ml)
      delete(searchMap, clusterSearchKey)
   }
}
func deleteFromSearchMap() {
   lock.Lock()
   defer lock.Unlock()
   if _, ok := searchMap[clusterSearchKey]; ok {
      delete(searchMap, clusterSearchKey)
   }
}
func (s ClusterService) FindAll() (arr []models.Cluster, err error) {
   var clusterE models.Cluster
   return clusterE.FindAll()
}
func (s ClusterService) FindNodesByClusterId(clusterId string) (nodes []models.Node, err error) {
   var nodeE models.Node
   return nodeE.FindNodesByClusterId(clusterId)
}
func (s ClusterService) FindAllClusterNodes() map[string]models.Node {
   m := make(map[string]models.Node, 0)
   var clusterE models.Cluster
   var nodeE models.Node
   arr, err := clusterE.FindAll()
   if err == nil && arr != nil {
      for _, clu := range arr {
         nodes, e := nodeE.FindNodesByClusterId(clu.ClusterId)
         if e == nil && nodes != nil {
            for _, n := range nodes {
               m[n.Id] = n
            }
         }
      }
   }
   return m
}
//根据集群名称和密码创建集群
func (s ClusterService) Create(clusterName string, pwd string, virtualIp string) (bool, string) {
   devTyp := GetDevType(config.Server.DeviceType)
   if devTyp != DevType_Storage && devTyp != DevType_Analysis_Storage {
      return false, "只有分析和存储节点才能创建集群"
   }
   clusterId := uuid.NewV4().String()
   pwd = config.ClusterSet.PwdPre + pwd
   var clusterE = models.Cluster{
      ClusterId:   clusterId,
      ClusterName: clusterName,
      Password:    pwd,
      VirtualIp:   virtualIp,
   }
   arr, err := clusterE.FindAll()
   if err == nil && (arr == nil || len(arr) == 0) {
      b := clusterE.Create()
      if b {
         serf.InitAgent(context.Background())
         //创建es集群
         if _, err := CreateOriginalCluster("/opt/elasticsearch/config", "/opt/vasystem/script", "/opt/vasystem/indexInit"); err != nil {
            //创建失败,则serf集群也要推出
            s.Leave(false)
            return false, err.Error()
         }
         //创建weedfs集群
         if _, err = CreateWeedfsServer("/opt/vasystem/script"); err != nil {
            s.Leave(false)
            return false, err.Error()
         }
         chMsg := protomsg.DbChangeMessage{
            Id:     clusterE.ClusterId,
            Table:  protomsg.TableChanged_T_Cluster,
            Action: protomsg.DbAction_Insert,
            Info:   "",
         }
         s.AddDbMessage(&chMsg)
         return true, clusterId
      }
   } else {
      if s.UpdateClusterName(clusterName, virtualIp) {
         return true, ""
      }
   }
   return false, ""
}
func (s ClusterService) SearchByPwd(pwd string) (err error) {
   _, isSearching := getFromSearchMap()
   if isSearching {
      return errors.New("other is searching,please wait")
   }
   pwd = config.ClusterSet.PwdPre + pwd
   ml, e := dbSync.CreateSearchNode(pwd)
   if e != nil {
      logger.Debug("CreateSearchNode err:", e)
      return errors.New("createSearchNode err")
   }
   set2SearchMap(ml)
   go clearSearchResult(ml)
   return nil
}
func (s ClusterService) SearchNodes() map[string]dbSync.NodeInfo {
   return dbSync.GetSearchNodes()
}
func (s ClusterService) StopSearching() bool {
   ml, _ := getFromSearchMap()
   if ml != nil {
      dbSync.CloseSearchNode(ml)
      deleteFromSearchMap()
      return true
   } else {
      return true
   }
}
//加入集群
func (s ClusterService) JoinCluster(joinArg *vo.ClusterJoinVo) (bool, error) {
   start := time.Now()
   if config.Server.AnalyServerId == "" {
      logger.Debug("AddCluster config serverId is nil")
      return false, errors.New("serverId配置有误")
   }
   targetIp := ""
   var joinIps []string
   for _, ipStr := range joinArg.NodeIps { //输入ip加入的情况,NodeIps里面元素只是ip
      if ip, b := util.IpCheck(ipStr); b {
         targetIp = ip
         joinIps = append(joinIps, ip+":30190")
      }
   }
   if len(joinIps) == 0 {
      logger.Debug("AddCluster JoinCluster len(joinIps)=0")
      return false, errors.New("加入的目标ip不能为空")
   }
   logger.Debug("AddCluster joinIps:", joinIps)
   joinArg.Password = config.ClusterSet.PwdPre + joinArg.Password
   conf := dbSync.DefaultConfig()
   localIp, _, _ := sys.GetLocalIP(config.Server.NetworkAdapter)
   if localIp != "" {
      conf.BindAddr = localIp
   }
   agent, err := dbSync.Init(joinArg.ClusterId, joinArg.Password, config.Server.AnalyServerId, joinIps, config.ClusterSet.SerfSnapShotPath, conf)
   if err == nil && agent != nil { //加入成功
      logger.Debug("AddCluster dbSync.Init success")
      serf.Agent = agent
      t := time.Now()
      syncB := syncTableDataFromCluster(joinArg)
      logger.Debugf("AddCluster syncTableDataFromCluster time=%v, syncB=%v", time.Since(t), syncB)
      if syncB {
         devTyp := GetDevType(config.Server.DeviceType)
         if devTyp == DevType_Analysis_Storage || devTyp == DevType_Storage { //只有能启动ES的节点才能加入ES
            go func() {
               //添加ES节点
               if _, err := AddCluster("/opt/elasticsearch/config", "/opt/vasystem/script", targetIp, "9200", config.EsConfig.StorePath); err != nil {
                  s.Leave(false)
                  return
               }
               //添加weedfs节点
               if _, err := AddWeedfsServer("/opt/vasystem/script", targetIp, config.StorageConf.VolumePath); err != nil {
                  s.Leave(false)
                  return
               }
            }()
         }
         //需要重新初始化本地比对进程
         go serf.ReInitDbPersonCompareData()
         chMsg := protomsg.DbChangeMessage{
            Id:     joinArg.ClusterId,
            Table:  protomsg.TableChanged_T_Cluster,
            Action: protomsg.DbAction_Insert,
            Info:   "",
         }
         s.AddDbMessage(&chMsg)
         logger.Debugf("AddCluster 加入集群成功 time=%v", time.Since(start))
         return true, nil
      } else {
         logger.Debug("AddCluster syncTableDataFromCluster fail")
         if agent != nil {
            agent.Leave()
            err = agent.Shutdown()
            if err != nil {
               logger.Debug("AddCluster agent shutdown err:", err)
            }
         }
      }
   } else {
      logger.Debug("AddCluster dbSync.Init err:", err)
      if agent != nil {
         agent.Leave()
         err = agent.Shutdown()
         logger.Debugf("AddCluster dbSync.Init err,shutdown, err=%v", err)
      }
   }
   logger.Debugf("AddCluster 加入集群失败, targetIp=%v, time=%v", targetIp, time.Since(start))
   return false, errors.New("加入集群失败")
}
const (
   DevType_Analysis         = "analysis"         //只做分析
   DevType_Storage          = "storage"          //只做存储
   DevType_Analysis_Storage = "analysis_storage" //分析加存储
   DevType_Other            = "other"            //其他设备类型,eg:应用
)
//获取本机类型,只进行分析、分析存储一体、只存储、其他。。
func GetDevType(dt string) string {
   if dt != "" {
      if len(dt) >= 4 {
         s := dt[2:4]
         if s == "01" {
            return DevType_Analysis
         } else if s == "02" {
            return DevType_Storage
         } else if s == "03" {
            return DevType_Analysis_Storage
         }
      }
   }
   return DevType_Other
}
func (s ClusterService) UpdateClusterName(clusterName string, virtualIp string) bool {
   var cE models.Cluster
   arr, e := cE.FindAll()
   if e == nil && arr != nil && len(arr) > 0 {
      if cE.UpdateClusterName(clusterName, virtualIp) {
         chMsg := protomsg.DbChangeMessage{
            Id:     "",
            Table:  protomsg.TableChanged_T_Cluster,
            Action: protomsg.DbAction_Update,
            Info:   virtualIp,
         }
         s.AddDbMessage(&chMsg)
         if arr[0].VirtualIp != virtualIp { //漂移ip有变化
            if serf.Agent != nil {
               b, _ := json.Marshal(&protomsg.DbChangeMessage{
                  Id:     "",
                  Table:  protomsg.TableChanged_T_Cluster,
                  Action: protomsg.DbAction_Update,
                  Info:   virtualIp,
               })
               err := serf.Agent.UserEvent(serf.UserEventSyncVirtualIp, b, false)
               if err != nil {
                  logger.Error("UserEventSyncVirtualIp err:", err)
               }
            }
         }
         return true
      }
   }
   return false
}
func (s ClusterService) Leave(isDel bool) (bool, error) {
   start := time.Now()
   devType := GetDevType(config.Server.DeviceType)
   logger.Debugf("Leave GetDevType=%v, isDel=%v", devType, isDel)
   if devType == DevType_Analysis_Storage || devType == DevType_Storage { //只有分析和存储才有ES集群
      go func() {
         //退出weedfs节点
         if _, err := ExitWeedfsServer("/opt/vasystem/script", isDel, config.StorageConf.VolumePath); err != nil {
            logger.Error("Leave ExitWeedfsServer err:", err)
            return
         }
         //退出es节点
         if _, err := ExitCluster("/opt/elasticsearch/config", "/opt/vasystem/script"); err != nil {
            logger.Error("Leave ExitCluster es cluster err:", err)
            return
         }
      }()
   }
   var err error
   tx := models.GetDB().Begin()
   defer func() {
      if err != nil && tx != nil {
         tx.Rollback()
      }
   }()
   if serf.Agent != nil {
      t := time.Now()
      serf.Agent.Stop()
      serf.Agent = nil
      err = tx.Exec("delete from cluster_node").Error
      if err != nil {
         logger.Error("Leave delete from cluster_node err:", err)
         return false, err
      }
      err = tx.Exec("delete from cluster").Error
      if err != nil {
         logger.Error("Leave delete from cluster err:", err)
         return false, err
      }
      tx.Commit()
      logger.Debug("Leave delete cluster_node and cluster from db")
      chMsg := protomsg.DbChangeMessage{
         Id:     "",
         Table:  protomsg.TableChanged_T_Cluster,
         Action: protomsg.DbAction_Delete,
         Info:   "",
      }
      logger.Debugf("Leave delete db time=%v", time.Since(t))
      tm := time.Now()
      s.AddDbMessage(&chMsg)
      logger.Debugf("Leave AddDbMessage time=%v", time.Since(tm))
   }
   logger.Debugf("Leave success time=%v", time.Since(start))
   return true, nil
}
func (s ClusterService) TestSyncSql() bool {
   var lc models.LocalConfig
   lc.Select()
   timeUnix := time.Now().Unix()
   fmtTimeStr := time.Unix(timeUnix, 0).Format("2006-01-02 15:04:05")
   serverIp, _, e := util.GetLocalIP(config.Server.NetworkAdapter)
   if e == nil && serverIp != "" {
      sql := "insert into cluster_node (id,cluster_id,node_name,node_id,node_ip,create_time) values ('" + uuid.NewV4().String() + "','" + uuid.NewV4().String() + "','" + lc.ServerName + "','" + config.Server.AnalyServerId + "','" + (serverIp + ":" + strconv.Itoa(dbSync.DefaultBindPort)) + "','" + fmtTimeStr + "')"
      if err := models.GetDB().Exec(sql).Error; err != nil {
         return false
      }
      return true
   }
   return false
}
//加入集群后清空本地的同步库数据
func syncTableDataFromCluster(joinArg *vo.ClusterJoinVo) bool {
   var lc models.LocalConfig
   e := lc.Select()
   nodeName := ""
   if e == nil && lc.ServerName != "" {
      nodeName = lc.ServerName
   }
   var err error
   db := models.GetDB()
   db.LogMode(false)
   defer db.LogMode(true)
   tx := db.Begin()
   defer func() {
      if err != nil && tx != nil {
         tx.Rollback()
      }
   }()
   //0.关闭reference
   tx.Exec("PRAGMA foreign_keys=OFF")
   //1.删除本地的同步库数据
   for _, t := range serf.SyncTables {
      delSql := ""
      if t == "dbtables" {
         delSql = "delete from " + t + " where (analyServerId='' or analyServerId=NULL)"
      } else if t == "dbtablepersons" {
         delSql = "delete from " + t + " where tableId in (select id from dbtables where (analyServerId='' or analyServerId=NULL))"
      } else {
         delSql = "delete from " + t + ""
      }
      err = tx.Exec(delSql).Error
      if err != nil {
         return false
      }
   }
   //2.拉取集群内的同步库数据到本地数据库表中
   var dumpSqls *[]string
   dumpSqls, err = serf.GetTableDataFromCluster(serf.Agent, joinArg.ClusterId, serf.SyncTables, 20*time.Second)
   if dumpSqls != nil && len(*dumpSqls) > 0 {
      for _, sqlStr := range *dumpSqls {
         logger.Debug("gorm exec dumpSql:", sqlStr)
         if err = tx.Exec(sqlStr).Error; err != nil {
            return false
         }
      }
   } else {
      logger.Debug("get cluster db data err, dumpSqls is nil,err:", err)
      err = errors.New("dumpSqls is nil")
      return false
   }
   logger.Debug("成功添加当前节点到集群节点中")
   //3.将本节点加入到节点列表中
   timeUnix := time.Now().Unix()
   fmtTimeStr := time.Unix(timeUnix, 0).Format("2006-01-02 15:04:05")
   serverIp, _, e1 := util.GetLocalIP(config.Server.NetworkAdapter)
   if e1 != nil || serverIp == "" {
      err = errors.New("get serverIp err")
      return false
   }
   logger.Debug("cur Node serverIp:", serverIp, "nodeName:", nodeName)
   if nodeName == "" {
      nodeName = serverIp
   }
   //添加本身节点,此处修复bug,加入集群的节点退出集群后重新加入,会报id冲突
   var sql = "insert into cluster_node (id,cluster_id,node_name,node_id,node_ip,create_time,isDelete,device_type) select '" + config.Server.AnalyServerId + "','" + joinArg.ClusterId + "','" + nodeName + "','" + config.Server.AnalyServerId + "','" + (serverIp + ":" + strconv.Itoa(dbSync.DefaultBindPort)) + "','" + fmtTimeStr + "',0,'" + config.Server.DeviceType + "' where not exists (select 1 from cluster_node where id='" + config.Server.AnalyServerId + "')"
   if err = tx.Exec(sql).Error; err != nil {
      logger.Debug("add cur node err:", err)
      return false
   }
   if err = tx.Exec("update cluster_node set isDelete=0 where id='" + config.Server.AnalyServerId + "'").Error; err != nil {
      logger.Debug("update isDelete err:", err)
      return false
   }
   //4.开启reference
   tx.Exec("PRAGMA foreign_keys=ON")
   tx.Commit()
   serf.SyncSql([]string{sql})
   return true
}
func (s ClusterService) AddDbMessage(pbMsg *protomsg.DbChangeMessage) {
   bts, _ := json.Marshal(*pbMsg)
   s.bk.Publish(ProcName, bts)
}
func (s ClusterService) FindIpByNode(nodeId string) (string, error) {
   var lc models.Node
   return lc.FindIpByNode(nodeId)
}