From 63645d248c765244488cd34dbc1bb6528ca6b7c7 Mon Sep 17 00:00:00 2001 From: zhangzengfei <zhangzengfei@smartai.com> Date: 星期二, 05 九月 2023 09:58:13 +0800 Subject: [PATCH] 修复编译 --- system-service/service/clusterService.go | 1014 +++++++++++++++++++++++++++++----------------------------- 1 files changed, 507 insertions(+), 507 deletions(-) diff --git a/system-service/service/clusterService.go b/system-service/service/clusterService.go index 07a9928..18af954 100644 --- a/system-service/service/clusterService.go +++ b/system-service/service/clusterService.go @@ -1,507 +1,507 @@ -package service - -import ( - "context" - "encoding/json" - "strconv" - sysSync "sync" - "time" - "vamicro/config" - "vamicro/extend/util" - "vamicro/system-service/models" - "vamicro/system-service/serf" - "vamicro/system-service/sys" - "vamicro/system-service/vo" - - "basic.com/pubsub/protomsg.git" - dbSync "basic.com/syncdb.git" - "basic.com/valib/bhomeclient.git" - "basic.com/valib/logger.git" - "github.com/hashicorp/memberlist" - "github.com/pkg/errors" - uuid "github.com/satori/go.uuid" -) - -type ClusterService struct { - bk bhomeclient.Broker -} - -func NewClusterService(broker bhomeclient.Broker) *ClusterService { - return &ClusterService{ - bk: broker, - } -} - -var clusterSearchKey = "clusterSearchKey" -var searchMap = make(map[string]*memberlist.Memberlist, 0) -var lock sysSync.Mutex - -func set2SearchMap(memList *memberlist.Memberlist) { - lock.Lock() - defer lock.Unlock() - searchMap[clusterSearchKey] = memList -} - -func getFromSearchMap() (*memberlist.Memberlist, bool) { - lock.Lock() - defer lock.Unlock() - if v, ok := searchMap[clusterSearchKey]; ok { - return v, true - } else { - return nil, false - } -} - -func clearSearchResult(ml *memberlist.Memberlist) { - time.Sleep(10 * time.Second) - lock.Lock() - defer lock.Unlock() - if _, ok := searchMap[clusterSearchKey]; ok { - dbSync.CloseSearchNode(ml) - delete(searchMap, clusterSearchKey) - } -} - -func deleteFromSearchMap() { - lock.Lock() - defer lock.Unlock() - if _, ok := searchMap[clusterSearchKey]; ok { - delete(searchMap, clusterSearchKey) - } -} - -func (s ClusterService) FindAll() (arr []models.Cluster, err error) { - var clusterE models.Cluster - return clusterE.FindAll() -} - -func (s ClusterService) FindNodesByClusterId(clusterId string) (nodes []models.Node, err error) { - var nodeE models.Node - return nodeE.FindNodesByClusterId(clusterId) -} - -func (s ClusterService) FindAllClusterNodes() map[string]models.Node { - m := make(map[string]models.Node, 0) - var clusterE models.Cluster - var nodeE models.Node - arr, err := clusterE.FindAll() - if err == nil && arr != nil { - for _, clu := range arr { - nodes, e := nodeE.FindNodesByClusterId(clu.ClusterId) - if e == nil && nodes != nil { - for _, n := range nodes { - m[n.Id] = n - } - } - } - } - return m -} - -//鏍规嵁闆嗙兢鍚嶇О鍜屽瘑鐮佸垱寤洪泦缇� -func (s ClusterService) Create(clusterName string, pwd string, virtualIp string) (bool, string) { - devTyp := GetDevType(config.Server.DeviceType) - if devTyp != DevType_Storage && devTyp != DevType_Analysis_Storage { - return false, "鍙湁鍒嗘瀽鍜屽瓨鍌ㄨ妭鐐规墠鑳藉垱寤洪泦缇�" - } - clusterId := uuid.NewV4().String() - pwd = config.ClusterSet.PwdPre + pwd - var clusterE = models.Cluster{ - ClusterId: clusterId, - ClusterName: clusterName, - Password: pwd, - VirtualIp: virtualIp, - } - - arr, err := clusterE.FindAll() - if err == nil && (arr == nil || len(arr) == 0) { - - b := clusterE.Create() - if b { - serf.InitAgent(context.Background()) - - //鍒涘缓es闆嗙兢 - if _, err := CreateOriginalCluster("/opt/elasticsearch/config", "/opt/vasystem/script", "/opt/vasystem/indexInit"); err != nil { - //鍒涘缓澶辫触锛屽垯serf闆嗙兢涔熻鎺ㄥ嚭 - s.Leave(false) - return false, err.Error() - } - - //鍒涘缓weedfs闆嗙兢 - if _, err = CreateWeedfsServer("/opt/vasystem/script"); err != nil { - s.Leave(false) - return false, err.Error() - } - - chMsg := protomsg.DbChangeMessage{ - Id: clusterE.ClusterId, - Table: protomsg.TableChanged_T_Cluster, - Action: protomsg.DbAction_Insert, - Info: "", - } - s.AddDbMessage(&chMsg) - - return true, clusterId - } - } else { - if s.UpdateClusterName(clusterName, virtualIp) { - return true, "" - } - } - - return false, "" -} - -func (s ClusterService) SearchByPwd(pwd string) (err error) { - _, isSearching := getFromSearchMap() - if isSearching { - return errors.New("other is searching,please wait") - } - pwd = config.ClusterSet.PwdPre + pwd - ml, e := dbSync.CreateSearchNode(pwd) - if e != nil { - logger.Debug("CreateSearchNode err:", e) - return errors.New("createSearchNode err") - } - - set2SearchMap(ml) - - go clearSearchResult(ml) - - return nil -} - -func (s ClusterService) SearchNodes() map[string]dbSync.NodeInfo { - return dbSync.GetSearchNodes() -} - -func (s ClusterService) StopSearching() bool { - ml, _ := getFromSearchMap() - if ml != nil { - dbSync.CloseSearchNode(ml) - deleteFromSearchMap() - return true - } else { - return true - } -} - -//鍔犲叆闆嗙兢 -func (s ClusterService) JoinCluster(joinArg *vo.ClusterJoinVo) (bool, error) { - start := time.Now() - if config.Server.AnalyServerId == "" { - logger.Debug("AddCluster config serverId is nil") - return false, errors.New("serverId閰嶇疆鏈夎") - } - targetIp := "" - var joinIps []string - for _, ipStr := range joinArg.NodeIps { //杈撳叆ip鍔犲叆鐨勬儏鍐碉紝NodeIps閲岄潰鍏冪礌鍙槸ip - if ip, b := util.IpCheck(ipStr); b { - targetIp = ip - joinIps = append(joinIps, ip+":30190") - } - } - if len(joinIps) == 0 { - logger.Debug("AddCluster JoinCluster len(joinIps)=0") - return false, errors.New("鍔犲叆鐨勭洰鏍噄p涓嶈兘涓虹┖") - } - logger.Debug("AddCluster joinIps:", joinIps) - joinArg.Password = config.ClusterSet.PwdPre + joinArg.Password - - conf := dbSync.DefaultConfig() - localIp, _, _ := sys.GetLocalIP(config.Server.NetworkAdapter) - - if localIp != "" { - conf.BindAddr = localIp - } - - agent, err := dbSync.Init(joinArg.ClusterId, joinArg.Password, config.Server.AnalyServerId, joinIps, config.ClusterSet.SerfSnapShotPath, conf) - if err == nil && agent != nil { //鍔犲叆鎴愬姛 - logger.Debug("AddCluster dbSync.Init success") - serf.Agent = agent - - t := time.Now() - syncB := syncTableDataFromCluster(joinArg) - logger.Debugf("AddCluster syncTableDataFromCluster time=%v, syncB=%v", time.Since(t), syncB) - if syncB { - devTyp := GetDevType(config.Server.DeviceType) - if devTyp == DevType_Analysis_Storage || devTyp == DevType_Storage { //鍙湁鑳藉惎鍔‥S鐨勮妭鐐规墠鑳藉姞鍏S - go func() { - //娣诲姞ES鑺傜偣 - if _, err := AddCluster("/opt/elasticsearch/config", "/opt/vasystem/script", targetIp, "9200", config.EsConfig.StorePath); err != nil { - s.Leave(false) - return - } - - //娣诲姞weedfs鑺傜偣 - if _, err := AddWeedfsServer("/opt/vasystem/script", targetIp, config.StorageConf.VolumePath); err != nil { - s.Leave(false) - return - } - }() - } - - //闇�瑕侀噸鏂板垵濮嬪寲鏈湴姣斿杩涚▼ - go serf.ReInitDbPersonCompareData() - chMsg := protomsg.DbChangeMessage{ - Id: joinArg.ClusterId, - Table: protomsg.TableChanged_T_Cluster, - Action: protomsg.DbAction_Insert, - Info: "", - } - s.AddDbMessage(&chMsg) - logger.Debugf("AddCluster 鍔犲叆闆嗙兢鎴愬姛 time=%v", time.Since(start)) - return true, nil - } else { - logger.Debug("AddCluster syncTableDataFromCluster fail") - if agent != nil { - agent.Leave() - err = agent.Shutdown() - if err != nil { - logger.Debug("AddCluster agent shutdown err:", err) - } - } - } - } else { - logger.Debug("AddCluster dbSync.Init err:", err) - if agent != nil { - agent.Leave() - err = agent.Shutdown() - logger.Debugf("AddCluster dbSync.Init err,shutdown, err=%v", err) - - } - } - logger.Debugf("AddCluster 鍔犲叆闆嗙兢澶辫触, targetIp=%v, time=%v", targetIp, time.Since(start)) - return false, errors.New("鍔犲叆闆嗙兢澶辫触") -} - -const ( - DevType_Analysis = "analysis" //鍙仛鍒嗘瀽 - DevType_Storage = "storage" //鍙仛瀛樺偍 - DevType_Analysis_Storage = "analysis_storage" //鍒嗘瀽鍔犲瓨鍌� - DevType_Other = "other" //鍏朵粬璁惧绫诲瀷锛宔g锛氬簲鐢� -) - -//鑾峰彇鏈満绫诲瀷锛屽彧杩涜鍒嗘瀽銆佸垎鏋愬瓨鍌ㄤ竴浣撱�佸彧瀛樺偍銆佸叾浠栥�傘�� -func GetDevType(dt string) string { - if dt != "" { - if len(dt) >= 4 { - s := dt[2:4] - if s == "01" { - return DevType_Analysis - } else if s == "02" { - return DevType_Storage - } else if s == "03" { - return DevType_Analysis_Storage - } - } - } - return DevType_Other -} - -func (s ClusterService) UpdateClusterName(clusterName string, virtualIp string) bool { - var cE models.Cluster - arr, e := cE.FindAll() - if e == nil && arr != nil && len(arr) > 0 { - if cE.UpdateClusterName(clusterName, virtualIp) { - chMsg := protomsg.DbChangeMessage{ - Id: "", - Table: protomsg.TableChanged_T_Cluster, - Action: protomsg.DbAction_Update, - Info: virtualIp, - } - s.AddDbMessage(&chMsg) - - if arr[0].VirtualIp != virtualIp { //婕傜Щip鏈夊彉鍖� - if serf.Agent != nil { - b, _ := json.Marshal(&protomsg.DbChangeMessage{ - Id: "", - Table: protomsg.TableChanged_T_Cluster, - Action: protomsg.DbAction_Update, - Info: virtualIp, - }) - err := serf.Agent.UserEvent(serf.UserEventSyncVirtualIp, b, false) - if err != nil { - logger.Error("UserEventSyncVirtualIp err:", err) - } - } - } - - return true - } - } - - return false -} - -func (s ClusterService) Leave(isDel bool) (bool, error) { - start := time.Now() - devType := GetDevType(config.Server.DeviceType) - logger.Debugf("Leave GetDevType=%v, isDel=%v", devType, isDel) - - if devType == DevType_Analysis_Storage || devType == DevType_Storage { //鍙湁鍒嗘瀽鍜屽瓨鍌ㄦ墠鏈塃S闆嗙兢 - go func() { - //閫�鍑簑eedfs鑺傜偣 - if _, err := ExitWeedfsServer("/opt/vasystem/script", isDel, config.StorageConf.VolumePath); err != nil { - logger.Error("Leave ExitWeedfsServer err:", err) - return - } - //閫�鍑篹s鑺傜偣 - if _, err := ExitCluster("/opt/elasticsearch/config", "/opt/vasystem/script"); err != nil { - logger.Error("Leave ExitCluster es cluster err:", err) - return - } - }() - } - - var err error - tx := models.GetDB().Begin() - defer func() { - if err != nil && tx != nil { - tx.Rollback() - } - }() - - if serf.Agent != nil { - t := time.Now() - serf.Agent.Stop() - serf.Agent = nil - err = tx.Exec("delete from cluster_node").Error - if err != nil { - logger.Error("Leave delete from cluster_node err:", err) - return false, err - } - err = tx.Exec("delete from cluster").Error - if err != nil { - logger.Error("Leave delete from cluster err:", err) - return false, err - } - tx.Commit() - logger.Debug("Leave delete cluster_node and cluster from db") - chMsg := protomsg.DbChangeMessage{ - Id: "", - Table: protomsg.TableChanged_T_Cluster, - Action: protomsg.DbAction_Delete, - Info: "", - } - logger.Debugf("Leave delete db time=%v", time.Since(t)) - tm := time.Now() - s.AddDbMessage(&chMsg) - logger.Debugf("Leave AddDbMessage time=%v", time.Since(tm)) - } - - logger.Debugf("Leave success time=%v", time.Since(start)) - return true, nil -} - -func (s ClusterService) TestSyncSql() bool { - var lc models.LocalConfig - lc.Select() - - timeUnix := time.Now().Unix() - fmtTimeStr := time.Unix(timeUnix, 0).Format("2006-01-02 15:04:05") - serverIp, _, e := util.GetLocalIP(config.Server.NetworkAdapter) - if e == nil && serverIp != "" { - sql := "insert into cluster_node (id,cluster_id,node_name,node_id,node_ip,create_time) values ('" + uuid.NewV4().String() + "','" + uuid.NewV4().String() + "','" + lc.ServerName + "','" + config.Server.AnalyServerId + "','" + (serverIp + ":" + strconv.Itoa(dbSync.DefaultBindPort)) + "','" + fmtTimeStr + "')" - if err := models.GetDB().Exec(sql).Error; err != nil { - return false - } - return true - } - return false -} - -//鍔犲叆闆嗙兢鍚庢竻绌烘湰鍦扮殑鍚屾搴撴暟鎹� -func syncTableDataFromCluster(joinArg *vo.ClusterJoinVo) bool { - var lc models.LocalConfig - e := lc.Select() - nodeName := "" - if e == nil && lc.ServerName != "" { - nodeName = lc.ServerName - } - - var err error - db := models.GetDB() - db.LogMode(false) - defer db.LogMode(true) - tx := db.Begin() - defer func() { - if err != nil && tx != nil { - tx.Rollback() - } - }() - //0.鍏抽棴reference - tx.Exec("PRAGMA foreign_keys=OFF") - //1.鍒犻櫎鏈湴鐨勫悓姝ュ簱鏁版嵁 - for _, t := range serf.SyncTables { - delSql := "" - if t == "dbtables" { - delSql = "delete from " + t + " where (analyServerId='' or analyServerId=NULL)" - } else if t == "dbtablepersons" { - delSql = "delete from " + t + " where tableId in (select id from dbtables where (analyServerId='' or analyServerId=NULL))" - } else { - delSql = "delete from " + t + "" - } - err = tx.Exec(delSql).Error - if err != nil { - return false - } - } - //2.鎷夊彇闆嗙兢鍐呯殑鍚屾搴撴暟鎹埌鏈湴鏁版嵁搴撹〃涓� - var dumpSqls *[]string - dumpSqls, err = serf.GetTableDataFromCluster(serf.Agent, joinArg.ClusterId, serf.SyncTables, 20*time.Second) - if dumpSqls != nil && len(*dumpSqls) > 0 { - for _, sqlStr := range *dumpSqls { - logger.Debug("gorm exec dumpSql:", sqlStr) - if err = tx.Exec(sqlStr).Error; err != nil { - return false - } - } - } else { - logger.Debug("get cluster db data err, dumpSqls is nil,err:", err) - err = errors.New("dumpSqls is nil") - return false - } - - logger.Debug("鎴愬姛娣诲姞褰撳墠鑺傜偣鍒伴泦缇よ妭鐐逛腑") - - //3.灏嗘湰鑺傜偣鍔犲叆鍒拌妭鐐瑰垪琛ㄤ腑 - timeUnix := time.Now().Unix() - fmtTimeStr := time.Unix(timeUnix, 0).Format("2006-01-02 15:04:05") - serverIp, _, e1 := util.GetLocalIP(config.Server.NetworkAdapter) - if e1 != nil || serverIp == "" { - err = errors.New("get serverIp err") - return false - } - logger.Debug("cur Node serverIp:", serverIp, "nodeName:", nodeName) - if nodeName == "" { - nodeName = serverIp - } - - //娣诲姞鏈韩鑺傜偣,姝ゅ淇bug锛屽姞鍏ラ泦缇ょ殑鑺傜偣閫�鍑洪泦缇ゅ悗閲嶆柊鍔犲叆锛屼細鎶d鍐茬獊 - var sql = "insert into cluster_node (id,cluster_id,node_name,node_id,node_ip,create_time,isDelete,device_type) select '" + config.Server.AnalyServerId + "','" + joinArg.ClusterId + "','" + nodeName + "','" + config.Server.AnalyServerId + "','" + (serverIp + ":" + strconv.Itoa(dbSync.DefaultBindPort)) + "','" + fmtTimeStr + "',0,'" + config.Server.DeviceType + "' where not exists (select 1 from cluster_node where id='" + config.Server.AnalyServerId + "')" - if err = tx.Exec(sql).Error; err != nil { - logger.Debug("add cur node err:", err) - return false - } - if err = tx.Exec("update cluster_node set isDelete=0 where id='" + config.Server.AnalyServerId + "'").Error; err != nil { - logger.Debug("update isDelete err:", err) - return false - } - - //4.寮�鍚痳eference - tx.Exec("PRAGMA foreign_keys=ON") - tx.Commit() - serf.SyncSql([]string{sql}) - return true -} - -func (s ClusterService) AddDbMessage(pbMsg *protomsg.DbChangeMessage) { - bts, _ := json.Marshal(*pbMsg) - s.bk.Publish(ProcName, bts) -} - -func (s ClusterService) FindIpByNode(nodeId string) (string, error) { - var lc models.Node - return lc.FindIpByNode(nodeId) -} +package service + +import ( + "context" + "encoding/json" + "strconv" + sysSync "sync" + "time" + "vamicro/config" + "vamicro/extend/util" + "vamicro/system-service/models" + "vamicro/system-service/serf" + "vamicro/system-service/sys" + "vamicro/system-service/vo" + + "basic.com/pubsub/protomsg.git" + dbSync "basic.com/syncdb.git" + "basic.com/valib/bhomeclient.git" + "basic.com/valib/logger.git" + "github.com/hashicorp/memberlist" + "github.com/pkg/errors" + uuid "github.com/satori/go.uuid" +) + +type ClusterService struct { + bk bhomeclient.Broker +} + +func NewClusterService(broker bhomeclient.Broker) *ClusterService { + return &ClusterService{ + bk: broker, + } +} + +var clusterSearchKey = "clusterSearchKey" +var searchMap = make(map[string]*memberlist.Memberlist, 0) +var lock sysSync.Mutex + +func set2SearchMap(memList *memberlist.Memberlist) { + lock.Lock() + defer lock.Unlock() + searchMap[clusterSearchKey] = memList +} + +func getFromSearchMap() (*memberlist.Memberlist, bool) { + lock.Lock() + defer lock.Unlock() + if v, ok := searchMap[clusterSearchKey]; ok { + return v, true + } else { + return nil, false + } +} + +func clearSearchResult(ml *memberlist.Memberlist) { + time.Sleep(10 * time.Second) + lock.Lock() + defer lock.Unlock() + if _, ok := searchMap[clusterSearchKey]; ok { + dbSync.CloseSearchNode(ml) + delete(searchMap, clusterSearchKey) + } +} + +func deleteFromSearchMap() { + lock.Lock() + defer lock.Unlock() + if _, ok := searchMap[clusterSearchKey]; ok { + delete(searchMap, clusterSearchKey) + } +} + +func (s ClusterService) FindAll() (arr []models.Cluster, err error) { + var clusterE models.Cluster + return clusterE.FindAll() +} + +func (s ClusterService) FindNodesByClusterId(clusterId string) (nodes []models.Node, err error) { + var nodeE models.Node + return nodeE.FindNodesByClusterId(clusterId) +} + +func (s ClusterService) FindAllClusterNodes() map[string]models.Node { + m := make(map[string]models.Node, 0) + var clusterE models.Cluster + var nodeE models.Node + arr, err := clusterE.FindAll() + if err == nil && arr != nil { + for _, clu := range arr { + nodes, e := nodeE.FindNodesByClusterId(clu.ClusterId) + if e == nil && nodes != nil { + for _, n := range nodes { + m[n.Id] = n + } + } + } + } + return m +} + +//鏍规嵁闆嗙兢鍚嶇О鍜屽瘑鐮佸垱寤洪泦缇� +func (s ClusterService) Create(clusterName string, pwd string, virtualIp string) (bool, string) { + devTyp := GetDevType(config.Server.DeviceType) + if devTyp != DevType_Storage && devTyp != DevType_Analysis_Storage { + return false, "鍙湁鍒嗘瀽鍜屽瓨鍌ㄨ妭鐐规墠鑳藉垱寤洪泦缇�" + } + clusterId := uuid.NewV4().String() + pwd = config.ClusterSet.PwdPre + pwd + var clusterE = models.Cluster{ + ClusterId: clusterId, + ClusterName: clusterName, + Password: pwd, + VirtualIp: virtualIp, + } + + arr, err := clusterE.FindAll() + if err == nil && (arr == nil || len(arr) == 0) { + + b := clusterE.Create() + if b { + serf.InitAgent(context.Background()) + + //鍒涘缓es闆嗙兢 + if _, err := CreateOriginalCluster("/opt/elasticsearch/config", "/opt/vasystem/script", "/opt/vasystem/indexInit"); err != nil { + //鍒涘缓澶辫触锛屽垯serf闆嗙兢涔熻鎺ㄥ嚭 + s.Leave(false) + return false, err.Error() + } + + //鍒涘缓weedfs闆嗙兢 + if _, err = CreateWeedfsServer("/opt/vasystem/script"); err != nil { + s.Leave(false) + return false, err.Error() + } + + chMsg := protomsg.DbChangeMessage{ + Id: clusterE.ClusterId, + Table: protomsg.TableChanged_T_Cluster, + Action: protomsg.DbAction_Insert, + Info: "", + } + s.AddDbMessage(&chMsg) + + return true, clusterId + } + } else { + if s.UpdateClusterName(clusterName, virtualIp) { + return true, "" + } + } + + return false, "" +} + +func (s ClusterService) SearchByPwd(pwd string) (err error) { + _, isSearching := getFromSearchMap() + if isSearching { + return errors.New("other is searching,please wait") + } + pwd = config.ClusterSet.PwdPre + pwd + ml, e := dbSync.CreateSearchNode(pwd) + if e != nil { + logger.Debug("CreateSearchNode err:", e) + return errors.New("createSearchNode err") + } + + set2SearchMap(ml) + + go clearSearchResult(ml) + + return nil +} + +func (s ClusterService) SearchNodes() map[string]dbSync.NodeInfo { + return dbSync.GetSearchNodes() +} + +func (s ClusterService) StopSearching() bool { + ml, _ := getFromSearchMap() + if ml != nil { + dbSync.CloseSearchNode(ml) + deleteFromSearchMap() + return true + } else { + return true + } +} + +//鍔犲叆闆嗙兢 +func (s ClusterService) JoinCluster(joinArg *vo.ClusterJoinVo) (bool, error) { + start := time.Now() + if config.Server.AnalyServerId == "" { + logger.Debug("AddCluster config serverId is nil") + return false, errors.New("serverId閰嶇疆鏈夎") + } + targetIp := "" + var joinIps []string + for _, ipStr := range joinArg.NodeIps { //杈撳叆ip鍔犲叆鐨勬儏鍐碉紝NodeIps閲岄潰鍏冪礌鍙槸ip + if ip, b := util.IpCheck(ipStr); b { + targetIp = ip + joinIps = append(joinIps, ip+":30190") + } + } + if len(joinIps) == 0 { + logger.Debug("AddCluster JoinCluster len(joinIps)=0") + return false, errors.New("鍔犲叆鐨勭洰鏍噄p涓嶈兘涓虹┖") + } + logger.Debug("AddCluster joinIps:", joinIps) + joinArg.Password = config.ClusterSet.PwdPre + joinArg.Password + + conf := dbSync.DefaultConfig() + localIp, _, _ := sys.GetLocalIP(config.Server.NetworkAdapter) + + if localIp != "" { + conf.BindAddr = localIp + } + + agent, err := dbSync.Init(joinArg.ClusterId, joinArg.Password, config.Server.AnalyServerId, joinIps, config.ClusterSet.SerfSnapShotPath, conf) + if err == nil && agent != nil { //鍔犲叆鎴愬姛 + logger.Debug("AddCluster dbSync.Init success") + serf.Agent = agent + + t := time.Now() + syncB := syncTableDataFromCluster(joinArg) + logger.Debugf("AddCluster syncTableDataFromCluster time=%v, syncB=%v", time.Since(t), syncB) + if syncB { + devTyp := GetDevType(config.Server.DeviceType) + if devTyp == DevType_Analysis_Storage || devTyp == DevType_Storage { //鍙湁鑳藉惎鍔‥S鐨勮妭鐐规墠鑳藉姞鍏S + go func() { + //娣诲姞ES鑺傜偣 + if _, err := AddCluster("/opt/elasticsearch/config", "/opt/vasystem/script", targetIp, "9200", config.EsConfig.StorePath); err != nil { + s.Leave(false) + return + } + + //娣诲姞weedfs鑺傜偣 + if _, err := AddWeedfsServer("/opt/vasystem/script", targetIp, config.StorageConf.VolumePath); err != nil { + s.Leave(false) + return + } + }() + } + + //闇�瑕侀噸鏂板垵濮嬪寲鏈湴姣斿杩涚▼ + go serf.ReInitDbPersonCompareData() + chMsg := protomsg.DbChangeMessage{ + Id: joinArg.ClusterId, + Table: protomsg.TableChanged_T_Cluster, + Action: protomsg.DbAction_Insert, + Info: "", + } + s.AddDbMessage(&chMsg) + logger.Debugf("AddCluster 鍔犲叆闆嗙兢鎴愬姛 time=%v", time.Since(start)) + return true, nil + } else { + logger.Debug("AddCluster syncTableDataFromCluster fail") + if agent != nil { + agent.Leave() + err = agent.Shutdown() + if err != nil { + logger.Debug("AddCluster agent shutdown err:", err) + } + } + } + } else { + logger.Debug("AddCluster dbSync.Init err:", err) + if agent != nil { + agent.Leave() + err = agent.Shutdown() + logger.Debugf("AddCluster dbSync.Init err,shutdown, err=%v", err) + + } + } + logger.Debugf("AddCluster 鍔犲叆闆嗙兢澶辫触, targetIp=%v, time=%v", targetIp, time.Since(start)) + return false, errors.New("鍔犲叆闆嗙兢澶辫触") +} + +const ( + DevType_Analysis = "analysis" //鍙仛鍒嗘瀽 + DevType_Storage = "storage" //鍙仛瀛樺偍 + DevType_Analysis_Storage = "analysis_storage" //鍒嗘瀽鍔犲瓨鍌� + DevType_Other = "other" //鍏朵粬璁惧绫诲瀷锛宔g锛氬簲鐢� +) + +//鑾峰彇鏈満绫诲瀷锛屽彧杩涜鍒嗘瀽銆佸垎鏋愬瓨鍌ㄤ竴浣撱�佸彧瀛樺偍銆佸叾浠栥�傘�� +func GetDevType(dt string) string { + if dt != "" { + if len(dt) >= 4 { + s := dt[2:4] + if s == "01" { + return DevType_Analysis + } else if s == "02" { + return DevType_Storage + } else if s == "03" { + return DevType_Analysis_Storage + } + } + } + return DevType_Other +} + +func (s ClusterService) UpdateClusterName(clusterName string, virtualIp string) bool { + var cE models.Cluster + arr, e := cE.FindAll() + if e == nil && arr != nil && len(arr) > 0 { + if cE.UpdateClusterName(clusterName, virtualIp) { + chMsg := protomsg.DbChangeMessage{ + Id: "", + Table: protomsg.TableChanged_T_Cluster, + Action: protomsg.DbAction_Update, + Info: virtualIp, + } + s.AddDbMessage(&chMsg) + + if arr[0].VirtualIp != virtualIp { //婕傜Щip鏈夊彉鍖� + if serf.Agent != nil { + b, _ := json.Marshal(&protomsg.DbChangeMessage{ + Id: "", + Table: protomsg.TableChanged_T_Cluster, + Action: protomsg.DbAction_Update, + Info: virtualIp, + }) + err := serf.Agent.UserEvent(serf.UserEventSyncVirtualIp, b, false) + if err != nil { + logger.Error("UserEventSyncVirtualIp err:", err) + } + } + } + + return true + } + } + + return false +} + +func (s ClusterService) Leave(isDel bool) (bool, error) { + start := time.Now() + devType := GetDevType(config.Server.DeviceType) + logger.Debugf("Leave GetDevType=%v, isDel=%v", devType, isDel) + + if devType == DevType_Analysis_Storage || devType == DevType_Storage { //鍙湁鍒嗘瀽鍜屽瓨鍌ㄦ墠鏈塃S闆嗙兢 + go func() { + //閫�鍑簑eedfs鑺傜偣 + if _, err := ExitWeedfsServer("/opt/vasystem/script", isDel, config.StorageConf.VolumePath); err != nil { + logger.Error("Leave ExitWeedfsServer err:", err) + return + } + //閫�鍑篹s鑺傜偣 + if _, err := ExitCluster("/opt/elasticsearch/config", "/opt/vasystem/script"); err != nil { + logger.Error("Leave ExitCluster es cluster err:", err) + return + } + }() + } + + var err error + tx := models.GetDB().Begin() + defer func() { + if err != nil && tx != nil { + tx.Rollback() + } + }() + + if serf.Agent != nil { + t := time.Now() + serf.Agent.Stop() + serf.Agent = nil + err = tx.Exec("delete from cluster_node").Error + if err != nil { + logger.Error("Leave delete from cluster_node err:", err) + return false, err + } + err = tx.Exec("delete from cluster").Error + if err != nil { + logger.Error("Leave delete from cluster err:", err) + return false, err + } + tx.Commit() + logger.Debug("Leave delete cluster_node and cluster from db") + chMsg := protomsg.DbChangeMessage{ + Id: "", + Table: protomsg.TableChanged_T_Cluster, + Action: protomsg.DbAction_Delete, + Info: "", + } + logger.Debugf("Leave delete db time=%v", time.Since(t)) + tm := time.Now() + s.AddDbMessage(&chMsg) + logger.Debugf("Leave AddDbMessage time=%v", time.Since(tm)) + } + + logger.Debugf("Leave success time=%v", time.Since(start)) + return true, nil +} + +func (s ClusterService) TestSyncSql() bool { + var lc models.LocalConfig + lc.Select() + + timeUnix := time.Now().Unix() + fmtTimeStr := time.Unix(timeUnix, 0).Format("2006-01-02 15:04:05") + serverIp, _, e := util.GetLocalIP(config.Server.NetworkAdapter) + if e == nil && serverIp != "" { + sql := "insert into cluster_node (id,cluster_id,node_name,node_id,node_ip,create_time) values ('" + uuid.NewV4().String() + "','" + uuid.NewV4().String() + "','" + lc.ServerName + "','" + config.Server.AnalyServerId + "','" + (serverIp + ":" + strconv.Itoa(dbSync.DefaultBindPort)) + "','" + fmtTimeStr + "')" + if err := models.GetDB().Exec(sql).Error; err != nil { + return false + } + return true + } + return false +} + +//鍔犲叆闆嗙兢鍚庢竻绌烘湰鍦扮殑鍚屾搴撴暟鎹� +func syncTableDataFromCluster(joinArg *vo.ClusterJoinVo) bool { + var lc models.LocalConfig + e := lc.Select() + nodeName := "" + if e == nil && lc.ServerName != "" { + nodeName = lc.ServerName + } + + var err error + db := models.GetDB() + db.LogMode(false) + defer db.LogMode(true) + tx := db.Begin() + defer func() { + if err != nil && tx != nil { + tx.Rollback() + } + }() + //0.鍏抽棴reference + tx.Exec("PRAGMA foreign_keys=OFF") + //1.鍒犻櫎鏈湴鐨勫悓姝ュ簱鏁版嵁 + for _, t := range serf.SyncTables { + delSql := "" + if t == "dbtables" { + delSql = "delete from " + t + " where (analyServerId='' or analyServerId=NULL)" + } else if t == "dbtablepersons" { + delSql = "delete from " + t + " where tableId in (select id from dbtables where (analyServerId='' or analyServerId=NULL))" + } else { + delSql = "delete from " + t + "" + } + err = tx.Exec(delSql).Error + if err != nil { + return false + } + } + //2.鎷夊彇闆嗙兢鍐呯殑鍚屾搴撴暟鎹埌鏈湴鏁版嵁搴撹〃涓� + var dumpSqls *[]string + dumpSqls, err = serf.GetTableDataFromCluster(serf.Agent, joinArg.ClusterId, serf.SyncTables, 20*time.Second) + if dumpSqls != nil && len(*dumpSqls) > 0 { + for _, sqlStr := range *dumpSqls { + logger.Debug("gorm exec dumpSql:", sqlStr) + if err = tx.Exec(sqlStr).Error; err != nil { + return false + } + } + } else { + logger.Debug("get cluster db data err, dumpSqls is nil,err:", err) + err = errors.New("dumpSqls is nil") + return false + } + + logger.Debug("鎴愬姛娣诲姞褰撳墠鑺傜偣鍒伴泦缇よ妭鐐逛腑") + + //3.灏嗘湰鑺傜偣鍔犲叆鍒拌妭鐐瑰垪琛ㄤ腑 + timeUnix := time.Now().Unix() + fmtTimeStr := time.Unix(timeUnix, 0).Format("2006-01-02 15:04:05") + serverIp, _, e1 := util.GetLocalIP(config.Server.NetworkAdapter) + if e1 != nil || serverIp == "" { + err = errors.New("get serverIp err") + return false + } + logger.Debug("cur Node serverIp:", serverIp, "nodeName:", nodeName) + if nodeName == "" { + nodeName = serverIp + } + + //娣诲姞鏈韩鑺傜偣,姝ゅ淇bug锛屽姞鍏ラ泦缇ょ殑鑺傜偣閫�鍑洪泦缇ゅ悗閲嶆柊鍔犲叆锛屼細鎶d鍐茬獊 + var sql = "insert into cluster_node (id,cluster_id,node_name,node_id,node_ip,create_time,isDelete,device_type) select '" + config.Server.AnalyServerId + "','" + joinArg.ClusterId + "','" + nodeName + "','" + config.Server.AnalyServerId + "','" + (serverIp + ":" + strconv.Itoa(dbSync.DefaultBindPort)) + "','" + fmtTimeStr + "',0,'" + config.Server.DeviceType + "' where not exists (select 1 from cluster_node where id='" + config.Server.AnalyServerId + "')" + if err = tx.Exec(sql).Error; err != nil { + logger.Debug("add cur node err:", err) + return false + } + if err = tx.Exec("update cluster_node set isDelete=0 where id='" + config.Server.AnalyServerId + "'").Error; err != nil { + logger.Debug("update isDelete err:", err) + return false + } + + //4.寮�鍚痳eference + tx.Exec("PRAGMA foreign_keys=ON") + tx.Commit() + serf.SyncSql([]string{sql}) + return true +} + +func (s ClusterService) AddDbMessage(pbMsg *protomsg.DbChangeMessage) { + bts, _ := json.Marshal(*pbMsg) + s.bk.Publish(ProcName, bts) +} + +func (s ClusterService) FindIpByNode(nodeId string) (string, error) { + var lc models.Node + return lc.FindIpByNode(nodeId) +} -- Gitblit v1.8.0