| | |
| | | if arr != nil && len(arr) > 0 { |
| | | var nodeE models.Node |
| | | nodes, _ := nodeE.FindNodesByClusterId(arr[0].ClusterId) |
| | | logger.Debugf("FindCluster nodes=%v", nodes) |
| | | //logger.Debugf("FindCluster nodes=%v", nodes) |
| | | return &bhomeclient.Reply{Success: true, Data: map[string]interface{}{ |
| | | "clusterId": arr[0].ClusterId, |
| | | "clusterName": arr[0].ClusterName, |
| | |
| | | import ( |
| | | "context" |
| | | "flag" |
| | | "net/http" |
| | | _ "net/http/pprof" |
| | | "os" |
| | | "os/signal" |
| | | "syscall" |
| | | "vamicro/config" |
| | | "vamicro/extend/util" |
| | | //"vamicro/extend/util" |
| | | "vamicro/system-service/broadcast" |
| | | "vamicro/system-service/controllers" |
| | | "vamicro/system-service/models" |
| | |
| | | q := make(chan os.Signal, 1) |
| | | signal.Notify(q, os.Interrupt, os.Kill, syscall.SIGTERM) |
| | | |
| | | ms, err := bhomeclient.NewMicroNode(ctx, q, config.Server.AnalyServerId, reg, logger.Debug) |
| | | ms, err := bhomeclient.NewMicroNode(ctx, q, config.Server.AnalyServerId, reg, nil) |
| | | if err != nil { |
| | | return |
| | | } |
| | | |
| | | bhomedbapi.InitGetNetNode(ms.GetLocalNetNodeByTopic) |
| | | bhomedbapi.InitDoReq(ms.RequestOnly) |
| | | bhomedbapi.InitLog(logger.Debug) |
| | | //bhomedbapi.InitLog(logger.Debug) |
| | | |
| | | util.AuthCheck(ctx) //授权检查 |
| | | //util.AuthCheck(ctx) //授权检查 |
| | | |
| | | go ms.StartServer(fm) |
| | | go dealSubMsg(ctx, ms) |
| | | |
| | | serf.InitBusH(ms) |
| | | serf.InitAgent(ctx) |
| | | |
| | | go service.WatchEsAndWeedfsIp(ms) |
| | | //go service.WatchEsAndWeedfsIp(ms) |
| | | go serf.StartSyncSqlToSerf() |
| | | |
| | | go service.StartSyncDev() |
| | | //go service.StartSyncDev() |
| | | |
| | | go broadcast.StartServer() //设备可以被广播搜索 |
| | | |
| | | //go service.CollectDeviceInfo(ctx, ms) |
| | | |
| | | go service.WatchAuthSetChange(ms) //根据授权文件监视通道数量变化 |
| | | //go service.WatchAuthSetChange(ms) //根据授权文件监视通道数量变化 |
| | | |
| | | //统计系统运行状态 |
| | | go sys.GatherStat() |
| | |
| | | &SysUserRole{}, |
| | | &SysUserMenu{}, |
| | | &SysSetting{}, |
| | | &SqlSyncHis{}, |
| | | &DefHeadPic{}, |
| | | &AuthConfig{}, |
| | | &AuthDevice{}, |
| | |
| | | setFaceTrackDic() |
| | | } |
| | | |
| | | //GetDB ... |
| | | // GetDB ... |
| | | func GetDB() *gorm.DB { |
| | | return db |
| | | } |
| | |
| | | db.Exec(strings.Join(sqls, "")) |
| | | } |
| | | |
| | | //添加一些是否的字典值 |
| | | // 添加一些是否的字典值 |
| | | func setFaceTrackDic() { |
| | | var sqls []string |
| | | sqls = append(sqls, "INSERT INTO dictionary(`id`,`value`,`name`,`type`,`description`,`sort`,`parent_id`) select '51739a24-20bd-4440-8ef5-47fe93200536', '1', '是', 'bForceSend', '是', 1, '0' where not exists (select 1 from dictionary where id='51739a24-20bd-4440-8ef5-47fe93200536');") |
| | |
| | | return |
| | | } |
| | | err = Agent.UserEvent(UserEventSyncSql, ueB, false) |
| | | if err == nil || !strings.Contains(err.Error(), "cannot contain") { |
| | | logger.Error("err: ", err) |
| | | if err != nil { |
| | | logger.Error("sending sync sql event err: ", err) |
| | | } |
| | | } |
| | |
| | | return m |
| | | } |
| | | |
| | | //根据集群名称和密码创建集群 |
| | | // 根据集群名称和密码创建集群 |
| | | func (s ClusterService) Create(clusterName string, pwd string, virtualIp string) (bool, string) { |
| | | devTyp := GetDevType(config.Server.DeviceType) |
| | | if devTyp != DevType_Storage && devTyp != DevType_Analysis_Storage { |
| | | return false, "只有分析和存储节点才能创建集群" |
| | | } |
| | | clusterId := uuid.NewV4().String() |
| | | pwd = config.ClusterSet.PwdPre + pwd |
| | | var clusterE = models.Cluster{ |
| | |
| | | b := clusterE.Create() |
| | | if b { |
| | | serf.InitAgent(context.Background()) |
| | | |
| | | //创建es集群 |
| | | if _, err := CreateOriginalCluster("/opt/elasticsearch/config", "/opt/vasystem/script", "/opt/vasystem/indexInit"); err != nil { |
| | | //创建失败,则serf集群也要推出 |
| | | s.Leave(false) |
| | | return false, err.Error() |
| | | } |
| | | |
| | | //创建weedfs集群 |
| | | if _, err = CreateWeedfsServer("/opt/vasystem/script"); err != nil { |
| | | s.Leave(false) |
| | | return false, err.Error() |
| | | } |
| | | |
| | | chMsg := protomsg.DbChangeMessage{ |
| | | Id: clusterE.ClusterId, |
| | |
| | | } |
| | | } |
| | | |
| | | //加入集群 |
| | | // 加入集群 |
| | | func (s ClusterService) JoinCluster(joinArg *vo.ClusterJoinVo) (bool, error) { |
| | | start := time.Now() |
| | | if config.Server.AnalyServerId == "" { |
| | |
| | | serf.Agent = agent |
| | | |
| | | t := time.Now() |
| | | syncB := syncTableDataFromCluster(joinArg) |
| | | logger.Debugf("AddCluster syncTableDataFromCluster time=%v, syncB=%v", time.Since(t), syncB) |
| | | if syncB { |
| | | devTyp := GetDevType(config.Server.DeviceType) |
| | | if devTyp == DevType_Analysis_Storage || devTyp == DevType_Storage { //只有能启动ES的节点才能加入ES |
| | | go func() { |
| | | //添加ES节点 |
| | | if _, err := AddCluster("/opt/elasticsearch/config", "/opt/vasystem/script", targetIp, "9200", config.EsConfig.StorePath); err != nil { |
| | | s.Leave(false) |
| | | return |
| | | } |
| | | |
| | | //添加weedfs节点 |
| | | if _, err := AddWeedfsServer("/opt/vasystem/script", targetIp, config.StorageConf.VolumePath); err != nil { |
| | | s.Leave(false) |
| | | return |
| | | } |
| | | }() |
| | | } |
| | | |
| | | //需要重新初始化本地比对进程 |
| | | go serf.ReInitDbPersonCompareData() |
| | | chMsg := protomsg.DbChangeMessage{ |
| | | Id: joinArg.ClusterId, |
| | | Table: protomsg.TableChanged_T_Cluster, |
| | | Action: protomsg.DbAction_Insert, |
| | | Info: "", |
| | | } |
| | | s.AddDbMessage(&chMsg) |
| | | logger.Debugf("AddCluster 加入集群成功 time=%v", time.Since(start)) |
| | | return true, nil |
| | | } else { |
| | | logger.Debug("AddCluster syncTableDataFromCluster fail") |
| | | if agent != nil { |
| | | agent.Leave() |
| | | err = agent.Shutdown() |
| | | if err != nil { |
| | | logger.Debug("AddCluster agent shutdown err:", err) |
| | | } |
| | | } |
| | | } |
| | | syncTableDataFromCluster(joinArg) |
| | | logger.Debugf("AddCluster time=%v", time.Since(t)) |
| | | //if syncB { |
| | | // //需要重新初始化本地比对进程 |
| | | // go serf.ReInitDbPersonCompareData() |
| | | // chMsg := protomsg.DbChangeMessage{ |
| | | // Id: joinArg.ClusterId, |
| | | // Table: protomsg.TableChanged_T_Cluster, |
| | | // Action: protomsg.DbAction_Insert, |
| | | // Info: "", |
| | | // } |
| | | // |
| | | // s.AddDbMessage(&chMsg) |
| | | logger.Debugf("AddCluster 加入集群成功 time=%v", time.Since(start)) |
| | | return true, nil |
| | | //} else { |
| | | // logger.Debug("AddCluster syncTableDataFromCluster fail") |
| | | // if agent != nil { |
| | | // agent.Leave() |
| | | // err = agent.Shutdown() |
| | | // if err != nil { |
| | | // logger.Debug("AddCluster agent shutdown err:", err) |
| | | // } |
| | | // } |
| | | //} |
| | | } else { |
| | | logger.Debug("AddCluster dbSync.Init err:", err) |
| | | if agent != nil { |
| | |
| | | DevType_Other = "other" //其他设备类型,eg:应用 |
| | | ) |
| | | |
| | | //获取本机类型,只进行分析、分析存储一体、只存储、其他。。 |
| | | // 获取本机类型,只进行分析、分析存储一体、只存储、其他。。 |
| | | func GetDevType(dt string) string { |
| | | if dt != "" { |
| | | if len(dt) >= 4 { |
| | |
| | | |
| | | func (s ClusterService) Leave(isDel bool) (bool, error) { |
| | | start := time.Now() |
| | | devType := GetDevType(config.Server.DeviceType) |
| | | logger.Debugf("Leave GetDevType=%v, isDel=%v", devType, isDel) |
| | | |
| | | if devType == DevType_Analysis_Storage || devType == DevType_Storage { //只有分析和存储才有ES集群 |
| | | go func() { |
| | | //退出weedfs节点 |
| | | if _, err := ExitWeedfsServer("/opt/vasystem/script", isDel, config.StorageConf.VolumePath); err != nil { |
| | | logger.Error("Leave ExitWeedfsServer err:", err) |
| | | return |
| | | } |
| | | //退出es节点 |
| | | if _, err := ExitCluster("/opt/elasticsearch/config", "/opt/vasystem/script"); err != nil { |
| | | logger.Error("Leave ExitCluster es cluster err:", err) |
| | | return |
| | | } |
| | | }() |
| | | } |
| | | |
| | | var err error |
| | | tx := models.GetDB().Begin() |
| | |
| | | return false |
| | | } |
| | | |
| | | //加入集群后清空本地的同步库数据 |
| | | // 加入集群后清空本地的同步库数据 |
| | | func syncTableDataFromCluster(joinArg *vo.ClusterJoinVo) bool { |
| | | var lc models.LocalConfig |
| | | e := lc.Select() |
| | |
| | | tx.Rollback() |
| | | } |
| | | }() |
| | | |
| | | //0.关闭reference |
| | | tx.Exec("PRAGMA foreign_keys=OFF") |
| | | //1.删除本地的同步库数据 |
| | |
| | | return false |
| | | } |
| | | } |
| | | |
| | | //2.拉取集群内的同步库数据到本地数据库表中 |
| | | var dumpSqls *[]string |
| | | dumpSqls, err = serf.GetTableDataFromCluster(serf.Agent, joinArg.ClusterId, serf.SyncTables, 20*time.Second) |