package master import ( "analysis/app" "analysis/logo" "analysis/util" "context" "encoding/json" "os" "plugin" "strconv" "strings" "time" ) var ( soLoaded bool fnFetch func(context.Context, string, string, int, string, chan<- []byte, func(...interface{})) fnNotify func(context.Context, string, <-chan []byte, func(...interface{})) context.CancelFunc ) func soLoad(soFile string) bool { if fnFetch != nil && fnNotify != nil { return true } plug, err := plugin.Open(soFile) if err != nil { logo.Errorln("Open: ", soFile, " error: ", err) return false } var fn plugin.Symbol fn, err = app.LoadFunc(plug, soFile, "Fetch") if err != nil { logo.Infoln("Lookup Func Fetch From File: ", soFile, " Error") return false } fnFetch = fn.(func(context.Context, string, string, int, string, chan<- []byte, func(...interface{}))) fn, err = app.LoadFunc(plug, soFile, "Notify") if err != nil { logo.Infoln("Lookup Func Notify From File: ", soFile, " Error") return false } fnNotify = fn.(func(context.Context, string, <-chan []byte, func(...interface{})) context.CancelFunc) return true } func initFetcher(ctx context.Context, soFile string) <-chan []byte { if !soLoad(soFile) { logo.Errorln("New Fetcher Load so File Funcs Error From File: ", soFile) return nil } logo.Infoln("~~~~~~Created Fetcher, Now Sync From DB") // ip := "tcp://192.168.5.22" ip := "tcp://" + util.FSI.IP url := ip + ":" + strconv.Itoa(util.FSI.DataPort) hearturl := ip + ":" + strconv.Itoa(util.FSI.HBPort) ch := make(chan []byte, 3) fnFetch(ctx, url, hearturl, 0, "analysis-master"+strconv.Itoa(os.Getpid()), ch, logo.Infoln) logo.Infoln("~~~~~~Start Recv SDK Infos") return ch } // Run run func Run(ctx context.Context, soFile, configPath string) bool { daemon := NewDaemon() chProc := make(chan []TypeProc, 32) go daemon.Watch(ctx, chProc) chMsg := initFetcher(ctx, soFile) if chMsg == nil { logo.Infoln("Master Run initFetcher Failed") return false } params := app.GetParams() for { select { case <-ctx.Done(): return true case msg := <-chMsg: // sdktype process_name topic null // yolo/face yolo_0/yolo_1 channel var sdk map[string](map[string](map[string]interface{})) if err := json.Unmarshal(msg, &sdk); err != nil { logo.Infoln("Fetcher SDK unmarshal err:", err) continue } logo.Infoln("~~~~~~Before Recv New SDKInfos") var typeProcs []TypeProc for sdkType, mapSdkProc := range sdk { config := findConfigFile(sdkType, configPath) if config == nil { logo.Infoln("!!!!!!There Is Not ", sdkType, " Config File In ", configPath, " Skip It") continue } env := checkConfig(sdkType, *config) if env == nil { continue } var channels []string var namedProcs []NamedProc for procName, mapProcChannels := range mapSdkProc { for c := range mapProcChannels { channels = append(channels, c) } p := NamedProc{ Name: procName, Channels: channels, Env: *env, Config: *config, Param: params, } namedProcs = append(namedProcs, p) } t := TypeProc{ Typ: sdkType, SNameProc: namedProcs, } typeProcs = append(typeProcs, t) } chProc <- typeProcs logo.Infof("~~~~~~Recv New SDKInfos %+v\n", typeProcs) default: time.Sleep(10 * time.Millisecond) } } } func findConfigFile(typ, configPath string) *string { rPath := configPath // default config file file := rPath + typ + ".json" // if configPath not end with '/' if rPath[len(rPath)-1] != '/' { file = rPath + "/" + typ + ".json" } // whether file exist if util.IsFileExist(file) { return &file } return nil } func checkConfig(typ, file string) *string { cfg, err := app.ReadConfig(file) if err != nil { logo.Errorln("!!!!!!Master Read: ", file, " for ", typ, " Config Error: ", err) return nil } // check config runtime exist if config this item env := strings.TrimSpace(cfg.Env) if len(env) > 0 { envs := strings.Split(env, ":") pathExist := true for _, v := range envs { if !util.IsFileExist(v) { logo.Infoln("Can't Find Runtime Path:", v, "Skip SDK: ", typ) pathExist = false break } } if !pathExist { return nil } } return &env }