From e6a5be3714b70236d84f25be2ce858c3d7e379d8 Mon Sep 17 00:00:00 2001 From: zhangmeng <775834166@qq.com> Date: 星期二, 21 一月 2020 14:16:01 +0800 Subject: [PATCH] add recv sdk proc info from dispath --- app/master/master.go | 211 +++++++++++++++++++++++++++++++++------------------- 1 files changed, 134 insertions(+), 77 deletions(-) diff --git a/app/master/master.go b/app/master/master.go index 1199713..797d4d1 100644 --- a/app/master/master.go +++ b/app/master/master.go @@ -7,64 +7,134 @@ "context" "encoding/json" "os" + "plugin" "strconv" "strings" "time" - - "basic.com/libgowrapper/sdkstruct.git" - "basic.com/valib/pubsub.git" ) -func reaper(ctxt context.Context) { - pidChan := make(chan int, 1) - Reap(pidChan) - go waitForRestart(ctxt, pidChan) +var ( + soLoaded bool + fnFetch func(context.Context, string, string, int, string, chan<- []byte, func(...interface{})) + fnNotify func(context.Context, string, <-chan []byte, func(...interface{})) context.CancelFunc +) + +func soLoad(soFile string) bool { + if fnFetch != nil && fnNotify != nil { + return true + } + + plug, err := plugin.Open(soFile) + if err != nil { + logo.Errorln("Open: ", soFile, " error: ", err) + return false + } + + var fn plugin.Symbol + + fn, err = app.LoadFunc(plug, soFile, "Fetch") + if err != nil { + logo.Infoln("Lookup Func Fetch From File: ", soFile, " Error") + return false + } + fnFetch = fn.(func(context.Context, string, string, int, string, chan<- []byte, func(...interface{}))) + + fn, err = app.LoadFunc(plug, soFile, "Notify") + if err != nil { + logo.Infoln("Lookup Func Notify From File: ", soFile, " Error") + return false + } + fnNotify = fn.(func(context.Context, string, <-chan []byte, func(...interface{})) context.CancelFunc) + return true } -// Run run -func Run(ctx context.Context, soFile, configPath string) bool { - reaper(ctx) - - fetcher := NewFetcher(soFile) - if fetcher == nil { +func initFetcher(ctx context.Context, soFile string) <-chan []byte { + if !soLoad(soFile) { logo.Errorln("New Fetcher Load so File Funcs Error From File: ", soFile) - return false + return nil } logo.Infoln("~~~~~~Created Fetcher, Now Sync From DB") - ip := "tcp://" + util.FSI.IP + ip := "tcp://192.168.5.22" + // ip := "tcp://" + util.FSI.IP url := ip + ":" + strconv.Itoa(util.FSI.DataPort) hearturl := ip + ":" + strconv.Itoa(util.FSI.HBPort) - chMsg, err := fetcher.fnInit(url, hearturl, 0, []string{pubsub.Topic_Sdk}, "analysis-master"+strconv.Itoa(os.Getpid())) - for { - if err == nil { - break - } - logo.Infoln("Analysis Fetcher INIT Error! URL:", url) - time.Sleep(time.Second) - chMsg, err = fetcher.fnInit(url, hearturl, 0, []string{pubsub.Topic_Sdk}, "analysis-master"+strconv.Itoa(os.Getpid())) + ch := make(chan []byte, 3) + + fnFetch(ctx, url, hearturl, 0, "analysis-master"+strconv.Itoa(os.Getpid()), ch, logo.Infoln) + + logo.Infoln("~~~~~~Start Recv SDK Infos") + return ch +} + +// Run run +func Run(ctx context.Context, soFile, configPath string) bool { + daemon := NewDaemon() + chProc := make(chan []TypeProc, 32) + go daemon.Watch(ctx, chProc) + + chMsg := initFetcher(ctx, soFile) + if chMsg == nil { + logo.Infoln("Master Run initFetcher Failed") + return false } + params := app.GetParams() for { select { case <-ctx.Done(): return true case msg := <-chMsg: - // sdktype process_name topic null - // yolo/face yolo_0/yolo_1 channel + // sdktype process_name topic null + // yolo/face yolo_0/yolo_1 channel var sdk map[string](map[string](map[string]interface{})) - if err := json.Unmarshal(msg.Msg, &sdk); err != nil { + if err := json.Unmarshal(msg, &sdk); err != nil { logo.Infoln("Fetcher SDK unmarshal err:", err) continue } logo.Infoln("~~~~~~Recv New SDKInfos") - chCameras <- CameraInfo{ - Cameras: cameras, + + var typeProcs []TypeProc + + for sdkType, mapSdkProc := range sdk { + config := findConfigFile(sdkType, configPath) + if config == nil { + logo.Infoln("!!!!!!There Is Not ", sdkType, " Config File In ", configPath, " Skip It") + continue + } + env := checkConfig(sdkType, *config) + if env == nil { + continue + } + + var channels []string + var namedProcs []NamedProc + + for procName, mapProcChannels := range mapSdkProc { + for c := range mapProcChannels { + channels = append(channels, c) + } + p := NamedProc{ + Name: procName, + Channels: channels, + Env: *env, + Config: *config, + Param: params, + } + namedProcs = append(namedProcs, p) + } + t := TypeProc{ + Typ: sdkType, + SNameProc: namedProcs, + } + typeProcs = append(typeProcs, t) } + chProc <- typeProcs + logo.Infoln("~~~~~~Recv New SDKInfos Over") default: @@ -74,56 +144,43 @@ } -func manualStart(ctx context.Context, sdks []sdkstruct.SDKInfo, configPath string) bool { +func findConfigFile(typ, configPath string) *string { rPath := configPath - - params := app.GetParams() - - for _, v := range sdks { - - file := rPath + v.SdkType + ".json" - if rPath[len(rPath)-1] != '/' { - file = rPath + "/" + v.SdkType + ".json" - } - - cfg, err := app.ReadConfig(file) - if err != nil { - logo.Errorln("Master Read: ", file, " Config Error: ", err) - continue - } - - if len(cfg.Env) > 0 { - envs := strings.Split(cfg.Env, ":") - normal := true - for _, v := range envs { - if !util.IsFileExist(v) { - normal = false - break - } - } - if !normal { - logo.Infoln("Can't Find Runtime Path, Skip It: ", file) - continue - } - } - - logo.Infoln(file, " CONFIG: ", cfg) - - args := []string{ - `-role=slave`, - "-sdk=" + v.SdkType, - "-id=" + v.IpcID, - "-" + util.ConfigPath + "=" + file, - } - - args = append(args, params...) - pid, err := runProc(ctx, "./analysis", args, cfg.Env) - - if err != nil { - logo.Errorf("ANALYSIS START SLAVE PROC %s IPC: %s error %+v\n", v.SdkType, v.IpcID, err) - } - logo.Infof("START SDK %s ID %s PID %d Env: %s\n", v.SdkType, v.IpcID, pid, cfg.Env) + // default config file + file := rPath + typ + ".json" + // if configPath not end with '/' + if rPath[len(rPath)-1] != '/' { + file = rPath + "/" + typ + ".json" } - return true + // whether file exist + if util.IsFileExist(file) { + return &file + } + return nil +} +func checkConfig(typ, file string) *string { + cfg, err := app.ReadConfig(file) + if err != nil { + logo.Errorln("!!!!!!Master Read: ", file, " for ", typ, " Config Error: ", err) + return nil + } + + // check config runtime exist if config this item + env := strings.TrimSpace(cfg.Env) + if len(env) > 0 { + envs := strings.Split(env, ":") + pathExist := true + for _, v := range envs { + if !util.IsFileExist(v) { + pathExist = false + break + } + } + if !pathExist { + logo.Infoln("Can't Find Runtime Path, Skip SDK: ", typ) + return nil + } + } + return &env } -- Gitblit v1.8.0