zhangmeng
2019-12-20 41069e00282aeb597af821127e55c1762758f6d8
update
6个文件已添加
503 ■■■■■ 已修改文件
app/common.go 67 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
app/master/dbfetcher.go 45 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
app/master/master.go 97 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
app/master/reaper.go 200 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
app/slave/sdkLoad.go 40 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
app/slave/slave.go 54 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
app/common.go
New file
@@ -0,0 +1,67 @@
package app
import (
    "analysis/logo"
    "analysis/util"
    "encoding/json"
    "fmt"
    "io/ioutil"
    "plugin"
)
// SdkConfig sdk
type SdkConfig struct {
    SoFile string            `json:"so_file_path"`
    Env    string            `json:"runtime"`
    Param  map[string]string `json:"param"`
}
// ReadConfig config json
func ReadConfig(file string) (SdkConfig, error) {
    data, err := ioutil.ReadFile(file)
    if err != nil {
        return SdkConfig{}, fmt.Errorf("READ SDK CONFIG FILE %s ERROR", file)
    }
    //读取的数据为json格式,需要进行解码
    var v SdkConfig
    err = json.Unmarshal(data, &v)
    return v, err
}
// EnvNoValue env no
const EnvNoValue = "env-no-value"
// ReadEnv env
func ReadEnv(file string) string {
    c, err := ReadConfig(file)
    if err != nil {
        return EnvNoValue
    }
    return c.Env
}
// GetParams params
func GetParams(rKey, rValue string) []string {
    var params []string
    for k, v := range util.MapParames {
        param := "-" + k + "=" + v
        if k == rKey {
            param = "-" + k + "=" + rValue
        }
        params = append(params, param)
    }
    return params
}
// LoadFunc load plugin
func LoadFunc(plug *plugin.Plugin, soFile, fnName string) (plugin.Symbol, error) {
    fn, err := plug.Lookup(fnName)
    if err != nil {
        logo.Errorln("Lookup Func: ", fnName, " From: ", soFile, " Error: ", err)
    }
    return fn, err
}
app/master/dbfetcher.go
New file
@@ -0,0 +1,45 @@
package master
import (
    "analysis/app"
    "analysis/logo"
    "plugin"
    "basic.com/libgowrapper/sdkstruct.git"
)
// Fetcher db
type Fetcher struct {
    fnInitDBAPI func(string, int, int, int, func(...interface{}))
    fnSDKInfo   func() []sdkstruct.SDKInfo
}
// NewFetcher new
func NewFetcher(soFile string) *Fetcher {
    plug, err := plugin.Open(soFile)
    if err != nil {
        logo.Errorln("Open: ", soFile, " error: ", err)
        return nil
    }
    fn, err := app.LoadFunc(plug, soFile, "InitDBAPI")
    if err != nil {
        logo.Infoln("Lookup Func InitDBAPI From File: ", soFile, " Error")
        return nil
    }
    fnInit := fn.(func(string, int, int, int, func(...interface{})))
    fn, err = app.LoadFunc(plug, soFile, "SDKInfo")
    if err != nil {
        logo.Infoln("Lookup Func SDKInfo From File: ", soFile, " Error")
        return nil
    }
    fnSDKInfo := fn.(func() []sdkstruct.SDKInfo)
    return &Fetcher{
        fnInitDBAPI: fnInit,
        fnSDKInfo:   fnSDKInfo,
    }
}
app/master/master.go
New file
@@ -0,0 +1,97 @@
package master
import (
    "analysis/app"
    "analysis/logo"
    "analysis/util"
    "context"
    "io/ioutil"
    "basic.com/libgowrapper/sdkstruct.git"
)
func reaper(ctxt context.Context) {
    pidChan := make(chan int, 1)
    Reap(pidChan)
    go waitForRestart(ctxt, pidChan)
}
// Run run
func Run(ctx context.Context, configPath string) bool {
    reaper(ctx)
    rPath := configPath
    configFile := configPath
    var fetcher *Fetcher
    fs, _ := ioutil.ReadDir(rPath)
    for _, file := range fs {
        if !file.IsDir() {
            if rPath[len(rPath)-1] != '/' {
                configFile = rPath + "/" + file.Name()
            } else {
                configFile = rPath + file.Name()
            }
            cfg, err := app.ReadConfig(configFile)
            if err != nil {
                logo.Errorln("Run Fetcher Master Read From File: ", configFile, " Config Error: ", err)
                continue
            }
            fetcher = NewFetcher(cfg.SoFile)
            if fetcher == nil {
                logo.Errorln("New Fetcher Load so File Funcs Error From File: ", cfg.SoFile)
                continue
            }
        }
    }
    if fetcher == nil {
        logo.Errorln("!!!!!!Read All So File, But Can't Init DB Fetcher")
        return false
    }
    logo.Infoln("~~~~~~Created Fetcher, Now Sync From DB")
    // fetcher.fnInitDBAPI(util.FSI.IP, util.FSI.HTTPort, util.FSI.HBPort, util.FSI.DataPort, logo.Infoln)
    fetcher.fnInitDBAPI("192.168.20.10", util.FSI.HTTPort, util.FSI.HBPort, util.FSI.DataPort, logo.Infoln)
    sdks := fetcher.fnSDKInfo()
    return manualStart(ctx, sdks, configPath)
}
func manualStart(ctx context.Context, sdks []sdkstruct.SDKInfo, configPath string) bool {
    rPath := configPath
    for k, v := range sdks {
        file := rPath + v.SdkType + ".json"
        if rPath[len(rPath)-1] != '/' {
            file = rPath + "/" + v.SdkType + ".json"
        }
        cfg, err := app.ReadConfig(file)
        if err != nil {
            logo.Errorln("Master Read: ", file, " Config Error: ", err)
            continue
        }
        logo.Infoln(file, " CONFIG: ", cfg)
        args := []string{
            `-role=slave`,
            "-sdk=" + v.SdkType,
            "-id=" + v.IpcID,
            "-" + util.ConfigPath + "=" + file,
        }
        args = append(args, app.GetParams(util.ConfigPath, file)...)
        pid, err := runProc(ctx, "./analysis", args, &cfg.Env)
        if err != nil {
            logo.Errorf("ANALYSIS START SLAVE PROC %s IPC: %s error %+v\n", v.SdkType, v.IpcID, err)
        }
        logo.Infof("START %d PROC %d SDK %s ID %s\n", k, pid, v.IpcID, v.SdkType)
    }
    return true
}
app/master/reaper.go
New file
@@ -0,0 +1,200 @@
package master
import (
    "analysis/logo"
    "context"
    "os"
    "os/exec"
    "os/signal"
    "syscall"
    "time"
)
type procInfo struct {
    cmd *exec.Cmd
    env string
}
var (
    procMap = make(map[int]*procInfo)
)
func restartProc(ctxt context.Context, pid int) {
    info, ok := procMap[pid]
    if ok {
        err := info.cmd.Wait()
        if err != nil {
            logo.Errorln("pid : [", pid, "] quit error: ", err)
        } else {
            logo.Infoln("pid : [", pid, "] quit")
        }
        delete(procMap, pid)
        runProc(ctxt, info.cmd.Path, info.cmd.Args[1:], &info.env)
    } else {
        logo.Errorln(pid, " doesn't exist")
    }
}
func quitProc(pid int) {
    info, ok := procMap[pid]
    if ok {
        delete(procMap, pid)
        syscall.Kill(pid, syscall.SIGINT)
        info.cmd.Wait()
    } else {
        logo.Errorln(pid, " doesn't exist")
    }
}
func runProc(ctxt context.Context, bin string, args []string, env *string) (int, error) {
    cmd := exec.CommandContext(ctxt, bin, args...)
    cmd.Env = os.Environ()
    cmd.Env = append(cmd.Env, *env)
    pid := -1
    cmd.Stdout = os.Stdout
    cmd.Stderr = os.Stderr
    err := cmd.Start()
    if err == nil {
        pid = cmd.Process.Pid
        procMap[pid] = &procInfo{cmd, *env}
    }
    return pid, err
}
// Config conf
type Config struct {
    Pid              int
    Options          int
    DisablePid1Check bool
}
//  Handle death of child (SIGCHLD) messages. Pushes the signal onto the
//  notifications channel if there is a waiter.
func sigChildHandler(notifications chan os.Signal) {
    sigs := make(chan os.Signal, 3)
    signal.Notify(sigs, syscall.SIGCHLD)
    for {
        sig := <-sigs
        select {
        case notifications <- sig: /*  published it.  */
        default:
            /*
             *  Notifications channel full - drop it to the
             *  floor. This ensures we don't fill up the SIGCHLD
             *  queue. The reaper just waits for any child
             *  process (pid=-1), so we ain't loosing it!! ;^)
             */
        }
    }
} /*  End of function  sigChildHandler.  */
//  Be a good parent - clean up behind the children.
func reapChildren(config Config, pidChan chan<- int) {
    notifications := make(chan os.Signal, 1)
    go sigChildHandler(notifications)
    pid := config.Pid
    opts := config.Options
    for {
        sig := <-notifications
        logo.Infof(" - Received signal %v\n", sig)
        for {
            var wstatus syscall.WaitStatus
            /*
             *  Reap 'em, so that zombies don't accumulate.
             *  Plants vs. Zombies!!
             */
            pid, err := syscall.Wait4(pid, &wstatus, opts, nil)
            pidChan <- pid
            for syscall.EINTR == err {
                pid, err = syscall.Wait4(pid, &wstatus, opts, nil)
                pidChan <- pid
            }
            if syscall.ECHILD == err {
                break
            }
            logo.Infof(" - Grim reaper cleanup: pid=%d, wstatus=%+v\n",
                pid, wstatus)
        }
    }
} /*   End of function  reapChildren.  */
/*
 *  ======================================================================
 *  Section: Exported functions
 *  ======================================================================
 */
//  Normal entry point for the reaper code. Start reaping children in the
//  background inside a goroutine.
// Reap reap
func Reap(pidChan chan<- int) {
    /*
     *  Only reap processes if we are taking over init's duties aka
     *  we are running as pid 1 inside a docker container. The default
     *  is to reap all processes.
     */
    Start(Config{
        Pid:              -1,
        Options:          0,
        DisablePid1Check: true,
    }, pidChan)
} /*  End of [exported] function  Reap.  */
//  Entry point for invoking the reaper code with a specific configuration.
//  The config allows you to bypass the pid 1 checks, so handle with care.
//  The child processes are reaped in the background inside a goroutine.
// Start start
func Start(config Config, pidChan chan<- int) {
    /*
     *  Start the Reaper with configuration options. This allows you to
     *  reap processes even if the current pid isn't running as pid 1.
     *  So ... use with caution!!
     *
     *  In most cases, you are better off just using Reap() as that
     *  checks if we are running as Pid 1.
     */
    if !config.DisablePid1Check {
        mypid := os.Getpid()
        if 1 != mypid {
            logo.Errorln(" - Grim reaper disabled, pid not 1\n")
            return
        }
    }
    /*
     *  Ok, so either pid 1 checks are disabled or we are the grandma
     *  of 'em all, either way we get to play the grim reaper.
     *  You will be missed, Terry Pratchett!! RIP
     */
    go reapChildren(config, pidChan)
} /*  End of [exported] function  Start.  */
func waitForRestart(ctxt context.Context, pidChan <-chan int) {
    for {
        select {
        case <-ctxt.Done():
            return
        case pid := <-pidChan:
            restartProc(ctxt, pid)
        default:
            time.Sleep(3 * time.Second)
        }
    }
}
app/slave/sdkLoad.go
New file
@@ -0,0 +1,40 @@
package slave
import (
    "analysis/app"
    "analysis/logo"
    "context"
    "plugin"
)
// func Create(config string, typ, id string, gpu int, shm bool, ipc2Rule string, ruleMaxSize int, fn func(...interface{}), reserved map[string]string) interface{}
// func Run(ctx context.Context, i interface{}) {
type sdk struct {
    fnCreate func(string, string, string, int, bool, string, int, func(...interface{}), map[string]string) interface{}
    fnRun    func(context.Context, interface{})
}
func loadSDK(soFile string) *sdk {
    plug, err := plugin.Open(soFile)
    if err != nil {
        logo.Errorln("Slave Open so File: ", soFile, " Error: ", err)
        return nil
    }
    fnC, err := app.LoadFunc(plug, soFile, "Create")
    if err != nil {
        logo.Errorln("Load Func Create From: ", soFile, " Error: ", err)
        return nil
    }
    fnR, err := app.LoadFunc(plug, soFile, "Run")
    if err != nil {
        logo.Errorln("Load Func Run From: ", soFile, " Error: ", err)
        return nil
    }
    return &sdk{
        fnCreate: fnC.(func(string, string, string, int, bool, string, int, func(...interface{}), map[string]string) interface{}),
        fnRun:    fnR.(func(context.Context, interface{})),
    }
}
app/slave/slave.go
New file
@@ -0,0 +1,54 @@
package slave
import (
    "analysis/app"
    "analysis/logo"
    "analysis/util"
    "context"
    "plugin"
)
// TwoPluginConflict test
func TwoPluginConflict(commSoFile, config string) bool {
    cfg, err := app.ReadConfig(config)
    if err != nil {
        logo.Errorln("Slave Read Config Error: ", err)
        return false
    }
    sdk := loadSDK(cfg.SoFile)
    if sdk == nil {
    }
    plug, err := plugin.Open(commSoFile)
    if err != nil || plug == nil {
        logo.Errorln("Slave Open so File: ", commSoFile, " Error: ", err)
        return false
    }
    return true
}
// Run run
func Run(ctx context.Context, config, typ, id string, gpu int, shm bool) bool {
    cfg, err := app.ReadConfig(config)
    if err != nil {
        logo.Errorln("Slave Read Config Error: ", err)
        return false
    }
    sdk := loadSDK(cfg.SoFile)
    if sdk == nil {
        return false
    }
    // func Create(config string, typ, id string, gpu int, shm bool, ipc2Rule string, ruleMaxSize int, fn func(...interface{}), reserved map[string]string) interface{}
    // func Run(ctx context.Context, i interface{}) {
    handle := sdk.fnCreate(config, typ, id, gpu, shm, util.ToRuleIPC, 30, logo.Infoln, nil)
    if handle == nil {
        logo.Errorln("Create SDK: ", typ, " ID: ", id, " Error")
        return false
    }
    sdk.fnRun(ctx, handle)
    return true
}