From 63645d248c765244488cd34dbc1bb6528ca6b7c7 Mon Sep 17 00:00:00 2001
From: zhangzengfei <zhangzengfei@smartai.com>
Date: 星期二, 05 九月 2023 09:58:13 +0800
Subject: [PATCH] 修复编译

---
 system-service/service/clusterService.go | 1014 +++++++++++++++++++++++++++++-----------------------------
 1 files changed, 507 insertions(+), 507 deletions(-)

diff --git a/system-service/service/clusterService.go b/system-service/service/clusterService.go
index 07a9928..18af954 100644
--- a/system-service/service/clusterService.go
+++ b/system-service/service/clusterService.go
@@ -1,507 +1,507 @@
-package service
-
-import (
-	"context"
-	"encoding/json"
-	"strconv"
-	sysSync "sync"
-	"time"
-	"vamicro/config"
-	"vamicro/extend/util"
-	"vamicro/system-service/models"
-	"vamicro/system-service/serf"
-	"vamicro/system-service/sys"
-	"vamicro/system-service/vo"
-
-	"basic.com/pubsub/protomsg.git"
-	dbSync "basic.com/syncdb.git"
-	"basic.com/valib/bhomeclient.git"
-	"basic.com/valib/logger.git"
-	"github.com/hashicorp/memberlist"
-	"github.com/pkg/errors"
-	uuid "github.com/satori/go.uuid"
-)
-
-type ClusterService struct {
-	bk bhomeclient.Broker
-}
-
-func NewClusterService(broker bhomeclient.Broker) *ClusterService {
-	return &ClusterService{
-		bk: broker,
-	}
-}
-
-var clusterSearchKey = "clusterSearchKey"
-var searchMap = make(map[string]*memberlist.Memberlist, 0)
-var lock sysSync.Mutex
-
-func set2SearchMap(memList *memberlist.Memberlist) {
-	lock.Lock()
-	defer lock.Unlock()
-	searchMap[clusterSearchKey] = memList
-}
-
-func getFromSearchMap() (*memberlist.Memberlist, bool) {
-	lock.Lock()
-	defer lock.Unlock()
-	if v, ok := searchMap[clusterSearchKey]; ok {
-		return v, true
-	} else {
-		return nil, false
-	}
-}
-
-func clearSearchResult(ml *memberlist.Memberlist) {
-	time.Sleep(10 * time.Second)
-	lock.Lock()
-	defer lock.Unlock()
-	if _, ok := searchMap[clusterSearchKey]; ok {
-		dbSync.CloseSearchNode(ml)
-		delete(searchMap, clusterSearchKey)
-	}
-}
-
-func deleteFromSearchMap() {
-	lock.Lock()
-	defer lock.Unlock()
-	if _, ok := searchMap[clusterSearchKey]; ok {
-		delete(searchMap, clusterSearchKey)
-	}
-}
-
-func (s ClusterService) FindAll() (arr []models.Cluster, err error) {
-	var clusterE models.Cluster
-	return clusterE.FindAll()
-}
-
-func (s ClusterService) FindNodesByClusterId(clusterId string) (nodes []models.Node, err error) {
-	var nodeE models.Node
-	return nodeE.FindNodesByClusterId(clusterId)
-}
-
-func (s ClusterService) FindAllClusterNodes() map[string]models.Node {
-	m := make(map[string]models.Node, 0)
-	var clusterE models.Cluster
-	var nodeE models.Node
-	arr, err := clusterE.FindAll()
-	if err == nil && arr != nil {
-		for _, clu := range arr {
-			nodes, e := nodeE.FindNodesByClusterId(clu.ClusterId)
-			if e == nil && nodes != nil {
-				for _, n := range nodes {
-					m[n.Id] = n
-				}
-			}
-		}
-	}
-	return m
-}
-
-//鏍规嵁闆嗙兢鍚嶇О鍜屽瘑鐮佸垱寤洪泦缇�
-func (s ClusterService) Create(clusterName string, pwd string, virtualIp string) (bool, string) {
-	devTyp := GetDevType(config.Server.DeviceType)
-	if devTyp != DevType_Storage && devTyp != DevType_Analysis_Storage {
-		return false, "鍙湁鍒嗘瀽鍜屽瓨鍌ㄨ妭鐐规墠鑳藉垱寤洪泦缇�"
-	}
-	clusterId := uuid.NewV4().String()
-	pwd = config.ClusterSet.PwdPre + pwd
-	var clusterE = models.Cluster{
-		ClusterId:   clusterId,
-		ClusterName: clusterName,
-		Password:    pwd,
-		VirtualIp:   virtualIp,
-	}
-
-	arr, err := clusterE.FindAll()
-	if err == nil && (arr == nil || len(arr) == 0) {
-
-		b := clusterE.Create()
-		if b {
-			serf.InitAgent(context.Background())
-
-			//鍒涘缓es闆嗙兢
-			if _, err := CreateOriginalCluster("/opt/elasticsearch/config", "/opt/vasystem/script", "/opt/vasystem/indexInit"); err != nil {
-				//鍒涘缓澶辫触锛屽垯serf闆嗙兢涔熻鎺ㄥ嚭
-				s.Leave(false)
-				return false, err.Error()
-			}
-
-			//鍒涘缓weedfs闆嗙兢
-			if _, err = CreateWeedfsServer("/opt/vasystem/script"); err != nil {
-				s.Leave(false)
-				return false, err.Error()
-			}
-
-			chMsg := protomsg.DbChangeMessage{
-				Id:     clusterE.ClusterId,
-				Table:  protomsg.TableChanged_T_Cluster,
-				Action: protomsg.DbAction_Insert,
-				Info:   "",
-			}
-			s.AddDbMessage(&chMsg)
-
-			return true, clusterId
-		}
-	} else {
-		if s.UpdateClusterName(clusterName, virtualIp) {
-			return true, ""
-		}
-	}
-
-	return false, ""
-}
-
-func (s ClusterService) SearchByPwd(pwd string) (err error) {
-	_, isSearching := getFromSearchMap()
-	if isSearching {
-		return errors.New("other is searching,please wait")
-	}
-	pwd = config.ClusterSet.PwdPre + pwd
-	ml, e := dbSync.CreateSearchNode(pwd)
-	if e != nil {
-		logger.Debug("CreateSearchNode err:", e)
-		return errors.New("createSearchNode err")
-	}
-
-	set2SearchMap(ml)
-
-	go clearSearchResult(ml)
-
-	return nil
-}
-
-func (s ClusterService) SearchNodes() map[string]dbSync.NodeInfo {
-	return dbSync.GetSearchNodes()
-}
-
-func (s ClusterService) StopSearching() bool {
-	ml, _ := getFromSearchMap()
-	if ml != nil {
-		dbSync.CloseSearchNode(ml)
-		deleteFromSearchMap()
-		return true
-	} else {
-		return true
-	}
-}
-
-//鍔犲叆闆嗙兢
-func (s ClusterService) JoinCluster(joinArg *vo.ClusterJoinVo) (bool, error) {
-	start := time.Now()
-	if config.Server.AnalyServerId == "" {
-		logger.Debug("AddCluster config serverId is nil")
-		return false, errors.New("serverId閰嶇疆鏈夎")
-	}
-	targetIp := ""
-	var joinIps []string
-	for _, ipStr := range joinArg.NodeIps { //杈撳叆ip鍔犲叆鐨勬儏鍐碉紝NodeIps閲岄潰鍏冪礌鍙槸ip
-		if ip, b := util.IpCheck(ipStr); b {
-			targetIp = ip
-			joinIps = append(joinIps, ip+":30190")
-		}
-	}
-	if len(joinIps) == 0 {
-		logger.Debug("AddCluster JoinCluster len(joinIps)=0")
-		return false, errors.New("鍔犲叆鐨勭洰鏍噄p涓嶈兘涓虹┖")
-	}
-	logger.Debug("AddCluster joinIps:", joinIps)
-	joinArg.Password = config.ClusterSet.PwdPre + joinArg.Password
-
-	conf := dbSync.DefaultConfig()
-	localIp, _, _ := sys.GetLocalIP(config.Server.NetworkAdapter)
-
-	if localIp != "" {
-		conf.BindAddr = localIp
-	}
-
-	agent, err := dbSync.Init(joinArg.ClusterId, joinArg.Password, config.Server.AnalyServerId, joinIps, config.ClusterSet.SerfSnapShotPath, conf)
-	if err == nil && agent != nil { //鍔犲叆鎴愬姛
-		logger.Debug("AddCluster dbSync.Init success")
-		serf.Agent = agent
-
-		t := time.Now()
-		syncB := syncTableDataFromCluster(joinArg)
-		logger.Debugf("AddCluster syncTableDataFromCluster time=%v, syncB=%v", time.Since(t), syncB)
-		if syncB {
-			devTyp := GetDevType(config.Server.DeviceType)
-			if devTyp == DevType_Analysis_Storage || devTyp == DevType_Storage { //鍙湁鑳藉惎鍔‥S鐨勮妭鐐规墠鑳藉姞鍏S
-				go func() {
-					//娣诲姞ES鑺傜偣
-					if _, err := AddCluster("/opt/elasticsearch/config", "/opt/vasystem/script", targetIp, "9200", config.EsConfig.StorePath); err != nil {
-						s.Leave(false)
-						return
-					}
-
-					//娣诲姞weedfs鑺傜偣
-					if _, err := AddWeedfsServer("/opt/vasystem/script", targetIp, config.StorageConf.VolumePath); err != nil {
-						s.Leave(false)
-						return
-					}
-				}()
-			}
-
-			//闇�瑕侀噸鏂板垵濮嬪寲鏈湴姣斿杩涚▼
-			go serf.ReInitDbPersonCompareData()
-			chMsg := protomsg.DbChangeMessage{
-				Id:     joinArg.ClusterId,
-				Table:  protomsg.TableChanged_T_Cluster,
-				Action: protomsg.DbAction_Insert,
-				Info:   "",
-			}
-			s.AddDbMessage(&chMsg)
-			logger.Debugf("AddCluster 鍔犲叆闆嗙兢鎴愬姛 time=%v", time.Since(start))
-			return true, nil
-		} else {
-			logger.Debug("AddCluster syncTableDataFromCluster fail")
-			if agent != nil {
-				agent.Leave()
-				err = agent.Shutdown()
-				if err != nil {
-					logger.Debug("AddCluster agent shutdown err:", err)
-				}
-			}
-		}
-	} else {
-		logger.Debug("AddCluster dbSync.Init err:", err)
-		if agent != nil {
-			agent.Leave()
-			err = agent.Shutdown()
-			logger.Debugf("AddCluster dbSync.Init err,shutdown, err=%v", err)
-
-		}
-	}
-	logger.Debugf("AddCluster 鍔犲叆闆嗙兢澶辫触, targetIp=%v, time=%v", targetIp, time.Since(start))
-	return false, errors.New("鍔犲叆闆嗙兢澶辫触")
-}
-
-const (
-	DevType_Analysis         = "analysis"         //鍙仛鍒嗘瀽
-	DevType_Storage          = "storage"          //鍙仛瀛樺偍
-	DevType_Analysis_Storage = "analysis_storage" //鍒嗘瀽鍔犲瓨鍌�
-	DevType_Other            = "other"            //鍏朵粬璁惧绫诲瀷锛宔g锛氬簲鐢�
-)
-
-//鑾峰彇鏈満绫诲瀷锛屽彧杩涜鍒嗘瀽銆佸垎鏋愬瓨鍌ㄤ竴浣撱�佸彧瀛樺偍銆佸叾浠栥�傘��
-func GetDevType(dt string) string {
-	if dt != "" {
-		if len(dt) >= 4 {
-			s := dt[2:4]
-			if s == "01" {
-				return DevType_Analysis
-			} else if s == "02" {
-				return DevType_Storage
-			} else if s == "03" {
-				return DevType_Analysis_Storage
-			}
-		}
-	}
-	return DevType_Other
-}
-
-func (s ClusterService) UpdateClusterName(clusterName string, virtualIp string) bool {
-	var cE models.Cluster
-	arr, e := cE.FindAll()
-	if e == nil && arr != nil && len(arr) > 0 {
-		if cE.UpdateClusterName(clusterName, virtualIp) {
-			chMsg := protomsg.DbChangeMessage{
-				Id:     "",
-				Table:  protomsg.TableChanged_T_Cluster,
-				Action: protomsg.DbAction_Update,
-				Info:   virtualIp,
-			}
-			s.AddDbMessage(&chMsg)
-
-			if arr[0].VirtualIp != virtualIp { //婕傜Щip鏈夊彉鍖�
-				if serf.Agent != nil {
-					b, _ := json.Marshal(&protomsg.DbChangeMessage{
-						Id:     "",
-						Table:  protomsg.TableChanged_T_Cluster,
-						Action: protomsg.DbAction_Update,
-						Info:   virtualIp,
-					})
-					err := serf.Agent.UserEvent(serf.UserEventSyncVirtualIp, b, false)
-					if err != nil {
-						logger.Error("UserEventSyncVirtualIp err:", err)
-					}
-				}
-			}
-
-			return true
-		}
-	}
-
-	return false
-}
-
-func (s ClusterService) Leave(isDel bool) (bool, error) {
-	start := time.Now()
-	devType := GetDevType(config.Server.DeviceType)
-	logger.Debugf("Leave GetDevType=%v, isDel=%v", devType, isDel)
-
-	if devType == DevType_Analysis_Storage || devType == DevType_Storage { //鍙湁鍒嗘瀽鍜屽瓨鍌ㄦ墠鏈塃S闆嗙兢
-		go func() {
-			//閫�鍑簑eedfs鑺傜偣
-			if _, err := ExitWeedfsServer("/opt/vasystem/script", isDel, config.StorageConf.VolumePath); err != nil {
-				logger.Error("Leave ExitWeedfsServer err:", err)
-				return
-			}
-			//閫�鍑篹s鑺傜偣
-			if _, err := ExitCluster("/opt/elasticsearch/config", "/opt/vasystem/script"); err != nil {
-				logger.Error("Leave ExitCluster es cluster err:", err)
-				return
-			}
-		}()
-	}
-
-	var err error
-	tx := models.GetDB().Begin()
-	defer func() {
-		if err != nil && tx != nil {
-			tx.Rollback()
-		}
-	}()
-
-	if serf.Agent != nil {
-		t := time.Now()
-		serf.Agent.Stop()
-		serf.Agent = nil
-		err = tx.Exec("delete from cluster_node").Error
-		if err != nil {
-			logger.Error("Leave delete from cluster_node err:", err)
-			return false, err
-		}
-		err = tx.Exec("delete from cluster").Error
-		if err != nil {
-			logger.Error("Leave delete from cluster err:", err)
-			return false, err
-		}
-		tx.Commit()
-		logger.Debug("Leave delete cluster_node and cluster from db")
-		chMsg := protomsg.DbChangeMessage{
-			Id:     "",
-			Table:  protomsg.TableChanged_T_Cluster,
-			Action: protomsg.DbAction_Delete,
-			Info:   "",
-		}
-		logger.Debugf("Leave delete db time=%v", time.Since(t))
-		tm := time.Now()
-		s.AddDbMessage(&chMsg)
-		logger.Debugf("Leave AddDbMessage time=%v", time.Since(tm))
-	}
-
-	logger.Debugf("Leave success time=%v", time.Since(start))
-	return true, nil
-}
-
-func (s ClusterService) TestSyncSql() bool {
-	var lc models.LocalConfig
-	lc.Select()
-
-	timeUnix := time.Now().Unix()
-	fmtTimeStr := time.Unix(timeUnix, 0).Format("2006-01-02 15:04:05")
-	serverIp, _, e := util.GetLocalIP(config.Server.NetworkAdapter)
-	if e == nil && serverIp != "" {
-		sql := "insert into cluster_node (id,cluster_id,node_name,node_id,node_ip,create_time) values ('" + uuid.NewV4().String() + "','" + uuid.NewV4().String() + "','" + lc.ServerName + "','" + config.Server.AnalyServerId + "','" + (serverIp + ":" + strconv.Itoa(dbSync.DefaultBindPort)) + "','" + fmtTimeStr + "')"
-		if err := models.GetDB().Exec(sql).Error; err != nil {
-			return false
-		}
-		return true
-	}
-	return false
-}
-
-//鍔犲叆闆嗙兢鍚庢竻绌烘湰鍦扮殑鍚屾搴撴暟鎹�
-func syncTableDataFromCluster(joinArg *vo.ClusterJoinVo) bool {
-	var lc models.LocalConfig
-	e := lc.Select()
-	nodeName := ""
-	if e == nil && lc.ServerName != "" {
-		nodeName = lc.ServerName
-	}
-
-	var err error
-	db := models.GetDB()
-	db.LogMode(false)
-	defer db.LogMode(true)
-	tx := db.Begin()
-	defer func() {
-		if err != nil && tx != nil {
-			tx.Rollback()
-		}
-	}()
-	//0.鍏抽棴reference
-	tx.Exec("PRAGMA foreign_keys=OFF")
-	//1.鍒犻櫎鏈湴鐨勫悓姝ュ簱鏁版嵁
-	for _, t := range serf.SyncTables {
-		delSql := ""
-		if t == "dbtables" {
-			delSql = "delete from " + t + " where (analyServerId='' or analyServerId=NULL)"
-		} else if t == "dbtablepersons" {
-			delSql = "delete from " + t + " where tableId in (select id from dbtables where (analyServerId='' or analyServerId=NULL))"
-		} else {
-			delSql = "delete from " + t + ""
-		}
-		err = tx.Exec(delSql).Error
-		if err != nil {
-			return false
-		}
-	}
-	//2.鎷夊彇闆嗙兢鍐呯殑鍚屾搴撴暟鎹埌鏈湴鏁版嵁搴撹〃涓�
-	var dumpSqls *[]string
-	dumpSqls, err = serf.GetTableDataFromCluster(serf.Agent, joinArg.ClusterId, serf.SyncTables, 20*time.Second)
-	if dumpSqls != nil && len(*dumpSqls) > 0 {
-		for _, sqlStr := range *dumpSqls {
-			logger.Debug("gorm exec dumpSql:", sqlStr)
-			if err = tx.Exec(sqlStr).Error; err != nil {
-				return false
-			}
-		}
-	} else {
-		logger.Debug("get cluster db data err, dumpSqls is nil,err:", err)
-		err = errors.New("dumpSqls is nil")
-		return false
-	}
-
-	logger.Debug("鎴愬姛娣诲姞褰撳墠鑺傜偣鍒伴泦缇よ妭鐐逛腑")
-
-	//3.灏嗘湰鑺傜偣鍔犲叆鍒拌妭鐐瑰垪琛ㄤ腑
-	timeUnix := time.Now().Unix()
-	fmtTimeStr := time.Unix(timeUnix, 0).Format("2006-01-02 15:04:05")
-	serverIp, _, e1 := util.GetLocalIP(config.Server.NetworkAdapter)
-	if e1 != nil || serverIp == "" {
-		err = errors.New("get serverIp err")
-		return false
-	}
-	logger.Debug("cur Node serverIp:", serverIp, "nodeName:", nodeName)
-	if nodeName == "" {
-		nodeName = serverIp
-	}
-
-	//娣诲姞鏈韩鑺傜偣,姝ゅ淇bug锛屽姞鍏ラ泦缇ょ殑鑺傜偣閫�鍑洪泦缇ゅ悗閲嶆柊鍔犲叆锛屼細鎶d鍐茬獊
-	var sql = "insert into cluster_node (id,cluster_id,node_name,node_id,node_ip,create_time,isDelete,device_type) select '" + config.Server.AnalyServerId + "','" + joinArg.ClusterId + "','" + nodeName + "','" + config.Server.AnalyServerId + "','" + (serverIp + ":" + strconv.Itoa(dbSync.DefaultBindPort)) + "','" + fmtTimeStr + "',0,'" + config.Server.DeviceType + "' where not exists (select 1 from cluster_node where id='" + config.Server.AnalyServerId + "')"
-	if err = tx.Exec(sql).Error; err != nil {
-		logger.Debug("add cur node err:", err)
-		return false
-	}
-	if err = tx.Exec("update cluster_node set isDelete=0 where id='" + config.Server.AnalyServerId + "'").Error; err != nil {
-		logger.Debug("update isDelete err:", err)
-		return false
-	}
-
-	//4.寮�鍚痳eference
-	tx.Exec("PRAGMA foreign_keys=ON")
-	tx.Commit()
-	serf.SyncSql([]string{sql})
-	return true
-}
-
-func (s ClusterService) AddDbMessage(pbMsg *protomsg.DbChangeMessage) {
-	bts, _ := json.Marshal(*pbMsg)
-	s.bk.Publish(ProcName, bts)
-}
-
-func (s ClusterService) FindIpByNode(nodeId string) (string, error) {
-	var lc models.Node
-	return lc.FindIpByNode(nodeId)
-}
+package service
+
+import (
+	"context"
+	"encoding/json"
+	"strconv"
+	sysSync "sync"
+	"time"
+	"vamicro/config"
+	"vamicro/extend/util"
+	"vamicro/system-service/models"
+	"vamicro/system-service/serf"
+	"vamicro/system-service/sys"
+	"vamicro/system-service/vo"
+
+	"basic.com/pubsub/protomsg.git"
+	dbSync "basic.com/syncdb.git"
+	"basic.com/valib/bhomeclient.git"
+	"basic.com/valib/logger.git"
+	"github.com/hashicorp/memberlist"
+	"github.com/pkg/errors"
+	uuid "github.com/satori/go.uuid"
+)
+
+type ClusterService struct {
+	bk bhomeclient.Broker
+}
+
+func NewClusterService(broker bhomeclient.Broker) *ClusterService {
+	return &ClusterService{
+		bk: broker,
+	}
+}
+
+var clusterSearchKey = "clusterSearchKey"
+var searchMap = make(map[string]*memberlist.Memberlist, 0)
+var lock sysSync.Mutex
+
+func set2SearchMap(memList *memberlist.Memberlist) {
+	lock.Lock()
+	defer lock.Unlock()
+	searchMap[clusterSearchKey] = memList
+}
+
+func getFromSearchMap() (*memberlist.Memberlist, bool) {
+	lock.Lock()
+	defer lock.Unlock()
+	if v, ok := searchMap[clusterSearchKey]; ok {
+		return v, true
+	} else {
+		return nil, false
+	}
+}
+
+func clearSearchResult(ml *memberlist.Memberlist) {
+	time.Sleep(10 * time.Second)
+	lock.Lock()
+	defer lock.Unlock()
+	if _, ok := searchMap[clusterSearchKey]; ok {
+		dbSync.CloseSearchNode(ml)
+		delete(searchMap, clusterSearchKey)
+	}
+}
+
+func deleteFromSearchMap() {
+	lock.Lock()
+	defer lock.Unlock()
+	if _, ok := searchMap[clusterSearchKey]; ok {
+		delete(searchMap, clusterSearchKey)
+	}
+}
+
+func (s ClusterService) FindAll() (arr []models.Cluster, err error) {
+	var clusterE models.Cluster
+	return clusterE.FindAll()
+}
+
+func (s ClusterService) FindNodesByClusterId(clusterId string) (nodes []models.Node, err error) {
+	var nodeE models.Node
+	return nodeE.FindNodesByClusterId(clusterId)
+}
+
+func (s ClusterService) FindAllClusterNodes() map[string]models.Node {
+	m := make(map[string]models.Node, 0)
+	var clusterE models.Cluster
+	var nodeE models.Node
+	arr, err := clusterE.FindAll()
+	if err == nil && arr != nil {
+		for _, clu := range arr {
+			nodes, e := nodeE.FindNodesByClusterId(clu.ClusterId)
+			if e == nil && nodes != nil {
+				for _, n := range nodes {
+					m[n.Id] = n
+				}
+			}
+		}
+	}
+	return m
+}
+
+//鏍规嵁闆嗙兢鍚嶇О鍜屽瘑鐮佸垱寤洪泦缇�
+func (s ClusterService) Create(clusterName string, pwd string, virtualIp string) (bool, string) {
+	devTyp := GetDevType(config.Server.DeviceType)
+	if devTyp != DevType_Storage && devTyp != DevType_Analysis_Storage {
+		return false, "鍙湁鍒嗘瀽鍜屽瓨鍌ㄨ妭鐐规墠鑳藉垱寤洪泦缇�"
+	}
+	clusterId := uuid.NewV4().String()
+	pwd = config.ClusterSet.PwdPre + pwd
+	var clusterE = models.Cluster{
+		ClusterId:   clusterId,
+		ClusterName: clusterName,
+		Password:    pwd,
+		VirtualIp:   virtualIp,
+	}
+
+	arr, err := clusterE.FindAll()
+	if err == nil && (arr == nil || len(arr) == 0) {
+
+		b := clusterE.Create()
+		if b {
+			serf.InitAgent(context.Background())
+
+			//鍒涘缓es闆嗙兢
+			if _, err := CreateOriginalCluster("/opt/elasticsearch/config", "/opt/vasystem/script", "/opt/vasystem/indexInit"); err != nil {
+				//鍒涘缓澶辫触锛屽垯serf闆嗙兢涔熻鎺ㄥ嚭
+				s.Leave(false)
+				return false, err.Error()
+			}
+
+			//鍒涘缓weedfs闆嗙兢
+			if _, err = CreateWeedfsServer("/opt/vasystem/script"); err != nil {
+				s.Leave(false)
+				return false, err.Error()
+			}
+
+			chMsg := protomsg.DbChangeMessage{
+				Id:     clusterE.ClusterId,
+				Table:  protomsg.TableChanged_T_Cluster,
+				Action: protomsg.DbAction_Insert,
+				Info:   "",
+			}
+			s.AddDbMessage(&chMsg)
+
+			return true, clusterId
+		}
+	} else {
+		if s.UpdateClusterName(clusterName, virtualIp) {
+			return true, ""
+		}
+	}
+
+	return false, ""
+}
+
+func (s ClusterService) SearchByPwd(pwd string) (err error) {
+	_, isSearching := getFromSearchMap()
+	if isSearching {
+		return errors.New("other is searching,please wait")
+	}
+	pwd = config.ClusterSet.PwdPre + pwd
+	ml, e := dbSync.CreateSearchNode(pwd)
+	if e != nil {
+		logger.Debug("CreateSearchNode err:", e)
+		return errors.New("createSearchNode err")
+	}
+
+	set2SearchMap(ml)
+
+	go clearSearchResult(ml)
+
+	return nil
+}
+
+func (s ClusterService) SearchNodes() map[string]dbSync.NodeInfo {
+	return dbSync.GetSearchNodes()
+}
+
+func (s ClusterService) StopSearching() bool {
+	ml, _ := getFromSearchMap()
+	if ml != nil {
+		dbSync.CloseSearchNode(ml)
+		deleteFromSearchMap()
+		return true
+	} else {
+		return true
+	}
+}
+
+//鍔犲叆闆嗙兢
+func (s ClusterService) JoinCluster(joinArg *vo.ClusterJoinVo) (bool, error) {
+	start := time.Now()
+	if config.Server.AnalyServerId == "" {
+		logger.Debug("AddCluster config serverId is nil")
+		return false, errors.New("serverId閰嶇疆鏈夎")
+	}
+	targetIp := ""
+	var joinIps []string
+	for _, ipStr := range joinArg.NodeIps { //杈撳叆ip鍔犲叆鐨勬儏鍐碉紝NodeIps閲岄潰鍏冪礌鍙槸ip
+		if ip, b := util.IpCheck(ipStr); b {
+			targetIp = ip
+			joinIps = append(joinIps, ip+":30190")
+		}
+	}
+	if len(joinIps) == 0 {
+		logger.Debug("AddCluster JoinCluster len(joinIps)=0")
+		return false, errors.New("鍔犲叆鐨勭洰鏍噄p涓嶈兘涓虹┖")
+	}
+	logger.Debug("AddCluster joinIps:", joinIps)
+	joinArg.Password = config.ClusterSet.PwdPre + joinArg.Password
+
+	conf := dbSync.DefaultConfig()
+	localIp, _, _ := sys.GetLocalIP(config.Server.NetworkAdapter)
+
+	if localIp != "" {
+		conf.BindAddr = localIp
+	}
+
+	agent, err := dbSync.Init(joinArg.ClusterId, joinArg.Password, config.Server.AnalyServerId, joinIps, config.ClusterSet.SerfSnapShotPath, conf)
+	if err == nil && agent != nil { //鍔犲叆鎴愬姛
+		logger.Debug("AddCluster dbSync.Init success")
+		serf.Agent = agent
+
+		t := time.Now()
+		syncB := syncTableDataFromCluster(joinArg)
+		logger.Debugf("AddCluster syncTableDataFromCluster time=%v, syncB=%v", time.Since(t), syncB)
+		if syncB {
+			devTyp := GetDevType(config.Server.DeviceType)
+			if devTyp == DevType_Analysis_Storage || devTyp == DevType_Storage { //鍙湁鑳藉惎鍔‥S鐨勮妭鐐规墠鑳藉姞鍏S
+				go func() {
+					//娣诲姞ES鑺傜偣
+					if _, err := AddCluster("/opt/elasticsearch/config", "/opt/vasystem/script", targetIp, "9200", config.EsConfig.StorePath); err != nil {
+						s.Leave(false)
+						return
+					}
+
+					//娣诲姞weedfs鑺傜偣
+					if _, err := AddWeedfsServer("/opt/vasystem/script", targetIp, config.StorageConf.VolumePath); err != nil {
+						s.Leave(false)
+						return
+					}
+				}()
+			}
+
+			//闇�瑕侀噸鏂板垵濮嬪寲鏈湴姣斿杩涚▼
+			go serf.ReInitDbPersonCompareData()
+			chMsg := protomsg.DbChangeMessage{
+				Id:     joinArg.ClusterId,
+				Table:  protomsg.TableChanged_T_Cluster,
+				Action: protomsg.DbAction_Insert,
+				Info:   "",
+			}
+			s.AddDbMessage(&chMsg)
+			logger.Debugf("AddCluster 鍔犲叆闆嗙兢鎴愬姛 time=%v", time.Since(start))
+			return true, nil
+		} else {
+			logger.Debug("AddCluster syncTableDataFromCluster fail")
+			if agent != nil {
+				agent.Leave()
+				err = agent.Shutdown()
+				if err != nil {
+					logger.Debug("AddCluster agent shutdown err:", err)
+				}
+			}
+		}
+	} else {
+		logger.Debug("AddCluster dbSync.Init err:", err)
+		if agent != nil {
+			agent.Leave()
+			err = agent.Shutdown()
+			logger.Debugf("AddCluster dbSync.Init err,shutdown, err=%v", err)
+
+		}
+	}
+	logger.Debugf("AddCluster 鍔犲叆闆嗙兢澶辫触, targetIp=%v, time=%v", targetIp, time.Since(start))
+	return false, errors.New("鍔犲叆闆嗙兢澶辫触")
+}
+
+const (
+	DevType_Analysis         = "analysis"         //鍙仛鍒嗘瀽
+	DevType_Storage          = "storage"          //鍙仛瀛樺偍
+	DevType_Analysis_Storage = "analysis_storage" //鍒嗘瀽鍔犲瓨鍌�
+	DevType_Other            = "other"            //鍏朵粬璁惧绫诲瀷锛宔g锛氬簲鐢�
+)
+
+//鑾峰彇鏈満绫诲瀷锛屽彧杩涜鍒嗘瀽銆佸垎鏋愬瓨鍌ㄤ竴浣撱�佸彧瀛樺偍銆佸叾浠栥�傘��
+func GetDevType(dt string) string {
+	if dt != "" {
+		if len(dt) >= 4 {
+			s := dt[2:4]
+			if s == "01" {
+				return DevType_Analysis
+			} else if s == "02" {
+				return DevType_Storage
+			} else if s == "03" {
+				return DevType_Analysis_Storage
+			}
+		}
+	}
+	return DevType_Other
+}
+
+func (s ClusterService) UpdateClusterName(clusterName string, virtualIp string) bool {
+	var cE models.Cluster
+	arr, e := cE.FindAll()
+	if e == nil && arr != nil && len(arr) > 0 {
+		if cE.UpdateClusterName(clusterName, virtualIp) {
+			chMsg := protomsg.DbChangeMessage{
+				Id:     "",
+				Table:  protomsg.TableChanged_T_Cluster,
+				Action: protomsg.DbAction_Update,
+				Info:   virtualIp,
+			}
+			s.AddDbMessage(&chMsg)
+
+			if arr[0].VirtualIp != virtualIp { //婕傜Щip鏈夊彉鍖�
+				if serf.Agent != nil {
+					b, _ := json.Marshal(&protomsg.DbChangeMessage{
+						Id:     "",
+						Table:  protomsg.TableChanged_T_Cluster,
+						Action: protomsg.DbAction_Update,
+						Info:   virtualIp,
+					})
+					err := serf.Agent.UserEvent(serf.UserEventSyncVirtualIp, b, false)
+					if err != nil {
+						logger.Error("UserEventSyncVirtualIp err:", err)
+					}
+				}
+			}
+
+			return true
+		}
+	}
+
+	return false
+}
+
+func (s ClusterService) Leave(isDel bool) (bool, error) {
+	start := time.Now()
+	devType := GetDevType(config.Server.DeviceType)
+	logger.Debugf("Leave GetDevType=%v, isDel=%v", devType, isDel)
+
+	if devType == DevType_Analysis_Storage || devType == DevType_Storage { //鍙湁鍒嗘瀽鍜屽瓨鍌ㄦ墠鏈塃S闆嗙兢
+		go func() {
+			//閫�鍑簑eedfs鑺傜偣
+			if _, err := ExitWeedfsServer("/opt/vasystem/script", isDel, config.StorageConf.VolumePath); err != nil {
+				logger.Error("Leave ExitWeedfsServer err:", err)
+				return
+			}
+			//閫�鍑篹s鑺傜偣
+			if _, err := ExitCluster("/opt/elasticsearch/config", "/opt/vasystem/script"); err != nil {
+				logger.Error("Leave ExitCluster es cluster err:", err)
+				return
+			}
+		}()
+	}
+
+	var err error
+	tx := models.GetDB().Begin()
+	defer func() {
+		if err != nil && tx != nil {
+			tx.Rollback()
+		}
+	}()
+
+	if serf.Agent != nil {
+		t := time.Now()
+		serf.Agent.Stop()
+		serf.Agent = nil
+		err = tx.Exec("delete from cluster_node").Error
+		if err != nil {
+			logger.Error("Leave delete from cluster_node err:", err)
+			return false, err
+		}
+		err = tx.Exec("delete from cluster").Error
+		if err != nil {
+			logger.Error("Leave delete from cluster err:", err)
+			return false, err
+		}
+		tx.Commit()
+		logger.Debug("Leave delete cluster_node and cluster from db")
+		chMsg := protomsg.DbChangeMessage{
+			Id:     "",
+			Table:  protomsg.TableChanged_T_Cluster,
+			Action: protomsg.DbAction_Delete,
+			Info:   "",
+		}
+		logger.Debugf("Leave delete db time=%v", time.Since(t))
+		tm := time.Now()
+		s.AddDbMessage(&chMsg)
+		logger.Debugf("Leave AddDbMessage time=%v", time.Since(tm))
+	}
+
+	logger.Debugf("Leave success time=%v", time.Since(start))
+	return true, nil
+}
+
+func (s ClusterService) TestSyncSql() bool {
+	var lc models.LocalConfig
+	lc.Select()
+
+	timeUnix := time.Now().Unix()
+	fmtTimeStr := time.Unix(timeUnix, 0).Format("2006-01-02 15:04:05")
+	serverIp, _, e := util.GetLocalIP(config.Server.NetworkAdapter)
+	if e == nil && serverIp != "" {
+		sql := "insert into cluster_node (id,cluster_id,node_name,node_id,node_ip,create_time) values ('" + uuid.NewV4().String() + "','" + uuid.NewV4().String() + "','" + lc.ServerName + "','" + config.Server.AnalyServerId + "','" + (serverIp + ":" + strconv.Itoa(dbSync.DefaultBindPort)) + "','" + fmtTimeStr + "')"
+		if err := models.GetDB().Exec(sql).Error; err != nil {
+			return false
+		}
+		return true
+	}
+	return false
+}
+
+//鍔犲叆闆嗙兢鍚庢竻绌烘湰鍦扮殑鍚屾搴撴暟鎹�
+func syncTableDataFromCluster(joinArg *vo.ClusterJoinVo) bool {
+	var lc models.LocalConfig
+	e := lc.Select()
+	nodeName := ""
+	if e == nil && lc.ServerName != "" {
+		nodeName = lc.ServerName
+	}
+
+	var err error
+	db := models.GetDB()
+	db.LogMode(false)
+	defer db.LogMode(true)
+	tx := db.Begin()
+	defer func() {
+		if err != nil && tx != nil {
+			tx.Rollback()
+		}
+	}()
+	//0.鍏抽棴reference
+	tx.Exec("PRAGMA foreign_keys=OFF")
+	//1.鍒犻櫎鏈湴鐨勫悓姝ュ簱鏁版嵁
+	for _, t := range serf.SyncTables {
+		delSql := ""
+		if t == "dbtables" {
+			delSql = "delete from " + t + " where (analyServerId='' or analyServerId=NULL)"
+		} else if t == "dbtablepersons" {
+			delSql = "delete from " + t + " where tableId in (select id from dbtables where (analyServerId='' or analyServerId=NULL))"
+		} else {
+			delSql = "delete from " + t + ""
+		}
+		err = tx.Exec(delSql).Error
+		if err != nil {
+			return false
+		}
+	}
+	//2.鎷夊彇闆嗙兢鍐呯殑鍚屾搴撴暟鎹埌鏈湴鏁版嵁搴撹〃涓�
+	var dumpSqls *[]string
+	dumpSqls, err = serf.GetTableDataFromCluster(serf.Agent, joinArg.ClusterId, serf.SyncTables, 20*time.Second)
+	if dumpSqls != nil && len(*dumpSqls) > 0 {
+		for _, sqlStr := range *dumpSqls {
+			logger.Debug("gorm exec dumpSql:", sqlStr)
+			if err = tx.Exec(sqlStr).Error; err != nil {
+				return false
+			}
+		}
+	} else {
+		logger.Debug("get cluster db data err, dumpSqls is nil,err:", err)
+		err = errors.New("dumpSqls is nil")
+		return false
+	}
+
+	logger.Debug("鎴愬姛娣诲姞褰撳墠鑺傜偣鍒伴泦缇よ妭鐐逛腑")
+
+	//3.灏嗘湰鑺傜偣鍔犲叆鍒拌妭鐐瑰垪琛ㄤ腑
+	timeUnix := time.Now().Unix()
+	fmtTimeStr := time.Unix(timeUnix, 0).Format("2006-01-02 15:04:05")
+	serverIp, _, e1 := util.GetLocalIP(config.Server.NetworkAdapter)
+	if e1 != nil || serverIp == "" {
+		err = errors.New("get serverIp err")
+		return false
+	}
+	logger.Debug("cur Node serverIp:", serverIp, "nodeName:", nodeName)
+	if nodeName == "" {
+		nodeName = serverIp
+	}
+
+	//娣诲姞鏈韩鑺傜偣,姝ゅ淇bug锛屽姞鍏ラ泦缇ょ殑鑺傜偣閫�鍑洪泦缇ゅ悗閲嶆柊鍔犲叆锛屼細鎶d鍐茬獊
+	var sql = "insert into cluster_node (id,cluster_id,node_name,node_id,node_ip,create_time,isDelete,device_type) select '" + config.Server.AnalyServerId + "','" + joinArg.ClusterId + "','" + nodeName + "','" + config.Server.AnalyServerId + "','" + (serverIp + ":" + strconv.Itoa(dbSync.DefaultBindPort)) + "','" + fmtTimeStr + "',0,'" + config.Server.DeviceType + "' where not exists (select 1 from cluster_node where id='" + config.Server.AnalyServerId + "')"
+	if err = tx.Exec(sql).Error; err != nil {
+		logger.Debug("add cur node err:", err)
+		return false
+	}
+	if err = tx.Exec("update cluster_node set isDelete=0 where id='" + config.Server.AnalyServerId + "'").Error; err != nil {
+		logger.Debug("update isDelete err:", err)
+		return false
+	}
+
+	//4.寮�鍚痳eference
+	tx.Exec("PRAGMA foreign_keys=ON")
+	tx.Commit()
+	serf.SyncSql([]string{sql})
+	return true
+}
+
+func (s ClusterService) AddDbMessage(pbMsg *protomsg.DbChangeMessage) {
+	bts, _ := json.Marshal(*pbMsg)
+	s.bk.Publish(ProcName, bts)
+}
+
+func (s ClusterService) FindIpByNode(nodeId string) (string, error) {
+	var lc models.Node
+	return lc.FindIpByNode(nodeId)
+}

--
Gitblit v1.8.0