From 9d9cd1d3b93613071d1dffc1c82c4515d2a65af6 Mon Sep 17 00:00:00 2001 From: zhangmeng <775834166@qq.com> Date: 星期二, 21 一月 2020 17:22:39 +0800 Subject: [PATCH] bug fixed change real fetcher ip --- app/master/master.go | 213 +++++++++++++++++++++++++++++++++++++++++------------ 1 files changed, 164 insertions(+), 49 deletions(-) diff --git a/app/master/master.go b/app/master/master.go index 728e3ca..a87c676 100644 --- a/app/master/master.go +++ b/app/master/master.go @@ -5,68 +5,183 @@ "analysis/logo" "analysis/util" "context" - - "basic.com/libgowrapper/sdkstruct.git" + "encoding/json" + "os" + "plugin" + "strconv" + "strings" + "time" ) -func reaper(ctxt context.Context) { - pidChan := make(chan int, 1) - Reap(pidChan) - go waitForRestart(ctxt, pidChan) +var ( + soLoaded bool + fnFetch func(context.Context, string, string, int, string, chan<- []byte, func(...interface{})) + fnNotify func(context.Context, string, <-chan []byte, func(...interface{})) context.CancelFunc +) + +func soLoad(soFile string) bool { + if fnFetch != nil && fnNotify != nil { + return true + } + + plug, err := plugin.Open(soFile) + if err != nil { + logo.Errorln("Open: ", soFile, " error: ", err) + return false + } + + var fn plugin.Symbol + + fn, err = app.LoadFunc(plug, soFile, "Fetch") + if err != nil { + logo.Infoln("Lookup Func Fetch From File: ", soFile, " Error") + return false + } + fnFetch = fn.(func(context.Context, string, string, int, string, chan<- []byte, func(...interface{}))) + + fn, err = app.LoadFunc(plug, soFile, "Notify") + if err != nil { + logo.Infoln("Lookup Func Notify From File: ", soFile, " Error") + return false + } + fnNotify = fn.(func(context.Context, string, <-chan []byte, func(...interface{})) context.CancelFunc) + return true } -// Run run -func Run(ctx context.Context, soFile, configPath string) bool { - reaper(ctx) - - fetcher := NewFetcher(soFile) - if fetcher == nil { +func initFetcher(ctx context.Context, soFile string) <-chan []byte { + if !soLoad(soFile) { logo.Errorln("New Fetcher Load so File Funcs Error From File: ", soFile) - return false + return nil } logo.Infoln("~~~~~~Created Fetcher, Now Sync From DB") - fetcher.fnInitDBAPI(util.FSI.IP, util.FSI.HTTPort, util.FSI.HBPort, util.FSI.DataPort, logo.Infoln) - // fetcher.fnInitDBAPI("192.168.20.10", util.FSI.HTTPort, util.FSI.HBPort, util.FSI.DataPort, logo.Infoln) - sdks := fetcher.fnSDKInfo() + // ip := "tcp://192.168.5.22" + ip := "tcp://" + util.FSI.IP + url := ip + ":" + strconv.Itoa(util.FSI.DataPort) + hearturl := ip + ":" + strconv.Itoa(util.FSI.HBPort) - return manualStart(ctx, sdks, configPath) + ch := make(chan []byte, 3) + + fnFetch(ctx, url, hearturl, 0, "analysis-master"+strconv.Itoa(os.Getpid()), ch, logo.Infoln) + + logo.Infoln("~~~~~~Start Recv SDK Infos") + return ch } -func manualStart(ctx context.Context, sdks []sdkstruct.SDKInfo, configPath string) bool { - rPath := configPath +// Run run +func Run(ctx context.Context, soFile, configPath string) bool { + daemon := NewDaemon() + chProc := make(chan []TypeProc, 32) + go daemon.Watch(ctx, chProc) - for _, v := range sdks { - - file := rPath + v.SdkType + ".json" - if rPath[len(rPath)-1] != '/' { - file = rPath + "/" + v.SdkType + ".json" - } - - cfg, err := app.ReadConfig(file) - if err != nil { - logo.Errorln("Master Read: ", file, " Config Error: ", err) - continue - } - - logo.Infoln(file, " CONFIG: ", cfg) - - args := []string{ - `-role=slave`, - "-sdk=" + v.SdkType, - "-id=" + v.IpcID, - "-" + util.ConfigPath + "=" + file, - } - - args = append(args, app.GetParams(util.ConfigPath, file)...) - pid, err := runProc(ctx, "./analysis", args, cfg.Env) - - if err != nil { - logo.Errorf("ANALYSIS START SLAVE PROC %s IPC: %s error %+v\n", v.SdkType, v.IpcID, err) - } - logo.Infof("START SDK %s ID %s PID %d Env: %s\n", v.SdkType, v.IpcID, pid, cfg.Env) + chMsg := initFetcher(ctx, soFile) + if chMsg == nil { + logo.Infoln("Master Run initFetcher Failed") + return false } - return true + params := app.GetParams() + for { + select { + case <-ctx.Done(): + return true + case msg := <-chMsg: + // sdktype process_name topic null + // yolo/face yolo_0/yolo_1 channel + var sdk map[string](map[string](map[string]interface{})) + + if err := json.Unmarshal(msg, &sdk); err != nil { + logo.Infoln("Fetcher SDK unmarshal err:", err) + continue + } + + logo.Infoln("~~~~~~Before Recv New SDKInfos") + + var typeProcs []TypeProc + + for sdkType, mapSdkProc := range sdk { + config := findConfigFile(sdkType, configPath) + if config == nil { + logo.Infoln("!!!!!!There Is Not ", sdkType, " Config File In ", configPath, " Skip It") + continue + } + env := checkConfig(sdkType, *config) + if env == nil { + continue + } + + var channels []string + var namedProcs []NamedProc + + for procName, mapProcChannels := range mapSdkProc { + for c := range mapProcChannels { + channels = append(channels, c) + } + p := NamedProc{ + Name: procName, + Channels: channels, + Env: *env, + Config: *config, + Param: params, + } + namedProcs = append(namedProcs, p) + } + t := TypeProc{ + Typ: sdkType, + SNameProc: namedProcs, + } + typeProcs = append(typeProcs, t) + } + chProc <- typeProcs + + logo.Infof("~~~~~~Recv New SDKInfos %+v\n", typeProcs) + + default: + time.Sleep(10 * time.Millisecond) + } + } + +} + +func findConfigFile(typ, configPath string) *string { + rPath := configPath + // default config file + file := rPath + typ + ".json" + // if configPath not end with '/' + if rPath[len(rPath)-1] != '/' { + file = rPath + "/" + typ + ".json" + } + // whether file exist + if util.IsFileExist(file) { + return &file + } + return nil +} + +func checkConfig(typ, file string) *string { + cfg, err := app.ReadConfig(file) + if err != nil { + logo.Errorln("!!!!!!Master Read: ", file, " for ", typ, " Config Error: ", err) + return nil + } + + // check config runtime exist if config this item + env := strings.TrimSpace(cfg.Env) + if len(env) > 0 { + envs := strings.Split(env, ":") + pathExist := true + for _, v := range envs { + if !util.IsFileExist(v) { + logo.Infoln("Can't Find Runtime Path:", v, "Skip SDK: ", typ) + pathExist = false + break + } + } + if !pathExist { + + return nil + } + } + return &env } -- Gitblit v1.8.0