From 2cd1af13bc4e7aec4c85b9fe88db2d294af6468f Mon Sep 17 00:00:00 2001 From: zhangzengfei <zhangzengfei@smartai.com> Date: 星期日, 08 十月 2023 11:24:37 +0800 Subject: [PATCH] 修复集群同步功能 --- system-service/service/clusterService.go | 57 ++++++++++++++++++++++++++++++++++----------------------- 1 files changed, 34 insertions(+), 23 deletions(-) diff --git a/system-service/service/clusterService.go b/system-service/service/clusterService.go index afe172b..2f978ef 100644 --- a/system-service/service/clusterService.go +++ b/system-service/service/clusterService.go @@ -112,19 +112,21 @@ arr, err := clusterE.FindAll() if err == nil && (arr == nil || len(arr) == 0) { - b := clusterE.Create() - if b { + err = clusterE.Create() + if err == nil { serf.InitAgent(context.Background()) chMsg := protomsg.DbChangeMessage{ Id: clusterE.ClusterId, Table: protomsg.TableChanged_T_Cluster, Action: protomsg.DbAction_Insert, - Info: "", + Info: "create", } s.AddDbMessage(&chMsg) return true, clusterId + } else { + logger.Error("鍒濆鍖栭泦缇ゆ暟鎹簱淇℃伅澶辫触. ", err.Error()) } } else { if s.UpdateClusterName(clusterName, virtualIp) { @@ -201,22 +203,21 @@ agent, err := dbSync.Init(joinArg.ClusterId, joinArg.Password, config.Server.AnalyServerId, joinIps, config.ClusterSet.SerfSnapShotPath, conf) if err == nil && agent != nil { //鍔犲叆鎴愬姛 logger.Debug("AddCluster dbSync.Init success") + agent.RegisterHandleEventFunc(serf.HandleSerfEvent) serf.Agent = agent t := time.Now() syncTableDataFromCluster(joinArg) logger.Debugf("AddCluster time=%v", time.Since(t)) //if syncB { - // //闇�瑕侀噸鏂板垵濮嬪寲鏈湴姣斿杩涚▼ - // go serf.ReInitDbPersonCompareData() - // chMsg := protomsg.DbChangeMessage{ - // Id: joinArg.ClusterId, - // Table: protomsg.TableChanged_T_Cluster, - // Action: protomsg.DbAction_Insert, - // Info: "", - // } - // - // s.AddDbMessage(&chMsg) + chMsg := protomsg.DbChangeMessage{ + Id: joinArg.ClusterId, + Table: protomsg.TableChanged_T_Cluster, + Action: protomsg.DbAction_Insert, + Info: "join", + } + + s.AddDbMessage(&chMsg) logger.Debugf("AddCluster 鍔犲叆闆嗙兢鎴愬姛 time=%v", time.Since(start)) return true, nil //} else { @@ -332,7 +333,7 @@ Id: "", Table: protomsg.TableChanged_T_Cluster, Action: protomsg.DbAction_Delete, - Info: "", + Info: "leave", } logger.Debugf("Leave delete db time=%v", time.Since(t)) tm := time.Now() @@ -341,6 +342,7 @@ } logger.Debugf("Leave success time=%v", time.Since(start)) + return true, nil } @@ -374,6 +376,7 @@ db := models.GetDB() db.LogMode(false) defer db.LogMode(true) + tx := db.Begin() defer func() { if err != nil && tx != nil { @@ -385,16 +388,12 @@ tx.Exec("PRAGMA foreign_keys=OFF") //1.鍒犻櫎鏈湴鐨勫悓姝ュ簱鏁版嵁 for _, t := range serf.SyncTables { - delSql := "" - if t == "dbtables" { - delSql = "delete from " + t + " where (analyServerId='' or analyServerId=NULL)" - } else if t == "dbtablepersons" { - delSql = "delete from " + t + " where tableId in (select id from dbtables where (analyServerId='' or analyServerId=NULL))" - } else { - delSql = "delete from " + t + "" - } + delSql := "delete from " + t + "" + err = tx.Exec(delSql).Error if err != nil { + logger.Error("鍒犻櫎鏈湴鐨勫悓姝ュ簱鏁版嵁澶辫触,", err.Error()) + logger.Error("sql:", delSql) return false } } @@ -404,8 +403,9 @@ dumpSqls, err = serf.GetTableDataFromCluster(serf.Agent, joinArg.ClusterId, serf.SyncTables, 20*time.Second) if dumpSqls != nil && len(*dumpSqls) > 0 { for _, sqlStr := range *dumpSqls { - logger.Debug("gorm exec dumpSql:", sqlStr) + //logger.Debug("gorm exec dumpSql:", sqlStr) if err = tx.Exec(sqlStr).Error; err != nil { + logger.Error("gorm exec dumpSql:", sqlStr, " error:", err.Error()) return false } } @@ -457,3 +457,14 @@ var lc models.Node return lc.FindIpByNode(nodeId) } + +func ClusterSyncProcMessage(payload []byte) { + if serf.Agent == nil { + logger.Error("鏈姞鍏ラ泦缇�") + } + + err := serf.Agent.UserEvent(serf.UserEventSyncMessage, payload, false) + if err != nil { + logger.Error("UserEventSyncMessage err:", err) + } +} -- Gitblit v1.8.0