package master import ( "analysis/app" "analysis/logo" "analysis/util" "context" "encoding/json" "os" "strconv" "strings" "time" "basic.com/libgowrapper/sdkstruct.git" "basic.com/valib/pubsub.git" ) func reaper(ctxt context.Context) { pidChan := make(chan int, 1) Reap(pidChan) go waitForRestart(ctxt, pidChan) } // Run run func Run(ctx context.Context, soFile, configPath string) bool { reaper(ctx) fetcher := NewFetcher(soFile) if fetcher == nil { logo.Errorln("New Fetcher Load so File Funcs Error From File: ", soFile) return false } logo.Infoln("~~~~~~Created Fetcher, Now Sync From DB") ip := "tcp://" + util.FSI.IP url := ip + ":" + strconv.Itoa(util.FSI.DataPort) hearturl := ip + ":" + strconv.Itoa(util.FSI.HBPort) chMsg, err := fetcher.fnInit(url, hearturl, 0, []string{pubsub.Topic_Sdk}, "analysis-master"+strconv.Itoa(os.Getpid())) for { if err == nil { break } logo.Infoln("Analysis Fetcher INIT Error! URL:", url) time.Sleep(time.Second) chMsg, err = fetcher.fnInit(url, hearturl, 0, []string{pubsub.Topic_Sdk}, "analysis-master"+strconv.Itoa(os.Getpid())) } for { select { case <-ctx.Done(): return true case msg := <-chMsg: // sdktype process_name topic null // yolo/face yolo_0/yolo_1 channel var sdk map[string](map[string](map[string]interface{})) if err := json.Unmarshal(msg.Msg, &sdk); err != nil { logo.Infoln("Fetcher SDK unmarshal err:", err) continue } logo.Infoln("~~~~~~Recv New SDKInfos") chCameras <- CameraInfo{ Cameras: cameras, } logo.Infoln("~~~~~~Recv New SDKInfos Over") default: time.Sleep(10 * time.Millisecond) } } } func manualStart(ctx context.Context, sdks []sdkstruct.SDKInfo, configPath string) bool { rPath := configPath params := app.GetParams() for _, v := range sdks { file := rPath + v.SdkType + ".json" if rPath[len(rPath)-1] != '/' { file = rPath + "/" + v.SdkType + ".json" } cfg, err := app.ReadConfig(file) if err != nil { logo.Errorln("Master Read: ", file, " Config Error: ", err) continue } if len(cfg.Env) > 0 { envs := strings.Split(cfg.Env, ":") normal := true for _, v := range envs { if !util.IsFileExist(v) { normal = false break } } if !normal { logo.Infoln("Can't Find Runtime Path, Skip It: ", file) continue } } logo.Infoln(file, " CONFIG: ", cfg) args := []string{ `-role=slave`, "-sdk=" + v.SdkType, "-id=" + v.IpcID, "-" + util.ConfigPath + "=" + file, } args = append(args, params...) pid, err := runProc(ctx, "./analysis", args, cfg.Env) if err != nil { logo.Errorf("ANALYSIS START SLAVE PROC %s IPC: %s error %+v\n", v.SdkType, v.IpcID, err) } logo.Infof("START SDK %s ID %s PID %d Env: %s\n", v.SdkType, v.IpcID, pid, cfg.Env) } return true }