add recv sdk proc info from dispath
New file |
| | |
| | | package master |
| | | |
| | | import ( |
| | | "context" |
| | | "encoding/json" |
| | | "os" |
| | | "os/exec" |
| | | "syscall" |
| | | "time" |
| | | |
| | | "analysis/logo" |
| | | "analysis/util" |
| | | ) |
| | | |
| | | // NamedProc 单个进程名字和服务通道 |
| | | type NamedProc struct { |
| | | // 进程名字 |
| | | Name string |
| | | // 进程通道 |
| | | Channels []string |
| | | // 进程runtime |
| | | Env string |
| | | // 进程config file path |
| | | Config string |
| | | // 进程param |
| | | Param []string |
| | | } |
| | | |
| | | // TypeProc 每个Type进程的参数 |
| | | type TypeProc struct { |
| | | // 进程类型FaceDetect/Yolo |
| | | Typ string |
| | | // 具名进程 |
| | | SNameProc []NamedProc |
| | | } |
| | | |
| | | // Notice transit to slave |
| | | type Notice struct { |
| | | Op string `json:"Op"` |
| | | Content []string `json:"Content"` |
| | | } |
| | | type transit struct { |
| | | chNotify chan<- []byte |
| | | cancel context.CancelFunc |
| | | } |
| | | |
| | | // Worker 单个进程服务 |
| | | type Worker struct { |
| | | pid int |
| | | cmd *exec.Cmd |
| | | info *NamedProc |
| | | trans *transit |
| | | } |
| | | |
| | | // Daemon 监控的所有子进程 |
| | | type Daemon struct { |
| | | // 每个sdk类型启动的进程数量 |
| | | workers map[string][]*Worker |
| | | } |
| | | |
| | | // NewDaemon 监控 |
| | | func NewDaemon() *Daemon { |
| | | return &Daemon{ |
| | | workers: make(map[string][]*Worker), |
| | | } |
| | | } |
| | | |
| | | //求交集 |
| | | func intersect(slice1, slice2 []string) []string { |
| | | m := make(map[string]int) |
| | | nn := make([]string, 0) |
| | | for _, v := range slice1 { |
| | | m[v]++ |
| | | } |
| | | |
| | | for _, v := range slice2 { |
| | | times, _ := m[v] |
| | | if times == 1 { |
| | | nn = append(nn, v) |
| | | } |
| | | } |
| | | return nn |
| | | } |
| | | |
| | | //求差集 slice1-并集 |
| | | func difference(slice1, slice2 []string) []string { |
| | | m := make(map[string]int) |
| | | nn := make([]string, 0) |
| | | inter := intersect(slice1, slice2) |
| | | for _, v := range inter { |
| | | m[v]++ |
| | | } |
| | | |
| | | for _, value := range slice1 { |
| | | times, _ := m[value] |
| | | if times == 0 { |
| | | nn = append(nn, value) |
| | | } |
| | | } |
| | | return nn |
| | | } |
| | | |
| | | func removeWorker(w *Worker) { |
| | | syscall.Kill(w.pid, syscall.SIGTERM) |
| | | w.cmd.Wait() |
| | | w.trans.cancel() |
| | | } |
| | | |
| | | func (d *Daemon) rmWorkerWith(typ string) { |
| | | if workers, ok := d.workers[typ]; ok { |
| | | delete(d.workers, typ) |
| | | |
| | | for _, w := range workers { |
| | | removeWorker(w) |
| | | } |
| | | } |
| | | } |
| | | |
| | | func (d *Daemon) rmWorkerNoType(childs []TypeProc) { |
| | | var newTypes []string |
| | | for _, v := range childs { |
| | | newTypes = append(newTypes, v.Typ) |
| | | } |
| | | |
| | | var runTypes []string |
| | | for k := range d.workers { |
| | | runTypes = append(runTypes, k) |
| | | } |
| | | |
| | | // 不存在于新信息中的type, remove |
| | | rmTypes := difference(runTypes, newTypes) |
| | | if len(rmTypes) > 0 { |
| | | for _, v := range rmTypes { |
| | | d.rmWorkerWith(v) |
| | | } |
| | | } |
| | | } |
| | | |
| | | func (d *Daemon) rmWorkerNoNamed(workers []*Worker, procs []NamedProc) []*Worker { |
| | | var newNames []string |
| | | for _, v := range procs { |
| | | newNames = append(newNames, v.Name) |
| | | } |
| | | |
| | | var runNames []string |
| | | for _, v := range workers { |
| | | runNames = append(runNames, v.info.Name) |
| | | } |
| | | |
| | | // 已经不需要存在进程,remove |
| | | rmWorkers := difference(runNames, newNames) |
| | | for _, v := range rmWorkers { |
| | | for _, w := range workers { |
| | | if v == w.info.Name { |
| | | removeWorker(w) |
| | | } |
| | | } |
| | | } |
| | | |
| | | // 保留已存在的进程 |
| | | stillWorks := intersect(runNames, newNames) |
| | | var ret []*Worker |
| | | for _, v := range stillWorks { |
| | | for _, w := range workers { |
| | | if v == w.info.Name { |
| | | ret = append(ret, w) |
| | | } |
| | | } |
| | | } |
| | | |
| | | return ret |
| | | } |
| | | |
| | | ////////////////////////////////////////////////////////// |
| | | |
| | | func getNamedProc(typ string, childs []TypeProc) []NamedProc { |
| | | for _, v := range childs { |
| | | if v.Typ == typ { |
| | | return v.SNameProc |
| | | } |
| | | } |
| | | return nil |
| | | } |
| | | |
| | | func getNamedProcInfo(name string, procs []NamedProc) *NamedProc { |
| | | for _, v := range procs { |
| | | if name == v.Name { |
| | | return &v |
| | | } |
| | | } |
| | | return nil |
| | | } |
| | | |
| | | func (d *Daemon) channelChanged(ctx context.Context, typs []string, childs []TypeProc) { |
| | | for _, s := range typs { |
| | | // 存在这个类型的进程 |
| | | if workers, ok := d.workers[s]; ok { |
| | | child := getNamedProc(s, childs) |
| | | if child == nil { |
| | | continue |
| | | } |
| | | var newNames []string |
| | | for _, v := range child { |
| | | newNames = append(newNames, v.Name) |
| | | } |
| | | |
| | | var runNames []string |
| | | for _, v := range workers { |
| | | runNames = append(runNames, v.info.Name) |
| | | } |
| | | |
| | | add := difference(newNames, runNames) |
| | | for _, c := range child { |
| | | for _, v := range add { |
| | | if c.Name == v { |
| | | d.startWorker(ctx, s, &c) |
| | | } |
| | | } |
| | | } |
| | | |
| | | adjust := intersect(runNames, newNames) |
| | | for _, v := range adjust { |
| | | proc := getNamedProcInfo(v, child) |
| | | if proc == nil { |
| | | continue |
| | | } |
| | | |
| | | for _, w := range workers { |
| | | if v == w.info.Name { |
| | | // 找到了对应名字的进程,首先求不需要再运行的通道 |
| | | var notice *Notice |
| | | removes := difference(w.info.Channels, proc.Channels) |
| | | if len(removes) > 0 { |
| | | // 通知子进程关闭通道 |
| | | notice = &Notice{ |
| | | Op: "remove", |
| | | Content: removes, |
| | | } |
| | | |
| | | } |
| | | |
| | | // 其次求出新增的通道 |
| | | adds := difference(proc.Channels, w.info.Channels) |
| | | if len(adds) > 0 { |
| | | // 通知子进程打开通道 |
| | | notice = &Notice{ |
| | | Op: "add", |
| | | Content: adds, |
| | | } |
| | | } |
| | | if notice != nil { |
| | | if d, err := json.Marshal(*notice); err == nil { |
| | | w.trans.chNotify <- d |
| | | } |
| | | } |
| | | } |
| | | } |
| | | } |
| | | } |
| | | } |
| | | } |
| | | |
| | | func (d *Daemon) startNewWorker(ctx context.Context, child TypeProc) { |
| | | for _, v := range child.SNameProc { |
| | | d.startWorker(ctx, child.Typ, &v) |
| | | } |
| | | } |
| | | |
| | | func (d *Daemon) adjustWorker(ctx context.Context, childs []TypeProc) { |
| | | var newTypes []string |
| | | for _, v := range childs { |
| | | newTypes = append(newTypes, v.Typ) |
| | | } |
| | | |
| | | var runTypes []string |
| | | for k := range d.workers { |
| | | runTypes = append(runTypes, k) |
| | | } |
| | | |
| | | // 新类型添加 |
| | | addWorkers := difference(newTypes, runTypes) |
| | | for _, a := range addWorkers { |
| | | for _, v := range childs { |
| | | if a == v.Typ { |
| | | // start new type proc |
| | | d.startNewWorker(ctx, v) |
| | | } |
| | | } |
| | | } |
| | | |
| | | stillWorkers := intersect(newTypes, runTypes) |
| | | // 调整已存在的进程的通道 |
| | | d.channelChanged(ctx, stillWorkers, childs) |
| | | } |
| | | |
| | | func (d *Daemon) updateWorker(ctx context.Context, childs []TypeProc) { |
| | | // 新的进程信息,首先删除掉不再运行的类型 |
| | | d.rmWorkerNoType(childs) |
| | | // 按名字删除特定类型中不再运行的进程 |
| | | for _, v := range childs { |
| | | if workers, ok := d.workers[v.Typ]; ok { |
| | | nWorkers := d.rmWorkerNoNamed(workers, v.SNameProc) |
| | | d.workers[v.Typ] = nWorkers |
| | | } |
| | | } |
| | | |
| | | d.adjustWorker(ctx, childs) |
| | | } |
| | | |
| | | // Watch watch |
| | | func (d *Daemon) Watch(ctx context.Context, ch <-chan []TypeProc) { |
| | | chExit := make(chan ExitInfo, 32) |
| | | go Reap(chExit) |
| | | |
| | | for { |
| | | select { |
| | | case <-ctx.Done(): |
| | | return |
| | | case i := <-chExit: |
| | | d.reWorker(ctx, &i) |
| | | case childs := <-ch: |
| | | d.updateWorker(ctx, childs) |
| | | default: |
| | | time.Sleep(time.Second) |
| | | } |
| | | } |
| | | } |
| | | |
| | | func (d *Daemon) reWorker(ctx context.Context, info *ExitInfo) { |
| | | // 有退出的进程,查看是否在运行进程中,拉起 |
| | | for i, workers := range d.workers { |
| | | found := false |
| | | for j, w := range workers { |
| | | if w.pid == info.Pid { |
| | | w = d.restartWorker(ctx, w) |
| | | d.workers[i][j] = w |
| | | found = true |
| | | break |
| | | } |
| | | } |
| | | if found { |
| | | break |
| | | } |
| | | } |
| | | } |
| | | |
| | | func (d *Daemon) restartWorker(ctx context.Context, w *Worker) *Worker { |
| | | w.cmd.Wait() |
| | | w.cmd = runProc(ctx, w.cmd.Path, w.info.Env, w.cmd.Args[1:]) |
| | | w.pid = w.cmd.Process.Pid |
| | | return w |
| | | } |
| | | |
| | | func (d *Daemon) startWorker(ctx context.Context, typ string, info *NamedProc) { |
| | | |
| | | ipcID := "analysis-" + typ + "-" + info.Name |
| | | |
| | | args := []string{ |
| | | `-role=slave`, |
| | | "-sdk=" + typ, |
| | | "-id=" + ipcID, |
| | | "-" + util.ConfigPath + "=" + info.Config, |
| | | } |
| | | |
| | | args = append(args, info.Param...) |
| | | cmd := runProc(ctx, "./analysis", info.Env, args) |
| | | |
| | | if cmd == nil { |
| | | logo.Errorf("ANALYSIS START SLAVE PROC %s IPC: %s Failed\n", typ, ipcID) |
| | | } |
| | | logo.Infof("START SDK %s ID %s PID %d Env: %s\n", typ, ipcID, cmd.Process.Pid, info.Env) |
| | | |
| | | ch := make(chan []byte, 3) |
| | | cancel := fnNotify(ctx, ipcID, ch, logo.Infoln) |
| | | |
| | | w := &Worker{ |
| | | pid: cmd.Process.Pid, |
| | | cmd: cmd, |
| | | info: info, |
| | | trans: &transit{ |
| | | chNotify: ch, |
| | | cancel: cancel, |
| | | }, |
| | | } |
| | | d.workers[typ] = append(d.workers[typ], w) |
| | | } |
| | | |
| | | func runProc(ctxt context.Context, bin, env string, args []string) *exec.Cmd { |
| | | cmd := exec.CommandContext(ctxt, bin, args...) |
| | | rEnv := "" |
| | | if len(env) != 0 { |
| | | runtime := "LD_LIBRARY_PATH" |
| | | rEnv = runtime + "=" + env |
| | | logo.Infoln("Env String: ", rEnv) |
| | | |
| | | // remove os environ ld |
| | | old := os.Getenv(runtime) |
| | | os.Unsetenv(runtime) |
| | | cmd.Env = os.Environ() |
| | | cmd.Env = append(cmd.Env, rEnv) |
| | | os.Setenv(runtime, old) |
| | | } |
| | | |
| | | // //debug |
| | | // cmd.Stdout = os.Stdout |
| | | // cmd.Stderr = os.Stderr |
| | | cmd.SysProcAttr = &syscall.SysProcAttr{Pdeathsig: syscall.SIGTERM} |
| | | |
| | | if err := cmd.Start(); err == nil { |
| | | return cmd |
| | | } |
| | | return nil |
| | | } |
| | |
| | | "context" |
| | | "encoding/json" |
| | | "os" |
| | | "plugin" |
| | | "strconv" |
| | | "strings" |
| | | "time" |
| | | |
| | | "basic.com/libgowrapper/sdkstruct.git" |
| | | "basic.com/valib/pubsub.git" |
| | | ) |
| | | |
| | | func reaper(ctxt context.Context) { |
| | | pidChan := make(chan int, 1) |
| | | Reap(pidChan) |
| | | go waitForRestart(ctxt, pidChan) |
| | | var ( |
| | | soLoaded bool |
| | | fnFetch func(context.Context, string, string, int, string, chan<- []byte, func(...interface{})) |
| | | fnNotify func(context.Context, string, <-chan []byte, func(...interface{})) context.CancelFunc |
| | | ) |
| | | |
| | | func soLoad(soFile string) bool { |
| | | if fnFetch != nil && fnNotify != nil { |
| | | return true |
| | | } |
| | | |
| | | plug, err := plugin.Open(soFile) |
| | | if err != nil { |
| | | logo.Errorln("Open: ", soFile, " error: ", err) |
| | | return false |
| | | } |
| | | |
| | | var fn plugin.Symbol |
| | | |
| | | fn, err = app.LoadFunc(plug, soFile, "Fetch") |
| | | if err != nil { |
| | | logo.Infoln("Lookup Func Fetch From File: ", soFile, " Error") |
| | | return false |
| | | } |
| | | fnFetch = fn.(func(context.Context, string, string, int, string, chan<- []byte, func(...interface{}))) |
| | | |
| | | fn, err = app.LoadFunc(plug, soFile, "Notify") |
| | | if err != nil { |
| | | logo.Infoln("Lookup Func Notify From File: ", soFile, " Error") |
| | | return false |
| | | } |
| | | fnNotify = fn.(func(context.Context, string, <-chan []byte, func(...interface{})) context.CancelFunc) |
| | | return true |
| | | } |
| | | |
| | | // Run run |
| | | func Run(ctx context.Context, soFile, configPath string) bool { |
| | | reaper(ctx) |
| | | |
| | | fetcher := NewFetcher(soFile) |
| | | if fetcher == nil { |
| | | func initFetcher(ctx context.Context, soFile string) <-chan []byte { |
| | | if !soLoad(soFile) { |
| | | logo.Errorln("New Fetcher Load so File Funcs Error From File: ", soFile) |
| | | return false |
| | | return nil |
| | | } |
| | | |
| | | logo.Infoln("~~~~~~Created Fetcher, Now Sync From DB") |
| | | |
| | | ip := "tcp://" + util.FSI.IP |
| | | ip := "tcp://192.168.5.22" |
| | | // ip := "tcp://" + util.FSI.IP |
| | | url := ip + ":" + strconv.Itoa(util.FSI.DataPort) |
| | | hearturl := ip + ":" + strconv.Itoa(util.FSI.HBPort) |
| | | |
| | | chMsg, err := fetcher.fnInit(url, hearturl, 0, []string{pubsub.Topic_Sdk}, "analysis-master"+strconv.Itoa(os.Getpid())) |
| | | for { |
| | | if err == nil { |
| | | break |
| | | } |
| | | logo.Infoln("Analysis Fetcher INIT Error! URL:", url) |
| | | time.Sleep(time.Second) |
| | | chMsg, err = fetcher.fnInit(url, hearturl, 0, []string{pubsub.Topic_Sdk}, "analysis-master"+strconv.Itoa(os.Getpid())) |
| | | ch := make(chan []byte, 3) |
| | | |
| | | fnFetch(ctx, url, hearturl, 0, "analysis-master"+strconv.Itoa(os.Getpid()), ch, logo.Infoln) |
| | | |
| | | logo.Infoln("~~~~~~Start Recv SDK Infos") |
| | | return ch |
| | | } |
| | | |
| | | // Run run |
| | | func Run(ctx context.Context, soFile, configPath string) bool { |
| | | daemon := NewDaemon() |
| | | chProc := make(chan []TypeProc, 32) |
| | | go daemon.Watch(ctx, chProc) |
| | | |
| | | chMsg := initFetcher(ctx, soFile) |
| | | if chMsg == nil { |
| | | logo.Infoln("Master Run initFetcher Failed") |
| | | return false |
| | | } |
| | | params := app.GetParams() |
| | | |
| | | for { |
| | | select { |
| | | case <-ctx.Done(): |
| | | return true |
| | | case msg := <-chMsg: |
| | | // sdktype process_name topic null |
| | | // yolo/face yolo_0/yolo_1 channel |
| | | // sdktype process_name topic null |
| | | // yolo/face yolo_0/yolo_1 channel |
| | | var sdk map[string](map[string](map[string]interface{})) |
| | | |
| | | if err := json.Unmarshal(msg.Msg, &sdk); err != nil { |
| | | if err := json.Unmarshal(msg, &sdk); err != nil { |
| | | logo.Infoln("Fetcher SDK unmarshal err:", err) |
| | | continue |
| | | } |
| | | |
| | | logo.Infoln("~~~~~~Recv New SDKInfos") |
| | | chCameras <- CameraInfo{ |
| | | Cameras: cameras, |
| | | |
| | | var typeProcs []TypeProc |
| | | |
| | | for sdkType, mapSdkProc := range sdk { |
| | | config := findConfigFile(sdkType, configPath) |
| | | if config == nil { |
| | | logo.Infoln("!!!!!!There Is Not ", sdkType, " Config File In ", configPath, " Skip It") |
| | | continue |
| | | } |
| | | env := checkConfig(sdkType, *config) |
| | | if env == nil { |
| | | continue |
| | | } |
| | | |
| | | var channels []string |
| | | var namedProcs []NamedProc |
| | | |
| | | for procName, mapProcChannels := range mapSdkProc { |
| | | for c := range mapProcChannels { |
| | | channels = append(channels, c) |
| | | } |
| | | p := NamedProc{ |
| | | Name: procName, |
| | | Channels: channels, |
| | | Env: *env, |
| | | Config: *config, |
| | | Param: params, |
| | | } |
| | | namedProcs = append(namedProcs, p) |
| | | } |
| | | t := TypeProc{ |
| | | Typ: sdkType, |
| | | SNameProc: namedProcs, |
| | | } |
| | | typeProcs = append(typeProcs, t) |
| | | } |
| | | chProc <- typeProcs |
| | | |
| | | logo.Infoln("~~~~~~Recv New SDKInfos Over") |
| | | |
| | | default: |
| | |
| | | |
| | | } |
| | | |
| | | func manualStart(ctx context.Context, sdks []sdkstruct.SDKInfo, configPath string) bool { |
| | | func findConfigFile(typ, configPath string) *string { |
| | | rPath := configPath |
| | | |
| | | params := app.GetParams() |
| | | |
| | | for _, v := range sdks { |
| | | |
| | | file := rPath + v.SdkType + ".json" |
| | | if rPath[len(rPath)-1] != '/' { |
| | | file = rPath + "/" + v.SdkType + ".json" |
| | | } |
| | | |
| | | cfg, err := app.ReadConfig(file) |
| | | if err != nil { |
| | | logo.Errorln("Master Read: ", file, " Config Error: ", err) |
| | | continue |
| | | } |
| | | |
| | | if len(cfg.Env) > 0 { |
| | | envs := strings.Split(cfg.Env, ":") |
| | | normal := true |
| | | for _, v := range envs { |
| | | if !util.IsFileExist(v) { |
| | | normal = false |
| | | break |
| | | } |
| | | } |
| | | if !normal { |
| | | logo.Infoln("Can't Find Runtime Path, Skip It: ", file) |
| | | continue |
| | | } |
| | | } |
| | | |
| | | logo.Infoln(file, " CONFIG: ", cfg) |
| | | |
| | | args := []string{ |
| | | `-role=slave`, |
| | | "-sdk=" + v.SdkType, |
| | | "-id=" + v.IpcID, |
| | | "-" + util.ConfigPath + "=" + file, |
| | | } |
| | | |
| | | args = append(args, params...) |
| | | pid, err := runProc(ctx, "./analysis", args, cfg.Env) |
| | | |
| | | if err != nil { |
| | | logo.Errorf("ANALYSIS START SLAVE PROC %s IPC: %s error %+v\n", v.SdkType, v.IpcID, err) |
| | | } |
| | | logo.Infof("START SDK %s ID %s PID %d Env: %s\n", v.SdkType, v.IpcID, pid, cfg.Env) |
| | | // default config file |
| | | file := rPath + typ + ".json" |
| | | // if configPath not end with '/' |
| | | if rPath[len(rPath)-1] != '/' { |
| | | file = rPath + "/" + typ + ".json" |
| | | } |
| | | return true |
| | | // whether file exist |
| | | if util.IsFileExist(file) { |
| | | return &file |
| | | } |
| | | return nil |
| | | } |
| | | |
| | | func checkConfig(typ, file string) *string { |
| | | cfg, err := app.ReadConfig(file) |
| | | if err != nil { |
| | | logo.Errorln("!!!!!!Master Read: ", file, " for ", typ, " Config Error: ", err) |
| | | return nil |
| | | } |
| | | |
| | | // check config runtime exist if config this item |
| | | env := strings.TrimSpace(cfg.Env) |
| | | if len(env) > 0 { |
| | | envs := strings.Split(env, ":") |
| | | pathExist := true |
| | | for _, v := range envs { |
| | | if !util.IsFileExist(v) { |
| | | pathExist = false |
| | | break |
| | | } |
| | | } |
| | | if !pathExist { |
| | | logo.Infoln("Can't Find Runtime Path, Skip SDK: ", typ) |
| | | return nil |
| | | } |
| | | } |
| | | return &env |
| | | } |
| | |
| | | |
| | | import ( |
| | | "analysis/logo" |
| | | "context" |
| | | "os" |
| | | "os/exec" |
| | | "os/signal" |
| | | "syscall" |
| | | "time" |
| | | ) |
| | | |
| | | type procInfo struct { |
| | | cmd *exec.Cmd |
| | | env string |
| | | // ExitInfo info |
| | | type ExitInfo struct { |
| | | Pid int |
| | | ExitCode int |
| | | } |
| | | |
| | | var ( |
| | | procMap = make(map[int]*procInfo) |
| | | ) |
| | | |
| | | func restartProc(ctxt context.Context, pid int) { |
| | | info, ok := procMap[pid] |
| | | if ok { |
| | | err := info.cmd.Wait() |
| | | |
| | | if err != nil { |
| | | logo.Errorln("pid : [", pid, "] quit error: ", err) |
| | | } else { |
| | | logo.Infoln("pid : [", pid, "] quit") |
| | | } |
| | | delete(procMap, pid) |
| | | runProc(ctxt, info.cmd.Path, info.cmd.Args[1:], info.env) |
| | | } else { |
| | | logo.Errorln(pid, " doesn't exist") |
| | | } |
| | | } |
| | | |
| | | func quitProc(pid int) { |
| | | info, ok := procMap[pid] |
| | | if ok { |
| | | delete(procMap, pid) |
| | | |
| | | syscall.Kill(pid, syscall.SIGINT) |
| | | info.cmd.Wait() |
| | | } else { |
| | | logo.Errorln(pid, " doesn't exist") |
| | | } |
| | | } |
| | | func runProc(ctxt context.Context, bin string, args []string, env string) (int, error) { |
| | | cmd := exec.CommandContext(ctxt, bin, args...) |
| | | rEnv := "" |
| | | if len(env) != 0 { |
| | | runtime := "LD_LIBRARY_PATH" |
| | | rEnv = runtime + "=" + env |
| | | logo.Infoln("Env String: ", rEnv) |
| | | |
| | | // remove os environ ld |
| | | old := os.Getenv(runtime) |
| | | os.Unsetenv(runtime) |
| | | cmd.Env = os.Environ() |
| | | cmd.Env = append(cmd.Env, rEnv) |
| | | os.Setenv(runtime, old) |
| | | } |
| | | |
| | | pid := -1 |
| | | cmd.Stdout = os.Stdout |
| | | cmd.Stderr = os.Stderr |
| | | cmd.SysProcAttr = &syscall.SysProcAttr{Pdeathsig: syscall.SIGTERM} |
| | | |
| | | err := cmd.Start() |
| | | if err == nil { |
| | | pid = cmd.Process.Pid |
| | | procMap[pid] = &procInfo{cmd, env} |
| | | } |
| | | return pid, err |
| | | } |
| | | |
| | | // Config conf |
| | | // Config config |
| | | type Config struct { |
| | | Pid int |
| | | Options int |
| | |
| | | func sigChildHandler(notifications chan os.Signal) { |
| | | sigs := make(chan os.Signal, 3) |
| | | signal.Notify(sigs, syscall.SIGCHLD) |
| | | signal.Ignore(syscall.SIGPIPE) |
| | | |
| | | for { |
| | | sig := <-sigs |
| | |
| | | } /* End of function sigChildHandler. */ |
| | | |
| | | // Be a good parent - clean up behind the children. |
| | | func reapChildren(config Config, pidChan chan<- int) { |
| | | func reapChildren(config Config, info chan<- ExitInfo) { |
| | | notifications := make(chan os.Signal, 1) |
| | | |
| | | go sigChildHandler(notifications) |
| | |
| | | * Plants vs. Zombies!! |
| | | */ |
| | | pid, err := syscall.Wait4(pid, &wstatus, opts, nil) |
| | | pidChan <- pid |
| | | info <- ExitInfo{pid, wstatus.ExitStatus()} |
| | | for syscall.EINTR == err { |
| | | pid, err = syscall.Wait4(pid, &wstatus, opts, nil) |
| | | pidChan <- pid |
| | | info <- ExitInfo{pid, wstatus.ExitStatus()} |
| | | } |
| | | |
| | | if syscall.ECHILD == err { |
| | |
| | | // background inside a goroutine. |
| | | |
| | | // Reap reap |
| | | func Reap(pidChan chan<- int) { |
| | | func Reap(info chan<- ExitInfo) { |
| | | /* |
| | | * Only reap processes if we are taking over init's duties aka |
| | | * we are running as pid 1 inside a docker container. The default |
| | |
| | | Pid: -1, |
| | | Options: 0, |
| | | DisablePid1Check: true, |
| | | }, pidChan) |
| | | }, info) |
| | | |
| | | } /* End of [exported] function Reap. */ |
| | | |
| | |
| | | // The child processes are reaped in the background inside a goroutine. |
| | | |
| | | // Start start |
| | | func Start(config Config, pidChan chan<- int) { |
| | | func Start(config Config, info chan<- ExitInfo) { |
| | | /* |
| | | * Start the Reaper with configuration options. This allows you to |
| | | * reap processes even if the current pid isn't running as pid 1. |
| | |
| | | * of 'em all, either way we get to play the grim reaper. |
| | | * You will be missed, Terry Pratchett!! RIP |
| | | */ |
| | | go reapChildren(config, pidChan) |
| | | go reapChildren(config, info) |
| | | |
| | | } /* End of [exported] function Start. */ |
| | | |
| | | func waitForRestart(ctxt context.Context, pidChan <-chan int) { |
| | | |
| | | for { |
| | | select { |
| | | case <-ctxt.Done(): |
| | | return |
| | | case pid := <-pidChan: |
| | | restartProc(ctxt, pid) |
| | | default: |
| | | time.Sleep(3 * time.Second) |
| | | } |
| | | } |
| | | } |
| | |
| | | go 1.12 |
| | | |
| | | require ( |
| | | basic.com/libgowrapper/sdkstruct.git v0.0.0-20191220011601-e0b3d1f0183c |
| | | basic.com/valib/gogpu.git v0.0.0-20190711044327-62043b070865 |
| | | basic.com/valib/gopherdiscovery.git v0.0.0-20200113080951-9bccb7681924 // indirect |
| | | basic.com/valib/pubsub.git v0.0.0-20200116061307-c43a8e3e552e |
| | | github.com/amoghe/distillog v0.0.0-20180726233512-ae382b35b717 |
| | | github.com/natefinch/lumberjack v2.0.0+incompatible |
| | | github.com/olebedev/config v0.0.0-20190528211619-364964f3a8e4 |
New file |
| | |
| | | package main |
| | | |
| | | import ( |
| | | "context" |
| | | "time" |
| | | |
| | | "basic.com/valib/pubsub.git" |
| | | ) |
| | | |
| | | func wait(ctx context.Context, c chan pubsub.Message, out chan<- []byte) { |
| | | for { |
| | | select { |
| | | case <-ctx.Done(): |
| | | return |
| | | case msg := <-c: |
| | | out <- msg.Msg |
| | | default: |
| | | time.Sleep(time.Second) |
| | | } |
| | | } |
| | | } |
| | | |
| | | // Fetch Fetch from tcp://192.168.5.22:4005 |
| | | func Fetch(ctx context.Context, url, heartBeatURL string, mode int, processID string, out chan<- []byte, fn func(...interface{})) { |
| | | topics := []string{pubsub.Topic_Sdk} |
| | | p, err := pubsub.NewSubscriber(url, heartBeatURL, mode, topics, processID) |
| | | for { |
| | | if err == nil { |
| | | break |
| | | } |
| | | fn("libcomm.so, pubsub subscribe error: ", err) |
| | | time.Sleep(time.Second) |
| | | p, err = pubsub.NewSubscriber(url, heartBeatURL, mode, topics, processID) |
| | | } |
| | | |
| | | c := p.Recv() |
| | | |
| | | go wait(ctx, c, out) |
| | | } |
| | |
| | | basic.com/valib/gopherdiscovery.git v0.0.0-20200113080951-9bccb7681924 // indirect |
| | | basic.com/valib/pubsub.git v0.0.0-20200116061307-c43a8e3e552e |
| | | golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa // indirect |
| | | nanomsg.org/go-mangos v1.4.0 // indirect |
| | | nanomsg.org/go-mangos v1.4.0 |
| | | ) |
New file |
| | |
| | | package main |
| | | |
| | | import ( |
| | | "context" |
| | | "os" |
| | | "strings" |
| | | "time" |
| | | |
| | | "nanomsg.org/go-mangos" |
| | | "nanomsg.org/go-mangos/protocol/rep" |
| | | "nanomsg.org/go-mangos/protocol/req" |
| | | "nanomsg.org/go-mangos/transport/ipc" |
| | | "nanomsg.org/go-mangos/transport/tcp" |
| | | ) |
| | | |
| | | func request(url string, timeout int, fn func(...interface{})) mangos.Socket { |
| | | |
| | | var sock mangos.Socket |
| | | var err error |
| | | |
| | | for { |
| | | if sock, err = req.NewSocket(); err != nil { |
| | | fn("!!!!!!Notify can't get new request socket: ", err) |
| | | time.Sleep(time.Second) |
| | | } else { |
| | | break |
| | | } |
| | | } |
| | | |
| | | sock.AddTransport(ipc.NewTransport()) |
| | | sock.AddTransport(tcp.NewTransport()) |
| | | sock.SetOption(mangos.OptionRecvDeadline, time.Duration(timeout)*time.Second) |
| | | sock.SetOption(mangos.OptionSendDeadline, time.Duration(timeout)*time.Second) |
| | | |
| | | for { |
| | | if err = sock.Dial(url); err != nil { |
| | | fn("!!!!!!Notify can't dial request socket: ", err, "URL:", url) |
| | | time.Sleep(time.Second) |
| | | } else { |
| | | break |
| | | } |
| | | } |
| | | |
| | | return sock |
| | | } |
| | | |
| | | func notify(ctx context.Context, sock mangos.Socket, ch <-chan []byte, fn func(...interface{})) { |
| | | for { |
| | | select { |
| | | case <-ctx.Done(): |
| | | sock.Close() |
| | | return |
| | | case data := <-ch: |
| | | var ret []byte |
| | | var err error |
| | | |
| | | err = sock.Send(data) |
| | | for { |
| | | if err == nil { |
| | | break |
| | | } |
| | | fn("!!!!!!Notify Send To Slave ERROR: ", err) |
| | | time.Sleep(500 * time.Millisecond) |
| | | continue |
| | | } |
| | | |
| | | ret, err = sock.Recv() |
| | | for { |
| | | if err == nil { |
| | | fn("~~~~~Notify Recv From Slave: ", string(ret)) |
| | | break |
| | | } |
| | | fn("!!!!!!Notify Recv From Slave Error: ", err) |
| | | time.Sleep(500 * time.Microsecond) |
| | | continue |
| | | } |
| | | |
| | | default: |
| | | time.Sleep(time.Second) |
| | | } |
| | | } |
| | | } |
| | | |
| | | // Notify master sync notify to slave |
| | | func Notify(ctx context.Context, url string, ch <-chan []byte, fn func(...interface{})) context.CancelFunc { |
| | | rctx, cancel := context.WithCancel(ctx) |
| | | |
| | | sock := request(url, 2, fn) |
| | | |
| | | go notify(rctx, sock, ch, fn) |
| | | return cancel |
| | | } |
| | | |
| | | ////////////////////////////////////////////////////////////////// |
| | | |
| | | func rmExistedIpcName(url string) { |
| | | s := strings.Split(url, "://") |
| | | |
| | | if s[0] == "ipc" { |
| | | if _, err := os.Stat(s[1]); err == nil { |
| | | os.Remove(s[1]) |
| | | } else if !os.IsNotExist(err) { |
| | | os.Remove(s[1]) |
| | | } |
| | | } |
| | | } |
| | | |
| | | func reply(url string, timeout int, fn func(...interface{})) mangos.Socket { |
| | | rmExistedIpcName(url) |
| | | |
| | | var sock mangos.Socket |
| | | var err error |
| | | |
| | | for { |
| | | if sock, err = rep.NewSocket(); err != nil { |
| | | rmExistedIpcName(url) |
| | | fn("!!!!!!Notify can't get new reply socket: ", err) |
| | | time.Sleep(time.Second) |
| | | } else { |
| | | break |
| | | } |
| | | } |
| | | |
| | | sock.AddTransport(ipc.NewTransport()) |
| | | sock.AddTransport(tcp.NewTransport()) |
| | | sock.SetOption(mangos.OptionRecvDeadline, time.Duration(timeout)*time.Second) |
| | | sock.SetOption(mangos.OptionSendDeadline, time.Duration(timeout)*time.Second) |
| | | |
| | | for { |
| | | if err = sock.Listen(url); err != nil { |
| | | rmExistedIpcName(url) |
| | | |
| | | fn("!!!!!!Notify can't listen reply socket: ", err, "URL:", url) |
| | | time.Sleep(time.Second) |
| | | } else { |
| | | break |
| | | } |
| | | } |
| | | |
| | | return sock |
| | | } |
| | | |
| | | func notifiee(ctx context.Context, sock mangos.Socket, ch chan<- []byte, fn func(...interface{})) { |
| | | for { |
| | | select { |
| | | case <-ctx.Done(): |
| | | sock.Close() |
| | | return |
| | | default: |
| | | |
| | | msg, err := sock.Recv() |
| | | for { |
| | | if err == nil { |
| | | fn("~~~~~Notifiee Recv From Master: ", string(msg)) |
| | | break |
| | | } |
| | | fn("!!!!!!Notify Recv From Master Error: ", err) |
| | | time.Sleep(500 * time.Microsecond) |
| | | continue |
| | | } |
| | | |
| | | err = sock.Send([]byte("ok")) |
| | | for { |
| | | if err == nil { |
| | | break |
| | | } |
| | | fn("!!!!!!Notify Send To Master ERROR: ", err) |
| | | time.Sleep(500 * time.Millisecond) |
| | | continue |
| | | } |
| | | } |
| | | } |
| | | } |
| | | |
| | | // Notifiee slave sync recv notice from master |
| | | func Notifiee(ctx context.Context, url string, ch chan<- []byte, fn func(...interface{})) context.CancelFunc { |
| | | rctx, cancel := context.WithCancel(ctx) |
| | | |
| | | sock := reply(url, 2, fn) |
| | | |
| | | go notifiee(rctx, sock, ch, fn) |
| | | return cancel |
| | | } |
| | |
| | | } |
| | | } |
| | | if file == nil { |
| | | fmt.Println(`Read All Log Config Files Failed, Use Default, "./log/analysis-[type]"`) |
| | | fmt.Println(`Read All Log Config Files Failed, If -logit Use Default, "./log/analysis-[type]"`) |
| | | return |
| | | } |
| | | yamlString := string(file) |