zhangmeng
2020-01-21 e6a5be3714b70236d84f25be2ce858c3d7e379d8
app/master/master.go
@@ -7,64 +7,134 @@
   "context"
   "encoding/json"
   "os"
   "plugin"
   "strconv"
   "strings"
   "time"
   "basic.com/libgowrapper/sdkstruct.git"
   "basic.com/valib/pubsub.git"
)
func reaper(ctxt context.Context) {
   pidChan := make(chan int, 1)
   Reap(pidChan)
   go waitForRestart(ctxt, pidChan)
var (
   soLoaded bool
   fnFetch  func(context.Context, string, string, int, string, chan<- []byte, func(...interface{}))
   fnNotify func(context.Context, string, <-chan []byte, func(...interface{})) context.CancelFunc
)
func soLoad(soFile string) bool {
   if fnFetch != nil && fnNotify != nil {
      return true
   }
   plug, err := plugin.Open(soFile)
   if err != nil {
      logo.Errorln("Open: ", soFile, " error: ", err)
      return false
   }
   var fn plugin.Symbol
   fn, err = app.LoadFunc(plug, soFile, "Fetch")
   if err != nil {
      logo.Infoln("Lookup Func Fetch From File: ", soFile, " Error")
      return false
   }
   fnFetch = fn.(func(context.Context, string, string, int, string, chan<- []byte, func(...interface{})))
   fn, err = app.LoadFunc(plug, soFile, "Notify")
   if err != nil {
      logo.Infoln("Lookup Func Notify From File: ", soFile, " Error")
      return false
   }
   fnNotify = fn.(func(context.Context, string, <-chan []byte, func(...interface{})) context.CancelFunc)
   return true
}
// Run run
func Run(ctx context.Context, soFile, configPath string) bool {
   reaper(ctx)
   fetcher := NewFetcher(soFile)
   if fetcher == nil {
func initFetcher(ctx context.Context, soFile string) <-chan []byte {
   if !soLoad(soFile) {
      logo.Errorln("New Fetcher Load so File Funcs Error From File: ", soFile)
      return false
      return nil
   }
   logo.Infoln("~~~~~~Created Fetcher, Now Sync From DB")
   ip := "tcp://" + util.FSI.IP
   ip := "tcp://192.168.5.22"
   // ip := "tcp://" + util.FSI.IP
   url := ip + ":" + strconv.Itoa(util.FSI.DataPort)
   hearturl := ip + ":" + strconv.Itoa(util.FSI.HBPort)
   chMsg, err := fetcher.fnInit(url, hearturl, 0, []string{pubsub.Topic_Sdk}, "analysis-master"+strconv.Itoa(os.Getpid()))
   for {
      if err == nil {
         break
      }
      logo.Infoln("Analysis Fetcher INIT Error! URL:", url)
      time.Sleep(time.Second)
      chMsg, err = fetcher.fnInit(url, hearturl, 0, []string{pubsub.Topic_Sdk}, "analysis-master"+strconv.Itoa(os.Getpid()))
   ch := make(chan []byte, 3)
   fnFetch(ctx, url, hearturl, 0, "analysis-master"+strconv.Itoa(os.Getpid()), ch, logo.Infoln)
   logo.Infoln("~~~~~~Start Recv SDK Infos")
   return ch
}
// Run run
func Run(ctx context.Context, soFile, configPath string) bool {
   daemon := NewDaemon()
   chProc := make(chan []TypeProc, 32)
   go daemon.Watch(ctx, chProc)
   chMsg := initFetcher(ctx, soFile)
   if chMsg == nil {
      logo.Infoln("Master Run initFetcher Failed")
      return false
   }
   params := app.GetParams()
   for {
      select {
      case <-ctx.Done():
         return true
      case msg := <-chMsg:
         //            sdktype      process_name   topic      null
         //            yolo/face  yolo_0/yolo_1  channel
         //         sdktype      process_name   topic      null
         //         yolo/face  yolo_0/yolo_1  channel
         var sdk map[string](map[string](map[string]interface{}))
         if err := json.Unmarshal(msg.Msg, &sdk); err != nil {
         if err := json.Unmarshal(msg, &sdk); err != nil {
            logo.Infoln("Fetcher SDK unmarshal err:", err)
            continue
         }
         logo.Infoln("~~~~~~Recv New SDKInfos")
         chCameras <- CameraInfo{
            Cameras: cameras,
         var typeProcs []TypeProc
         for sdkType, mapSdkProc := range sdk {
            config := findConfigFile(sdkType, configPath)
            if config == nil {
               logo.Infoln("!!!!!!There Is Not ", sdkType, " Config File In ", configPath, " Skip It")
               continue
            }
            env := checkConfig(sdkType, *config)
            if env == nil {
               continue
            }
            var channels []string
            var namedProcs []NamedProc
            for procName, mapProcChannels := range mapSdkProc {
               for c := range mapProcChannels {
                  channels = append(channels, c)
               }
               p := NamedProc{
                  Name:     procName,
                  Channels: channels,
                  Env:      *env,
                  Config:   *config,
                  Param:    params,
               }
               namedProcs = append(namedProcs, p)
            }
            t := TypeProc{
               Typ:       sdkType,
               SNameProc: namedProcs,
            }
            typeProcs = append(typeProcs, t)
         }
         chProc <- typeProcs
         logo.Infoln("~~~~~~Recv New SDKInfos Over")
      default:
@@ -74,56 +144,43 @@
}
func manualStart(ctx context.Context, sdks []sdkstruct.SDKInfo, configPath string) bool {
func findConfigFile(typ, configPath string) *string {
   rPath := configPath
   params := app.GetParams()
   for _, v := range sdks {
      file := rPath + v.SdkType + ".json"
      if rPath[len(rPath)-1] != '/' {
         file = rPath + "/" + v.SdkType + ".json"
      }
      cfg, err := app.ReadConfig(file)
      if err != nil {
         logo.Errorln("Master Read: ", file, " Config Error: ", err)
         continue
      }
      if len(cfg.Env) > 0 {
         envs := strings.Split(cfg.Env, ":")
         normal := true
         for _, v := range envs {
            if !util.IsFileExist(v) {
               normal = false
               break
            }
         }
         if !normal {
            logo.Infoln("Can't Find Runtime Path, Skip It: ", file)
            continue
         }
      }
      logo.Infoln(file, " CONFIG: ", cfg)
      args := []string{
         `-role=slave`,
         "-sdk=" + v.SdkType,
         "-id=" + v.IpcID,
         "-" + util.ConfigPath + "=" + file,
      }
      args = append(args, params...)
      pid, err := runProc(ctx, "./analysis", args, cfg.Env)
      if err != nil {
         logo.Errorf("ANALYSIS START SLAVE PROC %s IPC: %s error %+v\n", v.SdkType, v.IpcID, err)
      }
      logo.Infof("START SDK %s ID %s PID %d Env: %s\n", v.SdkType, v.IpcID, pid, cfg.Env)
   // default config file
   file := rPath + typ + ".json"
   // if configPath not end with '/'
   if rPath[len(rPath)-1] != '/' {
      file = rPath + "/" + typ + ".json"
   }
   return true
   // whether file exist
   if util.IsFileExist(file) {
      return &file
   }
   return nil
}
func checkConfig(typ, file string) *string {
   cfg, err := app.ReadConfig(file)
   if err != nil {
      logo.Errorln("!!!!!!Master Read: ", file, " for ", typ, " Config Error: ", err)
      return nil
   }
   // check config runtime exist if config this item
   env := strings.TrimSpace(cfg.Env)
   if len(env) > 0 {
      envs := strings.Split(env, ":")
      pathExist := true
      for _, v := range envs {
         if !util.IsFileExist(v) {
            pathExist = false
            break
         }
      }
      if !pathExist {
         logo.Infoln("Can't Find Runtime Path, Skip SDK: ", typ)
         return nil
      }
   }
   return &env
}