From daac628c64069633787588372dea22499ac35396 Mon Sep 17 00:00:00 2001 From: zhangzengfei <zhangzengfei@smartai.com> Date: 星期三, 18 十月 2023 16:52:56 +0800 Subject: [PATCH] 修复集群功能 --- system-service/service/clusterService.go | 56 +++++++++++++++++++++++++++++++++----------------------- 1 files changed, 33 insertions(+), 23 deletions(-) diff --git a/system-service/service/clusterService.go b/system-service/service/clusterService.go index 2f978ef..feffd61 100644 --- a/system-service/service/clusterService.go +++ b/system-service/service/clusterService.go @@ -3,6 +3,7 @@ import ( "context" "encoding/json" + "fmt" "strconv" sysSync "sync" "time" @@ -190,6 +191,7 @@ logger.Debug("AddCluster JoinCluster len(joinIps)=0") return false, errors.New("鍔犲叆鐨勭洰鏍噄p涓嶈兘涓虹┖") } + logger.Debug("AddCluster joinIps:", joinIps) joinArg.Password = config.ClusterSet.PwdPre + joinArg.Password @@ -207,29 +209,29 @@ serf.Agent = agent t := time.Now() - syncTableDataFromCluster(joinArg) + syncClusterNodes := syncTableDataFromCluster(joinArg) logger.Debugf("AddCluster time=%v", time.Since(t)) - //if syncB { - chMsg := protomsg.DbChangeMessage{ - Id: joinArg.ClusterId, - Table: protomsg.TableChanged_T_Cluster, - Action: protomsg.DbAction_Insert, - Info: "join", - } + if syncClusterNodes { + chMsg := protomsg.DbChangeMessage{ + Id: joinArg.ClusterId, + Table: protomsg.TableChanged_T_Cluster, + Action: protomsg.DbAction_Insert, + Info: "join", + } - s.AddDbMessage(&chMsg) - logger.Debugf("AddCluster 鍔犲叆闆嗙兢鎴愬姛 time=%v", time.Since(start)) - return true, nil - //} else { - // logger.Debug("AddCluster syncTableDataFromCluster fail") - // if agent != nil { - // agent.Leave() - // err = agent.Shutdown() - // if err != nil { - // logger.Debug("AddCluster agent shutdown err:", err) - // } - // } - //} + s.AddDbMessage(&chMsg) + logger.Debugf("AddCluster 鍔犲叆闆嗙兢鎴愬姛 time=%v", time.Since(start)) + return true, nil + } else { + logger.Debug("AddCluster syncTableDataFromCluster fail") + agent.Leave() + err = agent.Shutdown() + if err != nil { + logger.Debug("AddCluster agent shutdown err:", err) + } + + return false, errors.New("鍔犲叆闆嗙兢澶辫触") + } } else { logger.Debug("AddCluster dbSync.Init err:", err) if agent != nil { @@ -239,7 +241,9 @@ } } + logger.Debugf("AddCluster 鍔犲叆闆嗙兢澶辫触, targetIp=%v, time=%v", targetIp, time.Since(start)) + return false, errors.New("鍔犲叆闆嗙兢澶辫触") } @@ -403,7 +407,7 @@ dumpSqls, err = serf.GetTableDataFromCluster(serf.Agent, joinArg.ClusterId, serf.SyncTables, 20*time.Second) if dumpSqls != nil && len(*dumpSqls) > 0 { for _, sqlStr := range *dumpSqls { - //logger.Debug("gorm exec dumpSql:", sqlStr) + logger.Debug("gorm exec dumpSql:", sqlStr) if err = tx.Exec(sqlStr).Error; err != nil { logger.Error("gorm exec dumpSql:", sqlStr, " error:", err.Error()) return false @@ -423,8 +427,11 @@ serverIp, _, e1 := util.GetLocalIP(config.Server.NetworkAdapter) if e1 != nil || serverIp == "" { err = errors.New("get serverIp err") + + logger.Error("get serverIp err") return false } + logger.Debug("cur Node serverIp:", serverIp, "nodeName:", nodeName) if nodeName == "" { nodeName = serverIp @@ -436,7 +443,9 @@ logger.Debug("add cur node err:", err) return false } - if err = tx.Exec("update cluster_node set isDelete=0 where id='" + config.Server.AnalyServerId + "'").Error; err != nil { + + joinSql := fmt.Sprintf("update cluster_node set isDelete=0,drift_state='',node_ip='%s',create_time='%s' where node_id='%s'", serverIp+":"+strconv.Itoa(dbSync.DefaultBindPort), fmtTimeStr, config.Server.AnalyServerId) + if err = tx.Exec(joinSql).Error; err != nil { logger.Debug("update isDelete err:", err) return false } @@ -445,6 +454,7 @@ tx.Exec("PRAGMA foreign_keys=ON") tx.Commit() serf.SyncSql([]string{sql}) + return true } -- Gitblit v1.8.0