From 9d9cd1d3b93613071d1dffc1c82c4515d2a65af6 Mon Sep 17 00:00:00 2001
From: zhangmeng <775834166@qq.com>
Date: 星期二, 21 一月 2020 17:22:39 +0800
Subject: [PATCH] bug fixed  change real fetcher ip

---
 app/master/master.go |  213 +++++++++++++++++++++++++++++++++++++++++------------
 1 files changed, 164 insertions(+), 49 deletions(-)

diff --git a/app/master/master.go b/app/master/master.go
index 728e3ca..a87c676 100644
--- a/app/master/master.go
+++ b/app/master/master.go
@@ -5,68 +5,183 @@
 	"analysis/logo"
 	"analysis/util"
 	"context"
-
-	"basic.com/libgowrapper/sdkstruct.git"
+	"encoding/json"
+	"os"
+	"plugin"
+	"strconv"
+	"strings"
+	"time"
 )
 
-func reaper(ctxt context.Context) {
-	pidChan := make(chan int, 1)
-	Reap(pidChan)
-	go waitForRestart(ctxt, pidChan)
+var (
+	soLoaded bool
+	fnFetch  func(context.Context, string, string, int, string, chan<- []byte, func(...interface{}))
+	fnNotify func(context.Context, string, <-chan []byte, func(...interface{})) context.CancelFunc
+)
+
+func soLoad(soFile string) bool {
+	if fnFetch != nil && fnNotify != nil {
+		return true
+	}
+
+	plug, err := plugin.Open(soFile)
+	if err != nil {
+		logo.Errorln("Open: ", soFile, " error: ", err)
+		return false
+	}
+
+	var fn plugin.Symbol
+
+	fn, err = app.LoadFunc(plug, soFile, "Fetch")
+	if err != nil {
+		logo.Infoln("Lookup Func Fetch From File: ", soFile, " Error")
+		return false
+	}
+	fnFetch = fn.(func(context.Context, string, string, int, string, chan<- []byte, func(...interface{})))
+
+	fn, err = app.LoadFunc(plug, soFile, "Notify")
+	if err != nil {
+		logo.Infoln("Lookup Func Notify From File: ", soFile, " Error")
+		return false
+	}
+	fnNotify = fn.(func(context.Context, string, <-chan []byte, func(...interface{})) context.CancelFunc)
+	return true
 }
 
-// Run run
-func Run(ctx context.Context, soFile, configPath string) bool {
-	reaper(ctx)
-
-	fetcher := NewFetcher(soFile)
-	if fetcher == nil {
+func initFetcher(ctx context.Context, soFile string) <-chan []byte {
+	if !soLoad(soFile) {
 		logo.Errorln("New Fetcher Load so File Funcs Error From File: ", soFile)
-		return false
+		return nil
 	}
 
 	logo.Infoln("~~~~~~Created Fetcher, Now Sync From DB")
 
-	fetcher.fnInitDBAPI(util.FSI.IP, util.FSI.HTTPort, util.FSI.HBPort, util.FSI.DataPort, logo.Infoln)
-	// fetcher.fnInitDBAPI("192.168.20.10", util.FSI.HTTPort, util.FSI.HBPort, util.FSI.DataPort, logo.Infoln)
-	sdks := fetcher.fnSDKInfo()
+	// ip := "tcp://192.168.5.22"
+	ip := "tcp://" + util.FSI.IP
+	url := ip + ":" + strconv.Itoa(util.FSI.DataPort)
+	hearturl := ip + ":" + strconv.Itoa(util.FSI.HBPort)
 
-	return manualStart(ctx, sdks, configPath)
+	ch := make(chan []byte, 3)
+
+	fnFetch(ctx, url, hearturl, 0, "analysis-master"+strconv.Itoa(os.Getpid()), ch, logo.Infoln)
+
+	logo.Infoln("~~~~~~Start Recv SDK Infos")
+	return ch
 }
 
-func manualStart(ctx context.Context, sdks []sdkstruct.SDKInfo, configPath string) bool {
-	rPath := configPath
+// Run run
+func Run(ctx context.Context, soFile, configPath string) bool {
+	daemon := NewDaemon()
+	chProc := make(chan []TypeProc, 32)
+	go daemon.Watch(ctx, chProc)
 
-	for _, v := range sdks {
-
-		file := rPath + v.SdkType + ".json"
-		if rPath[len(rPath)-1] != '/' {
-			file = rPath + "/" + v.SdkType + ".json"
-		}
-
-		cfg, err := app.ReadConfig(file)
-		if err != nil {
-			logo.Errorln("Master Read: ", file, " Config Error: ", err)
-			continue
-		}
-
-		logo.Infoln(file, " CONFIG: ", cfg)
-
-		args := []string{
-			`-role=slave`,
-			"-sdk=" + v.SdkType,
-			"-id=" + v.IpcID,
-			"-" + util.ConfigPath + "=" + file,
-		}
-
-		args = append(args, app.GetParams(util.ConfigPath, file)...)
-		pid, err := runProc(ctx, "./analysis", args, cfg.Env)
-
-		if err != nil {
-			logo.Errorf("ANALYSIS START SLAVE PROC %s IPC: %s error %+v\n", v.SdkType, v.IpcID, err)
-		}
-		logo.Infof("START SDK %s ID %s PID %d Env: %s\n", v.SdkType, v.IpcID, pid, cfg.Env)
+	chMsg := initFetcher(ctx, soFile)
+	if chMsg == nil {
+		logo.Infoln("Master Run initFetcher Failed")
+		return false
 	}
-	return true
+	params := app.GetParams()
 
+	for {
+		select {
+		case <-ctx.Done():
+			return true
+		case msg := <-chMsg:
+			//      	sdktype	   process_name   topic		null
+			//			yolo/face  yolo_0/yolo_1  channel
+			var sdk map[string](map[string](map[string]interface{}))
+
+			if err := json.Unmarshal(msg, &sdk); err != nil {
+				logo.Infoln("Fetcher SDK unmarshal err:", err)
+				continue
+			}
+
+			logo.Infoln("~~~~~~Before Recv New SDKInfos")
+
+			var typeProcs []TypeProc
+
+			for sdkType, mapSdkProc := range sdk {
+				config := findConfigFile(sdkType, configPath)
+				if config == nil {
+					logo.Infoln("!!!!!!There Is Not ", sdkType, " Config File In ", configPath, " Skip It")
+					continue
+				}
+				env := checkConfig(sdkType, *config)
+				if env == nil {
+					continue
+				}
+
+				var channels []string
+				var namedProcs []NamedProc
+
+				for procName, mapProcChannels := range mapSdkProc {
+					for c := range mapProcChannels {
+						channels = append(channels, c)
+					}
+					p := NamedProc{
+						Name:     procName,
+						Channels: channels,
+						Env:      *env,
+						Config:   *config,
+						Param:    params,
+					}
+					namedProcs = append(namedProcs, p)
+				}
+				t := TypeProc{
+					Typ:       sdkType,
+					SNameProc: namedProcs,
+				}
+				typeProcs = append(typeProcs, t)
+			}
+			chProc <- typeProcs
+
+			logo.Infof("~~~~~~Recv New SDKInfos %+v\n", typeProcs)
+
+		default:
+			time.Sleep(10 * time.Millisecond)
+		}
+	}
+
+}
+
+func findConfigFile(typ, configPath string) *string {
+	rPath := configPath
+	// default config file
+	file := rPath + typ + ".json"
+	// if configPath not end with '/'
+	if rPath[len(rPath)-1] != '/' {
+		file = rPath + "/" + typ + ".json"
+	}
+	// whether file exist
+	if util.IsFileExist(file) {
+		return &file
+	}
+	return nil
+}
+
+func checkConfig(typ, file string) *string {
+	cfg, err := app.ReadConfig(file)
+	if err != nil {
+		logo.Errorln("!!!!!!Master Read: ", file, " for ", typ, " Config Error: ", err)
+		return nil
+	}
+
+	// check config runtime exist if config this item
+	env := strings.TrimSpace(cfg.Env)
+	if len(env) > 0 {
+		envs := strings.Split(env, ":")
+		pathExist := true
+		for _, v := range envs {
+			if !util.IsFileExist(v) {
+				logo.Infoln("Can't Find Runtime Path:", v, "Skip SDK: ", typ)
+				pathExist = false
+				break
+			}
+		}
+		if !pathExist {
+
+			return nil
+		}
+	}
+	return &env
 }

--
Gitblit v1.8.0