package service import ( "context" "encoding/json" "fmt" "strconv" sysSync "sync" "time" "vamicro/config" "vamicro/extend/util" "vamicro/system-service/models" "vamicro/system-service/serf" "vamicro/system-service/sys" "vamicro/system-service/vo" "basic.com/pubsub/protomsg.git" dbSync "basic.com/syncdb.git" "basic.com/valib/bhomeclient.git" "basic.com/valib/logger.git" "github.com/hashicorp/memberlist" "github.com/pkg/errors" uuid "github.com/satori/go.uuid" ) type ClusterService struct { bk bhomeclient.Broker } func NewClusterService(broker bhomeclient.Broker) *ClusterService { return &ClusterService{ bk: broker, } } var clusterSearchKey = "clusterSearchKey" var searchMap = make(map[string]*memberlist.Memberlist, 0) var lock sysSync.Mutex func set2SearchMap(memList *memberlist.Memberlist) { lock.Lock() defer lock.Unlock() searchMap[clusterSearchKey] = memList } func getFromSearchMap() (*memberlist.Memberlist, bool) { lock.Lock() defer lock.Unlock() if v, ok := searchMap[clusterSearchKey]; ok { return v, true } else { return nil, false } } func clearSearchResult(ml *memberlist.Memberlist) { time.Sleep(10 * time.Second) lock.Lock() defer lock.Unlock() if _, ok := searchMap[clusterSearchKey]; ok { dbSync.CloseSearchNode(ml) delete(searchMap, clusterSearchKey) } } func deleteFromSearchMap() { lock.Lock() defer lock.Unlock() if _, ok := searchMap[clusterSearchKey]; ok { delete(searchMap, clusterSearchKey) } } func (s ClusterService) FindAll() (arr []models.Cluster, err error) { var clusterE models.Cluster return clusterE.FindAll() } func (s ClusterService) FindNodesByClusterId(clusterId string) (nodes []models.Node, err error) { var nodeE models.Node return nodeE.FindNodesByClusterId(clusterId) } func (s ClusterService) FindAllClusterNodes() map[string]models.Node { m := make(map[string]models.Node, 0) var clusterE models.Cluster var nodeE models.Node arr, err := clusterE.FindAll() if err == nil && arr != nil { for _, clu := range arr { nodes, e := nodeE.FindNodesByClusterId(clu.ClusterId) if e == nil && nodes != nil { for _, n := range nodes { m[n.Id] = n } } } } return m } // 根据集群名称和密码创建集群 func (s ClusterService) Create(clusterName string, pwd string, virtualIp string) (bool, string) { clusterId := uuid.NewV4().String() pwd = config.ClusterSet.PwdPre + pwd var clusterE = models.Cluster{ ClusterId: clusterId, ClusterName: clusterName, Password: pwd, VirtualIp: virtualIp, } arr, err := clusterE.FindAll() if err == nil && (arr == nil || len(arr) == 0) { err = clusterE.Create() if err == nil { serf.InitAgent(context.Background()) chMsg := protomsg.DbChangeMessage{ Id: clusterE.ClusterId, Table: protomsg.TableChanged_T_Cluster, Action: protomsg.DbAction_Insert, Info: "create", } s.AddDbMessage(&chMsg) return true, clusterId } else { logger.Error("初始化集群数据库信息失败. ", err.Error()) } } else { if s.UpdateClusterName(clusterName, virtualIp) { return true, "" } } return false, "" } // 根据集群名称和密码创建集群 func (s ClusterService) UpdateDriftStateByNodeId(clusterId, nodeId, role string) (bool, string) { var node models.Node isSuccess := node.UpdateDriftStateByNodeId(role, nodeId, false) if isSuccess { // 通知主节点变更 chMsg := protomsg.DbChangeMessage{ Id: clusterId, Table: protomsg.TableChanged_T_Cluster, Action: protomsg.DbAction_Insert, Info: "slave2master", } s.AddDbMessage(&chMsg) err := serf.Agent.UserEvent(serf.UserEventChangeMaster, []byte(nodeId), false) if err != nil { logger.Error("UserEventSyncVirtualIp err:", err) } } return isSuccess, "" } func (s ClusterService) SearchByPwd(pwd string) (err error) { _, isSearching := getFromSearchMap() if isSearching { return errors.New("other is searching,please wait") } pwd = config.ClusterSet.PwdPre + pwd ml, e := dbSync.CreateSearchNode(pwd) if e != nil { logger.Debug("CreateSearchNode err:", e) return errors.New("createSearchNode err") } set2SearchMap(ml) go clearSearchResult(ml) return nil } func (s ClusterService) SearchNodes() map[string]dbSync.NodeInfo { return dbSync.GetSearchNodes() } func (s ClusterService) StopSearching() bool { ml, _ := getFromSearchMap() if ml != nil { dbSync.CloseSearchNode(ml) deleteFromSearchMap() return true } else { return true } } // 加入集群 func (s ClusterService) JoinCluster(joinArg *vo.ClusterJoinVo) (bool, error) { start := time.Now() if config.Server.AnalyServerId == "" { logger.Debug("AddCluster config serverId is nil") return false, errors.New("serverId配置有误") } targetIp := "" var joinIps []string for _, ipStr := range joinArg.NodeIps { //输入ip加入的情况,NodeIps里面元素只是ip if ip, b := util.IpCheck(ipStr); b { targetIp = ip joinIps = append(joinIps, ip+":30190") } } if len(joinIps) == 0 { logger.Debug("AddCluster JoinCluster len(joinIps)=0") return false, errors.New("加入的目标ip不能为空") } logger.Debug("AddCluster joinIps:", joinIps) joinArg.Password = config.ClusterSet.PwdPre + joinArg.Password conf := dbSync.DefaultConfig() localIp, _, _ := sys.GetLocalIP(config.Server.NetworkAdapter) if localIp != "" { conf.BindAddr = localIp } agent, err := dbSync.Init(joinArg.ClusterId, joinArg.Password, config.Server.AnalyServerId, joinIps, config.ClusterSet.SerfSnapShotPath, conf) if err == nil && agent != nil { //加入成功 logger.Debug("AddCluster dbSync.Init success") agent.RegisterHandleEventFunc(serf.HandleSerfEvent) serf.Agent = agent t := time.Now() syncClusterNodes := syncTableDataFromCluster(joinArg) logger.Debugf("AddCluster time=%v", time.Since(t)) if syncClusterNodes { chMsg := protomsg.DbChangeMessage{ Id: joinArg.ClusterId, Table: protomsg.TableChanged_T_Cluster, Action: protomsg.DbAction_Insert, Info: "join", } s.AddDbMessage(&chMsg) logger.Debugf("AddCluster 加入集群成功 time=%v", time.Since(start)) return true, nil } else { logger.Debug("AddCluster syncTableDataFromCluster fail") agent.Leave() err = agent.Shutdown() if err != nil { logger.Debug("AddCluster agent shutdown err:", err) } return false, errors.New("加入集群失败") } } else { logger.Debug("AddCluster dbSync.Init err:", err) if agent != nil { agent.Leave() err = agent.Shutdown() logger.Debugf("AddCluster dbSync.Init err,shutdown, err=%v", err) } } logger.Debugf("AddCluster 加入集群失败, targetIp=%v, time=%v", targetIp, time.Since(start)) return false, errors.New("加入集群失败") } const ( DevType_Analysis = "analysis" //只做分析 DevType_Storage = "storage" //只做存储 DevType_Analysis_Storage = "analysis_storage" //分析加存储 DevType_Other = "other" //其他设备类型,eg:应用 ) // 获取本机类型,只进行分析、分析存储一体、只存储、其他。。 func GetDevType(dt string) string { if dt != "" { if len(dt) >= 4 { s := dt[2:4] if s == "01" { return DevType_Analysis } else if s == "02" { return DevType_Storage } else if s == "03" { return DevType_Analysis_Storage } } } return DevType_Other } func (s ClusterService) UpdateClusterName(clusterName string, virtualIp string) bool { var cE models.Cluster arr, e := cE.FindAll() if e == nil && arr != nil && len(arr) > 0 { if cE.UpdateClusterName(clusterName, virtualIp) { chMsg := protomsg.DbChangeMessage{ Id: "", Table: protomsg.TableChanged_T_Cluster, Action: protomsg.DbAction_Update, Info: virtualIp, } s.AddDbMessage(&chMsg) if arr[0].VirtualIp != virtualIp { //漂移ip有变化 if serf.Agent != nil { b, _ := json.Marshal(&protomsg.DbChangeMessage{ Id: "", Table: protomsg.TableChanged_T_Cluster, Action: protomsg.DbAction_Update, Info: virtualIp, }) err := serf.Agent.UserEvent(serf.UserEventSyncVirtualIp, b, false) if err != nil { logger.Error("UserEventSyncVirtualIp err:", err) } } } return true } } return false } func (s ClusterService) Leave(isDel bool) (bool, error) { start := time.Now() var err error tx := models.GetDB().Begin() defer func() { if err != nil && tx != nil { tx.Rollback() } }() if serf.Agent != nil { t := time.Now() serf.Agent.Stop() serf.Agent = nil err = tx.Exec("delete from cluster_node").Error if err != nil { logger.Error("Leave delete from cluster_node err:", err) return false, err } err = tx.Exec("delete from cluster").Error if err != nil { logger.Error("Leave delete from cluster err:", err) return false, err } tx.Commit() logger.Debug("Leave delete cluster_node and cluster from db") chMsg := protomsg.DbChangeMessage{ Id: "", Table: protomsg.TableChanged_T_Cluster, Action: protomsg.DbAction_Delete, Info: "leave", } logger.Debugf("Leave delete db time=%v", time.Since(t)) tm := time.Now() s.AddDbMessage(&chMsg) logger.Debugf("Leave AddDbMessage time=%v", time.Since(tm)) } logger.Debugf("Leave success time=%v", time.Since(start)) return true, nil } func (s ClusterService) TestSyncSql() bool { var lc models.LocalConfig lc.Select() timeUnix := time.Now().Unix() fmtTimeStr := time.Unix(timeUnix, 0).Format("2006-01-02 15:04:05") serverIp, _, e := util.GetLocalIP(config.Server.NetworkAdapter) if e == nil && serverIp != "" { sql := "insert into cluster_node (id,cluster_id,node_name,node_id,node_ip,create_time) values ('" + uuid.NewV4().String() + "','" + uuid.NewV4().String() + "','" + lc.ServerName + "','" + config.Server.AnalyServerId + "','" + (serverIp + ":" + strconv.Itoa(dbSync.DefaultBindPort)) + "','" + fmtTimeStr + "')" if err := models.GetDB().Exec(sql).Error; err != nil { return false } return true } return false } // 加入集群后清空本地的同步库数据 func syncTableDataFromCluster(joinArg *vo.ClusterJoinVo) bool { var lc models.LocalConfig e := lc.Select() nodeName := "" if e == nil && lc.ServerName != "" { nodeName = lc.ServerName } var err error db := models.GetDB() db.LogMode(false) defer db.LogMode(true) tx := db.Begin() defer func() { if err != nil && tx != nil { tx.Rollback() } }() //0.关闭reference tx.Exec("PRAGMA foreign_keys=OFF") //1.删除本地的同步库数据 for _, t := range serf.SyncTables { delSql := "delete from " + t + "" err = tx.Exec(delSql).Error if err != nil { logger.Error("删除本地的同步库数据失败,", err.Error()) logger.Error("sql:", delSql) return false } } //2.拉取集群内的同步库数据到本地数据库表中 var dumpSqls *[]string dumpSqls, err = serf.GetTableDataFromCluster(serf.Agent, joinArg.ClusterId, serf.SyncTables, 20*time.Second) if dumpSqls != nil && len(*dumpSqls) > 0 { for _, sqlStr := range *dumpSqls { logger.Debug("gorm exec dumpSql:", sqlStr) if err = tx.Exec(sqlStr).Error; err != nil { logger.Error("gorm exec dumpSql:", sqlStr, " error:", err.Error()) return false } } } else { logger.Debug("get cluster db data err, dumpSqls is nil,err:", err) err = errors.New("dumpSqls is nil") return false } logger.Debug("成功添加当前节点到集群节点中") //3.将本节点加入到节点列表中 timeUnix := time.Now().Unix() fmtTimeStr := time.Unix(timeUnix, 0).Format("2006-01-02 15:04:05") serverIp, _, e1 := util.GetLocalIP(config.Server.NetworkAdapter) if e1 != nil || serverIp == "" { err = errors.New("get serverIp err") logger.Error("get serverIp err") return false } logger.Debug("cur Node serverIp:", serverIp, "nodeName:", nodeName) if nodeName == "" { nodeName = serverIp } //添加本身节点,此处修复bug,加入集群的节点退出集群后重新加入,会报id冲突 var sql = "insert into cluster_node (id,cluster_id,node_name,node_id,node_ip,create_time,isDelete,device_type) select '" + config.Server.AnalyServerId + "','" + joinArg.ClusterId + "','" + nodeName + "','" + config.Server.AnalyServerId + "','" + (serverIp + ":" + strconv.Itoa(dbSync.DefaultBindPort)) + "','" + fmtTimeStr + "',0,'" + config.Server.DeviceType + "' where not exists (select 1 from cluster_node where id='" + config.Server.AnalyServerId + "')" if err = tx.Exec(sql).Error; err != nil { logger.Debug("add cur node err:", err) return false } joinSql := fmt.Sprintf("update cluster_node set isDelete=0,drift_state='slave',node_ip='%s',create_time='%s' where node_id='%s'", serverIp+":"+strconv.Itoa(dbSync.DefaultBindPort), fmtTimeStr, config.Server.AnalyServerId) if err = tx.Exec(joinSql).Error; err != nil { logger.Debug("update isDelete err:", err) return false } //4.开启reference tx.Exec("PRAGMA foreign_keys=ON") tx.Commit() serf.SyncSql([]string{sql}) return true } func (s ClusterService) AddDbMessage(pbMsg *protomsg.DbChangeMessage) { bts, _ := json.Marshal(*pbMsg) s.bk.Publish(ProcName, bts) } func (s ClusterService) FindIpByNode(nodeId string) (string, error) { var lc models.Node return lc.FindIpByNode(nodeId) } func ClusterSyncProcMessage(payload []byte) { if serf.Agent == nil { logger.Error("未加入集群") return } err := serf.Agent.UserEvent(serf.UserEventSyncMessage, payload, false) if err != nil { logger.Error("UserEventSyncMessage err:", err) } }