package main import ( "basic.com/pubsub/protomsg.git" "basic.com/valib/bhomeclient.git" "basic.com/valib/bhomedbapi.git" "basic.com/valib/logger.git" "basic.com/valib/version.git" "context" "encoding/json" "flag" "os" "os/signal" "syscall" "vamicro/chanmanage-service/cache" "vamicro/chanmanage-service/controllers" "vamicro/chanmanage-service/models" "vamicro/chanmanage-service/service" "vamicro/config" ) //算力配置模块 var ( procName = "chanmanage-service" proc = &bhomeclient.ProcInfo{ Name: procName, //进程名称 ID: procName, //进程id Info: "", //进程的描述信息,用于区分同一进程名称下多个进程 } env = flag.String("e", "pro", "") ) func init() { flag.Parse() vaversion.Usage() config.Init(*env) // 日志初始化 var logFile = config.LogConf.Path + "vamicro-" + procName + ".log" logger.InitLogger(logFile, config.LogConf.Level, config.LogConf.MaxSize, config.LogConf.MaxBackups, config.LogConf.MaxAge) logger.Info("log init success !") } func main() { models.Init() defer models.CloseDB() ctx, cancel := context.WithCancel(context.Background()) fm, pubTopics := initFuncMap() var reg = &bhomeclient.RegisterInfo{ Proc: *proc, Channel: nil, PubTopic: pubTopics, SubTopic: []string{ bhomeclient.Proc_Camera_Service, bhomeclient.Proc_Gb28181_Service, bhomeclient.Proc_Scene_Service, bhomeclient.Proc_System_Service}, } q := make(chan os.Signal, 1) signal.Notify(q, os.Interrupt, os.Kill, syscall.SIGTERM) ms, err := bhomeclient.NewMicroNode(ctx, q, config.Server.AnalyServerId, reg, logger.Debug) if err != nil { return } bhomedbapi.InitLog(logger.Debug) bhomedbapi.InitGetNetNode(ms.GetLocalNetNodeByTopic) bhomedbapi.InitDoReq(ms.RequestOnly) go dealSubMsg(ctx, ms) // 断流监控数据接收服务 go service.WatchDuanliu() go ms.StartServer(fm) cache.InitDataCache() <-q ms.DeRegister() cancel() ms.Free() } //处理订阅消息 func dealSubMsg(ctx context.Context, ms *bhomeclient.MicroNode) { pollSv := service.NewPollSetService(ms) for { select { case <-ctx.Done(): return case msg := <-ms.SubCh: logger.Debug("recv sub msg:", msg) cache.UpdateBySub(msg) var protoMsg protomsg.DbChangeMessage if err := json.Unmarshal(msg.Data, &protoMsg); err == nil { if protoMsg.Table == protomsg.TableChanged_T_Camera || protoMsg.Table == protomsg.TableChanged_T_CameraRule || protoMsg.Table == protomsg.TableChanged_T_Server { pollSv.ResetChannelCount() } } else { logger.Debug("unmarshal sub msg to DbChangeMsg err:", err) } } } } const urlPrefix = "/data/api-v" func initFuncMap() (map[string]bhomeclient.MicroFunc, []string) { funcMap := make(map[string]bhomeclient.MicroFunc) psc := new(controllers.PollSetController) funcMap[urlPrefix+"/camera/statisticRunInfo"] = psc.StatisticRunInfo funcMap[urlPrefix+"/camera/getAllCamerasByServer"] = psc.GetAllCamerasByServer funcMap[urlPrefix+"/pollConfig/savePollPeriod"] = psc.SavePollPeriod funcMap[urlPrefix+"/pollConfig/savePollDelay"] = psc.SavePollDelay funcMap[urlPrefix+"/pollConfig/updateEnable"] = psc.UpdateEnable funcMap[urlPrefix+"/pollConfig/updateChannelCount"] = psc.UpdateChannelCount funcMap[urlPrefix+"/pollConfig/getPollConfig"] = psc.GetPollConfig var pubTopics []string for key, _ := range funcMap { pubTopics = append(pubTopics, key) } return funcMap, pubTopics }