From 41069e00282aeb597af821127e55c1762758f6d8 Mon Sep 17 00:00:00 2001
From: zhangmeng <775834166@qq.com>
Date: 星期五, 20 十二月 2019 16:43:50 +0800
Subject: [PATCH] update

---
 app/slave/sdkLoad.go    |   40 ++++
 app/master/master.go    |   97 ++++++++++
 app/master/dbfetcher.go |   45 +++++
 app/common.go           |   67 +++++++
 app/slave/slave.go      |   54 ++++++
 app/master/reaper.go    |  200 ++++++++++++++++++++++
 6 files changed, 503 insertions(+), 0 deletions(-)

diff --git a/app/common.go b/app/common.go
new file mode 100644
index 0000000..8d4a04a
--- /dev/null
+++ b/app/common.go
@@ -0,0 +1,67 @@
+package app
+
+import (
+	"analysis/logo"
+	"analysis/util"
+	"encoding/json"
+	"fmt"
+	"io/ioutil"
+	"plugin"
+)
+
+// SdkConfig sdk
+type SdkConfig struct {
+	SoFile string            `json:"so_file_path"`
+	Env    string            `json:"runtime"`
+	Param  map[string]string `json:"param"`
+}
+
+// ReadConfig config json
+func ReadConfig(file string) (SdkConfig, error) {
+	data, err := ioutil.ReadFile(file)
+	if err != nil {
+		return SdkConfig{}, fmt.Errorf("READ SDK CONFIG FILE %s ERROR", file)
+	}
+
+	//璇诲彇鐨勬暟鎹负json鏍煎紡锛岄渶瑕佽繘琛岃В鐮�
+	var v SdkConfig
+	err = json.Unmarshal(data, &v)
+
+	return v, err
+}
+
+// EnvNoValue env no
+const EnvNoValue = "env-no-value"
+
+// ReadEnv env
+func ReadEnv(file string) string {
+	c, err := ReadConfig(file)
+	if err != nil {
+		return EnvNoValue
+	}
+	return c.Env
+}
+
+// GetParams params
+func GetParams(rKey, rValue string) []string {
+	var params []string
+
+	for k, v := range util.MapParames {
+		param := "-" + k + "=" + v
+		if k == rKey {
+			param = "-" + k + "=" + rValue
+		}
+		params = append(params, param)
+	}
+
+	return params
+}
+
+// LoadFunc load plugin
+func LoadFunc(plug *plugin.Plugin, soFile, fnName string) (plugin.Symbol, error) {
+	fn, err := plug.Lookup(fnName)
+	if err != nil {
+		logo.Errorln("Lookup Func: ", fnName, " From: ", soFile, " Error: ", err)
+	}
+	return fn, err
+}
diff --git a/app/master/dbfetcher.go b/app/master/dbfetcher.go
new file mode 100644
index 0000000..56c5633
--- /dev/null
+++ b/app/master/dbfetcher.go
@@ -0,0 +1,45 @@
+package master
+
+import (
+	"analysis/app"
+	"analysis/logo"
+	"plugin"
+
+	"basic.com/libgowrapper/sdkstruct.git"
+)
+
+// Fetcher db
+type Fetcher struct {
+	fnInitDBAPI func(string, int, int, int, func(...interface{}))
+	fnSDKInfo   func() []sdkstruct.SDKInfo
+}
+
+// NewFetcher new
+func NewFetcher(soFile string) *Fetcher {
+
+	plug, err := plugin.Open(soFile)
+	if err != nil {
+		logo.Errorln("Open: ", soFile, " error: ", err)
+		return nil
+	}
+
+	fn, err := app.LoadFunc(plug, soFile, "InitDBAPI")
+	if err != nil {
+		logo.Infoln("Lookup Func InitDBAPI From File: ", soFile, " Error")
+		return nil
+	}
+	fnInit := fn.(func(string, int, int, int, func(...interface{})))
+
+	fn, err = app.LoadFunc(plug, soFile, "SDKInfo")
+	if err != nil {
+		logo.Infoln("Lookup Func SDKInfo From File: ", soFile, " Error")
+		return nil
+	}
+
+	fnSDKInfo := fn.(func() []sdkstruct.SDKInfo)
+
+	return &Fetcher{
+		fnInitDBAPI: fnInit,
+		fnSDKInfo:   fnSDKInfo,
+	}
+}
diff --git a/app/master/master.go b/app/master/master.go
new file mode 100644
index 0000000..d4dd0a6
--- /dev/null
+++ b/app/master/master.go
@@ -0,0 +1,97 @@
+package master
+
+import (
+	"analysis/app"
+	"analysis/logo"
+	"analysis/util"
+	"context"
+	"io/ioutil"
+
+	"basic.com/libgowrapper/sdkstruct.git"
+)
+
+func reaper(ctxt context.Context) {
+	pidChan := make(chan int, 1)
+	Reap(pidChan)
+	go waitForRestart(ctxt, pidChan)
+}
+
+// Run run
+func Run(ctx context.Context, configPath string) bool {
+	reaper(ctx)
+
+	rPath := configPath
+	configFile := configPath
+	var fetcher *Fetcher
+
+	fs, _ := ioutil.ReadDir(rPath)
+	for _, file := range fs {
+		if !file.IsDir() {
+			if rPath[len(rPath)-1] != '/' {
+				configFile = rPath + "/" + file.Name()
+			} else {
+				configFile = rPath + file.Name()
+			}
+
+			cfg, err := app.ReadConfig(configFile)
+			if err != nil {
+				logo.Errorln("Run Fetcher Master Read From File: ", configFile, " Config Error: ", err)
+				continue
+			}
+			fetcher = NewFetcher(cfg.SoFile)
+			if fetcher == nil {
+				logo.Errorln("New Fetcher Load so File Funcs Error From File: ", cfg.SoFile)
+				continue
+			}
+		}
+	}
+	if fetcher == nil {
+		logo.Errorln("!!!!!!Read All So File, But Can't Init DB Fetcher")
+		return false
+	}
+
+	logo.Infoln("~~~~~~Created Fetcher, Now Sync From DB")
+
+	// fetcher.fnInitDBAPI(util.FSI.IP, util.FSI.HTTPort, util.FSI.HBPort, util.FSI.DataPort, logo.Infoln)
+	fetcher.fnInitDBAPI("192.168.20.10", util.FSI.HTTPort, util.FSI.HBPort, util.FSI.DataPort, logo.Infoln)
+	sdks := fetcher.fnSDKInfo()
+
+	return manualStart(ctx, sdks, configPath)
+}
+
+func manualStart(ctx context.Context, sdks []sdkstruct.SDKInfo, configPath string) bool {
+	rPath := configPath
+
+	for k, v := range sdks {
+
+		file := rPath + v.SdkType + ".json"
+		if rPath[len(rPath)-1] != '/' {
+			file = rPath + "/" + v.SdkType + ".json"
+		}
+
+		cfg, err := app.ReadConfig(file)
+		if err != nil {
+			logo.Errorln("Master Read: ", file, " Config Error: ", err)
+			continue
+		}
+
+		logo.Infoln(file, " CONFIG: ", cfg)
+
+		args := []string{
+			`-role=slave`,
+			"-sdk=" + v.SdkType,
+			"-id=" + v.IpcID,
+			"-" + util.ConfigPath + "=" + file,
+		}
+
+		args = append(args, app.GetParams(util.ConfigPath, file)...)
+		pid, err := runProc(ctx, "./analysis", args, &cfg.Env)
+
+		if err != nil {
+			logo.Errorf("ANALYSIS START SLAVE PROC %s IPC: %s error %+v\n", v.SdkType, v.IpcID, err)
+		}
+		logo.Infof("START %d PROC %d SDK %s ID %s\n", k, pid, v.IpcID, v.SdkType)
+	}
+	return true
+
+}
diff --git a/app/master/reaper.go b/app/master/reaper.go
new file mode 100644
index 0000000..d35a2d3
--- /dev/null
+++ b/app/master/reaper.go
@@ -0,0 +1,200 @@
+package master
+
+import (
+	"analysis/logo"
+	"context"
+	"os"
+	"os/exec"
+	"os/signal"
+	"syscall"
+	"time"
+)
+
+type procInfo struct {
+	cmd *exec.Cmd
+	env string
+}
+
+var (
+	procMap = make(map[int]*procInfo)
+)
+
+func restartProc(ctxt context.Context, pid int) {
+	info, ok := procMap[pid]
+	if ok {
+		err := info.cmd.Wait()
+
+		if err != nil {
+			logo.Errorln("pid : [", pid, "] quit error: ", err)
+		} else {
+			logo.Infoln("pid : [", pid, "] quit")
+		}
+		delete(procMap, pid)
+		runProc(ctxt, info.cmd.Path, info.cmd.Args[1:], &info.env)
+	} else {
+		logo.Errorln(pid, " doesn't exist")
+	}
+}
+
+func quitProc(pid int) {
+	info, ok := procMap[pid]
+	if ok {
+		delete(procMap, pid)
+
+		syscall.Kill(pid, syscall.SIGINT)
+		info.cmd.Wait()
+	} else {
+		logo.Errorln(pid, " doesn't exist")
+	}
+}
+func runProc(ctxt context.Context, bin string, args []string, env *string) (int, error) {
+	cmd := exec.CommandContext(ctxt, bin, args...)
+	cmd.Env = os.Environ()
+	cmd.Env = append(cmd.Env, *env)
+
+	pid := -1
+	cmd.Stdout = os.Stdout
+	cmd.Stderr = os.Stderr
+	err := cmd.Start()
+	if err == nil {
+		pid = cmd.Process.Pid
+		procMap[pid] = &procInfo{cmd, *env}
+	}
+	return pid, err
+}
+
+// Config conf
+type Config struct {
+	Pid              int
+	Options          int
+	DisablePid1Check bool
+}
+
+//  Handle death of child (SIGCHLD) messages. Pushes the signal onto the
+//  notifications channel if there is a waiter.
+func sigChildHandler(notifications chan os.Signal) {
+	sigs := make(chan os.Signal, 3)
+	signal.Notify(sigs, syscall.SIGCHLD)
+
+	for {
+		sig := <-sigs
+		select {
+		case notifications <- sig: /*  published it.  */
+		default:
+			/*
+			 *  Notifications channel full - drop it to the
+			 *  floor. This ensures we don't fill up the SIGCHLD
+			 *  queue. The reaper just waits for any child
+			 *  process (pid=-1), so we ain't loosing it!! ;^)
+			 */
+		}
+	}
+
+} /*  End of function  sigChildHandler.  */
+
+//  Be a good parent - clean up behind the children.
+func reapChildren(config Config, pidChan chan<- int) {
+	notifications := make(chan os.Signal, 1)
+
+	go sigChildHandler(notifications)
+
+	pid := config.Pid
+	opts := config.Options
+
+	for {
+		sig := <-notifications
+		logo.Infof(" - Received signal %v\n", sig)
+		for {
+			var wstatus syscall.WaitStatus
+
+			/*
+			 *  Reap 'em, so that zombies don't accumulate.
+			 *  Plants vs. Zombies!!
+			 */
+			pid, err := syscall.Wait4(pid, &wstatus, opts, nil)
+			pidChan <- pid
+			for syscall.EINTR == err {
+				pid, err = syscall.Wait4(pid, &wstatus, opts, nil)
+				pidChan <- pid
+			}
+
+			if syscall.ECHILD == err {
+				break
+			}
+
+			logo.Infof(" - Grim reaper cleanup: pid=%d, wstatus=%+v\n",
+				pid, wstatus)
+
+		}
+	}
+
+} /*   End of function  reapChildren.  */
+
+/*
+ *  ======================================================================
+ *  Section: Exported functions
+ *  ======================================================================
+ */
+
+//  Normal entry point for the reaper code. Start reaping children in the
+//  background inside a goroutine.
+
+// Reap reap
+func Reap(pidChan chan<- int) {
+	/*
+	 *  Only reap processes if we are taking over init's duties aka
+	 *  we are running as pid 1 inside a docker container. The default
+	 *  is to reap all processes.
+	 */
+	Start(Config{
+		Pid:              -1,
+		Options:          0,
+		DisablePid1Check: true,
+	}, pidChan)
+
+} /*  End of [exported] function  Reap.  */
+
+//  Entry point for invoking the reaper code with a specific configuration.
+//  The config allows you to bypass the pid 1 checks, so handle with care.
+//  The child processes are reaped in the background inside a goroutine.
+
+// Start start
+func Start(config Config, pidChan chan<- int) {
+	/*
+	 *  Start the Reaper with configuration options. This allows you to
+	 *  reap processes even if the current pid isn't running as pid 1.
+	 *  So ... use with caution!!
+	 *
+	 *  In most cases, you are better off just using Reap() as that
+	 *  checks if we are running as Pid 1.
+	 */
+	if !config.DisablePid1Check {
+		mypid := os.Getpid()
+		if 1 != mypid {
+			logo.Errorln(" - Grim reaper disabled, pid not 1\n")
+			return
+		}
+	}
+
+	/*
+	 *  Ok, so either pid 1 checks are disabled or we are the grandma
+	 *  of 'em all, either way we get to play the grim reaper.
+	 *  You will be missed, Terry Pratchett!! RIP
+	 */
+	go reapChildren(config, pidChan)
+
+} /*  End of [exported] function  Start.  */
+
+func waitForRestart(ctxt context.Context, pidChan <-chan int) {
+
+	for {
+		select {
+		case <-ctxt.Done():
+			return
+		case pid := <-pidChan:
+			restartProc(ctxt, pid)
+		default:
+			time.Sleep(3 * time.Second)
+		}
+	}
+}
diff --git a/app/slave/sdkLoad.go b/app/slave/sdkLoad.go
new file mode 100644
index 0000000..ceb4f4b
--- /dev/null
+++ b/app/slave/sdkLoad.go
@@ -0,0 +1,40 @@
+package slave
+
+import (
+	"analysis/app"
+	"analysis/logo"
+	"context"
+	"plugin"
+)
+
+// func Create(config string, typ, id string, gpu int, shm bool, ipc2Rule string, ruleMaxSize int, fn func(...interface{}), reserved map[string]string) interface{}
+// func Run(ctx context.Context, i interface{}) {
+
+type sdk struct {
+	fnCreate func(string, string, string, int, bool, string, int, func(...interface{}), map[string]string) interface{}
+	fnRun    func(context.Context, interface{})
+}
+
+func loadSDK(soFile string) *sdk {
+	plug, err := plugin.Open(soFile)
+	if err != nil {
+		logo.Errorln("Slave Open so File: ", soFile, " Error: ", err)
+		return nil
+	}
+
+	fnC, err := app.LoadFunc(plug, soFile, "Create")
+	if err != nil {
+		logo.Errorln("Load Func Create From: ", soFile, " Error: ", err)
+		return nil
+	}
+
+	fnR, err := app.LoadFunc(plug, soFile, "Run")
+	if err != nil {
+		logo.Errorln("Load Func Run From: ", soFile, " Error: ", err)
+		return nil
+	}
+	return &sdk{
+		fnCreate: fnC.(func(string, string, string, int, bool, string, int, func(...interface{}), map[string]string) interface{}),
+		fnRun:    fnR.(func(context.Context, interface{})),
+	}
+}
diff --git a/app/slave/slave.go b/app/slave/slave.go
new file mode 100644
index 0000000..6c77ec2
--- /dev/null
+++ b/app/slave/slave.go
@@ -0,0 +1,54 @@
+package slave
+
+import (
+	"analysis/app"
+	"analysis/logo"
+	"analysis/util"
+	"context"
+	"plugin"
+)
+
+// TwoPluginConflict test
+func TwoPluginConflict(commSoFile, config string) bool {
+	cfg, err := app.ReadConfig(config)
+	if err != nil {
+		logo.Errorln("Slave Read Config Error: ", err)
+		return false
+	}
+	sdk := loadSDK(cfg.SoFile)
+	if sdk == nil {
+	}
+
+	plug, err := plugin.Open(commSoFile)
+	if err != nil || plug == nil {
+		logo.Errorln("Slave Open so File: ", commSoFile, " Error: ", err)
+		return false
+	}
+	return true
+}
+
+// Run run
+func Run(ctx context.Context, config, typ, id string, gpu int, shm bool) bool {
+	cfg, err := app.ReadConfig(config)
+	if err != nil {
+		logo.Errorln("Slave Read Config Error: ", err)
+		return false
+	}
+	sdk := loadSDK(cfg.SoFile)
+	if sdk == nil {
+		return false
+	}
+
+	// func Create(config string, typ, id string, gpu int, shm bool, ipc2Rule string, ruleMaxSize int, fn func(...interface{}), reserved map[string]string) interface{}
+	// func Run(ctx context.Context, i interface{}) {
+
+	handle := sdk.fnCreate(config, typ, id, gpu, shm, util.ToRuleIPC, 30, logo.Infoln, nil)
+	if handle == nil {
+		logo.Errorln("Create SDK: ", typ, " ID: ", id, " Error")
+		return false
+	}
+
+	sdk.fnRun(ctx, handle)
+
+	return true
+}

--
Gitblit v1.8.0