| | |
| | | package service |
| | | |
| | | import ( |
| | | "context" |
| | | "encoding/json" |
| | | "strconv" |
| | | sysSync "sync" |
| | | "time" |
| | | "vamicro/config" |
| | | "vamicro/extend/util" |
| | | "vamicro/system-service/models" |
| | | "vamicro/system-service/serf" |
| | | "vamicro/system-service/sys" |
| | | "vamicro/system-service/vo" |
| | | |
| | | "basic.com/pubsub/protomsg.git" |
| | | dbSync "basic.com/syncdb.git" |
| | | "basic.com/valib/bhomeclient.git" |
| | | "basic.com/valib/logger.git" |
| | | "github.com/hashicorp/memberlist" |
| | | "github.com/pkg/errors" |
| | | uuid "github.com/satori/go.uuid" |
| | | ) |
| | | |
| | | type ClusterService struct { |
| | | bk bhomeclient.Broker |
| | | } |
| | | |
| | | func NewClusterService(broker bhomeclient.Broker) *ClusterService { |
| | | return &ClusterService{ |
| | | bk: broker, |
| | | } |
| | | } |
| | | |
| | | var clusterSearchKey = "clusterSearchKey" |
| | | var searchMap = make(map[string]*memberlist.Memberlist, 0) |
| | | var lock sysSync.Mutex |
| | | |
| | | func set2SearchMap(memList *memberlist.Memberlist) { |
| | | lock.Lock() |
| | | defer lock.Unlock() |
| | | searchMap[clusterSearchKey] = memList |
| | | } |
| | | |
| | | func getFromSearchMap() (*memberlist.Memberlist, bool) { |
| | | lock.Lock() |
| | | defer lock.Unlock() |
| | | if v, ok := searchMap[clusterSearchKey]; ok { |
| | | return v, true |
| | | } else { |
| | | return nil, false |
| | | } |
| | | } |
| | | |
| | | func clearSearchResult(ml *memberlist.Memberlist) { |
| | | time.Sleep(10 * time.Second) |
| | | lock.Lock() |
| | | defer lock.Unlock() |
| | | if _, ok := searchMap[clusterSearchKey]; ok { |
| | | dbSync.CloseSearchNode(ml) |
| | | delete(searchMap, clusterSearchKey) |
| | | } |
| | | } |
| | | |
| | | func deleteFromSearchMap() { |
| | | lock.Lock() |
| | | defer lock.Unlock() |
| | | if _, ok := searchMap[clusterSearchKey]; ok { |
| | | delete(searchMap, clusterSearchKey) |
| | | } |
| | | } |
| | | |
| | | func (s ClusterService) FindAll() (arr []models.Cluster, err error) { |
| | | var clusterE models.Cluster |
| | | return clusterE.FindAll() |
| | | } |
| | | |
| | | func (s ClusterService) FindNodesByClusterId(clusterId string) (nodes []models.Node, err error) { |
| | | var nodeE models.Node |
| | | return nodeE.FindNodesByClusterId(clusterId) |
| | | } |
| | | |
| | | func (s ClusterService) FindAllClusterNodes() map[string]models.Node { |
| | | m := make(map[string]models.Node, 0) |
| | | var clusterE models.Cluster |
| | | var nodeE models.Node |
| | | arr, err := clusterE.FindAll() |
| | | if err == nil && arr != nil { |
| | | for _, clu := range arr { |
| | | nodes, e := nodeE.FindNodesByClusterId(clu.ClusterId) |
| | | if e == nil && nodes != nil { |
| | | for _, n := range nodes { |
| | | m[n.Id] = n |
| | | } |
| | | } |
| | | } |
| | | } |
| | | return m |
| | | } |
| | | |
| | | //根据集群名称和密码创建集群 |
| | | func (s ClusterService) Create(clusterName string, pwd string, virtualIp string) (bool, string) { |
| | | devTyp := GetDevType(config.Server.DeviceType) |
| | | if devTyp != DevType_Storage && devTyp != DevType_Analysis_Storage { |
| | | return false, "只有分析和存储节点才能创建集群" |
| | | } |
| | | clusterId := uuid.NewV4().String() |
| | | pwd = config.ClusterSet.PwdPre + pwd |
| | | var clusterE = models.Cluster{ |
| | | ClusterId: clusterId, |
| | | ClusterName: clusterName, |
| | | Password: pwd, |
| | | VirtualIp: virtualIp, |
| | | } |
| | | |
| | | arr, err := clusterE.FindAll() |
| | | if err == nil && (arr == nil || len(arr) == 0) { |
| | | |
| | | b := clusterE.Create() |
| | | if b { |
| | | serf.InitAgent(context.Background()) |
| | | |
| | | //创建es集群 |
| | | if _, err := CreateOriginalCluster("/opt/elasticsearch/config", "/opt/vasystem/script", "/opt/vasystem/indexInit"); err != nil { |
| | | //创建失败,则serf集群也要推出 |
| | | s.Leave(false) |
| | | return false, err.Error() |
| | | } |
| | | |
| | | //创建weedfs集群 |
| | | if _, err = CreateWeedfsServer("/opt/vasystem/script"); err != nil { |
| | | s.Leave(false) |
| | | return false, err.Error() |
| | | } |
| | | |
| | | chMsg := protomsg.DbChangeMessage{ |
| | | Id: clusterE.ClusterId, |
| | | Table: protomsg.TableChanged_T_Cluster, |
| | | Action: protomsg.DbAction_Insert, |
| | | Info: "", |
| | | } |
| | | s.AddDbMessage(&chMsg) |
| | | |
| | | return true, clusterId |
| | | } |
| | | } else { |
| | | if s.UpdateClusterName(clusterName, virtualIp) { |
| | | return true, "" |
| | | } |
| | | } |
| | | |
| | | return false, "" |
| | | } |
| | | |
| | | func (s ClusterService) SearchByPwd(pwd string) (err error) { |
| | | _, isSearching := getFromSearchMap() |
| | | if isSearching { |
| | | return errors.New("other is searching,please wait") |
| | | } |
| | | pwd = config.ClusterSet.PwdPre + pwd |
| | | ml, e := dbSync.CreateSearchNode(pwd) |
| | | if e != nil { |
| | | logger.Debug("CreateSearchNode err:", e) |
| | | return errors.New("createSearchNode err") |
| | | } |
| | | |
| | | set2SearchMap(ml) |
| | | |
| | | go clearSearchResult(ml) |
| | | |
| | | return nil |
| | | } |
| | | |
| | | func (s ClusterService) SearchNodes() map[string]dbSync.NodeInfo { |
| | | return dbSync.GetSearchNodes() |
| | | } |
| | | |
| | | func (s ClusterService) StopSearching() bool { |
| | | ml, _ := getFromSearchMap() |
| | | if ml != nil { |
| | | dbSync.CloseSearchNode(ml) |
| | | deleteFromSearchMap() |
| | | return true |
| | | } else { |
| | | return true |
| | | } |
| | | } |
| | | |
| | | //加入集群 |
| | | func (s ClusterService) JoinCluster(joinArg *vo.ClusterJoinVo) (bool, error) { |
| | | start := time.Now() |
| | | if config.Server.AnalyServerId == "" { |
| | | logger.Debug("AddCluster config serverId is nil") |
| | | return false, errors.New("serverId配置有误") |
| | | } |
| | | targetIp := "" |
| | | var joinIps []string |
| | | for _, ipStr := range joinArg.NodeIps { //输入ip加入的情况,NodeIps里面元素只是ip |
| | | if ip, b := util.IpCheck(ipStr); b { |
| | | targetIp = ip |
| | | joinIps = append(joinIps, ip+":30190") |
| | | } |
| | | } |
| | | if len(joinIps) == 0 { |
| | | logger.Debug("AddCluster JoinCluster len(joinIps)=0") |
| | | return false, errors.New("加入的目标ip不能为空") |
| | | } |
| | | logger.Debug("AddCluster joinIps:", joinIps) |
| | | joinArg.Password = config.ClusterSet.PwdPre + joinArg.Password |
| | | |
| | | conf := dbSync.DefaultConfig() |
| | | localIp, _, _ := sys.GetLocalIP(config.Server.NetworkAdapter) |
| | | |
| | | if localIp != "" { |
| | | conf.BindAddr = localIp |
| | | } |
| | | |
| | | agent, err := dbSync.Init(joinArg.ClusterId, joinArg.Password, config.Server.AnalyServerId, joinIps, config.ClusterSet.SerfSnapShotPath, conf) |
| | | if err == nil && agent != nil { //加入成功 |
| | | logger.Debug("AddCluster dbSync.Init success") |
| | | serf.Agent = agent |
| | | |
| | | t := time.Now() |
| | | syncB := syncTableDataFromCluster(joinArg) |
| | | logger.Debugf("AddCluster syncTableDataFromCluster time=%v, syncB=%v", time.Since(t), syncB) |
| | | if syncB { |
| | | devTyp := GetDevType(config.Server.DeviceType) |
| | | if devTyp == DevType_Analysis_Storage || devTyp == DevType_Storage { //只有能启动ES的节点才能加入ES |
| | | go func() { |
| | | //添加ES节点 |
| | | if _, err := AddCluster("/opt/elasticsearch/config", "/opt/vasystem/script", targetIp, "9200", config.EsConfig.StorePath); err != nil { |
| | | s.Leave(false) |
| | | return |
| | | } |
| | | |
| | | //添加weedfs节点 |
| | | if _, err := AddWeedfsServer("/opt/vasystem/script", targetIp, config.StorageConf.VolumePath); err != nil { |
| | | s.Leave(false) |
| | | return |
| | | } |
| | | }() |
| | | } |
| | | |
| | | //需要重新初始化本地比对进程 |
| | | go serf.ReInitDbPersonCompareData() |
| | | chMsg := protomsg.DbChangeMessage{ |
| | | Id: joinArg.ClusterId, |
| | | Table: protomsg.TableChanged_T_Cluster, |
| | | Action: protomsg.DbAction_Insert, |
| | | Info: "", |
| | | } |
| | | s.AddDbMessage(&chMsg) |
| | | logger.Debugf("AddCluster 加入集群成功 time=%v", time.Since(start)) |
| | | return true, nil |
| | | } else { |
| | | logger.Debug("AddCluster syncTableDataFromCluster fail") |
| | | if agent != nil { |
| | | agent.Leave() |
| | | err = agent.Shutdown() |
| | | if err != nil { |
| | | logger.Debug("AddCluster agent shutdown err:", err) |
| | | } |
| | | } |
| | | } |
| | | } else { |
| | | logger.Debug("AddCluster dbSync.Init err:", err) |
| | | if agent != nil { |
| | | agent.Leave() |
| | | err = agent.Shutdown() |
| | | logger.Debugf("AddCluster dbSync.Init err,shutdown, err=%v", err) |
| | | |
| | | } |
| | | } |
| | | logger.Debugf("AddCluster 加入集群失败, targetIp=%v, time=%v", targetIp, time.Since(start)) |
| | | return false, errors.New("加入集群失败") |
| | | } |
| | | |
| | | const ( |
| | | DevType_Analysis = "analysis" //只做分析 |
| | | DevType_Storage = "storage" //只做存储 |
| | | DevType_Analysis_Storage = "analysis_storage" //分析加存储 |
| | | DevType_Other = "other" //其他设备类型,eg:应用 |
| | | ) |
| | | |
| | | //获取本机类型,只进行分析、分析存储一体、只存储、其他。。 |
| | | func GetDevType(dt string) string { |
| | | if dt != "" { |
| | | if len(dt) >= 4 { |
| | | s := dt[2:4] |
| | | if s == "01" { |
| | | return DevType_Analysis |
| | | } else if s == "02" { |
| | | return DevType_Storage |
| | | } else if s == "03" { |
| | | return DevType_Analysis_Storage |
| | | } |
| | | } |
| | | } |
| | | return DevType_Other |
| | | } |
| | | |
| | | func (s ClusterService) UpdateClusterName(clusterName string, virtualIp string) bool { |
| | | var cE models.Cluster |
| | | arr, e := cE.FindAll() |
| | | if e == nil && arr != nil && len(arr) > 0 { |
| | | if cE.UpdateClusterName(clusterName, virtualIp) { |
| | | chMsg := protomsg.DbChangeMessage{ |
| | | Id: "", |
| | | Table: protomsg.TableChanged_T_Cluster, |
| | | Action: protomsg.DbAction_Update, |
| | | Info: virtualIp, |
| | | } |
| | | s.AddDbMessage(&chMsg) |
| | | |
| | | if arr[0].VirtualIp != virtualIp { //漂移ip有变化 |
| | | if serf.Agent != nil { |
| | | b, _ := json.Marshal(&protomsg.DbChangeMessage{ |
| | | Id: "", |
| | | Table: protomsg.TableChanged_T_Cluster, |
| | | Action: protomsg.DbAction_Update, |
| | | Info: virtualIp, |
| | | }) |
| | | err := serf.Agent.UserEvent(serf.UserEventSyncVirtualIp, b, false) |
| | | if err != nil { |
| | | logger.Error("UserEventSyncVirtualIp err:", err) |
| | | } |
| | | } |
| | | } |
| | | |
| | | return true |
| | | } |
| | | } |
| | | |
| | | return false |
| | | } |
| | | |
| | | func (s ClusterService) Leave(isDel bool) (bool, error) { |
| | | start := time.Now() |
| | | devType := GetDevType(config.Server.DeviceType) |
| | | logger.Debugf("Leave GetDevType=%v, isDel=%v", devType, isDel) |
| | | |
| | | if devType == DevType_Analysis_Storage || devType == DevType_Storage { //只有分析和存储才有ES集群 |
| | | go func() { |
| | | //退出weedfs节点 |
| | | if _, err := ExitWeedfsServer("/opt/vasystem/script", isDel, config.StorageConf.VolumePath); err != nil { |
| | | logger.Error("Leave ExitWeedfsServer err:", err) |
| | | return |
| | | } |
| | | //退出es节点 |
| | | if _, err := ExitCluster("/opt/elasticsearch/config", "/opt/vasystem/script"); err != nil { |
| | | logger.Error("Leave ExitCluster es cluster err:", err) |
| | | return |
| | | } |
| | | }() |
| | | } |
| | | |
| | | var err error |
| | | tx := models.GetDB().Begin() |
| | | defer func() { |
| | | if err != nil && tx != nil { |
| | | tx.Rollback() |
| | | } |
| | | }() |
| | | |
| | | if serf.Agent != nil { |
| | | t := time.Now() |
| | | serf.Agent.Stop() |
| | | serf.Agent = nil |
| | | err = tx.Exec("delete from cluster_node").Error |
| | | if err != nil { |
| | | logger.Error("Leave delete from cluster_node err:", err) |
| | | return false, err |
| | | } |
| | | err = tx.Exec("delete from cluster").Error |
| | | if err != nil { |
| | | logger.Error("Leave delete from cluster err:", err) |
| | | return false, err |
| | | } |
| | | tx.Commit() |
| | | logger.Debug("Leave delete cluster_node and cluster from db") |
| | | chMsg := protomsg.DbChangeMessage{ |
| | | Id: "", |
| | | Table: protomsg.TableChanged_T_Cluster, |
| | | Action: protomsg.DbAction_Delete, |
| | | Info: "", |
| | | } |
| | | logger.Debugf("Leave delete db time=%v", time.Since(t)) |
| | | tm := time.Now() |
| | | s.AddDbMessage(&chMsg) |
| | | logger.Debugf("Leave AddDbMessage time=%v", time.Since(tm)) |
| | | } |
| | | |
| | | logger.Debugf("Leave success time=%v", time.Since(start)) |
| | | return true, nil |
| | | } |
| | | |
| | | func (s ClusterService) TestSyncSql() bool { |
| | | var lc models.LocalConfig |
| | | lc.Select() |
| | | |
| | | timeUnix := time.Now().Unix() |
| | | fmtTimeStr := time.Unix(timeUnix, 0).Format("2006-01-02 15:04:05") |
| | | serverIp, _, e := util.GetLocalIP(config.Server.NetworkAdapter) |
| | | if e == nil && serverIp != "" { |
| | | sql := "insert into cluster_node (id,cluster_id,node_name,node_id,node_ip,create_time) values ('" + uuid.NewV4().String() + "','" + uuid.NewV4().String() + "','" + lc.ServerName + "','" + config.Server.AnalyServerId + "','" + (serverIp + ":" + strconv.Itoa(dbSync.DefaultBindPort)) + "','" + fmtTimeStr + "')" |
| | | if err := models.GetDB().Exec(sql).Error; err != nil { |
| | | return false |
| | | } |
| | | return true |
| | | } |
| | | return false |
| | | } |
| | | |
| | | //加入集群后清空本地的同步库数据 |
| | | func syncTableDataFromCluster(joinArg *vo.ClusterJoinVo) bool { |
| | | var lc models.LocalConfig |
| | | e := lc.Select() |
| | | nodeName := "" |
| | | if e == nil && lc.ServerName != "" { |
| | | nodeName = lc.ServerName |
| | | } |
| | | |
| | | var err error |
| | | db := models.GetDB() |
| | | db.LogMode(false) |
| | | defer db.LogMode(true) |
| | | tx := db.Begin() |
| | | defer func() { |
| | | if err != nil && tx != nil { |
| | | tx.Rollback() |
| | | } |
| | | }() |
| | | //0.关闭reference |
| | | tx.Exec("PRAGMA foreign_keys=OFF") |
| | | //1.删除本地的同步库数据 |
| | | for _, t := range serf.SyncTables { |
| | | delSql := "" |
| | | if t == "dbtables" { |
| | | delSql = "delete from " + t + " where (analyServerId='' or analyServerId=NULL)" |
| | | } else if t == "dbtablepersons" { |
| | | delSql = "delete from " + t + " where tableId in (select id from dbtables where (analyServerId='' or analyServerId=NULL))" |
| | | } else { |
| | | delSql = "delete from " + t + "" |
| | | } |
| | | err = tx.Exec(delSql).Error |
| | | if err != nil { |
| | | return false |
| | | } |
| | | } |
| | | //2.拉取集群内的同步库数据到本地数据库表中 |
| | | var dumpSqls *[]string |
| | | dumpSqls, err = serf.GetTableDataFromCluster(serf.Agent, joinArg.ClusterId, serf.SyncTables, 20*time.Second) |
| | | if dumpSqls != nil && len(*dumpSqls) > 0 { |
| | | for _, sqlStr := range *dumpSqls { |
| | | logger.Debug("gorm exec dumpSql:", sqlStr) |
| | | if err = tx.Exec(sqlStr).Error; err != nil { |
| | | return false |
| | | } |
| | | } |
| | | } else { |
| | | logger.Debug("get cluster db data err, dumpSqls is nil,err:", err) |
| | | err = errors.New("dumpSqls is nil") |
| | | return false |
| | | } |
| | | |
| | | logger.Debug("成功添加当前节点到集群节点中") |
| | | |
| | | //3.将本节点加入到节点列表中 |
| | | timeUnix := time.Now().Unix() |
| | | fmtTimeStr := time.Unix(timeUnix, 0).Format("2006-01-02 15:04:05") |
| | | serverIp, _, e1 := util.GetLocalIP(config.Server.NetworkAdapter) |
| | | if e1 != nil || serverIp == "" { |
| | | err = errors.New("get serverIp err") |
| | | return false |
| | | } |
| | | logger.Debug("cur Node serverIp:", serverIp, "nodeName:", nodeName) |
| | | if nodeName == "" { |
| | | nodeName = serverIp |
| | | } |
| | | |
| | | //添加本身节点,此处修复bug,加入集群的节点退出集群后重新加入,会报id冲突 |
| | | var sql = "insert into cluster_node (id,cluster_id,node_name,node_id,node_ip,create_time,isDelete,device_type) select '" + config.Server.AnalyServerId + "','" + joinArg.ClusterId + "','" + nodeName + "','" + config.Server.AnalyServerId + "','" + (serverIp + ":" + strconv.Itoa(dbSync.DefaultBindPort)) + "','" + fmtTimeStr + "',0,'" + config.Server.DeviceType + "' where not exists (select 1 from cluster_node where id='" + config.Server.AnalyServerId + "')" |
| | | if err = tx.Exec(sql).Error; err != nil { |
| | | logger.Debug("add cur node err:", err) |
| | | return false |
| | | } |
| | | if err = tx.Exec("update cluster_node set isDelete=0 where id='" + config.Server.AnalyServerId + "'").Error; err != nil { |
| | | logger.Debug("update isDelete err:", err) |
| | | return false |
| | | } |
| | | |
| | | //4.开启reference |
| | | tx.Exec("PRAGMA foreign_keys=ON") |
| | | tx.Commit() |
| | | serf.SyncSql([]string{sql}) |
| | | return true |
| | | } |
| | | |
| | | func (s ClusterService) AddDbMessage(pbMsg *protomsg.DbChangeMessage) { |
| | | bts, _ := json.Marshal(*pbMsg) |
| | | s.bk.Publish(ProcName, bts) |
| | | } |
| | | |
| | | func (s ClusterService) FindIpByNode(nodeId string) (string, error) { |
| | | var lc models.Node |
| | | return lc.FindIpByNode(nodeId) |
| | | } |
| | | package service
|
| | |
|
| | | import (
|
| | | "context"
|
| | | "encoding/json"
|
| | | "strconv"
|
| | | sysSync "sync"
|
| | | "time"
|
| | | "vamicro/config"
|
| | | "vamicro/extend/util"
|
| | | "vamicro/system-service/models"
|
| | | "vamicro/system-service/serf"
|
| | | "vamicro/system-service/sys"
|
| | | "vamicro/system-service/vo"
|
| | |
|
| | | "basic.com/pubsub/protomsg.git"
|
| | | dbSync "basic.com/syncdb.git"
|
| | | "basic.com/valib/bhomeclient.git"
|
| | | "basic.com/valib/logger.git"
|
| | | "github.com/hashicorp/memberlist"
|
| | | "github.com/pkg/errors"
|
| | | uuid "github.com/satori/go.uuid"
|
| | | )
|
| | |
|
| | | type ClusterService struct {
|
| | | bk bhomeclient.Broker
|
| | | }
|
| | |
|
| | | func NewClusterService(broker bhomeclient.Broker) *ClusterService {
|
| | | return &ClusterService{
|
| | | bk: broker,
|
| | | }
|
| | | }
|
| | |
|
| | | var clusterSearchKey = "clusterSearchKey"
|
| | | var searchMap = make(map[string]*memberlist.Memberlist, 0)
|
| | | var lock sysSync.Mutex
|
| | |
|
| | | func set2SearchMap(memList *memberlist.Memberlist) {
|
| | | lock.Lock()
|
| | | defer lock.Unlock()
|
| | | searchMap[clusterSearchKey] = memList
|
| | | }
|
| | |
|
| | | func getFromSearchMap() (*memberlist.Memberlist, bool) {
|
| | | lock.Lock()
|
| | | defer lock.Unlock()
|
| | | if v, ok := searchMap[clusterSearchKey]; ok {
|
| | | return v, true
|
| | | } else {
|
| | | return nil, false
|
| | | }
|
| | | }
|
| | |
|
| | | func clearSearchResult(ml *memberlist.Memberlist) {
|
| | | time.Sleep(10 * time.Second)
|
| | | lock.Lock()
|
| | | defer lock.Unlock()
|
| | | if _, ok := searchMap[clusterSearchKey]; ok {
|
| | | dbSync.CloseSearchNode(ml)
|
| | | delete(searchMap, clusterSearchKey)
|
| | | }
|
| | | }
|
| | |
|
| | | func deleteFromSearchMap() {
|
| | | lock.Lock()
|
| | | defer lock.Unlock()
|
| | | if _, ok := searchMap[clusterSearchKey]; ok {
|
| | | delete(searchMap, clusterSearchKey)
|
| | | }
|
| | | }
|
| | |
|
| | | func (s ClusterService) FindAll() (arr []models.Cluster, err error) {
|
| | | var clusterE models.Cluster
|
| | | return clusterE.FindAll()
|
| | | }
|
| | |
|
| | | func (s ClusterService) FindNodesByClusterId(clusterId string) (nodes []models.Node, err error) {
|
| | | var nodeE models.Node
|
| | | return nodeE.FindNodesByClusterId(clusterId)
|
| | | }
|
| | |
|
| | | func (s ClusterService) FindAllClusterNodes() map[string]models.Node {
|
| | | m := make(map[string]models.Node, 0)
|
| | | var clusterE models.Cluster
|
| | | var nodeE models.Node
|
| | | arr, err := clusterE.FindAll()
|
| | | if err == nil && arr != nil {
|
| | | for _, clu := range arr {
|
| | | nodes, e := nodeE.FindNodesByClusterId(clu.ClusterId)
|
| | | if e == nil && nodes != nil {
|
| | | for _, n := range nodes {
|
| | | m[n.Id] = n
|
| | | }
|
| | | }
|
| | | }
|
| | | }
|
| | | return m
|
| | | }
|
| | |
|
| | | //根据集群名称和密码创建集群
|
| | | func (s ClusterService) Create(clusterName string, pwd string, virtualIp string) (bool, string) {
|
| | | devTyp := GetDevType(config.Server.DeviceType)
|
| | | if devTyp != DevType_Storage && devTyp != DevType_Analysis_Storage {
|
| | | return false, "只有分析和存储节点才能创建集群"
|
| | | }
|
| | | clusterId := uuid.NewV4().String()
|
| | | pwd = config.ClusterSet.PwdPre + pwd
|
| | | var clusterE = models.Cluster{
|
| | | ClusterId: clusterId,
|
| | | ClusterName: clusterName,
|
| | | Password: pwd,
|
| | | VirtualIp: virtualIp,
|
| | | }
|
| | |
|
| | | arr, err := clusterE.FindAll()
|
| | | if err == nil && (arr == nil || len(arr) == 0) {
|
| | |
|
| | | b := clusterE.Create()
|
| | | if b {
|
| | | serf.InitAgent(context.Background())
|
| | |
|
| | | //创建es集群
|
| | | if _, err := CreateOriginalCluster("/opt/elasticsearch/config", "/opt/vasystem/script", "/opt/vasystem/indexInit"); err != nil {
|
| | | //创建失败,则serf集群也要推出
|
| | | s.Leave(false)
|
| | | return false, err.Error()
|
| | | }
|
| | |
|
| | | //创建weedfs集群
|
| | | if _, err = CreateWeedfsServer("/opt/vasystem/script"); err != nil {
|
| | | s.Leave(false)
|
| | | return false, err.Error()
|
| | | }
|
| | |
|
| | | chMsg := protomsg.DbChangeMessage{
|
| | | Id: clusterE.ClusterId,
|
| | | Table: protomsg.TableChanged_T_Cluster,
|
| | | Action: protomsg.DbAction_Insert,
|
| | | Info: "",
|
| | | }
|
| | | s.AddDbMessage(&chMsg)
|
| | |
|
| | | return true, clusterId
|
| | | }
|
| | | } else {
|
| | | if s.UpdateClusterName(clusterName, virtualIp) {
|
| | | return true, ""
|
| | | }
|
| | | }
|
| | |
|
| | | return false, ""
|
| | | }
|
| | |
|
| | | func (s ClusterService) SearchByPwd(pwd string) (err error) {
|
| | | _, isSearching := getFromSearchMap()
|
| | | if isSearching {
|
| | | return errors.New("other is searching,please wait")
|
| | | }
|
| | | pwd = config.ClusterSet.PwdPre + pwd
|
| | | ml, e := dbSync.CreateSearchNode(pwd)
|
| | | if e != nil {
|
| | | logger.Debug("CreateSearchNode err:", e)
|
| | | return errors.New("createSearchNode err")
|
| | | }
|
| | |
|
| | | set2SearchMap(ml)
|
| | |
|
| | | go clearSearchResult(ml)
|
| | |
|
| | | return nil
|
| | | }
|
| | |
|
| | | func (s ClusterService) SearchNodes() map[string]dbSync.NodeInfo {
|
| | | return dbSync.GetSearchNodes()
|
| | | }
|
| | |
|
| | | func (s ClusterService) StopSearching() bool {
|
| | | ml, _ := getFromSearchMap()
|
| | | if ml != nil {
|
| | | dbSync.CloseSearchNode(ml)
|
| | | deleteFromSearchMap()
|
| | | return true
|
| | | } else {
|
| | | return true
|
| | | }
|
| | | }
|
| | |
|
| | | //加入集群
|
| | | func (s ClusterService) JoinCluster(joinArg *vo.ClusterJoinVo) (bool, error) {
|
| | | start := time.Now()
|
| | | if config.Server.AnalyServerId == "" {
|
| | | logger.Debug("AddCluster config serverId is nil")
|
| | | return false, errors.New("serverId配置有误")
|
| | | }
|
| | | targetIp := ""
|
| | | var joinIps []string
|
| | | for _, ipStr := range joinArg.NodeIps { //输入ip加入的情况,NodeIps里面元素只是ip
|
| | | if ip, b := util.IpCheck(ipStr); b {
|
| | | targetIp = ip
|
| | | joinIps = append(joinIps, ip+":30190")
|
| | | }
|
| | | }
|
| | | if len(joinIps) == 0 {
|
| | | logger.Debug("AddCluster JoinCluster len(joinIps)=0")
|
| | | return false, errors.New("加入的目标ip不能为空")
|
| | | }
|
| | | logger.Debug("AddCluster joinIps:", joinIps)
|
| | | joinArg.Password = config.ClusterSet.PwdPre + joinArg.Password
|
| | |
|
| | | conf := dbSync.DefaultConfig()
|
| | | localIp, _, _ := sys.GetLocalIP(config.Server.NetworkAdapter)
|
| | |
|
| | | if localIp != "" {
|
| | | conf.BindAddr = localIp
|
| | | }
|
| | |
|
| | | agent, err := dbSync.Init(joinArg.ClusterId, joinArg.Password, config.Server.AnalyServerId, joinIps, config.ClusterSet.SerfSnapShotPath, conf)
|
| | | if err == nil && agent != nil { //加入成功
|
| | | logger.Debug("AddCluster dbSync.Init success")
|
| | | serf.Agent = agent
|
| | |
|
| | | t := time.Now()
|
| | | syncB := syncTableDataFromCluster(joinArg)
|
| | | logger.Debugf("AddCluster syncTableDataFromCluster time=%v, syncB=%v", time.Since(t), syncB)
|
| | | if syncB {
|
| | | devTyp := GetDevType(config.Server.DeviceType)
|
| | | if devTyp == DevType_Analysis_Storage || devTyp == DevType_Storage { //只有能启动ES的节点才能加入ES
|
| | | go func() {
|
| | | //添加ES节点
|
| | | if _, err := AddCluster("/opt/elasticsearch/config", "/opt/vasystem/script", targetIp, "9200", config.EsConfig.StorePath); err != nil {
|
| | | s.Leave(false)
|
| | | return
|
| | | }
|
| | |
|
| | | //添加weedfs节点
|
| | | if _, err := AddWeedfsServer("/opt/vasystem/script", targetIp, config.StorageConf.VolumePath); err != nil {
|
| | | s.Leave(false)
|
| | | return
|
| | | }
|
| | | }()
|
| | | }
|
| | |
|
| | | //需要重新初始化本地比对进程
|
| | | go serf.ReInitDbPersonCompareData()
|
| | | chMsg := protomsg.DbChangeMessage{
|
| | | Id: joinArg.ClusterId,
|
| | | Table: protomsg.TableChanged_T_Cluster,
|
| | | Action: protomsg.DbAction_Insert,
|
| | | Info: "",
|
| | | }
|
| | | s.AddDbMessage(&chMsg)
|
| | | logger.Debugf("AddCluster 加入集群成功 time=%v", time.Since(start))
|
| | | return true, nil
|
| | | } else {
|
| | | logger.Debug("AddCluster syncTableDataFromCluster fail")
|
| | | if agent != nil {
|
| | | agent.Leave()
|
| | | err = agent.Shutdown()
|
| | | if err != nil {
|
| | | logger.Debug("AddCluster agent shutdown err:", err)
|
| | | }
|
| | | }
|
| | | }
|
| | | } else {
|
| | | logger.Debug("AddCluster dbSync.Init err:", err)
|
| | | if agent != nil {
|
| | | agent.Leave()
|
| | | err = agent.Shutdown()
|
| | | logger.Debugf("AddCluster dbSync.Init err,shutdown, err=%v", err)
|
| | |
|
| | | }
|
| | | }
|
| | | logger.Debugf("AddCluster 加入集群失败, targetIp=%v, time=%v", targetIp, time.Since(start))
|
| | | return false, errors.New("加入集群失败")
|
| | | }
|
| | |
|
| | | const (
|
| | | DevType_Analysis = "analysis" //只做分析
|
| | | DevType_Storage = "storage" //只做存储
|
| | | DevType_Analysis_Storage = "analysis_storage" //分析加存储
|
| | | DevType_Other = "other" //其他设备类型,eg:应用
|
| | | )
|
| | |
|
| | | //获取本机类型,只进行分析、分析存储一体、只存储、其他。。
|
| | | func GetDevType(dt string) string {
|
| | | if dt != "" {
|
| | | if len(dt) >= 4 {
|
| | | s := dt[2:4]
|
| | | if s == "01" {
|
| | | return DevType_Analysis
|
| | | } else if s == "02" {
|
| | | return DevType_Storage
|
| | | } else if s == "03" {
|
| | | return DevType_Analysis_Storage
|
| | | }
|
| | | }
|
| | | }
|
| | | return DevType_Other
|
| | | }
|
| | |
|
| | | func (s ClusterService) UpdateClusterName(clusterName string, virtualIp string) bool {
|
| | | var cE models.Cluster
|
| | | arr, e := cE.FindAll()
|
| | | if e == nil && arr != nil && len(arr) > 0 {
|
| | | if cE.UpdateClusterName(clusterName, virtualIp) {
|
| | | chMsg := protomsg.DbChangeMessage{
|
| | | Id: "",
|
| | | Table: protomsg.TableChanged_T_Cluster,
|
| | | Action: protomsg.DbAction_Update,
|
| | | Info: virtualIp,
|
| | | }
|
| | | s.AddDbMessage(&chMsg)
|
| | |
|
| | | if arr[0].VirtualIp != virtualIp { //漂移ip有变化
|
| | | if serf.Agent != nil {
|
| | | b, _ := json.Marshal(&protomsg.DbChangeMessage{
|
| | | Id: "",
|
| | | Table: protomsg.TableChanged_T_Cluster,
|
| | | Action: protomsg.DbAction_Update,
|
| | | Info: virtualIp,
|
| | | })
|
| | | err := serf.Agent.UserEvent(serf.UserEventSyncVirtualIp, b, false)
|
| | | if err != nil {
|
| | | logger.Error("UserEventSyncVirtualIp err:", err)
|
| | | }
|
| | | }
|
| | | }
|
| | |
|
| | | return true
|
| | | }
|
| | | }
|
| | |
|
| | | return false
|
| | | }
|
| | |
|
| | | func (s ClusterService) Leave(isDel bool) (bool, error) {
|
| | | start := time.Now()
|
| | | devType := GetDevType(config.Server.DeviceType)
|
| | | logger.Debugf("Leave GetDevType=%v, isDel=%v", devType, isDel)
|
| | |
|
| | | if devType == DevType_Analysis_Storage || devType == DevType_Storage { //只有分析和存储才有ES集群
|
| | | go func() {
|
| | | //退出weedfs节点
|
| | | if _, err := ExitWeedfsServer("/opt/vasystem/script", isDel, config.StorageConf.VolumePath); err != nil {
|
| | | logger.Error("Leave ExitWeedfsServer err:", err)
|
| | | return
|
| | | }
|
| | | //退出es节点
|
| | | if _, err := ExitCluster("/opt/elasticsearch/config", "/opt/vasystem/script"); err != nil {
|
| | | logger.Error("Leave ExitCluster es cluster err:", err)
|
| | | return
|
| | | }
|
| | | }()
|
| | | }
|
| | |
|
| | | var err error
|
| | | tx := models.GetDB().Begin()
|
| | | defer func() {
|
| | | if err != nil && tx != nil {
|
| | | tx.Rollback()
|
| | | }
|
| | | }()
|
| | |
|
| | | if serf.Agent != nil {
|
| | | t := time.Now()
|
| | | serf.Agent.Stop()
|
| | | serf.Agent = nil
|
| | | err = tx.Exec("delete from cluster_node").Error
|
| | | if err != nil {
|
| | | logger.Error("Leave delete from cluster_node err:", err)
|
| | | return false, err
|
| | | }
|
| | | err = tx.Exec("delete from cluster").Error
|
| | | if err != nil {
|
| | | logger.Error("Leave delete from cluster err:", err)
|
| | | return false, err
|
| | | }
|
| | | tx.Commit()
|
| | | logger.Debug("Leave delete cluster_node and cluster from db")
|
| | | chMsg := protomsg.DbChangeMessage{
|
| | | Id: "",
|
| | | Table: protomsg.TableChanged_T_Cluster,
|
| | | Action: protomsg.DbAction_Delete,
|
| | | Info: "",
|
| | | }
|
| | | logger.Debugf("Leave delete db time=%v", time.Since(t))
|
| | | tm := time.Now()
|
| | | s.AddDbMessage(&chMsg)
|
| | | logger.Debugf("Leave AddDbMessage time=%v", time.Since(tm))
|
| | | }
|
| | |
|
| | | logger.Debugf("Leave success time=%v", time.Since(start))
|
| | | return true, nil
|
| | | }
|
| | |
|
| | | func (s ClusterService) TestSyncSql() bool {
|
| | | var lc models.LocalConfig
|
| | | lc.Select()
|
| | |
|
| | | timeUnix := time.Now().Unix()
|
| | | fmtTimeStr := time.Unix(timeUnix, 0).Format("2006-01-02 15:04:05")
|
| | | serverIp, _, e := util.GetLocalIP(config.Server.NetworkAdapter)
|
| | | if e == nil && serverIp != "" {
|
| | | sql := "insert into cluster_node (id,cluster_id,node_name,node_id,node_ip,create_time) values ('" + uuid.NewV4().String() + "','" + uuid.NewV4().String() + "','" + lc.ServerName + "','" + config.Server.AnalyServerId + "','" + (serverIp + ":" + strconv.Itoa(dbSync.DefaultBindPort)) + "','" + fmtTimeStr + "')"
|
| | | if err := models.GetDB().Exec(sql).Error; err != nil {
|
| | | return false
|
| | | }
|
| | | return true
|
| | | }
|
| | | return false
|
| | | }
|
| | |
|
| | | //加入集群后清空本地的同步库数据
|
| | | func syncTableDataFromCluster(joinArg *vo.ClusterJoinVo) bool {
|
| | | var lc models.LocalConfig
|
| | | e := lc.Select()
|
| | | nodeName := ""
|
| | | if e == nil && lc.ServerName != "" {
|
| | | nodeName = lc.ServerName
|
| | | }
|
| | |
|
| | | var err error
|
| | | db := models.GetDB()
|
| | | db.LogMode(false)
|
| | | defer db.LogMode(true)
|
| | | tx := db.Begin()
|
| | | defer func() {
|
| | | if err != nil && tx != nil {
|
| | | tx.Rollback()
|
| | | }
|
| | | }()
|
| | | //0.关闭reference
|
| | | tx.Exec("PRAGMA foreign_keys=OFF")
|
| | | //1.删除本地的同步库数据
|
| | | for _, t := range serf.SyncTables {
|
| | | delSql := ""
|
| | | if t == "dbtables" {
|
| | | delSql = "delete from " + t + " where (analyServerId='' or analyServerId=NULL)"
|
| | | } else if t == "dbtablepersons" {
|
| | | delSql = "delete from " + t + " where tableId in (select id from dbtables where (analyServerId='' or analyServerId=NULL))"
|
| | | } else {
|
| | | delSql = "delete from " + t + ""
|
| | | }
|
| | | err = tx.Exec(delSql).Error
|
| | | if err != nil {
|
| | | return false
|
| | | }
|
| | | }
|
| | | //2.拉取集群内的同步库数据到本地数据库表中
|
| | | var dumpSqls *[]string
|
| | | dumpSqls, err = serf.GetTableDataFromCluster(serf.Agent, joinArg.ClusterId, serf.SyncTables, 20*time.Second)
|
| | | if dumpSqls != nil && len(*dumpSqls) > 0 {
|
| | | for _, sqlStr := range *dumpSqls {
|
| | | logger.Debug("gorm exec dumpSql:", sqlStr)
|
| | | if err = tx.Exec(sqlStr).Error; err != nil {
|
| | | return false
|
| | | }
|
| | | }
|
| | | } else {
|
| | | logger.Debug("get cluster db data err, dumpSqls is nil,err:", err)
|
| | | err = errors.New("dumpSqls is nil")
|
| | | return false
|
| | | }
|
| | |
|
| | | logger.Debug("成功添加当前节点到集群节点中")
|
| | |
|
| | | //3.将本节点加入到节点列表中
|
| | | timeUnix := time.Now().Unix()
|
| | | fmtTimeStr := time.Unix(timeUnix, 0).Format("2006-01-02 15:04:05")
|
| | | serverIp, _, e1 := util.GetLocalIP(config.Server.NetworkAdapter)
|
| | | if e1 != nil || serverIp == "" {
|
| | | err = errors.New("get serverIp err")
|
| | | return false
|
| | | }
|
| | | logger.Debug("cur Node serverIp:", serverIp, "nodeName:", nodeName)
|
| | | if nodeName == "" {
|
| | | nodeName = serverIp
|
| | | }
|
| | |
|
| | | //添加本身节点,此处修复bug,加入集群的节点退出集群后重新加入,会报id冲突
|
| | | var sql = "insert into cluster_node (id,cluster_id,node_name,node_id,node_ip,create_time,isDelete,device_type) select '" + config.Server.AnalyServerId + "','" + joinArg.ClusterId + "','" + nodeName + "','" + config.Server.AnalyServerId + "','" + (serverIp + ":" + strconv.Itoa(dbSync.DefaultBindPort)) + "','" + fmtTimeStr + "',0,'" + config.Server.DeviceType + "' where not exists (select 1 from cluster_node where id='" + config.Server.AnalyServerId + "')"
|
| | | if err = tx.Exec(sql).Error; err != nil {
|
| | | logger.Debug("add cur node err:", err)
|
| | | return false
|
| | | }
|
| | | if err = tx.Exec("update cluster_node set isDelete=0 where id='" + config.Server.AnalyServerId + "'").Error; err != nil {
|
| | | logger.Debug("update isDelete err:", err)
|
| | | return false
|
| | | }
|
| | |
|
| | | //4.开启reference
|
| | | tx.Exec("PRAGMA foreign_keys=ON")
|
| | | tx.Commit()
|
| | | serf.SyncSql([]string{sql})
|
| | | return true
|
| | | }
|
| | |
|
| | | func (s ClusterService) AddDbMessage(pbMsg *protomsg.DbChangeMessage) {
|
| | | bts, _ := json.Marshal(*pbMsg)
|
| | | s.bk.Publish(ProcName, bts)
|
| | | }
|
| | |
|
| | | func (s ClusterService) FindIpByNode(nodeId string) (string, error) {
|
| | | var lc models.Node
|
| | | return lc.FindIpByNode(nodeId)
|
| | | }
|