package service import ( "context" "encoding/json" "strconv" sysSync "sync" "time" "vamicro/config" "vamicro/extend/util" "vamicro/system-service/models" "vamicro/system-service/serf" "vamicro/system-service/sys" "vamicro/system-service/vo" "basic.com/pubsub/protomsg.git" dbSync "basic.com/syncdb.git" "basic.com/valib/bhomeclient.git" "basic.com/valib/logger.git" "github.com/hashicorp/memberlist" "github.com/pkg/errors" uuid "github.com/satori/go.uuid" ) type ClusterService struct { bk bhomeclient.Broker } func NewClusterService(broker bhomeclient.Broker) *ClusterService { return &ClusterService{ bk: broker, } } var clusterSearchKey = "clusterSearchKey" var searchMap = make(map[string]*memberlist.Memberlist, 0) var lock sysSync.Mutex func set2SearchMap(memList *memberlist.Memberlist) { lock.Lock() defer lock.Unlock() searchMap[clusterSearchKey] = memList } func getFromSearchMap() (*memberlist.Memberlist, bool) { lock.Lock() defer lock.Unlock() if v, ok := searchMap[clusterSearchKey]; ok { return v, true } else { return nil, false } } func clearSearchResult(ml *memberlist.Memberlist) { time.Sleep(10 * time.Second) lock.Lock() defer lock.Unlock() if _, ok := searchMap[clusterSearchKey]; ok { dbSync.CloseSearchNode(ml) delete(searchMap, clusterSearchKey) } } func deleteFromSearchMap() { lock.Lock() defer lock.Unlock() if _, ok := searchMap[clusterSearchKey]; ok { delete(searchMap, clusterSearchKey) } } func (s ClusterService) FindAll() (arr []models.Cluster, err error) { var clusterE models.Cluster return clusterE.FindAll() } func (s ClusterService) FindNodesByClusterId(clusterId string) (nodes []models.Node, err error) { var nodeE models.Node return nodeE.FindNodesByClusterId(clusterId) } func (s ClusterService) FindAllClusterNodes() map[string]models.Node { m := make(map[string]models.Node, 0) var clusterE models.Cluster var nodeE models.Node arr, err := clusterE.FindAll() if err == nil && arr != nil { for _, clu := range arr { nodes, e := nodeE.FindNodesByClusterId(clu.ClusterId) if e == nil && nodes != nil { for _, n := range nodes { m[n.Id] = n } } } } return m } //根据集群名称和密码创建集群 func (s ClusterService) Create(clusterName string, pwd string, virtualIp string) (bool, string) { devTyp := GetDevType(config.Server.DeviceType) if devTyp != DevType_Storage && devTyp != DevType_Analysis_Storage { return false, "只有分析和存储节点才能创建集群" } clusterId := uuid.NewV4().String() pwd = config.ClusterSet.PwdPre + pwd var clusterE = models.Cluster{ ClusterId: clusterId, ClusterName: clusterName, Password: pwd, VirtualIp: virtualIp, } arr, err := clusterE.FindAll() if err == nil && (arr == nil || len(arr) == 0) { b := clusterE.Create() if b { serf.InitAgent(context.Background()) //创建es集群 if _, err := CreateOriginalCluster("/opt/elasticsearch/config", "/opt/vasystem/script", "/opt/vasystem/indexInit"); err != nil { //创建失败,则serf集群也要推出 s.Leave(false) return false, err.Error() } //创建weedfs集群 if _, err = CreateWeedfsServer("/opt/vasystem/script"); err != nil { s.Leave(false) return false, err.Error() } chMsg := protomsg.DbChangeMessage{ Id: clusterE.ClusterId, Table: protomsg.TableChanged_T_Cluster, Action: protomsg.DbAction_Insert, Info: "", } s.AddDbMessage(&chMsg) return true, clusterId } } else { if s.UpdateClusterName(clusterName, virtualIp) { return true, "" } } return false, "" } func (s ClusterService) SearchByPwd(pwd string) (err error) { _, isSearching := getFromSearchMap() if isSearching { return errors.New("other is searching,please wait") } pwd = config.ClusterSet.PwdPre + pwd ml, e := dbSync.CreateSearchNode(pwd) if e != nil { logger.Debug("CreateSearchNode err:", e) return errors.New("createSearchNode err") } set2SearchMap(ml) go clearSearchResult(ml) return nil } func (s ClusterService) SearchNodes() map[string]dbSync.NodeInfo { return dbSync.GetSearchNodes() } func (s ClusterService) StopSearching() bool { ml, _ := getFromSearchMap() if ml != nil { dbSync.CloseSearchNode(ml) deleteFromSearchMap() return true } else { return true } } //加入集群 func (s ClusterService) JoinCluster(joinArg *vo.ClusterJoinVo) (bool, error) { start := time.Now() if config.Server.AnalyServerId == "" { logger.Debug("AddCluster config serverId is nil") return false, errors.New("serverId配置有误") } targetIp := "" var joinIps []string for _, ipStr := range joinArg.NodeIps { //输入ip加入的情况,NodeIps里面元素只是ip if ip, b := util.IpCheck(ipStr); b { targetIp = ip joinIps = append(joinIps, ip+":30190") } } if len(joinIps) == 0 { logger.Debug("AddCluster JoinCluster len(joinIps)=0") return false, errors.New("加入的目标ip不能为空") } logger.Debug("AddCluster joinIps:", joinIps) joinArg.Password = config.ClusterSet.PwdPre + joinArg.Password conf := dbSync.DefaultConfig() localIp, _, _ := sys.GetLocalIP(config.Server.NetworkAdapter) if localIp != "" { conf.BindAddr = localIp } agent, err := dbSync.Init(joinArg.ClusterId, joinArg.Password, config.Server.AnalyServerId, joinIps, config.ClusterSet.SerfSnapShotPath, conf) if err == nil && agent != nil { //加入成功 logger.Debug("AddCluster dbSync.Init success") serf.Agent = agent t := time.Now() syncB := syncTableDataFromCluster(joinArg) logger.Debugf("AddCluster syncTableDataFromCluster time=%v, syncB=%v", time.Since(t), syncB) if syncB { devTyp := GetDevType(config.Server.DeviceType) if devTyp == DevType_Analysis_Storage || devTyp == DevType_Storage { //只有能启动ES的节点才能加入ES go func() { //添加ES节点 if _, err := AddCluster("/opt/elasticsearch/config", "/opt/vasystem/script", targetIp, "9200", config.EsConfig.StorePath); err != nil { s.Leave(false) return } //添加weedfs节点 if _, err := AddWeedfsServer("/opt/vasystem/script", targetIp, config.StorageConf.VolumePath); err != nil { s.Leave(false) return } }() } //需要重新初始化本地比对进程 go serf.ReInitDbPersonCompareData() chMsg := protomsg.DbChangeMessage{ Id: joinArg.ClusterId, Table: protomsg.TableChanged_T_Cluster, Action: protomsg.DbAction_Insert, Info: "", } s.AddDbMessage(&chMsg) logger.Debugf("AddCluster 加入集群成功 time=%v", time.Since(start)) return true, nil } else { logger.Debug("AddCluster syncTableDataFromCluster fail") if agent != nil { agent.Leave() err = agent.Shutdown() if err != nil { logger.Debug("AddCluster agent shutdown err:", err) } } } } else { logger.Debug("AddCluster dbSync.Init err:", err) if agent != nil { agent.Leave() err = agent.Shutdown() logger.Debugf("AddCluster dbSync.Init err,shutdown, err=%v", err) } } logger.Debugf("AddCluster 加入集群失败, targetIp=%v, time=%v", targetIp, time.Since(start)) return false, errors.New("加入集群失败") } const ( DevType_Analysis = "analysis" //只做分析 DevType_Storage = "storage" //只做存储 DevType_Analysis_Storage = "analysis_storage" //分析加存储 DevType_Other = "other" //其他设备类型,eg:应用 ) //获取本机类型,只进行分析、分析存储一体、只存储、其他。。 func GetDevType(dt string) string { if dt != "" { if len(dt) >= 4 { s := dt[2:4] if s == "01" { return DevType_Analysis } else if s == "02" { return DevType_Storage } else if s == "03" { return DevType_Analysis_Storage } } } return DevType_Other } func (s ClusterService) UpdateClusterName(clusterName string, virtualIp string) bool { var cE models.Cluster arr, e := cE.FindAll() if e == nil && arr != nil && len(arr) > 0 { if cE.UpdateClusterName(clusterName, virtualIp) { chMsg := protomsg.DbChangeMessage{ Id: "", Table: protomsg.TableChanged_T_Cluster, Action: protomsg.DbAction_Update, Info: virtualIp, } s.AddDbMessage(&chMsg) if arr[0].VirtualIp != virtualIp { //漂移ip有变化 if serf.Agent != nil { b, _ := json.Marshal(&protomsg.DbChangeMessage{ Id: "", Table: protomsg.TableChanged_T_Cluster, Action: protomsg.DbAction_Update, Info: virtualIp, }) err := serf.Agent.UserEvent(serf.UserEventSyncVirtualIp, b, false) if err != nil { logger.Error("UserEventSyncVirtualIp err:", err) } } } return true } } return false } func (s ClusterService) Leave(isDel bool) (bool, error) { start := time.Now() devType := GetDevType(config.Server.DeviceType) logger.Debugf("Leave GetDevType=%v, isDel=%v", devType, isDel) if devType == DevType_Analysis_Storage || devType == DevType_Storage { //只有分析和存储才有ES集群 go func() { //退出weedfs节点 if _, err := ExitWeedfsServer("/opt/vasystem/script", isDel, config.StorageConf.VolumePath); err != nil { logger.Error("Leave ExitWeedfsServer err:", err) return } //退出es节点 if _, err := ExitCluster("/opt/elasticsearch/config", "/opt/vasystem/script"); err != nil { logger.Error("Leave ExitCluster es cluster err:", err) return } }() } var err error tx := models.GetDB().Begin() defer func() { if err != nil && tx != nil { tx.Rollback() } }() if serf.Agent != nil { t := time.Now() serf.Agent.Stop() serf.Agent = nil err = tx.Exec("delete from cluster_node").Error if err != nil { logger.Error("Leave delete from cluster_node err:", err) return false, err } err = tx.Exec("delete from cluster").Error if err != nil { logger.Error("Leave delete from cluster err:", err) return false, err } tx.Commit() logger.Debug("Leave delete cluster_node and cluster from db") chMsg := protomsg.DbChangeMessage{ Id: "", Table: protomsg.TableChanged_T_Cluster, Action: protomsg.DbAction_Delete, Info: "", } logger.Debugf("Leave delete db time=%v", time.Since(t)) tm := time.Now() s.AddDbMessage(&chMsg) logger.Debugf("Leave AddDbMessage time=%v", time.Since(tm)) } logger.Debugf("Leave success time=%v", time.Since(start)) return true, nil } func (s ClusterService) TestSyncSql() bool { var lc models.LocalConfig lc.Select() timeUnix := time.Now().Unix() fmtTimeStr := time.Unix(timeUnix, 0).Format("2006-01-02 15:04:05") serverIp, _, e := util.GetLocalIP(config.Server.NetworkAdapter) if e == nil && serverIp != "" { sql := "insert into cluster_node (id,cluster_id,node_name,node_id,node_ip,create_time) values ('" + uuid.NewV4().String() + "','" + uuid.NewV4().String() + "','" + lc.ServerName + "','" + config.Server.AnalyServerId + "','" + (serverIp + ":" + strconv.Itoa(dbSync.DefaultBindPort)) + "','" + fmtTimeStr + "')" if err := models.GetDB().Exec(sql).Error; err != nil { return false } return true } return false } //加入集群后清空本地的同步库数据 func syncTableDataFromCluster(joinArg *vo.ClusterJoinVo) bool { var lc models.LocalConfig e := lc.Select() nodeName := "" if e == nil && lc.ServerName != "" { nodeName = lc.ServerName } var err error db := models.GetDB() db.LogMode(false) defer db.LogMode(true) tx := db.Begin() defer func() { if err != nil && tx != nil { tx.Rollback() } }() //0.关闭reference tx.Exec("PRAGMA foreign_keys=OFF") //1.删除本地的同步库数据 for _, t := range serf.SyncTables { delSql := "" if t == "dbtables" { delSql = "delete from " + t + " where (analyServerId='' or analyServerId=NULL)" } else if t == "dbtablepersons" { delSql = "delete from " + t + " where tableId in (select id from dbtables where (analyServerId='' or analyServerId=NULL))" } else { delSql = "delete from " + t + "" } err = tx.Exec(delSql).Error if err != nil { return false } } //2.拉取集群内的同步库数据到本地数据库表中 var dumpSqls *[]string dumpSqls, err = serf.GetTableDataFromCluster(serf.Agent, joinArg.ClusterId, serf.SyncTables, 20*time.Second) if dumpSqls != nil && len(*dumpSqls) > 0 { for _, sqlStr := range *dumpSqls { logger.Debug("gorm exec dumpSql:", sqlStr) if err = tx.Exec(sqlStr).Error; err != nil { return false } } } else { logger.Debug("get cluster db data err, dumpSqls is nil,err:", err) err = errors.New("dumpSqls is nil") return false } logger.Debug("成功添加当前节点到集群节点中") //3.将本节点加入到节点列表中 timeUnix := time.Now().Unix() fmtTimeStr := time.Unix(timeUnix, 0).Format("2006-01-02 15:04:05") serverIp, _, e1 := util.GetLocalIP(config.Server.NetworkAdapter) if e1 != nil || serverIp == "" { err = errors.New("get serverIp err") return false } logger.Debug("cur Node serverIp:", serverIp, "nodeName:", nodeName) if nodeName == "" { nodeName = serverIp } //添加本身节点,此处修复bug,加入集群的节点退出集群后重新加入,会报id冲突 var sql = "insert into cluster_node (id,cluster_id,node_name,node_id,node_ip,create_time,isDelete,device_type) select '" + config.Server.AnalyServerId + "','" + joinArg.ClusterId + "','" + nodeName + "','" + config.Server.AnalyServerId + "','" + (serverIp + ":" + strconv.Itoa(dbSync.DefaultBindPort)) + "','" + fmtTimeStr + "',0,'" + config.Server.DeviceType + "' where not exists (select 1 from cluster_node where id='" + config.Server.AnalyServerId + "')" if err = tx.Exec(sql).Error; err != nil { logger.Debug("add cur node err:", err) return false } if err = tx.Exec("update cluster_node set isDelete=0 where id='" + config.Server.AnalyServerId + "'").Error; err != nil { logger.Debug("update isDelete err:", err) return false } //4.开启reference tx.Exec("PRAGMA foreign_keys=ON") tx.Commit() serf.SyncSql([]string{sql}) return true } func (s ClusterService) AddDbMessage(pbMsg *protomsg.DbChangeMessage) { bts, _ := json.Marshal(*pbMsg) s.bk.Publish(ProcName, bts) } func (s ClusterService) FindIpByNode(nodeId string) (string, error) { var lc models.Node return lc.FindIpByNode(nodeId) }