zhangmeng
2020-01-20 6fa3dbf51db482430271979840734859200bf392
app/master/master.go
@@ -5,8 +5,14 @@
   "analysis/logo"
   "analysis/util"
   "context"
   "encoding/json"
   "os"
   "strconv"
   "strings"
   "time"
   "basic.com/libgowrapper/sdkstruct.git"
   "basic.com/valib/pubsub.git"
)
func reaper(ctxt context.Context) {
@@ -27,15 +33,51 @@
   logo.Infoln("~~~~~~Created Fetcher, Now Sync From DB")
   fetcher.fnInitDBAPI(util.FSI.IP, util.FSI.HTTPort, util.FSI.HBPort, util.FSI.DataPort, logo.Infoln)
   // fetcher.fnInitDBAPI("192.168.20.10", util.FSI.HTTPort, util.FSI.HBPort, util.FSI.DataPort, logo.Infoln)
   sdks := fetcher.fnSDKInfo()
   ip := "tcp://" + util.FSI.IP
   url := ip + ":" + strconv.Itoa(util.FSI.DataPort)
   hearturl := ip + ":" + strconv.Itoa(util.FSI.HBPort)
   return manualStart(ctx, sdks, configPath)
   chMsg, err := fetcher.fnInit(url, hearturl, 0, []string{pubsub.Topic_Sdk}, "analysis-master"+strconv.Itoa(os.Getpid()))
   for {
      if err == nil {
         break
      }
      logo.Infoln("Analysis Fetcher INIT Error! URL:", url)
      time.Sleep(time.Second)
      chMsg, err = fetcher.fnInit(url, hearturl, 0, []string{pubsub.Topic_Sdk}, "analysis-master"+strconv.Itoa(os.Getpid()))
   }
   for {
      select {
      case <-ctx.Done():
         return true
      case msg := <-chMsg:
         //            sdktype      process_name   topic      null
         //            yolo/face  yolo_0/yolo_1  channel
         var sdk map[string](map[string](map[string]interface{}))
         if err := json.Unmarshal(msg.Msg, &sdk); err != nil {
            logo.Infoln("Fetcher SDK unmarshal err:", err)
            continue
         }
         logo.Infoln("~~~~~~Recv New SDKInfos")
         chCameras <- CameraInfo{
            Cameras: cameras,
         }
         logo.Infoln("~~~~~~Recv New SDKInfos Over")
      default:
         time.Sleep(10 * time.Millisecond)
      }
   }
}
func manualStart(ctx context.Context, sdks []sdkstruct.SDKInfo, configPath string) bool {
   rPath := configPath
   params := app.GetParams()
   for _, v := range sdks {
@@ -50,6 +92,21 @@
         continue
      }
      if len(cfg.Env) > 0 {
         envs := strings.Split(cfg.Env, ":")
         normal := true
         for _, v := range envs {
            if !util.IsFileExist(v) {
               normal = false
               break
            }
         }
         if !normal {
            logo.Infoln("Can't Find Runtime Path, Skip It: ", file)
            continue
         }
      }
      logo.Infoln(file, " CONFIG: ", cfg)
      args := []string{
@@ -59,7 +116,7 @@
         "-" + util.ConfigPath + "=" + file,
      }
      args = append(args, app.GetParams(util.ConfigPath, file)...)
      args = append(args, params...)
      pid, err := runProc(ctx, "./analysis", args, cfg.Env)
      if err != nil {