| | |
| | | "context" |
| | | "encoding/json" |
| | | "os" |
| | | "plugin" |
| | | "strconv" |
| | | "strings" |
| | | "time" |
| | | |
| | | "basic.com/libgowrapper/sdkstruct.git" |
| | | "basic.com/valib/pubsub.git" |
| | | ) |
| | | |
| | | func reaper(ctxt context.Context) { |
| | | pidChan := make(chan int, 1) |
| | | Reap(pidChan) |
| | | go waitForRestart(ctxt, pidChan) |
| | | var ( |
| | | soLoaded bool |
| | | fnFetch func(context.Context, string, string, int, string, chan<- []byte, func(...interface{})) |
| | | fnNotify func(context.Context, string, <-chan []byte, func(...interface{})) context.CancelFunc |
| | | ) |
| | | |
| | | func soLoad(soFile string) bool { |
| | | if fnFetch != nil && fnNotify != nil { |
| | | return true |
| | | } |
| | | |
| | | plug, err := plugin.Open(soFile) |
| | | if err != nil { |
| | | logo.Errorln("Open: ", soFile, " error: ", err) |
| | | return false |
| | | } |
| | | |
| | | var fn plugin.Symbol |
| | | |
| | | fn, err = app.LoadFunc(plug, soFile, "Fetch") |
| | | if err != nil { |
| | | logo.Infoln("Lookup Func Fetch From File: ", soFile, " Error") |
| | | return false |
| | | } |
| | | fnFetch = fn.(func(context.Context, string, string, int, string, chan<- []byte, func(...interface{}))) |
| | | |
| | | fn, err = app.LoadFunc(plug, soFile, "Notify") |
| | | if err != nil { |
| | | logo.Infoln("Lookup Func Notify From File: ", soFile, " Error") |
| | | return false |
| | | } |
| | | fnNotify = fn.(func(context.Context, string, <-chan []byte, func(...interface{})) context.CancelFunc) |
| | | return true |
| | | } |
| | | |
| | | // Run run |
| | | func Run(ctx context.Context, soFile, configPath string) bool { |
| | | reaper(ctx) |
| | | |
| | | fetcher := NewFetcher(soFile) |
| | | if fetcher == nil { |
| | | func initFetcher(ctx context.Context, soFile string) <-chan []byte { |
| | | if !soLoad(soFile) { |
| | | logo.Errorln("New Fetcher Load so File Funcs Error From File: ", soFile) |
| | | return false |
| | | return nil |
| | | } |
| | | |
| | | logo.Infoln("~~~~~~Created Fetcher, Now Sync From DB") |
| | | |
| | | ip := "tcp://" + util.FSI.IP |
| | | ip := "tcp://192.168.5.22" |
| | | // ip := "tcp://" + util.FSI.IP |
| | | url := ip + ":" + strconv.Itoa(util.FSI.DataPort) |
| | | hearturl := ip + ":" + strconv.Itoa(util.FSI.HBPort) |
| | | |
| | | chMsg, err := fetcher.fnInit(url, hearturl, 0, []string{pubsub.Topic_Sdk}, "analysis-master"+strconv.Itoa(os.Getpid())) |
| | | for { |
| | | if err == nil { |
| | | break |
| | | } |
| | | logo.Infoln("Analysis Fetcher INIT Error! URL:", url) |
| | | time.Sleep(time.Second) |
| | | chMsg, err = fetcher.fnInit(url, hearturl, 0, []string{pubsub.Topic_Sdk}, "analysis-master"+strconv.Itoa(os.Getpid())) |
| | | ch := make(chan []byte, 3) |
| | | |
| | | fnFetch(ctx, url, hearturl, 0, "analysis-master"+strconv.Itoa(os.Getpid()), ch, logo.Infoln) |
| | | |
| | | logo.Infoln("~~~~~~Start Recv SDK Infos") |
| | | return ch |
| | | } |
| | | |
| | | // Run run |
| | | func Run(ctx context.Context, soFile, configPath string) bool { |
| | | daemon := NewDaemon() |
| | | chProc := make(chan []TypeProc, 32) |
| | | go daemon.Watch(ctx, chProc) |
| | | |
| | | chMsg := initFetcher(ctx, soFile) |
| | | if chMsg == nil { |
| | | logo.Infoln("Master Run initFetcher Failed") |
| | | return false |
| | | } |
| | | params := app.GetParams() |
| | | |
| | | for { |
| | | select { |
| | | case <-ctx.Done(): |
| | | return true |
| | | case msg := <-chMsg: |
| | | // sdktype process_name topic null |
| | | // yolo/face yolo_0/yolo_1 channel |
| | | // sdktype process_name topic null |
| | | // yolo/face yolo_0/yolo_1 channel |
| | | var sdk map[string](map[string](map[string]interface{})) |
| | | |
| | | if err := json.Unmarshal(msg.Msg, &sdk); err != nil { |
| | | if err := json.Unmarshal(msg, &sdk); err != nil { |
| | | logo.Infoln("Fetcher SDK unmarshal err:", err) |
| | | continue |
| | | } |
| | | |
| | | logo.Infoln("~~~~~~Recv New SDKInfos") |
| | | chCameras <- CameraInfo{ |
| | | Cameras: cameras, |
| | | |
| | | var typeProcs []TypeProc |
| | | |
| | | for sdkType, mapSdkProc := range sdk { |
| | | config := findConfigFile(sdkType, configPath) |
| | | if config == nil { |
| | | logo.Infoln("!!!!!!There Is Not ", sdkType, " Config File In ", configPath, " Skip It") |
| | | continue |
| | | } |
| | | env := checkConfig(sdkType, *config) |
| | | if env == nil { |
| | | continue |
| | | } |
| | | |
| | | var channels []string |
| | | var namedProcs []NamedProc |
| | | |
| | | for procName, mapProcChannels := range mapSdkProc { |
| | | for c := range mapProcChannels { |
| | | channels = append(channels, c) |
| | | } |
| | | p := NamedProc{ |
| | | Name: procName, |
| | | Channels: channels, |
| | | Env: *env, |
| | | Config: *config, |
| | | Param: params, |
| | | } |
| | | namedProcs = append(namedProcs, p) |
| | | } |
| | | t := TypeProc{ |
| | | Typ: sdkType, |
| | | SNameProc: namedProcs, |
| | | } |
| | | typeProcs = append(typeProcs, t) |
| | | } |
| | | chProc <- typeProcs |
| | | |
| | | logo.Infoln("~~~~~~Recv New SDKInfos Over") |
| | | |
| | | default: |
| | |
| | | |
| | | } |
| | | |
| | | func manualStart(ctx context.Context, sdks []sdkstruct.SDKInfo, configPath string) bool { |
| | | func findConfigFile(typ, configPath string) *string { |
| | | rPath := configPath |
| | | |
| | | params := app.GetParams() |
| | | |
| | | for _, v := range sdks { |
| | | |
| | | file := rPath + v.SdkType + ".json" |
| | | if rPath[len(rPath)-1] != '/' { |
| | | file = rPath + "/" + v.SdkType + ".json" |
| | | } |
| | | |
| | | cfg, err := app.ReadConfig(file) |
| | | if err != nil { |
| | | logo.Errorln("Master Read: ", file, " Config Error: ", err) |
| | | continue |
| | | } |
| | | |
| | | if len(cfg.Env) > 0 { |
| | | envs := strings.Split(cfg.Env, ":") |
| | | normal := true |
| | | for _, v := range envs { |
| | | if !util.IsFileExist(v) { |
| | | normal = false |
| | | break |
| | | } |
| | | } |
| | | if !normal { |
| | | logo.Infoln("Can't Find Runtime Path, Skip It: ", file) |
| | | continue |
| | | } |
| | | } |
| | | |
| | | logo.Infoln(file, " CONFIG: ", cfg) |
| | | |
| | | args := []string{ |
| | | `-role=slave`, |
| | | "-sdk=" + v.SdkType, |
| | | "-id=" + v.IpcID, |
| | | "-" + util.ConfigPath + "=" + file, |
| | | } |
| | | |
| | | args = append(args, params...) |
| | | pid, err := runProc(ctx, "./analysis", args, cfg.Env) |
| | | |
| | | if err != nil { |
| | | logo.Errorf("ANALYSIS START SLAVE PROC %s IPC: %s error %+v\n", v.SdkType, v.IpcID, err) |
| | | } |
| | | logo.Infof("START SDK %s ID %s PID %d Env: %s\n", v.SdkType, v.IpcID, pid, cfg.Env) |
| | | // default config file |
| | | file := rPath + typ + ".json" |
| | | // if configPath not end with '/' |
| | | if rPath[len(rPath)-1] != '/' { |
| | | file = rPath + "/" + typ + ".json" |
| | | } |
| | | return true |
| | | // whether file exist |
| | | if util.IsFileExist(file) { |
| | | return &file |
| | | } |
| | | return nil |
| | | } |
| | | |
| | | func checkConfig(typ, file string) *string { |
| | | cfg, err := app.ReadConfig(file) |
| | | if err != nil { |
| | | logo.Errorln("!!!!!!Master Read: ", file, " for ", typ, " Config Error: ", err) |
| | | return nil |
| | | } |
| | | |
| | | // check config runtime exist if config this item |
| | | env := strings.TrimSpace(cfg.Env) |
| | | if len(env) > 0 { |
| | | envs := strings.Split(env, ":") |
| | | pathExist := true |
| | | for _, v := range envs { |
| | | if !util.IsFileExist(v) { |
| | | pathExist = false |
| | | break |
| | | } |
| | | } |
| | | if !pathExist { |
| | | logo.Infoln("Can't Find Runtime Path, Skip SDK: ", typ) |
| | | return nil |
| | | } |
| | | } |
| | | return &env |
| | | } |