From e6a5be3714b70236d84f25be2ce858c3d7e379d8 Mon Sep 17 00:00:00 2001
From: zhangmeng <775834166@qq.com>
Date: 星期二, 21 一月 2020 14:16:01 +0800
Subject: [PATCH] add recv sdk proc info from dispath

---
 app/master/master.go |  211 +++++++++++++++++++++++++++++++++-------------------
 1 files changed, 134 insertions(+), 77 deletions(-)

diff --git a/app/master/master.go b/app/master/master.go
index 1199713..797d4d1 100644
--- a/app/master/master.go
+++ b/app/master/master.go
@@ -7,64 +7,134 @@
 	"context"
 	"encoding/json"
 	"os"
+	"plugin"
 	"strconv"
 	"strings"
 	"time"
-
-	"basic.com/libgowrapper/sdkstruct.git"
-	"basic.com/valib/pubsub.git"
 )
 
-func reaper(ctxt context.Context) {
-	pidChan := make(chan int, 1)
-	Reap(pidChan)
-	go waitForRestart(ctxt, pidChan)
+var (
+	soLoaded bool
+	fnFetch  func(context.Context, string, string, int, string, chan<- []byte, func(...interface{}))
+	fnNotify func(context.Context, string, <-chan []byte, func(...interface{})) context.CancelFunc
+)
+
+func soLoad(soFile string) bool {
+	if fnFetch != nil && fnNotify != nil {
+		return true
+	}
+
+	plug, err := plugin.Open(soFile)
+	if err != nil {
+		logo.Errorln("Open: ", soFile, " error: ", err)
+		return false
+	}
+
+	var fn plugin.Symbol
+
+	fn, err = app.LoadFunc(plug, soFile, "Fetch")
+	if err != nil {
+		logo.Infoln("Lookup Func Fetch From File: ", soFile, " Error")
+		return false
+	}
+	fnFetch = fn.(func(context.Context, string, string, int, string, chan<- []byte, func(...interface{})))
+
+	fn, err = app.LoadFunc(plug, soFile, "Notify")
+	if err != nil {
+		logo.Infoln("Lookup Func Notify From File: ", soFile, " Error")
+		return false
+	}
+	fnNotify = fn.(func(context.Context, string, <-chan []byte, func(...interface{})) context.CancelFunc)
+	return true
 }
 
-// Run run
-func Run(ctx context.Context, soFile, configPath string) bool {
-	reaper(ctx)
-
-	fetcher := NewFetcher(soFile)
-	if fetcher == nil {
+func initFetcher(ctx context.Context, soFile string) <-chan []byte {
+	if !soLoad(soFile) {
 		logo.Errorln("New Fetcher Load so File Funcs Error From File: ", soFile)
-		return false
+		return nil
 	}
 
 	logo.Infoln("~~~~~~Created Fetcher, Now Sync From DB")
 
-	ip := "tcp://" + util.FSI.IP
+	ip := "tcp://192.168.5.22"
+	// ip := "tcp://" + util.FSI.IP
 	url := ip + ":" + strconv.Itoa(util.FSI.DataPort)
 	hearturl := ip + ":" + strconv.Itoa(util.FSI.HBPort)
 
-	chMsg, err := fetcher.fnInit(url, hearturl, 0, []string{pubsub.Topic_Sdk}, "analysis-master"+strconv.Itoa(os.Getpid()))
-	for {
-		if err == nil {
-			break
-		}
-		logo.Infoln("Analysis Fetcher INIT Error! URL:", url)
-		time.Sleep(time.Second)
-		chMsg, err = fetcher.fnInit(url, hearturl, 0, []string{pubsub.Topic_Sdk}, "analysis-master"+strconv.Itoa(os.Getpid()))
+	ch := make(chan []byte, 3)
+
+	fnFetch(ctx, url, hearturl, 0, "analysis-master"+strconv.Itoa(os.Getpid()), ch, logo.Infoln)
+
+	logo.Infoln("~~~~~~Start Recv SDK Infos")
+	return ch
+}
+
+// Run run
+func Run(ctx context.Context, soFile, configPath string) bool {
+	daemon := NewDaemon()
+	chProc := make(chan []TypeProc, 32)
+	go daemon.Watch(ctx, chProc)
+
+	chMsg := initFetcher(ctx, soFile)
+	if chMsg == nil {
+		logo.Infoln("Master Run initFetcher Failed")
+		return false
 	}
+	params := app.GetParams()
 
 	for {
 		select {
 		case <-ctx.Done():
 			return true
 		case msg := <-chMsg:
-			//      		sdktype	   process_name   topic		null
-			//				yolo/face  yolo_0/yolo_1  channel
+			//      	sdktype	   process_name   topic		null
+			//			yolo/face  yolo_0/yolo_1  channel
 			var sdk map[string](map[string](map[string]interface{}))
 
-			if err := json.Unmarshal(msg.Msg, &sdk); err != nil {
+			if err := json.Unmarshal(msg, &sdk); err != nil {
 				logo.Infoln("Fetcher SDK unmarshal err:", err)
 				continue
 			}
 
 			logo.Infoln("~~~~~~Recv New SDKInfos")
-			chCameras <- CameraInfo{
-				Cameras: cameras,
+
+			var typeProcs []TypeProc
+
+			for sdkType, mapSdkProc := range sdk {
+				config := findConfigFile(sdkType, configPath)
+				if config == nil {
+					logo.Infoln("!!!!!!There Is Not ", sdkType, " Config File In ", configPath, " Skip It")
+					continue
+				}
+				env := checkConfig(sdkType, *config)
+				if env == nil {
+					continue
+				}
+
+				var channels []string
+				var namedProcs []NamedProc
+
+				for procName, mapProcChannels := range mapSdkProc {
+					for c := range mapProcChannels {
+						channels = append(channels, c)
+					}
+					p := NamedProc{
+						Name:     procName,
+						Channels: channels,
+						Env:      *env,
+						Config:   *config,
+						Param:    params,
+					}
+					namedProcs = append(namedProcs, p)
+				}
+				t := TypeProc{
+					Typ:       sdkType,
+					SNameProc: namedProcs,
+				}
+				typeProcs = append(typeProcs, t)
 			}
+			chProc <- typeProcs
+
 			logo.Infoln("~~~~~~Recv New SDKInfos Over")
 
 		default:
@@ -74,56 +144,43 @@
 
 }
 
-func manualStart(ctx context.Context, sdks []sdkstruct.SDKInfo, configPath string) bool {
+func findConfigFile(typ, configPath string) *string {
 	rPath := configPath
-
-	params := app.GetParams()
-
-	for _, v := range sdks {
-
-		file := rPath + v.SdkType + ".json"
-		if rPath[len(rPath)-1] != '/' {
-			file = rPath + "/" + v.SdkType + ".json"
-		}
-
-		cfg, err := app.ReadConfig(file)
-		if err != nil {
-			logo.Errorln("Master Read: ", file, " Config Error: ", err)
-			continue
-		}
-
-		if len(cfg.Env) > 0 {
-			envs := strings.Split(cfg.Env, ":")
-			normal := true
-			for _, v := range envs {
-				if !util.IsFileExist(v) {
-					normal = false
-					break
-				}
-			}
-			if !normal {
-				logo.Infoln("Can't Find Runtime Path, Skip It: ", file)
-				continue
-			}
-		}
-
-		logo.Infoln(file, " CONFIG: ", cfg)
-
-		args := []string{
-			`-role=slave`,
-			"-sdk=" + v.SdkType,
-			"-id=" + v.IpcID,
-			"-" + util.ConfigPath + "=" + file,
-		}
-
-		args = append(args, params...)
-		pid, err := runProc(ctx, "./analysis", args, cfg.Env)
-
-		if err != nil {
-			logo.Errorf("ANALYSIS START SLAVE PROC %s IPC: %s error %+v\n", v.SdkType, v.IpcID, err)
-		}
-		logo.Infof("START SDK %s ID %s PID %d Env: %s\n", v.SdkType, v.IpcID, pid, cfg.Env)
+	// default config file
+	file := rPath + typ + ".json"
+	// if configPath not end with '/'
+	if rPath[len(rPath)-1] != '/' {
+		file = rPath + "/" + typ + ".json"
 	}
-	return true
+	// whether file exist
+	if util.IsFileExist(file) {
+		return &file
+	}
+	return nil
+}
 
+func checkConfig(typ, file string) *string {
+	cfg, err := app.ReadConfig(file)
+	if err != nil {
+		logo.Errorln("!!!!!!Master Read: ", file, " for ", typ, " Config Error: ", err)
+		return nil
+	}
+
+	// check config runtime exist if config this item
+	env := strings.TrimSpace(cfg.Env)
+	if len(env) > 0 {
+		envs := strings.Split(env, ":")
+		pathExist := true
+		for _, v := range envs {
+			if !util.IsFileExist(v) {
+				pathExist = false
+				break
+			}
+		}
+		if !pathExist {
+			logo.Infoln("Can't Find Runtime Path, Skip SDK: ", typ)
+			return nil
+		}
+	}
+	return &env
 }

--
Gitblit v1.8.0