From 55aa27a6ad0e012d62dcea2db37528a1b18836fb Mon Sep 17 00:00:00 2001 From: zhangzengfei <zhangzengfei@smartai.com> Date: 星期四, 07 九月 2023 16:44:09 +0800 Subject: [PATCH] 裁剪集群操作的无关动作 --- system-service/serf/serf.go | 4 system-service/controllers/cluster.go | 2 system-service/service/clusterService.go | 110 ++++++++++-------------------------- system-service/main.go | 19 +++-- system-service/models/db.go | 5 + 5 files changed, 48 insertions(+), 92 deletions(-) diff --git a/system-service/controllers/cluster.go b/system-service/controllers/cluster.go index da13d0a..bcc88ec 100644 --- a/system-service/controllers/cluster.go +++ b/system-service/controllers/cluster.go @@ -27,7 +27,7 @@ if arr != nil && len(arr) > 0 { var nodeE models.Node nodes, _ := nodeE.FindNodesByClusterId(arr[0].ClusterId) - logger.Debugf("FindCluster nodes=%v", nodes) + //logger.Debugf("FindCluster nodes=%v", nodes) return &bhomeclient.Reply{Success: true, Data: map[string]interface{}{ "clusterId": arr[0].ClusterId, "clusterName": arr[0].ClusterName, diff --git a/system-service/main.go b/system-service/main.go index eebb524..f95fe17 100644 --- a/system-service/main.go +++ b/system-service/main.go @@ -3,13 +3,12 @@ import ( "context" "flag" - "net/http" _ "net/http/pprof" "os" "os/signal" "syscall" "vamicro/config" - "vamicro/extend/util" + //"vamicro/extend/util" "vamicro/system-service/broadcast" "vamicro/system-service/controllers" "vamicro/system-service/models" @@ -68,29 +67,33 @@ q := make(chan os.Signal, 1) signal.Notify(q, os.Interrupt, os.Kill, syscall.SIGTERM) - ms, err := bhomeclient.NewMicroNode(ctx, q, config.Server.AnalyServerId, reg, logger.Debug) + ms, err := bhomeclient.NewMicroNode(ctx, q, config.Server.AnalyServerId, reg, nil) if err != nil { return } bhomedbapi.InitGetNetNode(ms.GetLocalNetNodeByTopic) bhomedbapi.InitDoReq(ms.RequestOnly) - bhomedbapi.InitLog(logger.Debug) + //bhomedbapi.InitLog(logger.Debug) - util.AuthCheck(ctx) //鎺堟潈妫�鏌� + //util.AuthCheck(ctx) //鎺堟潈妫�鏌� + go ms.StartServer(fm) go dealSubMsg(ctx, ms) + serf.InitBusH(ms) serf.InitAgent(ctx) - go service.WatchEsAndWeedfsIp(ms) + //go service.WatchEsAndWeedfsIp(ms) go serf.StartSyncSqlToSerf() - go service.StartSyncDev() + //go service.StartSyncDev() + go broadcast.StartServer() //璁惧鍙互琚箍鎾悳绱� + //go service.CollectDeviceInfo(ctx, ms) - go service.WatchAuthSetChange(ms) //鏍规嵁鎺堟潈鏂囦欢鐩戣閫氶亾鏁伴噺鍙樺寲 + //go service.WatchAuthSetChange(ms) //鏍规嵁鎺堟潈鏂囦欢鐩戣閫氶亾鏁伴噺鍙樺寲 //缁熻绯荤粺杩愯鐘舵�� go sys.GatherStat() diff --git a/system-service/models/db.go b/system-service/models/db.go index babcb71..2a02bdc 100644 --- a/system-service/models/db.go +++ b/system-service/models/db.go @@ -37,6 +37,7 @@ &SysUserRole{}, &SysUserMenu{}, &SysSetting{}, + &SqlSyncHis{}, &DefHeadPic{}, &AuthConfig{}, &AuthDevice{}, @@ -54,7 +55,7 @@ setFaceTrackDic() } -//GetDB ... +// GetDB ... func GetDB() *gorm.DB { return db } @@ -96,7 +97,7 @@ db.Exec(strings.Join(sqls, "")) } -//娣诲姞涓�浜涙槸鍚︾殑瀛楀吀鍊� +// 娣诲姞涓�浜涙槸鍚︾殑瀛楀吀鍊� func setFaceTrackDic() { var sqls []string sqls = append(sqls, "INSERT INTO dictionary(`id`,`value`,`name`,`type`,`description`,`sort`,`parent_id`) select '51739a24-20bd-4440-8ef5-47fe93200536', '1', '鏄�', 'bForceSend', '鏄�', 1, '0' where not exists (select 1 from dictionary where id='51739a24-20bd-4440-8ef5-47fe93200536');") diff --git a/system-service/serf/serf.go b/system-service/serf/serf.go index 58b9af2..55ecb72 100644 --- a/system-service/serf/serf.go +++ b/system-service/serf/serf.go @@ -267,7 +267,7 @@ return } err = Agent.UserEvent(UserEventSyncSql, ueB, false) - if err == nil || !strings.Contains(err.Error(), "cannot contain") { - logger.Error("err: ", err) + if err != nil { + logger.Error("sending sync sql event err: ", err) } } diff --git a/system-service/service/clusterService.go b/system-service/service/clusterService.go index 07a9928..afe172b 100644 --- a/system-service/service/clusterService.go +++ b/system-service/service/clusterService.go @@ -98,12 +98,8 @@ return m } -//鏍规嵁闆嗙兢鍚嶇О鍜屽瘑鐮佸垱寤洪泦缇� +// 鏍规嵁闆嗙兢鍚嶇О鍜屽瘑鐮佸垱寤洪泦缇� func (s ClusterService) Create(clusterName string, pwd string, virtualIp string) (bool, string) { - devTyp := GetDevType(config.Server.DeviceType) - if devTyp != DevType_Storage && devTyp != DevType_Analysis_Storage { - return false, "鍙湁鍒嗘瀽鍜屽瓨鍌ㄨ妭鐐规墠鑳藉垱寤洪泦缇�" - } clusterId := uuid.NewV4().String() pwd = config.ClusterSet.PwdPre + pwd var clusterE = models.Cluster{ @@ -119,19 +115,6 @@ b := clusterE.Create() if b { serf.InitAgent(context.Background()) - - //鍒涘缓es闆嗙兢 - if _, err := CreateOriginalCluster("/opt/elasticsearch/config", "/opt/vasystem/script", "/opt/vasystem/indexInit"); err != nil { - //鍒涘缓澶辫触锛屽垯serf闆嗙兢涔熻鎺ㄥ嚭 - s.Leave(false) - return false, err.Error() - } - - //鍒涘缓weedfs闆嗙兢 - if _, err = CreateWeedfsServer("/opt/vasystem/script"); err != nil { - s.Leave(false) - return false, err.Error() - } chMsg := protomsg.DbChangeMessage{ Id: clusterE.ClusterId, @@ -186,7 +169,7 @@ } } -//鍔犲叆闆嗙兢 +// 鍔犲叆闆嗙兢 func (s ClusterService) JoinCluster(joinArg *vo.ClusterJoinVo) (bool, error) { start := time.Now() if config.Server.AnalyServerId == "" { @@ -221,47 +204,31 @@ serf.Agent = agent t := time.Now() - syncB := syncTableDataFromCluster(joinArg) - logger.Debugf("AddCluster syncTableDataFromCluster time=%v, syncB=%v", time.Since(t), syncB) - if syncB { - devTyp := GetDevType(config.Server.DeviceType) - if devTyp == DevType_Analysis_Storage || devTyp == DevType_Storage { //鍙湁鑳藉惎鍔‥S鐨勮妭鐐规墠鑳藉姞鍏S - go func() { - //娣诲姞ES鑺傜偣 - if _, err := AddCluster("/opt/elasticsearch/config", "/opt/vasystem/script", targetIp, "9200", config.EsConfig.StorePath); err != nil { - s.Leave(false) - return - } - - //娣诲姞weedfs鑺傜偣 - if _, err := AddWeedfsServer("/opt/vasystem/script", targetIp, config.StorageConf.VolumePath); err != nil { - s.Leave(false) - return - } - }() - } - - //闇�瑕侀噸鏂板垵濮嬪寲鏈湴姣斿杩涚▼ - go serf.ReInitDbPersonCompareData() - chMsg := protomsg.DbChangeMessage{ - Id: joinArg.ClusterId, - Table: protomsg.TableChanged_T_Cluster, - Action: protomsg.DbAction_Insert, - Info: "", - } - s.AddDbMessage(&chMsg) - logger.Debugf("AddCluster 鍔犲叆闆嗙兢鎴愬姛 time=%v", time.Since(start)) - return true, nil - } else { - logger.Debug("AddCluster syncTableDataFromCluster fail") - if agent != nil { - agent.Leave() - err = agent.Shutdown() - if err != nil { - logger.Debug("AddCluster agent shutdown err:", err) - } - } - } + syncTableDataFromCluster(joinArg) + logger.Debugf("AddCluster time=%v", time.Since(t)) + //if syncB { + // //闇�瑕侀噸鏂板垵濮嬪寲鏈湴姣斿杩涚▼ + // go serf.ReInitDbPersonCompareData() + // chMsg := protomsg.DbChangeMessage{ + // Id: joinArg.ClusterId, + // Table: protomsg.TableChanged_T_Cluster, + // Action: protomsg.DbAction_Insert, + // Info: "", + // } + // + // s.AddDbMessage(&chMsg) + logger.Debugf("AddCluster 鍔犲叆闆嗙兢鎴愬姛 time=%v", time.Since(start)) + return true, nil + //} else { + // logger.Debug("AddCluster syncTableDataFromCluster fail") + // if agent != nil { + // agent.Leave() + // err = agent.Shutdown() + // if err != nil { + // logger.Debug("AddCluster agent shutdown err:", err) + // } + // } + //} } else { logger.Debug("AddCluster dbSync.Init err:", err) if agent != nil { @@ -282,7 +249,7 @@ DevType_Other = "other" //鍏朵粬璁惧绫诲瀷锛宔g锛氬簲鐢� ) -//鑾峰彇鏈満绫诲瀷锛屽彧杩涜鍒嗘瀽銆佸垎鏋愬瓨鍌ㄤ竴浣撱�佸彧瀛樺偍銆佸叾浠栥�傘�� +// 鑾峰彇鏈満绫诲瀷锛屽彧杩涜鍒嗘瀽銆佸垎鏋愬瓨鍌ㄤ竴浣撱�佸彧瀛樺偍銆佸叾浠栥�傘�� func GetDevType(dt string) string { if dt != "" { if len(dt) >= 4 { @@ -336,23 +303,6 @@ func (s ClusterService) Leave(isDel bool) (bool, error) { start := time.Now() - devType := GetDevType(config.Server.DeviceType) - logger.Debugf("Leave GetDevType=%v, isDel=%v", devType, isDel) - - if devType == DevType_Analysis_Storage || devType == DevType_Storage { //鍙湁鍒嗘瀽鍜屽瓨鍌ㄦ墠鏈塃S闆嗙兢 - go func() { - //閫�鍑簑eedfs鑺傜偣 - if _, err := ExitWeedfsServer("/opt/vasystem/script", isDel, config.StorageConf.VolumePath); err != nil { - logger.Error("Leave ExitWeedfsServer err:", err) - return - } - //閫�鍑篹s鑺傜偣 - if _, err := ExitCluster("/opt/elasticsearch/config", "/opt/vasystem/script"); err != nil { - logger.Error("Leave ExitCluster es cluster err:", err) - return - } - }() - } var err error tx := models.GetDB().Begin() @@ -411,7 +361,7 @@ return false } -//鍔犲叆闆嗙兢鍚庢竻绌烘湰鍦扮殑鍚屾搴撴暟鎹� +// 鍔犲叆闆嗙兢鍚庢竻绌烘湰鍦扮殑鍚屾搴撴暟鎹� func syncTableDataFromCluster(joinArg *vo.ClusterJoinVo) bool { var lc models.LocalConfig e := lc.Select() @@ -430,6 +380,7 @@ tx.Rollback() } }() + //0.鍏抽棴reference tx.Exec("PRAGMA foreign_keys=OFF") //1.鍒犻櫎鏈湴鐨勫悓姝ュ簱鏁版嵁 @@ -447,6 +398,7 @@ return false } } + //2.鎷夊彇闆嗙兢鍐呯殑鍚屾搴撴暟鎹埌鏈湴鏁版嵁搴撹〃涓� var dumpSqls *[]string dumpSqls, err = serf.GetTableDataFromCluster(serf.Agent, joinArg.ClusterId, serf.SyncTables, 20*time.Second) -- Gitblit v1.8.0