package master
|
|
import (
|
"analysis/app"
|
"analysis/logo"
|
"analysis/util"
|
"context"
|
"encoding/json"
|
"os"
|
"plugin"
|
"strconv"
|
"strings"
|
"time"
|
)
|
|
var (
|
soLoaded bool
|
fnFetch func(context.Context, string, string, int, string, chan<- []byte, func(...interface{}))
|
fnNotify func(context.Context, string, <-chan []byte, func(...interface{})) context.CancelFunc
|
)
|
|
func soLoad(soFile string) bool {
|
if fnFetch != nil && fnNotify != nil {
|
return true
|
}
|
|
plug, err := plugin.Open(soFile)
|
if err != nil {
|
logo.Errorln("Open: ", soFile, " error: ", err)
|
return false
|
}
|
|
var fn plugin.Symbol
|
|
fn, err = app.LoadFunc(plug, soFile, "Fetch")
|
if err != nil {
|
logo.Infoln("Lookup Func Fetch From File: ", soFile, " Error")
|
return false
|
}
|
fnFetch = fn.(func(context.Context, string, string, int, string, chan<- []byte, func(...interface{})))
|
|
fn, err = app.LoadFunc(plug, soFile, "Notify")
|
if err != nil {
|
logo.Infoln("Lookup Func Notify From File: ", soFile, " Error")
|
return false
|
}
|
fnNotify = fn.(func(context.Context, string, <-chan []byte, func(...interface{})) context.CancelFunc)
|
return true
|
}
|
|
func initFetcher(ctx context.Context, soFile string) <-chan []byte {
|
if !soLoad(soFile) {
|
logo.Errorln("New Fetcher Load so File Funcs Error From File: ", soFile)
|
return nil
|
}
|
|
logo.Infoln("~~~~~~Created Fetcher, Now Sync From DB")
|
|
// ip := "tcp://192.168.5.22"
|
ip := "tcp://" + util.FSI.IP
|
url := ip + ":" + strconv.Itoa(util.FSI.DataPort)
|
hearturl := ip + ":" + strconv.Itoa(util.FSI.HBPort)
|
|
ch := make(chan []byte, 3)
|
|
fnFetch(ctx, url, hearturl, 0, "analysis-master"+strconv.Itoa(os.Getpid()), ch, logo.Infoln)
|
|
logo.Infoln("~~~~~~Start Recv SDK Infos")
|
return ch
|
}
|
|
// Run run
|
func Run(ctx context.Context, soFile, configPath string) bool {
|
daemon := NewDaemon()
|
chProc := make(chan []TypeProc, 32)
|
go daemon.Watch(ctx, chProc)
|
|
chMsg := initFetcher(ctx, soFile)
|
if chMsg == nil {
|
logo.Infoln("Master Run initFetcher Failed")
|
return false
|
}
|
params := app.GetParams()
|
|
for {
|
select {
|
case <-ctx.Done():
|
return true
|
case msg := <-chMsg:
|
// sdktype process_name topic null
|
// yolo/face yolo_0/yolo_1 channel
|
var sdk map[string](map[string](map[string]interface{}))
|
|
if err := json.Unmarshal(msg, &sdk); err != nil {
|
logo.Infoln("Fetcher SDK unmarshal err:", err)
|
continue
|
}
|
|
logo.Infoln("~~~~~~Before Recv New SDKInfos")
|
|
var typeProcs []TypeProc
|
|
for sdkType, mapSdkProc := range sdk {
|
config := findConfigFile(sdkType, configPath)
|
if config == nil {
|
logo.Infoln("!!!!!!There Is Not ", sdkType, " Config File In ", configPath, " Skip It")
|
continue
|
}
|
env := checkConfig(sdkType, *config)
|
if env == nil {
|
continue
|
}
|
|
var channels []string
|
var namedProcs []NamedProc
|
|
for procName, mapProcChannels := range mapSdkProc {
|
for c := range mapProcChannels {
|
channels = append(channels, c)
|
}
|
p := NamedProc{
|
Name: procName,
|
Channels: channels,
|
Env: *env,
|
Config: *config,
|
Param: params,
|
}
|
namedProcs = append(namedProcs, p)
|
}
|
t := TypeProc{
|
Typ: sdkType,
|
SNameProc: namedProcs,
|
}
|
typeProcs = append(typeProcs, t)
|
}
|
chProc <- typeProcs
|
|
logo.Infof("~~~~~~Recv New SDKInfos %+v\n", typeProcs)
|
|
default:
|
time.Sleep(10 * time.Millisecond)
|
}
|
}
|
|
}
|
|
func findConfigFile(typ, configPath string) *string {
|
rPath := configPath
|
// default config file
|
file := rPath + typ + ".json"
|
// if configPath not end with '/'
|
if rPath[len(rPath)-1] != '/' {
|
file = rPath + "/" + typ + ".json"
|
}
|
// whether file exist
|
if util.IsFileExist(file) {
|
return &file
|
}
|
return nil
|
}
|
|
func checkConfig(typ, file string) *string {
|
cfg, err := app.ReadConfig(file)
|
if err != nil {
|
logo.Errorln("!!!!!!Master Read: ", file, " for ", typ, " Config Error: ", err)
|
return nil
|
}
|
|
// check config runtime exist if config this item
|
env := strings.TrimSpace(cfg.Env)
|
if len(env) > 0 {
|
envs := strings.Split(env, ":")
|
pathExist := true
|
for _, v := range envs {
|
if !util.IsFileExist(v) {
|
logo.Infoln("Can't Find Runtime Path:", v, "Skip SDK: ", typ)
|
pathExist = false
|
break
|
}
|
}
|
if !pathExist {
|
|
return nil
|
}
|
}
|
return &env
|
}
|