package master import ( "context" "encoding/json" "os" "os/exec" "syscall" "time" "analysis/logo" "analysis/util" ) const ( opRemove = "remove" opAdd = "add" ) // Notice transit to slave type Notice struct { Op string `json:"Op"` Content []string `json:"Content"` } type transit struct { chNotify chan<- []byte cancel context.CancelFunc } // NamedProc 单个进程名字和服务通道 type NamedProc struct { // 进程名字 Name string // 进程通道 Channels []string // 进程runtime Env string // 进程config file path Config string // 进程param Param []string } // TypeProc 每个Type进程的参数 type TypeProc struct { // 进程类型FaceDetect/Yolo Typ string // 具名进程 SNameProc []NamedProc } // Worker 单个进程服务 type Worker struct { pid int cmd *exec.Cmd info *NamedProc trans *transit } // Daemon 监控的所有子进程 type Daemon struct { // 每个sdk类型启动的进程数量 workers map[string][]*Worker } // NewDaemon 监控 func NewDaemon() *Daemon { return &Daemon{ workers: make(map[string][]*Worker), } } //求交集 func intersect(slice1, slice2 []string) []string { m := make(map[string]int) nn := make([]string, 0) for _, v := range slice1 { m[v]++ } for _, v := range slice2 { times, _ := m[v] if times == 1 { nn = append(nn, v) } } return nn } //求差集 slice1-并集 func difference(slice1, slice2 []string) []string { m := make(map[string]int) nn := make([]string, 0) inter := intersect(slice1, slice2) for _, v := range inter { m[v]++ } for _, value := range slice1 { times, _ := m[value] if times == 0 { nn = append(nn, value) } } return nn } func removeWorker(w *Worker) { syscall.Kill(w.pid, syscall.SIGTERM) w.cmd.Wait() w.trans.cancel() } func (d *Daemon) rmWorkerWith(typ string) { if workers, ok := d.workers[typ]; ok { delete(d.workers, typ) for _, w := range workers { removeWorker(w) } } } func (d *Daemon) rmWorkerNoType(childs []TypeProc) { var newTypes []string for _, v := range childs { newTypes = append(newTypes, v.Typ) } var runTypes []string for k := range d.workers { runTypes = append(runTypes, k) } // 不存在于新信息中的type, remove rmTypes := difference(runTypes, newTypes) if len(rmTypes) > 0 { for _, v := range rmTypes { d.rmWorkerWith(v) } } } func (d *Daemon) rmWorkerNoNamed(workers []*Worker, procs []NamedProc) []*Worker { var newNames []string for _, v := range procs { newNames = append(newNames, v.Name) } var runNames []string for _, v := range workers { runNames = append(runNames, v.info.Name) } // 已经不需要存在进程,remove rmWorkers := difference(runNames, newNames) for _, v := range rmWorkers { for _, w := range workers { if v == w.info.Name { removeWorker(w) } } } // 保留已存在的进程 stillWorks := intersect(runNames, newNames) var ret []*Worker for _, v := range stillWorks { for _, w := range workers { if v == w.info.Name { ret = append(ret, w) } } } return ret } ////////////////////////////////////////////////////////// func getNamedProc(typ string, childs []TypeProc) []NamedProc { for _, v := range childs { if v.Typ == typ { return v.SNameProc } } return nil } func getNamedProcInfo(name string, procs []NamedProc) *NamedProc { for _, v := range procs { if name == v.Name { return &v } } return nil } func (d *Daemon) channelChanged(ctx context.Context, typs []string, childs []TypeProc) { for _, s := range typs { // 存在这个类型的进程 if workers, ok := d.workers[s]; ok { child := getNamedProc(s, childs) if child == nil { continue } var newNames []string for _, v := range child { newNames = append(newNames, v.Name) } var runNames []string for _, v := range workers { runNames = append(runNames, v.info.Name) } add := difference(newNames, runNames) for _, c := range child { for _, v := range add { if c.Name == v { d.startWorker(ctx, s, &c) } } } adjust := intersect(runNames, newNames) for _, v := range adjust { proc := getNamedProcInfo(v, child) if proc == nil { continue } for _, w := range workers { if v == w.info.Name { // 找到了对应名字的进程,首先求不需要再运行的通道 removes := difference(w.info.Channels, proc.Channels) if len(removes) > 0 { // 通知子进程关闭通道 n := Notice{ Op: opRemove, Content: removes, } if d, err := json.Marshal(n); err == nil { w.trans.chNotify <- d } } // 其次求出新增的通道 adds := difference(proc.Channels, w.info.Channels) if len(adds) > 0 { // 通知子进程打开通道 n := Notice{ Op: opAdd, Content: adds, } if d, err := json.Marshal(n); err == nil { w.trans.chNotify <- d } } } } } } } } func (d *Daemon) startNewWorker(ctx context.Context, child TypeProc) { for _, v := range child.SNameProc { d.startWorker(ctx, child.Typ, &v) } } func (d *Daemon) adjustWorker(ctx context.Context, childs []TypeProc) { var newTypes []string for _, v := range childs { newTypes = append(newTypes, v.Typ) } var runTypes []string for k := range d.workers { runTypes = append(runTypes, k) } // 新类型添加 addWorkers := difference(newTypes, runTypes) for _, a := range addWorkers { for _, v := range childs { if a == v.Typ { // start new type proc d.startNewWorker(ctx, v) } } } stillWorkers := intersect(newTypes, runTypes) // 调整已存在的进程的通道 d.channelChanged(ctx, stillWorkers, childs) } func (d *Daemon) updateWorker(ctx context.Context, childs []TypeProc) { // 新的进程信息,首先删除掉不再运行的类型 d.rmWorkerNoType(childs) // 按名字删除特定类型中不再运行的进程 for _, v := range childs { if workers, ok := d.workers[v.Typ]; ok { nWorkers := d.rmWorkerNoNamed(workers, v.SNameProc) d.workers[v.Typ] = nWorkers } } d.adjustWorker(ctx, childs) } // Watch watch func (d *Daemon) Watch(ctx context.Context, ch <-chan []TypeProc) { chExit := make(chan ExitInfo, 32) go Reap(chExit) for { select { case <-ctx.Done(): return case i := <-chExit: d.reWorker(ctx, &i) case childs := <-ch: d.updateWorker(ctx, childs) default: time.Sleep(time.Second) } } } func (d *Daemon) reWorker(ctx context.Context, info *ExitInfo) { // 有退出的进程,查看是否在运行进程中,拉起 for i, workers := range d.workers { found := false for j, w := range workers { if w.pid == info.Pid { w = d.restartWorker(ctx, w) d.workers[i][j] = w found = true break } } if found { break } } } func (d *Daemon) restartWorker(ctx context.Context, w *Worker) *Worker { w.cmd.Wait() w.cmd = runProc(ctx, w.cmd.Path, w.info.Env, w.cmd.Args[1:]) w.pid = w.cmd.Process.Pid return w } func (d *Daemon) startWorker(ctx context.Context, typ string, info *NamedProc) { ipcID := "analysis-" + typ + "-" + info.Name args := []string{ `-role=slave`, "-sdk=" + typ, "-id=" + ipcID, "-" + util.ConfigPath + "=" + info.Config, } args = append(args, info.Param...) cmd := runProc(ctx, "./analysis", info.Env, args) if cmd == nil { logo.Errorf("ANALYSIS START SLAVE PROC %s IPC: %s Failed\n", typ, ipcID) return } logo.Infof("START SDK %s ID %s PID %d Env: %s\n", typ, ipcID, cmd.Process.Pid, info.Env) logo.Infoln(cmd.Args) ch := make(chan []byte, 3) cancel := fnNotify(ctx, ipcID, ch, logo.Infoln) w := &Worker{ pid: cmd.Process.Pid, cmd: cmd, info: info, trans: &transit{ chNotify: ch, cancel: cancel, }, } d.workers[typ] = append(d.workers[typ], w) } func runProc(ctxt context.Context, bin, env string, args []string) *exec.Cmd { cmd := exec.CommandContext(ctxt, bin, args...) rEnv := "" if len(env) != 0 { runtime := "LD_LIBRARY_PATH" rEnv = runtime + "=" + env logo.Infoln("Env String: ", rEnv) // remove os environ ld old := os.Getenv(runtime) os.Unsetenv(runtime) cmd.Env = os.Environ() cmd.Env = append(cmd.Env, rEnv) os.Setenv(runtime, old) } //debug cmd.Stdout = os.Stdout cmd.Stderr = os.Stderr cmd.SysProcAttr = &syscall.SysProcAttr{Pdeathsig: syscall.SIGTERM} if err := cmd.Start(); err == nil { return cmd } return nil }