zhangzengfei
2023-09-07 55aa27a6ad0e012d62dcea2db37528a1b18836fb
裁剪集群操作的无关动作
5个文件已修改
140 ■■■■■ 已修改文件
system-service/controllers/cluster.go 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
system-service/main.go 19 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
system-service/models/db.go 5 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
system-service/serf/serf.go 4 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
system-service/service/clusterService.go 110 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
system-service/controllers/cluster.go
@@ -27,7 +27,7 @@
        if arr != nil && len(arr) > 0 {
            var nodeE models.Node
            nodes, _ := nodeE.FindNodesByClusterId(arr[0].ClusterId)
            logger.Debugf("FindCluster nodes=%v", nodes)
            //logger.Debugf("FindCluster nodes=%v", nodes)
            return &bhomeclient.Reply{Success: true, Data: map[string]interface{}{
                "clusterId":   arr[0].ClusterId,
                "clusterName": arr[0].ClusterName,
system-service/main.go
@@ -3,13 +3,12 @@
import (
    "context"
    "flag"
    "net/http"
    _ "net/http/pprof"
    "os"
    "os/signal"
    "syscall"
    "vamicro/config"
    "vamicro/extend/util"
    //"vamicro/extend/util"
    "vamicro/system-service/broadcast"
    "vamicro/system-service/controllers"
    "vamicro/system-service/models"
@@ -68,29 +67,33 @@
    q := make(chan os.Signal, 1)
    signal.Notify(q, os.Interrupt, os.Kill, syscall.SIGTERM)
    ms, err := bhomeclient.NewMicroNode(ctx, q, config.Server.AnalyServerId, reg, logger.Debug)
    ms, err := bhomeclient.NewMicroNode(ctx, q, config.Server.AnalyServerId, reg, nil)
    if err != nil {
        return
    }
    bhomedbapi.InitGetNetNode(ms.GetLocalNetNodeByTopic)
    bhomedbapi.InitDoReq(ms.RequestOnly)
    bhomedbapi.InitLog(logger.Debug)
    //bhomedbapi.InitLog(logger.Debug)
    util.AuthCheck(ctx) //授权检查
    //util.AuthCheck(ctx) //授权检查
    go ms.StartServer(fm)
    go dealSubMsg(ctx, ms)
    serf.InitBusH(ms)
    serf.InitAgent(ctx)
    go service.WatchEsAndWeedfsIp(ms)
    //go service.WatchEsAndWeedfsIp(ms)
    go serf.StartSyncSqlToSerf()
    go service.StartSyncDev()
    //go service.StartSyncDev()
    go broadcast.StartServer() //设备可以被广播搜索
    //go service.CollectDeviceInfo(ctx, ms)
    go service.WatchAuthSetChange(ms) //根据授权文件监视通道数量变化
    //go service.WatchAuthSetChange(ms) //根据授权文件监视通道数量变化
    //统计系统运行状态
    go sys.GatherStat()
system-service/models/db.go
@@ -37,6 +37,7 @@
        &SysUserRole{},
        &SysUserMenu{},
        &SysSetting{},
        &SqlSyncHis{},
        &DefHeadPic{},
        &AuthConfig{},
        &AuthDevice{},
@@ -54,7 +55,7 @@
    setFaceTrackDic()
}
//GetDB ...
// GetDB ...
func GetDB() *gorm.DB {
    return db
}
@@ -96,7 +97,7 @@
    db.Exec(strings.Join(sqls, ""))
}
//添加一些是否的字典值
// 添加一些是否的字典值
func setFaceTrackDic() {
    var sqls []string
    sqls = append(sqls, "INSERT INTO dictionary(`id`,`value`,`name`,`type`,`description`,`sort`,`parent_id`) select '51739a24-20bd-4440-8ef5-47fe93200536', '1', '是', 'bForceSend', '是', 1, '0' where not exists (select 1 from dictionary where id='51739a24-20bd-4440-8ef5-47fe93200536');")
system-service/serf/serf.go
@@ -267,7 +267,7 @@
        return
    }
    err = Agent.UserEvent(UserEventSyncSql, ueB, false)
    if err == nil || !strings.Contains(err.Error(), "cannot contain") {
        logger.Error("err: ", err)
    if err != nil {
        logger.Error("sending sync sql event err: ", err)
    }
}
system-service/service/clusterService.go
@@ -98,12 +98,8 @@
    return m
}
//根据集群名称和密码创建集群
// 根据集群名称和密码创建集群
func (s ClusterService) Create(clusterName string, pwd string, virtualIp string) (bool, string) {
    devTyp := GetDevType(config.Server.DeviceType)
    if devTyp != DevType_Storage && devTyp != DevType_Analysis_Storage {
        return false, "只有分析和存储节点才能创建集群"
    }
    clusterId := uuid.NewV4().String()
    pwd = config.ClusterSet.PwdPre + pwd
    var clusterE = models.Cluster{
@@ -119,19 +115,6 @@
        b := clusterE.Create()
        if b {
            serf.InitAgent(context.Background())
            //创建es集群
            if _, err := CreateOriginalCluster("/opt/elasticsearch/config", "/opt/vasystem/script", "/opt/vasystem/indexInit"); err != nil {
                //创建失败,则serf集群也要推出
                s.Leave(false)
                return false, err.Error()
            }
            //创建weedfs集群
            if _, err = CreateWeedfsServer("/opt/vasystem/script"); err != nil {
                s.Leave(false)
                return false, err.Error()
            }
            chMsg := protomsg.DbChangeMessage{
                Id:     clusterE.ClusterId,
@@ -186,7 +169,7 @@
    }
}
//加入集群
// 加入集群
func (s ClusterService) JoinCluster(joinArg *vo.ClusterJoinVo) (bool, error) {
    start := time.Now()
    if config.Server.AnalyServerId == "" {
@@ -221,47 +204,31 @@
        serf.Agent = agent
        t := time.Now()
        syncB := syncTableDataFromCluster(joinArg)
        logger.Debugf("AddCluster syncTableDataFromCluster time=%v, syncB=%v", time.Since(t), syncB)
        if syncB {
            devTyp := GetDevType(config.Server.DeviceType)
            if devTyp == DevType_Analysis_Storage || devTyp == DevType_Storage { //只有能启动ES的节点才能加入ES
                go func() {
                    //添加ES节点
                    if _, err := AddCluster("/opt/elasticsearch/config", "/opt/vasystem/script", targetIp, "9200", config.EsConfig.StorePath); err != nil {
                        s.Leave(false)
                        return
                    }
                    //添加weedfs节点
                    if _, err := AddWeedfsServer("/opt/vasystem/script", targetIp, config.StorageConf.VolumePath); err != nil {
                        s.Leave(false)
                        return
                    }
                }()
            }
            //需要重新初始化本地比对进程
            go serf.ReInitDbPersonCompareData()
            chMsg := protomsg.DbChangeMessage{
                Id:     joinArg.ClusterId,
                Table:  protomsg.TableChanged_T_Cluster,
                Action: protomsg.DbAction_Insert,
                Info:   "",
            }
            s.AddDbMessage(&chMsg)
            logger.Debugf("AddCluster 加入集群成功 time=%v", time.Since(start))
            return true, nil
        } else {
            logger.Debug("AddCluster syncTableDataFromCluster fail")
            if agent != nil {
                agent.Leave()
                err = agent.Shutdown()
                if err != nil {
                    logger.Debug("AddCluster agent shutdown err:", err)
                }
            }
        }
        syncTableDataFromCluster(joinArg)
        logger.Debugf("AddCluster  time=%v", time.Since(t))
        //if syncB {
        //    //需要重新初始化本地比对进程
        //    go serf.ReInitDbPersonCompareData()
        //    chMsg := protomsg.DbChangeMessage{
        //        Id:     joinArg.ClusterId,
        //        Table:  protomsg.TableChanged_T_Cluster,
        //        Action: protomsg.DbAction_Insert,
        //        Info:   "",
        //    }
        //
        //    s.AddDbMessage(&chMsg)
        logger.Debugf("AddCluster 加入集群成功 time=%v", time.Since(start))
        return true, nil
        //} else {
        //    logger.Debug("AddCluster syncTableDataFromCluster fail")
        //    if agent != nil {
        //        agent.Leave()
        //        err = agent.Shutdown()
        //        if err != nil {
        //            logger.Debug("AddCluster agent shutdown err:", err)
        //        }
        //    }
        //}
    } else {
        logger.Debug("AddCluster dbSync.Init err:", err)
        if agent != nil {
@@ -282,7 +249,7 @@
    DevType_Other            = "other"            //其他设备类型,eg:应用
)
//获取本机类型,只进行分析、分析存储一体、只存储、其他。。
// 获取本机类型,只进行分析、分析存储一体、只存储、其他。。
func GetDevType(dt string) string {
    if dt != "" {
        if len(dt) >= 4 {
@@ -336,23 +303,6 @@
func (s ClusterService) Leave(isDel bool) (bool, error) {
    start := time.Now()
    devType := GetDevType(config.Server.DeviceType)
    logger.Debugf("Leave GetDevType=%v, isDel=%v", devType, isDel)
    if devType == DevType_Analysis_Storage || devType == DevType_Storage { //只有分析和存储才有ES集群
        go func() {
            //退出weedfs节点
            if _, err := ExitWeedfsServer("/opt/vasystem/script", isDel, config.StorageConf.VolumePath); err != nil {
                logger.Error("Leave ExitWeedfsServer err:", err)
                return
            }
            //退出es节点
            if _, err := ExitCluster("/opt/elasticsearch/config", "/opt/vasystem/script"); err != nil {
                logger.Error("Leave ExitCluster es cluster err:", err)
                return
            }
        }()
    }
    var err error
    tx := models.GetDB().Begin()
@@ -411,7 +361,7 @@
    return false
}
//加入集群后清空本地的同步库数据
// 加入集群后清空本地的同步库数据
func syncTableDataFromCluster(joinArg *vo.ClusterJoinVo) bool {
    var lc models.LocalConfig
    e := lc.Select()
@@ -430,6 +380,7 @@
            tx.Rollback()
        }
    }()
    //0.关闭reference
    tx.Exec("PRAGMA foreign_keys=OFF")
    //1.删除本地的同步库数据
@@ -447,6 +398,7 @@
            return false
        }
    }
    //2.拉取集群内的同步库数据到本地数据库表中
    var dumpSqls *[]string
    dumpSqls, err = serf.GetTableDataFromCluster(serf.Agent, joinArg.ClusterId, serf.SyncTables, 20*time.Second)