From 3a706d3378aa3626501370352963883fd2783558 Mon Sep 17 00:00:00 2001 From: zhangzengfei <zhangzengfei@smartai.com> Date: 星期二, 28 十一月 2023 11:24:49 +0800 Subject: [PATCH] 添加appcenter,完善算法上传下载功能 --- system-service/service/clusterService.go | 159 ++++++++++++++++++++++++++-------------------------- 1 files changed, 79 insertions(+), 80 deletions(-) diff --git a/system-service/service/clusterService.go b/system-service/service/clusterService.go index 07a9928..a39741f 100644 --- a/system-service/service/clusterService.go +++ b/system-service/service/clusterService.go @@ -3,6 +3,7 @@ import ( "context" "encoding/json" + "fmt" "strconv" sysSync "sync" "time" @@ -98,12 +99,8 @@ return m } -//鏍规嵁闆嗙兢鍚嶇О鍜屽瘑鐮佸垱寤洪泦缇� +// 鏍规嵁闆嗙兢鍚嶇О鍜屽瘑鐮佸垱寤洪泦缇� func (s ClusterService) Create(clusterName string, pwd string, virtualIp string) (bool, string) { - devTyp := GetDevType(config.Server.DeviceType) - if devTyp != DevType_Storage && devTyp != DevType_Analysis_Storage { - return false, "鍙湁鍒嗘瀽鍜屽瓨鍌ㄨ妭鐐规墠鑳藉垱寤洪泦缇�" - } clusterId := uuid.NewV4().String() pwd = config.ClusterSet.PwdPre + pwd var clusterE = models.Cluster{ @@ -116,32 +113,21 @@ arr, err := clusterE.FindAll() if err == nil && (arr == nil || len(arr) == 0) { - b := clusterE.Create() - if b { + err = clusterE.Create() + if err == nil { serf.InitAgent(context.Background()) - - //鍒涘缓es闆嗙兢 - if _, err := CreateOriginalCluster("/opt/elasticsearch/config", "/opt/vasystem/script", "/opt/vasystem/indexInit"); err != nil { - //鍒涘缓澶辫触锛屽垯serf闆嗙兢涔熻鎺ㄥ嚭 - s.Leave(false) - return false, err.Error() - } - - //鍒涘缓weedfs闆嗙兢 - if _, err = CreateWeedfsServer("/opt/vasystem/script"); err != nil { - s.Leave(false) - return false, err.Error() - } chMsg := protomsg.DbChangeMessage{ Id: clusterE.ClusterId, Table: protomsg.TableChanged_T_Cluster, Action: protomsg.DbAction_Insert, - Info: "", + Info: "create", } s.AddDbMessage(&chMsg) return true, clusterId + } else { + logger.Error("鍒濆鍖栭泦缇ゆ暟鎹簱淇℃伅澶辫触. ", err.Error()) } } else { if s.UpdateClusterName(clusterName, virtualIp) { @@ -150,6 +136,31 @@ } return false, "" +} + +// 鏍规嵁闆嗙兢鍚嶇О鍜屽瘑鐮佸垱寤洪泦缇� +func (s ClusterService) UpdateDriftStateByNodeId(clusterId, nodeId, role string) (bool, string) { + var node models.Node + isSuccess := node.UpdateDriftStateByNodeId(role, nodeId, false) + + if isSuccess { + // 閫氱煡涓昏妭鐐瑰彉鏇� + chMsg := protomsg.DbChangeMessage{ + Id: clusterId, + Table: protomsg.TableChanged_T_Cluster, + Action: protomsg.DbAction_Insert, + Info: "slave2master", + } + + s.AddDbMessage(&chMsg) + + err := serf.Agent.UserEvent(serf.UserEventChangeMaster, []byte(nodeId), false) + if err != nil { + logger.Error("UserEventSyncVirtualIp err:", err) + } + } + + return isSuccess, "" } func (s ClusterService) SearchByPwd(pwd string) (err error) { @@ -186,7 +197,7 @@ } } -//鍔犲叆闆嗙兢 +// 鍔犲叆闆嗙兢 func (s ClusterService) JoinCluster(joinArg *vo.ClusterJoinVo) (bool, error) { start := time.Now() if config.Server.AnalyServerId == "" { @@ -205,6 +216,7 @@ logger.Debug("AddCluster JoinCluster len(joinIps)=0") return false, errors.New("鍔犲叆鐨勭洰鏍噄p涓嶈兘涓虹┖") } + logger.Debug("AddCluster joinIps:", joinIps) joinArg.Password = config.ClusterSet.PwdPre + joinArg.Password @@ -218,49 +230,32 @@ agent, err := dbSync.Init(joinArg.ClusterId, joinArg.Password, config.Server.AnalyServerId, joinIps, config.ClusterSet.SerfSnapShotPath, conf) if err == nil && agent != nil { //鍔犲叆鎴愬姛 logger.Debug("AddCluster dbSync.Init success") + agent.RegisterHandleEventFunc(serf.HandleSerfEvent) serf.Agent = agent t := time.Now() - syncB := syncTableDataFromCluster(joinArg) - logger.Debugf("AddCluster syncTableDataFromCluster time=%v, syncB=%v", time.Since(t), syncB) - if syncB { - devTyp := GetDevType(config.Server.DeviceType) - if devTyp == DevType_Analysis_Storage || devTyp == DevType_Storage { //鍙湁鑳藉惎鍔‥S鐨勮妭鐐规墠鑳藉姞鍏S - go func() { - //娣诲姞ES鑺傜偣 - if _, err := AddCluster("/opt/elasticsearch/config", "/opt/vasystem/script", targetIp, "9200", config.EsConfig.StorePath); err != nil { - s.Leave(false) - return - } - - //娣诲姞weedfs鑺傜偣 - if _, err := AddWeedfsServer("/opt/vasystem/script", targetIp, config.StorageConf.VolumePath); err != nil { - s.Leave(false) - return - } - }() - } - - //闇�瑕侀噸鏂板垵濮嬪寲鏈湴姣斿杩涚▼ - go serf.ReInitDbPersonCompareData() + syncClusterNodes := syncTableDataFromCluster(joinArg) + logger.Debugf("AddCluster time=%v", time.Since(t)) + if syncClusterNodes { chMsg := protomsg.DbChangeMessage{ Id: joinArg.ClusterId, Table: protomsg.TableChanged_T_Cluster, Action: protomsg.DbAction_Insert, - Info: "", + Info: "join", } + s.AddDbMessage(&chMsg) logger.Debugf("AddCluster 鍔犲叆闆嗙兢鎴愬姛 time=%v", time.Since(start)) return true, nil } else { logger.Debug("AddCluster syncTableDataFromCluster fail") - if agent != nil { - agent.Leave() - err = agent.Shutdown() - if err != nil { - logger.Debug("AddCluster agent shutdown err:", err) - } + agent.Leave() + err = agent.Shutdown() + if err != nil { + logger.Debug("AddCluster agent shutdown err:", err) } + + return false, errors.New("鍔犲叆闆嗙兢澶辫触") } } else { logger.Debug("AddCluster dbSync.Init err:", err) @@ -271,7 +266,9 @@ } } + logger.Debugf("AddCluster 鍔犲叆闆嗙兢澶辫触, targetIp=%v, time=%v", targetIp, time.Since(start)) + return false, errors.New("鍔犲叆闆嗙兢澶辫触") } @@ -282,7 +279,7 @@ DevType_Other = "other" //鍏朵粬璁惧绫诲瀷锛宔g锛氬簲鐢� ) -//鑾峰彇鏈満绫诲瀷锛屽彧杩涜鍒嗘瀽銆佸垎鏋愬瓨鍌ㄤ竴浣撱�佸彧瀛樺偍銆佸叾浠栥�傘�� +// 鑾峰彇鏈満绫诲瀷锛屽彧杩涜鍒嗘瀽銆佸垎鏋愬瓨鍌ㄤ竴浣撱�佸彧瀛樺偍銆佸叾浠栥�傘�� func GetDevType(dt string) string { if dt != "" { if len(dt) >= 4 { @@ -336,23 +333,6 @@ func (s ClusterService) Leave(isDel bool) (bool, error) { start := time.Now() - devType := GetDevType(config.Server.DeviceType) - logger.Debugf("Leave GetDevType=%v, isDel=%v", devType, isDel) - - if devType == DevType_Analysis_Storage || devType == DevType_Storage { //鍙湁鍒嗘瀽鍜屽瓨鍌ㄦ墠鏈塃S闆嗙兢 - go func() { - //閫�鍑簑eedfs鑺傜偣 - if _, err := ExitWeedfsServer("/opt/vasystem/script", isDel, config.StorageConf.VolumePath); err != nil { - logger.Error("Leave ExitWeedfsServer err:", err) - return - } - //閫�鍑篹s鑺傜偣 - if _, err := ExitCluster("/opt/elasticsearch/config", "/opt/vasystem/script"); err != nil { - logger.Error("Leave ExitCluster es cluster err:", err) - return - } - }() - } var err error tx := models.GetDB().Begin() @@ -382,7 +362,7 @@ Id: "", Table: protomsg.TableChanged_T_Cluster, Action: protomsg.DbAction_Delete, - Info: "", + Info: "leave", } logger.Debugf("Leave delete db time=%v", time.Since(t)) tm := time.Now() @@ -391,6 +371,7 @@ } logger.Debugf("Leave success time=%v", time.Since(start)) + return true, nil } @@ -411,7 +392,7 @@ return false } -//鍔犲叆闆嗙兢鍚庢竻绌烘湰鍦扮殑鍚屾搴撴暟鎹� +// 鍔犲叆闆嗙兢鍚庢竻绌烘湰鍦扮殑鍚屾搴撴暟鎹� func syncTableDataFromCluster(joinArg *vo.ClusterJoinVo) bool { var lc models.LocalConfig e := lc.Select() @@ -424,29 +405,28 @@ db := models.GetDB() db.LogMode(false) defer db.LogMode(true) + tx := db.Begin() defer func() { if err != nil && tx != nil { tx.Rollback() } }() + //0.鍏抽棴reference tx.Exec("PRAGMA foreign_keys=OFF") //1.鍒犻櫎鏈湴鐨勫悓姝ュ簱鏁版嵁 for _, t := range serf.SyncTables { - delSql := "" - if t == "dbtables" { - delSql = "delete from " + t + " where (analyServerId='' or analyServerId=NULL)" - } else if t == "dbtablepersons" { - delSql = "delete from " + t + " where tableId in (select id from dbtables where (analyServerId='' or analyServerId=NULL))" - } else { - delSql = "delete from " + t + "" - } + delSql := "delete from " + t + "" + err = tx.Exec(delSql).Error if err != nil { + logger.Error("鍒犻櫎鏈湴鐨勫悓姝ュ簱鏁版嵁澶辫触,", err.Error()) + logger.Error("sql:", delSql) return false } } + //2.鎷夊彇闆嗙兢鍐呯殑鍚屾搴撴暟鎹埌鏈湴鏁版嵁搴撹〃涓� var dumpSqls *[]string dumpSqls, err = serf.GetTableDataFromCluster(serf.Agent, joinArg.ClusterId, serf.SyncTables, 20*time.Second) @@ -454,6 +434,7 @@ for _, sqlStr := range *dumpSqls { logger.Debug("gorm exec dumpSql:", sqlStr) if err = tx.Exec(sqlStr).Error; err != nil { + logger.Error("gorm exec dumpSql:", sqlStr, " error:", err.Error()) return false } } @@ -471,8 +452,11 @@ serverIp, _, e1 := util.GetLocalIP(config.Server.NetworkAdapter) if e1 != nil || serverIp == "" { err = errors.New("get serverIp err") + + logger.Error("get serverIp err") return false } + logger.Debug("cur Node serverIp:", serverIp, "nodeName:", nodeName) if nodeName == "" { nodeName = serverIp @@ -484,7 +468,9 @@ logger.Debug("add cur node err:", err) return false } - if err = tx.Exec("update cluster_node set isDelete=0 where id='" + config.Server.AnalyServerId + "'").Error; err != nil { + + joinSql := fmt.Sprintf("update cluster_node set isDelete=0,drift_state='slave',node_ip='%s',create_time='%s' where node_id='%s'", serverIp+":"+strconv.Itoa(dbSync.DefaultBindPort), fmtTimeStr, config.Server.AnalyServerId) + if err = tx.Exec(joinSql).Error; err != nil { logger.Debug("update isDelete err:", err) return false } @@ -493,6 +479,7 @@ tx.Exec("PRAGMA foreign_keys=ON") tx.Commit() serf.SyncSql([]string{sql}) + return true } @@ -505,3 +492,15 @@ var lc models.Node return lc.FindIpByNode(nodeId) } + +func ClusterSyncProcMessage(payload []byte) { + if serf.Agent == nil { + logger.Error("鏈姞鍏ラ泦缇�") + return + } + + err := serf.Agent.UserEvent(serf.UserEventSyncMessage, payload, false) + if err != nil { + logger.Error("UserEventSyncMessage err:", err) + } +} -- Gitblit v1.8.0