From 41069e00282aeb597af821127e55c1762758f6d8 Mon Sep 17 00:00:00 2001 From: zhangmeng <775834166@qq.com> Date: 星期五, 20 十二月 2019 16:43:50 +0800 Subject: [PATCH] update --- app/slave/sdkLoad.go | 40 ++++ app/master/master.go | 97 ++++++++++ app/master/dbfetcher.go | 45 +++++ app/common.go | 67 +++++++ app/slave/slave.go | 54 ++++++ app/master/reaper.go | 200 ++++++++++++++++++++++ 6 files changed, 503 insertions(+), 0 deletions(-) diff --git a/app/common.go b/app/common.go new file mode 100644 index 0000000..8d4a04a --- /dev/null +++ b/app/common.go @@ -0,0 +1,67 @@ +package app + +import ( + "analysis/logo" + "analysis/util" + "encoding/json" + "fmt" + "io/ioutil" + "plugin" +) + +// SdkConfig sdk +type SdkConfig struct { + SoFile string `json:"so_file_path"` + Env string `json:"runtime"` + Param map[string]string `json:"param"` +} + +// ReadConfig config json +func ReadConfig(file string) (SdkConfig, error) { + data, err := ioutil.ReadFile(file) + if err != nil { + return SdkConfig{}, fmt.Errorf("READ SDK CONFIG FILE %s ERROR", file) + } + + //璇诲彇鐨勬暟鎹负json鏍煎紡锛岄渶瑕佽繘琛岃В鐮� + var v SdkConfig + err = json.Unmarshal(data, &v) + + return v, err +} + +// EnvNoValue env no +const EnvNoValue = "env-no-value" + +// ReadEnv env +func ReadEnv(file string) string { + c, err := ReadConfig(file) + if err != nil { + return EnvNoValue + } + return c.Env +} + +// GetParams params +func GetParams(rKey, rValue string) []string { + var params []string + + for k, v := range util.MapParames { + param := "-" + k + "=" + v + if k == rKey { + param = "-" + k + "=" + rValue + } + params = append(params, param) + } + + return params +} + +// LoadFunc load plugin +func LoadFunc(plug *plugin.Plugin, soFile, fnName string) (plugin.Symbol, error) { + fn, err := plug.Lookup(fnName) + if err != nil { + logo.Errorln("Lookup Func: ", fnName, " From: ", soFile, " Error: ", err) + } + return fn, err +} diff --git a/app/master/dbfetcher.go b/app/master/dbfetcher.go new file mode 100644 index 0000000..56c5633 --- /dev/null +++ b/app/master/dbfetcher.go @@ -0,0 +1,45 @@ +package master + +import ( + "analysis/app" + "analysis/logo" + "plugin" + + "basic.com/libgowrapper/sdkstruct.git" +) + +// Fetcher db +type Fetcher struct { + fnInitDBAPI func(string, int, int, int, func(...interface{})) + fnSDKInfo func() []sdkstruct.SDKInfo +} + +// NewFetcher new +func NewFetcher(soFile string) *Fetcher { + + plug, err := plugin.Open(soFile) + if err != nil { + logo.Errorln("Open: ", soFile, " error: ", err) + return nil + } + + fn, err := app.LoadFunc(plug, soFile, "InitDBAPI") + if err != nil { + logo.Infoln("Lookup Func InitDBAPI From File: ", soFile, " Error") + return nil + } + fnInit := fn.(func(string, int, int, int, func(...interface{}))) + + fn, err = app.LoadFunc(plug, soFile, "SDKInfo") + if err != nil { + logo.Infoln("Lookup Func SDKInfo From File: ", soFile, " Error") + return nil + } + + fnSDKInfo := fn.(func() []sdkstruct.SDKInfo) + + return &Fetcher{ + fnInitDBAPI: fnInit, + fnSDKInfo: fnSDKInfo, + } +} diff --git a/app/master/master.go b/app/master/master.go new file mode 100644 index 0000000..d4dd0a6 --- /dev/null +++ b/app/master/master.go @@ -0,0 +1,97 @@ +package master + +import ( + "analysis/app" + "analysis/logo" + "analysis/util" + "context" + "io/ioutil" + + "basic.com/libgowrapper/sdkstruct.git" +) + +func reaper(ctxt context.Context) { + pidChan := make(chan int, 1) + Reap(pidChan) + go waitForRestart(ctxt, pidChan) +} + +// Run run +func Run(ctx context.Context, configPath string) bool { + reaper(ctx) + + rPath := configPath + configFile := configPath + var fetcher *Fetcher + + fs, _ := ioutil.ReadDir(rPath) + for _, file := range fs { + if !file.IsDir() { + if rPath[len(rPath)-1] != '/' { + configFile = rPath + "/" + file.Name() + } else { + configFile = rPath + file.Name() + } + + cfg, err := app.ReadConfig(configFile) + if err != nil { + logo.Errorln("Run Fetcher Master Read From File: ", configFile, " Config Error: ", err) + continue + } + fetcher = NewFetcher(cfg.SoFile) + if fetcher == nil { + logo.Errorln("New Fetcher Load so File Funcs Error From File: ", cfg.SoFile) + continue + } + } + } + if fetcher == nil { + logo.Errorln("!!!!!!Read All So File, But Can't Init DB Fetcher") + return false + } + + logo.Infoln("~~~~~~Created Fetcher, Now Sync From DB") + + // fetcher.fnInitDBAPI(util.FSI.IP, util.FSI.HTTPort, util.FSI.HBPort, util.FSI.DataPort, logo.Infoln) + fetcher.fnInitDBAPI("192.168.20.10", util.FSI.HTTPort, util.FSI.HBPort, util.FSI.DataPort, logo.Infoln) + sdks := fetcher.fnSDKInfo() + + return manualStart(ctx, sdks, configPath) +} + +func manualStart(ctx context.Context, sdks []sdkstruct.SDKInfo, configPath string) bool { + rPath := configPath + + for k, v := range sdks { + + file := rPath + v.SdkType + ".json" + if rPath[len(rPath)-1] != '/' { + file = rPath + "/" + v.SdkType + ".json" + } + + cfg, err := app.ReadConfig(file) + if err != nil { + logo.Errorln("Master Read: ", file, " Config Error: ", err) + continue + } + + logo.Infoln(file, " CONFIG: ", cfg) + + args := []string{ + `-role=slave`, + "-sdk=" + v.SdkType, + "-id=" + v.IpcID, + "-" + util.ConfigPath + "=" + file, + } + + args = append(args, app.GetParams(util.ConfigPath, file)...) + pid, err := runProc(ctx, "./analysis", args, &cfg.Env) + + if err != nil { + logo.Errorf("ANALYSIS START SLAVE PROC %s IPC: %s error %+v\n", v.SdkType, v.IpcID, err) + } + logo.Infof("START %d PROC %d SDK %s ID %s\n", k, pid, v.IpcID, v.SdkType) + } + return true + +} diff --git a/app/master/reaper.go b/app/master/reaper.go new file mode 100644 index 0000000..d35a2d3 --- /dev/null +++ b/app/master/reaper.go @@ -0,0 +1,200 @@ +package master + +import ( + "analysis/logo" + "context" + "os" + "os/exec" + "os/signal" + "syscall" + "time" +) + +type procInfo struct { + cmd *exec.Cmd + env string +} + +var ( + procMap = make(map[int]*procInfo) +) + +func restartProc(ctxt context.Context, pid int) { + info, ok := procMap[pid] + if ok { + err := info.cmd.Wait() + + if err != nil { + logo.Errorln("pid : [", pid, "] quit error: ", err) + } else { + logo.Infoln("pid : [", pid, "] quit") + } + delete(procMap, pid) + runProc(ctxt, info.cmd.Path, info.cmd.Args[1:], &info.env) + } else { + logo.Errorln(pid, " doesn't exist") + } +} + +func quitProc(pid int) { + info, ok := procMap[pid] + if ok { + delete(procMap, pid) + + syscall.Kill(pid, syscall.SIGINT) + info.cmd.Wait() + } else { + logo.Errorln(pid, " doesn't exist") + } +} +func runProc(ctxt context.Context, bin string, args []string, env *string) (int, error) { + cmd := exec.CommandContext(ctxt, bin, args...) + cmd.Env = os.Environ() + cmd.Env = append(cmd.Env, *env) + + pid := -1 + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + err := cmd.Start() + if err == nil { + pid = cmd.Process.Pid + procMap[pid] = &procInfo{cmd, *env} + } + return pid, err +} + +// Config conf +type Config struct { + Pid int + Options int + DisablePid1Check bool +} + +// Handle death of child (SIGCHLD) messages. Pushes the signal onto the +// notifications channel if there is a waiter. +func sigChildHandler(notifications chan os.Signal) { + sigs := make(chan os.Signal, 3) + signal.Notify(sigs, syscall.SIGCHLD) + + for { + sig := <-sigs + select { + case notifications <- sig: /* published it. */ + default: + /* + * Notifications channel full - drop it to the + * floor. This ensures we don't fill up the SIGCHLD + * queue. The reaper just waits for any child + * process (pid=-1), so we ain't loosing it!! ;^) + */ + } + } + +} /* End of function sigChildHandler. */ + +// Be a good parent - clean up behind the children. +func reapChildren(config Config, pidChan chan<- int) { + notifications := make(chan os.Signal, 1) + + go sigChildHandler(notifications) + + pid := config.Pid + opts := config.Options + + for { + sig := <-notifications + logo.Infof(" - Received signal %v\n", sig) + for { + var wstatus syscall.WaitStatus + + /* + * Reap 'em, so that zombies don't accumulate. + * Plants vs. Zombies!! + */ + pid, err := syscall.Wait4(pid, &wstatus, opts, nil) + pidChan <- pid + for syscall.EINTR == err { + pid, err = syscall.Wait4(pid, &wstatus, opts, nil) + pidChan <- pid + } + + if syscall.ECHILD == err { + break + } + + logo.Infof(" - Grim reaper cleanup: pid=%d, wstatus=%+v\n", + pid, wstatus) + + } + } + +} /* End of function reapChildren. */ + +/* + * ====================================================================== + * Section: Exported functions + * ====================================================================== + */ + +// Normal entry point for the reaper code. Start reaping children in the +// background inside a goroutine. + +// Reap reap +func Reap(pidChan chan<- int) { + /* + * Only reap processes if we are taking over init's duties aka + * we are running as pid 1 inside a docker container. The default + * is to reap all processes. + */ + Start(Config{ + Pid: -1, + Options: 0, + DisablePid1Check: true, + }, pidChan) + +} /* End of [exported] function Reap. */ + +// Entry point for invoking the reaper code with a specific configuration. +// The config allows you to bypass the pid 1 checks, so handle with care. +// The child processes are reaped in the background inside a goroutine. + +// Start start +func Start(config Config, pidChan chan<- int) { + /* + * Start the Reaper with configuration options. This allows you to + * reap processes even if the current pid isn't running as pid 1. + * So ... use with caution!! + * + * In most cases, you are better off just using Reap() as that + * checks if we are running as Pid 1. + */ + if !config.DisablePid1Check { + mypid := os.Getpid() + if 1 != mypid { + logo.Errorln(" - Grim reaper disabled, pid not 1\n") + return + } + } + + /* + * Ok, so either pid 1 checks are disabled or we are the grandma + * of 'em all, either way we get to play the grim reaper. + * You will be missed, Terry Pratchett!! RIP + */ + go reapChildren(config, pidChan) + +} /* End of [exported] function Start. */ + +func waitForRestart(ctxt context.Context, pidChan <-chan int) { + + for { + select { + case <-ctxt.Done(): + return + case pid := <-pidChan: + restartProc(ctxt, pid) + default: + time.Sleep(3 * time.Second) + } + } +} diff --git a/app/slave/sdkLoad.go b/app/slave/sdkLoad.go new file mode 100644 index 0000000..ceb4f4b --- /dev/null +++ b/app/slave/sdkLoad.go @@ -0,0 +1,40 @@ +package slave + +import ( + "analysis/app" + "analysis/logo" + "context" + "plugin" +) + +// func Create(config string, typ, id string, gpu int, shm bool, ipc2Rule string, ruleMaxSize int, fn func(...interface{}), reserved map[string]string) interface{} +// func Run(ctx context.Context, i interface{}) { + +type sdk struct { + fnCreate func(string, string, string, int, bool, string, int, func(...interface{}), map[string]string) interface{} + fnRun func(context.Context, interface{}) +} + +func loadSDK(soFile string) *sdk { + plug, err := plugin.Open(soFile) + if err != nil { + logo.Errorln("Slave Open so File: ", soFile, " Error: ", err) + return nil + } + + fnC, err := app.LoadFunc(plug, soFile, "Create") + if err != nil { + logo.Errorln("Load Func Create From: ", soFile, " Error: ", err) + return nil + } + + fnR, err := app.LoadFunc(plug, soFile, "Run") + if err != nil { + logo.Errorln("Load Func Run From: ", soFile, " Error: ", err) + return nil + } + return &sdk{ + fnCreate: fnC.(func(string, string, string, int, bool, string, int, func(...interface{}), map[string]string) interface{}), + fnRun: fnR.(func(context.Context, interface{})), + } +} diff --git a/app/slave/slave.go b/app/slave/slave.go new file mode 100644 index 0000000..6c77ec2 --- /dev/null +++ b/app/slave/slave.go @@ -0,0 +1,54 @@ +package slave + +import ( + "analysis/app" + "analysis/logo" + "analysis/util" + "context" + "plugin" +) + +// TwoPluginConflict test +func TwoPluginConflict(commSoFile, config string) bool { + cfg, err := app.ReadConfig(config) + if err != nil { + logo.Errorln("Slave Read Config Error: ", err) + return false + } + sdk := loadSDK(cfg.SoFile) + if sdk == nil { + } + + plug, err := plugin.Open(commSoFile) + if err != nil || plug == nil { + logo.Errorln("Slave Open so File: ", commSoFile, " Error: ", err) + return false + } + return true +} + +// Run run +func Run(ctx context.Context, config, typ, id string, gpu int, shm bool) bool { + cfg, err := app.ReadConfig(config) + if err != nil { + logo.Errorln("Slave Read Config Error: ", err) + return false + } + sdk := loadSDK(cfg.SoFile) + if sdk == nil { + return false + } + + // func Create(config string, typ, id string, gpu int, shm bool, ipc2Rule string, ruleMaxSize int, fn func(...interface{}), reserved map[string]string) interface{} + // func Run(ctx context.Context, i interface{}) { + + handle := sdk.fnCreate(config, typ, id, gpu, shm, util.ToRuleIPC, 30, logo.Infoln, nil) + if handle == nil { + logo.Errorln("Create SDK: ", typ, " ID: ", id, " Error") + return false + } + + sdk.fnRun(ctx, handle) + + return true +} -- Gitblit v1.8.0