zhangmeng
2020-01-21 9d9cd1d3b93613071d1dffc1c82c4515d2a65af6
app/master/master.go
@@ -5,93 +5,183 @@
   "analysis/logo"
   "analysis/util"
   "context"
   "io/ioutil"
   "basic.com/libgowrapper/sdkstruct.git"
   "encoding/json"
   "os"
   "plugin"
   "strconv"
   "strings"
   "time"
)
func reaper(ctxt context.Context) {
   pidChan := make(chan int, 1)
   Reap(pidChan)
   go waitForRestart(ctxt, pidChan)
var (
   soLoaded bool
   fnFetch  func(context.Context, string, string, int, string, chan<- []byte, func(...interface{}))
   fnNotify func(context.Context, string, <-chan []byte, func(...interface{})) context.CancelFunc
)
func soLoad(soFile string) bool {
   if fnFetch != nil && fnNotify != nil {
      return true
   }
   plug, err := plugin.Open(soFile)
   if err != nil {
      logo.Errorln("Open: ", soFile, " error: ", err)
      return false
   }
   var fn plugin.Symbol
   fn, err = app.LoadFunc(plug, soFile, "Fetch")
   if err != nil {
      logo.Infoln("Lookup Func Fetch From File: ", soFile, " Error")
      return false
   }
   fnFetch = fn.(func(context.Context, string, string, int, string, chan<- []byte, func(...interface{})))
   fn, err = app.LoadFunc(plug, soFile, "Notify")
   if err != nil {
      logo.Infoln("Lookup Func Notify From File: ", soFile, " Error")
      return false
   }
   fnNotify = fn.(func(context.Context, string, <-chan []byte, func(...interface{})) context.CancelFunc)
   return true
}
// Run run
func Run(ctx context.Context, configPath string) bool {
   reaper(ctx)
   rPath := configPath
   configFile := configPath
   var fetcher *Fetcher
   fs, _ := ioutil.ReadDir(rPath)
   for _, file := range fs {
      if !file.IsDir() {
         if rPath[len(rPath)-1] != '/' {
            configFile = rPath + "/" + file.Name()
         } else {
            configFile = rPath + file.Name()
         }
         cfg, err := app.ReadConfig(configFile)
         if err != nil {
            logo.Errorln("Run Fetcher Master Read From File: ", configFile, " Config Error: ", err)
            continue
         }
         fetcher = NewFetcher(cfg.SoFile)
         if fetcher == nil {
            logo.Errorln("New Fetcher Load so File Funcs Error From File: ", cfg.SoFile)
            continue
         }
      }
   }
   if fetcher == nil {
      logo.Errorln("!!!!!!Read All So File, But Can't Init DB Fetcher")
      return false
func initFetcher(ctx context.Context, soFile string) <-chan []byte {
   if !soLoad(soFile) {
      logo.Errorln("New Fetcher Load so File Funcs Error From File: ", soFile)
      return nil
   }
   logo.Infoln("~~~~~~Created Fetcher, Now Sync From DB")
   // fetcher.fnInitDBAPI(util.FSI.IP, util.FSI.HTTPort, util.FSI.HBPort, util.FSI.DataPort, logo.Infoln)
   fetcher.fnInitDBAPI("192.168.20.10", util.FSI.HTTPort, util.FSI.HBPort, util.FSI.DataPort, logo.Infoln)
   sdks := fetcher.fnSDKInfo()
   // ip := "tcp://192.168.5.22"
   ip := "tcp://" + util.FSI.IP
   url := ip + ":" + strconv.Itoa(util.FSI.DataPort)
   hearturl := ip + ":" + strconv.Itoa(util.FSI.HBPort)
   return manualStart(ctx, sdks, configPath)
   ch := make(chan []byte, 3)
   fnFetch(ctx, url, hearturl, 0, "analysis-master"+strconv.Itoa(os.Getpid()), ch, logo.Infoln)
   logo.Infoln("~~~~~~Start Recv SDK Infos")
   return ch
}
func manualStart(ctx context.Context, sdks []sdkstruct.SDKInfo, configPath string) bool {
   rPath := configPath
// Run run
func Run(ctx context.Context, soFile, configPath string) bool {
   daemon := NewDaemon()
   chProc := make(chan []TypeProc, 32)
   go daemon.Watch(ctx, chProc)
   for _, v := range sdks {
      file := rPath + v.SdkType + ".json"
      if rPath[len(rPath)-1] != '/' {
         file = rPath + "/" + v.SdkType + ".json"
      }
      cfg, err := app.ReadConfig(file)
      if err != nil {
         logo.Errorln("Master Read: ", file, " Config Error: ", err)
         continue
      }
      logo.Infoln(file, " CONFIG: ", cfg)
      args := []string{
         `-role=slave`,
         "-sdk=" + v.SdkType,
         "-id=" + v.IpcID,
         "-" + util.ConfigPath + "=" + file,
      }
      args = append(args, app.GetParams(util.ConfigPath, file)...)
      pid, err := runProc(ctx, "./analysis", args, &cfg.Env)
      if err != nil {
         logo.Errorf("ANALYSIS START SLAVE PROC %s IPC: %s error %+v\n", v.SdkType, v.IpcID, err)
      }
      logo.Infof("START SDK %s ID %s PID %d Env: %s\n", v.SdkType, v.IpcID, pid, cfg.Env)
   chMsg := initFetcher(ctx, soFile)
   if chMsg == nil {
      logo.Infoln("Master Run initFetcher Failed")
      return false
   }
   return true
   params := app.GetParams()
   for {
      select {
      case <-ctx.Done():
         return true
      case msg := <-chMsg:
         //         sdktype      process_name   topic      null
         //         yolo/face  yolo_0/yolo_1  channel
         var sdk map[string](map[string](map[string]interface{}))
         if err := json.Unmarshal(msg, &sdk); err != nil {
            logo.Infoln("Fetcher SDK unmarshal err:", err)
            continue
         }
         logo.Infoln("~~~~~~Before Recv New SDKInfos")
         var typeProcs []TypeProc
         for sdkType, mapSdkProc := range sdk {
            config := findConfigFile(sdkType, configPath)
            if config == nil {
               logo.Infoln("!!!!!!There Is Not ", sdkType, " Config File In ", configPath, " Skip It")
               continue
            }
            env := checkConfig(sdkType, *config)
            if env == nil {
               continue
            }
            var channels []string
            var namedProcs []NamedProc
            for procName, mapProcChannels := range mapSdkProc {
               for c := range mapProcChannels {
                  channels = append(channels, c)
               }
               p := NamedProc{
                  Name:     procName,
                  Channels: channels,
                  Env:      *env,
                  Config:   *config,
                  Param:    params,
               }
               namedProcs = append(namedProcs, p)
            }
            t := TypeProc{
               Typ:       sdkType,
               SNameProc: namedProcs,
            }
            typeProcs = append(typeProcs, t)
         }
         chProc <- typeProcs
         logo.Infof("~~~~~~Recv New SDKInfos %+v\n", typeProcs)
      default:
         time.Sleep(10 * time.Millisecond)
      }
   }
}
func findConfigFile(typ, configPath string) *string {
   rPath := configPath
   // default config file
   file := rPath + typ + ".json"
   // if configPath not end with '/'
   if rPath[len(rPath)-1] != '/' {
      file = rPath + "/" + typ + ".json"
   }
   // whether file exist
   if util.IsFileExist(file) {
      return &file
   }
   return nil
}
func checkConfig(typ, file string) *string {
   cfg, err := app.ReadConfig(file)
   if err != nil {
      logo.Errorln("!!!!!!Master Read: ", file, " for ", typ, " Config Error: ", err)
      return nil
   }
   // check config runtime exist if config this item
   env := strings.TrimSpace(cfg.Env)
   if len(env) > 0 {
      envs := strings.Split(env, ":")
      pathExist := true
      for _, v := range envs {
         if !util.IsFileExist(v) {
            logo.Infoln("Can't Find Runtime Path:", v, "Skip SDK: ", typ)
            pathExist = false
            break
         }
      }
      if !pathExist {
         return nil
      }
   }
   return &env
}