package service
|
|
import (
|
dbSync "basic.com/syncdb.git"
|
"dbserver/extend/config"
|
"dbserver/extend/logger"
|
"dbserver/extend/util"
|
"dbserver/models"
|
"dbserver/vo"
|
"github.com/hashicorp/memberlist"
|
"github.com/pkg/errors"
|
"github.com/satori/go.uuid"
|
sysSync "sync"
|
"time"
|
)
|
|
type ClusterService struct {
|
|
}
|
|
var clusterSearchKey = "clusterSearchKey"
|
var searchMap = make(map[string]*memberlist.Memberlist,0)
|
var lock sysSync.Mutex
|
|
func set2SearchMap(memList *memberlist.Memberlist) {
|
lock.Lock()
|
defer lock.Unlock()
|
searchMap[clusterSearchKey] = memList
|
}
|
|
func getFromSearchMap() (*memberlist.Memberlist,bool) {
|
lock.Lock()
|
defer lock.Unlock()
|
if v,ok := searchMap[clusterSearchKey];ok{
|
return v,true
|
} else {
|
return nil,false
|
}
|
}
|
|
func clearSearchResult(ml *memberlist.Memberlist) {
|
time.Sleep(10 * time.Second)
|
lock.Lock()
|
defer lock.Unlock()
|
if _,ok := searchMap[clusterSearchKey];ok {
|
dbSync.CloseSearchNode(ml)
|
delete(searchMap,clusterSearchKey)
|
}
|
}
|
|
func deleteFromSearchMap() {
|
lock.Lock()
|
defer lock.Unlock()
|
if _,ok := searchMap[clusterSearchKey];ok {
|
delete(searchMap,clusterSearchKey)
|
}
|
}
|
|
func (s ClusterService) FindAll() (arr []models.Cluster, err error) {
|
var clusterE models.Cluster
|
return clusterE.FindAll()
|
}
|
|
func (s ClusterService) FindNodesByClusterId(clusterId string) (nodes []models.Node, err error){
|
var nodeE models.Node
|
return nodeE.FindNodesByClusterId(clusterId)
|
}
|
|
//根据集群名称和密码创建集群
|
func (s ClusterService) Create(clusterName string, pwd string) (bool,string) {
|
clusterId := uuid.NewV4().String()
|
pwd = config.ClusterSet.PwdPre + pwd
|
var clusterE = models.Cluster{
|
ClusterId: clusterId,
|
ClusterName: clusterName,
|
Password: pwd,
|
}
|
|
arr, err := clusterE.FindAll()
|
if err == nil && (arr == nil || len(arr)==0){
|
|
b := clusterE.Create()
|
if b {
|
models.InitAgent()
|
return true,clusterId
|
}
|
}
|
|
return false,""
|
}
|
|
func (s ClusterService) SearchByPwd(pwd string) (err error){
|
_, isSearching := getFromSearchMap()
|
if isSearching {
|
return errors.New("other is searching,please wait")
|
}
|
pwd = config.ClusterSet.PwdPre + pwd
|
ml, e := dbSync.CreateSearchNode(pwd)
|
if e != nil {
|
logger.Debug("CreateSearchNode err:", e)
|
return errors.New("createSearchNode err")
|
}
|
|
set2SearchMap(ml)
|
|
go clearSearchResult(ml)
|
|
return nil
|
}
|
|
func (s ClusterService) SearchNodes() map[string]dbSync.NodeInfo{
|
return dbSync.GetSearchNodes()
|
}
|
|
func (s ClusterService) StopSearching() bool{
|
ml,_ := getFromSearchMap()
|
if ml !=nil {
|
dbSync.CloseSearchNode(ml)
|
deleteFromSearchMap()
|
return true
|
} else {
|
return true
|
}
|
}
|
|
//加入集群
|
func (s ClusterService) JoinCluster(joinArg *vo.ClusterJoinVo) bool {
|
var lc models.LocalConfig
|
e := lc.Select()
|
if e !=nil || lc.ServerId == "" {
|
logger.Debug("JoinCluster lc.ServerId")
|
return false
|
}
|
var joinIps []string
|
for _,ipStr :=range joinArg.NodeIps {
|
if ip,b := util.IpCheck(ipStr);b {
|
joinIps = append(joinIps, ip)
|
}
|
}
|
if len(joinIps) == 0 {
|
return false
|
}
|
joinArg.Password = config.ClusterSet.PwdPre + joinArg.Password
|
agent, err := dbSync.Init(joinArg.ClusterId, joinArg.Password, lc.ServerId, joinIps)
|
if err ==nil && agent !=nil {//加入成功
|
models.Agent = agent
|
|
if syncTableDataFromCluster(joinArg,lc){
|
logger.Debug("加入集群成功!!!")
|
return true
|
}
|
} else {
|
if agent != nil {
|
err = agent.Shutdown()
|
if err !=nil {
|
logger.Debug("dbSync.Init err,shutdown err:",err)
|
}
|
}
|
}
|
return false
|
}
|
|
func (s ClusterService) UpdateClusterName(clusterName string) bool {
|
var cE models.Cluster
|
return cE.UpdateClusterName(clusterName)
|
}
|
|
func (s ClusterService) Leave() bool {
|
var err error
|
tx := models.GetDB().Begin()
|
defer func() {
|
if err !=nil && tx !=nil {
|
tx.Rollback()
|
}
|
}()
|
|
if models.Agent !=nil {
|
err = models.Agent.Leave()
|
if err !=nil {
|
logger.Debug("cluster leave err")
|
return false
|
}
|
models.Agent.Shutdown()
|
models.Agent = nil
|
err = tx.Exec("delete from cluster_node").Error
|
if err!=nil {
|
return false
|
}
|
err = tx.Exec("delete from cluster").Error
|
if err!=nil {
|
return false
|
}
|
tx.Commit()
|
return true
|
}
|
return false
|
}
|
|
func (s ClusterService) TestSyncSql() bool {
|
var lc models.LocalConfig
|
lc.Select()
|
|
timeUnix := time.Now().Unix()
|
fmtTimeStr := time.Unix(timeUnix, 0).Format("2006-01-02 15:04:05")
|
serverIp, _, e := util.GetLocalIP(config.Server.NetworkAdapter)
|
if e ==nil && serverIp !="" {
|
sql := "insert into cluster_node(id,cluster_id,node_name,node_id,node_ip,create_time) values ('" + uuid.NewV4().String() + "','" + uuid.NewV4().String() + "','" + lc.ServerName + "','" + lc.ServerId + "','" + serverIp + "','" + fmtTimeStr + "')"
|
if err := models.GetDB().Exec(sql).Error;err !=nil {
|
return false
|
}
|
return true
|
}
|
return false
|
}
|
|
//加入集群后清空本地的同步库数据
|
func syncTableDataFromCluster(joinArg *vo.ClusterJoinVo, lc models.LocalConfig) bool {
|
var err error
|
db := models.GetDB()
|
db.LogMode(false)
|
defer db.LogMode(true)
|
tx := db.Begin()
|
defer func() {
|
if err !=nil && tx !=nil {
|
tx.Rollback()
|
}
|
}()
|
//0.关闭reference
|
tx.Exec("PRAGMA foreign_keys=OFF")
|
//1.删除本地的同步库数据
|
for _,t :=range models.SyncTables {
|
err = tx.Exec("delete from "+t+"").Error
|
if err !=nil {
|
return false
|
}
|
}
|
//2.拉取集群内的同步库数据到本地数据库表中
|
var dumpSqls *[]string
|
dumpSqls,err = models.Agent.GetTableDataFromCluster(models.SyncTables)
|
if dumpSqls !=nil {
|
for _,sqlStr := range *dumpSqls {
|
logger.Debug("gorm exec dumpSql:",sqlStr)
|
if err = tx.Exec(sqlStr).Error;err !=nil {
|
return false
|
}
|
}
|
}
|
|
logger.Debug("成功添加当前节点到集群节点中")
|
|
//3.将本节点加入到节点列表中
|
timeUnix := time.Now().Unix()
|
fmtTimeStr := time.Unix(timeUnix, 0).Format("2006-01-02 15:04:05")
|
serverIp, _, err := util.GetLocalIP(config.Server.NetworkAdapter)
|
if err !=nil || serverIp == "" {
|
return false
|
}
|
logger.Debug("cur Node serverIp:",serverIp)
|
|
//添加本身节点
|
sql := "insert into cluster_node(id,cluster_id,node_name,node_id,node_ip,create_time) values ('"+lc.ServerId+"','"+joinArg.ClusterId+"','"+lc.ServerName+"','"+lc.ServerId+"','"+serverIp+"','"+fmtTimeStr+"')"
|
if err = tx.Exec(sql).Error;err !=nil {
|
return false
|
}
|
models.Agent.SyncSql([]string{ sql })
|
|
|
//4.开启reference
|
tx.Exec("PRAGMA foreign_keys=ON")
|
tx.Commit()
|
return true
|
}
|