package service import ( dbSync "basic.com/syncdb.git" "dbserver/extend/config" "dbserver/extend/logger" "dbserver/extend/util" "dbserver/models" "dbserver/vo" "github.com/hashicorp/memberlist" "github.com/pkg/errors" "github.com/satori/go.uuid" sysSync "sync" "time" ) type ClusterService struct { } var clusterSearchKey = "clusterSearchKey" var searchMap = make(map[string]*memberlist.Memberlist,0) var lock sysSync.Mutex func set2SearchMap(memList *memberlist.Memberlist) { lock.Lock() defer lock.Unlock() searchMap[clusterSearchKey] = memList } func getFromSearchMap() (*memberlist.Memberlist,bool) { lock.Lock() defer lock.Unlock() if v,ok := searchMap[clusterSearchKey];ok{ return v,true } else { return nil,false } } func clearSearchResult(ml *memberlist.Memberlist) { time.Sleep(10 * time.Second) lock.Lock() defer lock.Unlock() if _,ok := searchMap[clusterSearchKey];ok { dbSync.CloseSearchNode(ml) delete(searchMap,clusterSearchKey) } } func deleteFromSearchMap() { lock.Lock() defer lock.Unlock() if _,ok := searchMap[clusterSearchKey];ok { delete(searchMap,clusterSearchKey) } } func (s ClusterService) FindAll() (arr []models.Cluster, err error) { var clusterE models.Cluster return clusterE.FindAll() } func (s ClusterService) FindNodesByClusterId(clusterId string) (nodes []models.Node, err error){ var nodeE models.Node return nodeE.FindNodesByClusterId(clusterId) } //根据集群名称和密码创建集群 func (s ClusterService) Create(clusterName string, pwd string) (bool,string) { clusterId := uuid.NewV4().String() pwd = config.ClusterSet.PwdPre + pwd var clusterE = models.Cluster{ ClusterId: clusterId, ClusterName: clusterName, Password: pwd, } arr, err := clusterE.FindAll() if err == nil && (arr == nil || len(arr)==0){ b := clusterE.Create() if b { models.InitAgent() return true,clusterId } } return false,"" } func (s ClusterService) SearchByPwd(pwd string) (err error){ _, isSearching := getFromSearchMap() if isSearching { return errors.New("other is searching,please wait") } pwd = config.ClusterSet.PwdPre + pwd ml, e := dbSync.CreateSearchNode(pwd) if e != nil { logger.Debug("CreateSearchNode err:", e) return errors.New("createSearchNode err") } set2SearchMap(ml) go clearSearchResult(ml) return nil } func (s ClusterService) SearchNodes() map[string]dbSync.NodeInfo{ return dbSync.GetSearchNodes() } func (s ClusterService) StopSearching() bool{ ml,_ := getFromSearchMap() if ml !=nil { dbSync.CloseSearchNode(ml) deleteFromSearchMap() return true } else { return true } } //加入集群 func (s ClusterService) JoinCluster(joinArg *vo.ClusterJoinVo) bool { var lc models.LocalConfig e := lc.Select() if e !=nil || lc.ServerId == "" { logger.Debug("JoinCluster lc.ServerId") return false } var joinIps []string for _,ipStr :=range joinArg.NodeIps { if ip,b := util.IpCheck(ipStr);b { joinIps = append(joinIps, ip) } } if len(joinIps) == 0 { return false } joinArg.Password = config.ClusterSet.PwdPre + joinArg.Password agent, err := dbSync.Init(joinArg.ClusterId, joinArg.Password, lc.ServerId, joinIps) if err ==nil && agent !=nil {//加入成功 models.Agent = agent if syncTableDataFromCluster(joinArg,lc){ logger.Debug("加入集群成功!!!") return true } } else { if agent != nil { err = agent.Shutdown() if err !=nil { logger.Debug("dbSync.Init err,shutdown err:",err) } } } return false } func (s ClusterService) UpdateClusterName(clusterName string) bool { var cE models.Cluster return cE.UpdateClusterName(clusterName) } func (s ClusterService) Leave() bool { var err error tx := models.GetDB().Begin() defer func() { if err !=nil && tx !=nil { tx.Rollback() } }() if models.Agent !=nil { err = models.Agent.Leave() if err !=nil { logger.Debug("cluster leave err") return false } models.Agent.Shutdown() models.Agent = nil err = tx.Exec("delete from cluster_node").Error if err!=nil { return false } err = tx.Exec("delete from cluster").Error if err!=nil { return false } tx.Commit() return true } return false } func (s ClusterService) TestSyncSql() bool { var lc models.LocalConfig lc.Select() timeUnix := time.Now().Unix() fmtTimeStr := time.Unix(timeUnix, 0).Format("2006-01-02 15:04:05") serverIp, _, e := util.GetLocalIP(config.Server.NetworkAdapter) if e ==nil && serverIp !="" { sql := "insert into cluster_node(id,cluster_id,node_name,node_id,node_ip,create_time) values ('" + uuid.NewV4().String() + "','" + uuid.NewV4().String() + "','" + lc.ServerName + "','" + lc.ServerId + "','" + serverIp + "','" + fmtTimeStr + "')" if err := models.GetDB().Exec(sql).Error;err !=nil { return false } return true } return false } //加入集群后清空本地的同步库数据 func syncTableDataFromCluster(joinArg *vo.ClusterJoinVo, lc models.LocalConfig) bool { var err error db := models.GetDB() db.LogMode(false) defer db.LogMode(true) tx := db.Begin() defer func() { if err !=nil && tx !=nil { tx.Rollback() } }() //0.关闭reference tx.Exec("PRAGMA foreign_keys=OFF") //1.删除本地的同步库数据 for _,t :=range models.SyncTables { err = tx.Exec("delete from "+t+"").Error if err !=nil { return false } } //2.拉取集群内的同步库数据到本地数据库表中 var dumpSqls *[]string dumpSqls,err = models.Agent.GetTableDataFromCluster(models.SyncTables) if dumpSqls !=nil { for _,sqlStr := range *dumpSqls { logger.Debug("gorm exec dumpSql:",sqlStr) if err = tx.Exec(sqlStr).Error;err !=nil { return false } } } logger.Debug("成功添加当前节点到集群节点中") //3.将本节点加入到节点列表中 timeUnix := time.Now().Unix() fmtTimeStr := time.Unix(timeUnix, 0).Format("2006-01-02 15:04:05") serverIp, _, err := util.GetLocalIP(config.Server.NetworkAdapter) if err !=nil || serverIp == "" { return false } logger.Debug("cur Node serverIp:",serverIp) //添加本身节点 sql := "insert into cluster_node(id,cluster_id,node_name,node_id,node_ip,create_time) values ('"+lc.ServerId+"','"+joinArg.ClusterId+"','"+lc.ServerName+"','"+lc.ServerId+"','"+serverIp+"','"+fmtTimeStr+"')" if err = tx.Exec(sql).Error;err !=nil { return false } models.Agent.SyncSql([]string{ sql }) //4.开启reference tx.Exec("PRAGMA foreign_keys=ON") tx.Commit() return true }