package master
|
|
import (
|
"context"
|
"encoding/json"
|
"os"
|
"os/exec"
|
"syscall"
|
"time"
|
|
"analysis/logo"
|
"analysis/util"
|
)
|
|
const (
|
opRemove = "remove"
|
opAdd = "add"
|
)
|
|
// Notice transit to slave
|
type Notice struct {
|
Op string `json:"Op"`
|
Content []string `json:"Content"`
|
}
|
|
type transit struct {
|
chNotify chan<- []byte
|
cancel context.CancelFunc
|
}
|
|
// NamedProc 单个进程名字和服务通道
|
type NamedProc struct {
|
// 进程名字
|
Name string
|
// 进程通道
|
Channels []string
|
// 进程runtime
|
Env string
|
// 进程config file path
|
Config string
|
// 进程param
|
Param []string
|
}
|
|
// TypeProc 每个Type进程的参数
|
type TypeProc struct {
|
// 进程类型FaceDetect/Yolo
|
Typ string
|
// 具名进程
|
SNameProc []NamedProc
|
}
|
|
// Worker 单个进程服务
|
type Worker struct {
|
pid int
|
cmd *exec.Cmd
|
info *NamedProc
|
trans *transit
|
}
|
|
// Daemon 监控的所有子进程
|
type Daemon struct {
|
// 每个sdk类型启动的进程数量
|
workers map[string][]*Worker
|
}
|
|
// NewDaemon 监控
|
func NewDaemon() *Daemon {
|
return &Daemon{
|
workers: make(map[string][]*Worker),
|
}
|
}
|
|
//求交集
|
func intersect(slice1, slice2 []string) []string {
|
m := make(map[string]int)
|
nn := make([]string, 0)
|
for _, v := range slice1 {
|
m[v]++
|
}
|
|
for _, v := range slice2 {
|
times, _ := m[v]
|
if times == 1 {
|
nn = append(nn, v)
|
}
|
}
|
return nn
|
}
|
|
//求差集 slice1-并集
|
func difference(slice1, slice2 []string) []string {
|
m := make(map[string]int)
|
nn := make([]string, 0)
|
inter := intersect(slice1, slice2)
|
for _, v := range inter {
|
m[v]++
|
}
|
|
for _, value := range slice1 {
|
times, _ := m[value]
|
if times == 0 {
|
nn = append(nn, value)
|
}
|
}
|
return nn
|
}
|
|
func removeWorker(w *Worker) {
|
syscall.Kill(w.pid, syscall.SIGTERM)
|
w.cmd.Wait()
|
w.trans.cancel()
|
}
|
|
func (d *Daemon) rmWorkerWith(typ string) {
|
if workers, ok := d.workers[typ]; ok {
|
delete(d.workers, typ)
|
|
for _, w := range workers {
|
removeWorker(w)
|
}
|
}
|
}
|
|
func (d *Daemon) rmWorkerNoType(childs []TypeProc) {
|
var newTypes []string
|
for _, v := range childs {
|
newTypes = append(newTypes, v.Typ)
|
}
|
|
var runTypes []string
|
for k := range d.workers {
|
runTypes = append(runTypes, k)
|
}
|
|
// 不存在于新信息中的type, remove
|
rmTypes := difference(runTypes, newTypes)
|
if len(rmTypes) > 0 {
|
for _, v := range rmTypes {
|
d.rmWorkerWith(v)
|
}
|
}
|
}
|
|
func (d *Daemon) rmWorkerNoNamed(workers []*Worker, procs []NamedProc) []*Worker {
|
var newNames []string
|
for _, v := range procs {
|
newNames = append(newNames, v.Name)
|
}
|
|
var runNames []string
|
for _, v := range workers {
|
runNames = append(runNames, v.info.Name)
|
}
|
|
// 已经不需要存在进程,remove
|
rmWorkers := difference(runNames, newNames)
|
for _, v := range rmWorkers {
|
for _, w := range workers {
|
if v == w.info.Name {
|
removeWorker(w)
|
}
|
}
|
}
|
|
// 保留已存在的进程
|
stillWorks := intersect(runNames, newNames)
|
var ret []*Worker
|
for _, v := range stillWorks {
|
for _, w := range workers {
|
if v == w.info.Name {
|
ret = append(ret, w)
|
}
|
}
|
}
|
|
return ret
|
}
|
|
//////////////////////////////////////////////////////////
|
|
func getNamedProc(typ string, childs []TypeProc) []NamedProc {
|
for _, v := range childs {
|
if v.Typ == typ {
|
return v.SNameProc
|
}
|
}
|
return nil
|
}
|
|
func getNamedProcInfo(name string, procs []NamedProc) *NamedProc {
|
for _, v := range procs {
|
if name == v.Name {
|
return &v
|
}
|
}
|
return nil
|
}
|
|
func (d *Daemon) channelChanged(ctx context.Context, typs []string, childs []TypeProc) {
|
for _, s := range typs {
|
// 存在这个类型的进程
|
if workers, ok := d.workers[s]; ok {
|
child := getNamedProc(s, childs)
|
if child == nil {
|
continue
|
}
|
var newNames []string
|
for _, v := range child {
|
newNames = append(newNames, v.Name)
|
}
|
|
var runNames []string
|
for _, v := range workers {
|
runNames = append(runNames, v.info.Name)
|
}
|
|
add := difference(newNames, runNames)
|
for _, c := range child {
|
for _, v := range add {
|
if c.Name == v {
|
d.startWorker(ctx, s, &c)
|
}
|
}
|
}
|
|
adjust := intersect(runNames, newNames)
|
for _, v := range adjust {
|
proc := getNamedProcInfo(v, child)
|
if proc == nil {
|
continue
|
}
|
|
for _, w := range workers {
|
if v == w.info.Name {
|
// 找到了对应名字的进程,首先求不需要再运行的通道
|
removes := difference(w.info.Channels, proc.Channels)
|
if len(removes) > 0 {
|
// 通知子进程关闭通道
|
n := Notice{
|
Op: opRemove,
|
Content: removes,
|
}
|
if d, err := json.Marshal(n); err == nil {
|
w.trans.chNotify <- d
|
}
|
}
|
|
// 其次求出新增的通道
|
adds := difference(proc.Channels, w.info.Channels)
|
if len(adds) > 0 {
|
// 通知子进程打开通道
|
n := Notice{
|
Op: opAdd,
|
Content: adds,
|
}
|
if d, err := json.Marshal(n); err == nil {
|
w.trans.chNotify <- d
|
}
|
}
|
|
}
|
}
|
}
|
}
|
}
|
}
|
|
func (d *Daemon) startNewWorker(ctx context.Context, child TypeProc) {
|
for _, v := range child.SNameProc {
|
d.startWorker(ctx, child.Typ, &v)
|
}
|
}
|
|
func (d *Daemon) adjustWorker(ctx context.Context, childs []TypeProc) {
|
var newTypes []string
|
for _, v := range childs {
|
newTypes = append(newTypes, v.Typ)
|
}
|
|
var runTypes []string
|
for k := range d.workers {
|
runTypes = append(runTypes, k)
|
}
|
|
// 新类型添加
|
addWorkers := difference(newTypes, runTypes)
|
for _, a := range addWorkers {
|
for _, v := range childs {
|
if a == v.Typ {
|
// start new type proc
|
d.startNewWorker(ctx, v)
|
}
|
}
|
}
|
|
stillWorkers := intersect(newTypes, runTypes)
|
// 调整已存在的进程的通道
|
d.channelChanged(ctx, stillWorkers, childs)
|
}
|
|
func (d *Daemon) updateWorker(ctx context.Context, childs []TypeProc) {
|
// 新的进程信息,首先删除掉不再运行的类型
|
d.rmWorkerNoType(childs)
|
// 按名字删除特定类型中不再运行的进程
|
for _, v := range childs {
|
if workers, ok := d.workers[v.Typ]; ok {
|
nWorkers := d.rmWorkerNoNamed(workers, v.SNameProc)
|
d.workers[v.Typ] = nWorkers
|
}
|
}
|
|
d.adjustWorker(ctx, childs)
|
}
|
|
// Watch watch
|
func (d *Daemon) Watch(ctx context.Context, ch <-chan []TypeProc) {
|
chExit := make(chan ExitInfo, 32)
|
go Reap(chExit)
|
|
for {
|
select {
|
case <-ctx.Done():
|
return
|
case i := <-chExit:
|
d.reWorker(ctx, &i)
|
case childs := <-ch:
|
d.updateWorker(ctx, childs)
|
default:
|
time.Sleep(time.Second)
|
}
|
}
|
}
|
|
func (d *Daemon) reWorker(ctx context.Context, info *ExitInfo) {
|
// 有退出的进程,查看是否在运行进程中,拉起
|
for i, workers := range d.workers {
|
found := false
|
for j, w := range workers {
|
if w.pid == info.Pid {
|
w = d.restartWorker(ctx, w)
|
d.workers[i][j] = w
|
found = true
|
break
|
}
|
}
|
if found {
|
break
|
}
|
}
|
}
|
|
func (d *Daemon) restartWorker(ctx context.Context, w *Worker) *Worker {
|
w.cmd.Wait()
|
w.cmd = runProc(ctx, w.cmd.Path, w.info.Env, w.cmd.Args[1:])
|
w.pid = w.cmd.Process.Pid
|
return w
|
}
|
|
func (d *Daemon) startWorker(ctx context.Context, typ string, info *NamedProc) {
|
|
ipcID := "analysis-" + typ + "-" + info.Name
|
|
args := []string{
|
`-role=slave`,
|
"-sdk=" + typ,
|
"-id=" + ipcID,
|
"-" + util.ConfigPath + "=" + info.Config,
|
}
|
|
args = append(args, info.Param...)
|
cmd := runProc(ctx, "./analysis", info.Env, args)
|
|
if cmd == nil {
|
logo.Errorf("ANALYSIS START SLAVE PROC %s IPC: %s Failed\n", typ, ipcID)
|
return
|
}
|
logo.Infof("START SDK %s ID %s PID %d Env: %s\n", typ, ipcID, cmd.Process.Pid, info.Env)
|
logo.Infoln(cmd.Args)
|
|
ch := make(chan []byte, 3)
|
cancel := fnNotify(ctx, ipcID, ch, logo.Infoln)
|
|
w := &Worker{
|
pid: cmd.Process.Pid,
|
cmd: cmd,
|
info: info,
|
trans: &transit{
|
chNotify: ch,
|
cancel: cancel,
|
},
|
}
|
d.workers[typ] = append(d.workers[typ], w)
|
}
|
|
func runProc(ctxt context.Context, bin, env string, args []string) *exec.Cmd {
|
cmd := exec.CommandContext(ctxt, bin, args...)
|
rEnv := ""
|
if len(env) != 0 {
|
runtime := "LD_LIBRARY_PATH"
|
rEnv = runtime + "=" + env
|
logo.Infoln("Env String: ", rEnv)
|
|
// remove os environ ld
|
old := os.Getenv(runtime)
|
os.Unsetenv(runtime)
|
cmd.Env = os.Environ()
|
cmd.Env = append(cmd.Env, rEnv)
|
os.Setenv(runtime, old)
|
}
|
|
//debug
|
cmd.Stdout = os.Stdout
|
cmd.Stderr = os.Stderr
|
cmd.SysProcAttr = &syscall.SysProcAttr{Pdeathsig: syscall.SIGTERM}
|
|
if err := cmd.Start(); err == nil {
|
return cmd
|
}
|
return nil
|
}
|