| | |
| | | arr, err := clusterE.FindAll() |
| | | if err == nil && (arr == nil || len(arr) == 0) { |
| | | |
| | | b := clusterE.Create() |
| | | if b { |
| | | err = clusterE.Create() |
| | | if err == nil { |
| | | serf.InitAgent(context.Background()) |
| | | |
| | | chMsg := protomsg.DbChangeMessage{ |
| | | Id: clusterE.ClusterId, |
| | | Table: protomsg.TableChanged_T_Cluster, |
| | | Action: protomsg.DbAction_Insert, |
| | | Info: "", |
| | | Info: "create", |
| | | } |
| | | s.AddDbMessage(&chMsg) |
| | | |
| | | return true, clusterId |
| | | } else { |
| | | logger.Error("初始化集群数据库信息失败. ", err.Error()) |
| | | } |
| | | } else { |
| | | if s.UpdateClusterName(clusterName, virtualIp) { |
| | |
| | | agent, err := dbSync.Init(joinArg.ClusterId, joinArg.Password, config.Server.AnalyServerId, joinIps, config.ClusterSet.SerfSnapShotPath, conf) |
| | | if err == nil && agent != nil { //加入成功 |
| | | logger.Debug("AddCluster dbSync.Init success") |
| | | agent.RegisterHandleEventFunc(serf.HandleSerfEvent) |
| | | serf.Agent = agent |
| | | |
| | | t := time.Now() |
| | | syncTableDataFromCluster(joinArg) |
| | | logger.Debugf("AddCluster time=%v", time.Since(t)) |
| | | //if syncB { |
| | | // //需要重新初始化本地比对进程 |
| | | // go serf.ReInitDbPersonCompareData() |
| | | // chMsg := protomsg.DbChangeMessage{ |
| | | // Id: joinArg.ClusterId, |
| | | // Table: protomsg.TableChanged_T_Cluster, |
| | | // Action: protomsg.DbAction_Insert, |
| | | // Info: "", |
| | | // } |
| | | // |
| | | // s.AddDbMessage(&chMsg) |
| | | chMsg := protomsg.DbChangeMessage{ |
| | | Id: joinArg.ClusterId, |
| | | Table: protomsg.TableChanged_T_Cluster, |
| | | Action: protomsg.DbAction_Insert, |
| | | Info: "join", |
| | | } |
| | | |
| | | s.AddDbMessage(&chMsg) |
| | | logger.Debugf("AddCluster 加入集群成功 time=%v", time.Since(start)) |
| | | return true, nil |
| | | //} else { |
| | |
| | | Id: "", |
| | | Table: protomsg.TableChanged_T_Cluster, |
| | | Action: protomsg.DbAction_Delete, |
| | | Info: "", |
| | | Info: "leave", |
| | | } |
| | | logger.Debugf("Leave delete db time=%v", time.Since(t)) |
| | | tm := time.Now() |
| | |
| | | } |
| | | |
| | | logger.Debugf("Leave success time=%v", time.Since(start)) |
| | | |
| | | return true, nil |
| | | } |
| | | |
| | |
| | | db := models.GetDB() |
| | | db.LogMode(false) |
| | | defer db.LogMode(true) |
| | | |
| | | tx := db.Begin() |
| | | defer func() { |
| | | if err != nil && tx != nil { |
| | |
| | | tx.Exec("PRAGMA foreign_keys=OFF") |
| | | //1.删除本地的同步库数据 |
| | | for _, t := range serf.SyncTables { |
| | | delSql := "" |
| | | if t == "dbtables" { |
| | | delSql = "delete from " + t + " where (analyServerId='' or analyServerId=NULL)" |
| | | } else if t == "dbtablepersons" { |
| | | delSql = "delete from " + t + " where tableId in (select id from dbtables where (analyServerId='' or analyServerId=NULL))" |
| | | } else { |
| | | delSql = "delete from " + t + "" |
| | | } |
| | | delSql := "delete from " + t + "" |
| | | |
| | | err = tx.Exec(delSql).Error |
| | | if err != nil { |
| | | logger.Error("删除本地的同步库数据失败,", err.Error()) |
| | | logger.Error("sql:", delSql) |
| | | return false |
| | | } |
| | | } |
| | |
| | | dumpSqls, err = serf.GetTableDataFromCluster(serf.Agent, joinArg.ClusterId, serf.SyncTables, 20*time.Second) |
| | | if dumpSqls != nil && len(*dumpSqls) > 0 { |
| | | for _, sqlStr := range *dumpSqls { |
| | | logger.Debug("gorm exec dumpSql:", sqlStr) |
| | | //logger.Debug("gorm exec dumpSql:", sqlStr) |
| | | if err = tx.Exec(sqlStr).Error; err != nil { |
| | | logger.Error("gorm exec dumpSql:", sqlStr, " error:", err.Error()) |
| | | return false |
| | | } |
| | | } |
| | |
| | | var lc models.Node |
| | | return lc.FindIpByNode(nodeId) |
| | | } |
| | | |
| | | func ClusterSyncProcMessage(payload []byte) { |
| | | if serf.Agent == nil { |
| | | logger.Error("未加入集群") |
| | | } |
| | | |
| | | err := serf.Agent.UserEvent(serf.UserEventSyncMessage, payload, false) |
| | | if err != nil { |
| | | logger.Error("UserEventSyncMessage err:", err) |
| | | } |
| | | } |