zhangmeng
2020-01-21 9d9cd1d3b93613071d1dffc1c82c4515d2a65af6
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
package master
 
import (
    "context"
    "encoding/json"
    "os"
    "os/exec"
    "syscall"
    "time"
 
    "analysis/logo"
    "analysis/util"
)
 
const (
    opRemove = "remove"
    opAdd    = "add"
)
 
// Notice transit to slave
type Notice struct {
    Op      string   `json:"Op"`
    Content []string `json:"Content"`
}
 
type transit struct {
    chNotify chan<- []byte
    cancel   context.CancelFunc
}
 
// NamedProc 单个进程名字和服务通道
type NamedProc struct {
    // 进程名字
    Name string
    // 进程通道
    Channels []string
    // 进程runtime
    Env string
    // 进程config file path
    Config string
    // 进程param
    Param []string
}
 
// TypeProc 每个Type进程的参数
type TypeProc struct {
    // 进程类型FaceDetect/Yolo
    Typ string
    // 具名进程
    SNameProc []NamedProc
}
 
// Worker 单个进程服务
type Worker struct {
    pid   int
    cmd   *exec.Cmd
    info  *NamedProc
    trans *transit
}
 
// Daemon 监控的所有子进程
type Daemon struct {
    // 每个sdk类型启动的进程数量
    workers map[string][]*Worker
}
 
// NewDaemon 监控
func NewDaemon() *Daemon {
    return &Daemon{
        workers: make(map[string][]*Worker),
    }
}
 
//求交集
func intersect(slice1, slice2 []string) []string {
    m := make(map[string]int)
    nn := make([]string, 0)
    for _, v := range slice1 {
        m[v]++
    }
 
    for _, v := range slice2 {
        times, _ := m[v]
        if times == 1 {
            nn = append(nn, v)
        }
    }
    return nn
}
 
//求差集 slice1-并集
func difference(slice1, slice2 []string) []string {
    m := make(map[string]int)
    nn := make([]string, 0)
    inter := intersect(slice1, slice2)
    for _, v := range inter {
        m[v]++
    }
 
    for _, value := range slice1 {
        times, _ := m[value]
        if times == 0 {
            nn = append(nn, value)
        }
    }
    return nn
}
 
func removeWorker(w *Worker) {
    syscall.Kill(w.pid, syscall.SIGTERM)
    w.cmd.Wait()
    w.trans.cancel()
}
 
func (d *Daemon) rmWorkerWith(typ string) {
    if workers, ok := d.workers[typ]; ok {
        delete(d.workers, typ)
 
        for _, w := range workers {
            removeWorker(w)
        }
    }
}
 
func (d *Daemon) rmWorkerNoType(childs []TypeProc) {
    var newTypes []string
    for _, v := range childs {
        newTypes = append(newTypes, v.Typ)
    }
 
    var runTypes []string
    for k := range d.workers {
        runTypes = append(runTypes, k)
    }
 
    // 不存在于新信息中的type, remove
    rmTypes := difference(runTypes, newTypes)
    if len(rmTypes) > 0 {
        for _, v := range rmTypes {
            d.rmWorkerWith(v)
        }
    }
}
 
func (d *Daemon) rmWorkerNoNamed(workers []*Worker, procs []NamedProc) []*Worker {
    var newNames []string
    for _, v := range procs {
        newNames = append(newNames, v.Name)
    }
 
    var runNames []string
    for _, v := range workers {
        runNames = append(runNames, v.info.Name)
    }
 
    // 已经不需要存在进程,remove
    rmWorkers := difference(runNames, newNames)
    for _, v := range rmWorkers {
        for _, w := range workers {
            if v == w.info.Name {
                removeWorker(w)
            }
        }
    }
 
    // 保留已存在的进程
    stillWorks := intersect(runNames, newNames)
    var ret []*Worker
    for _, v := range stillWorks {
        for _, w := range workers {
            if v == w.info.Name {
                ret = append(ret, w)
            }
        }
    }
 
    return ret
}
 
//////////////////////////////////////////////////////////
 
func getNamedProc(typ string, childs []TypeProc) []NamedProc {
    for _, v := range childs {
        if v.Typ == typ {
            return v.SNameProc
        }
    }
    return nil
}
 
func getNamedProcInfo(name string, procs []NamedProc) *NamedProc {
    for _, v := range procs {
        if name == v.Name {
            return &v
        }
    }
    return nil
}
 
func (d *Daemon) channelChanged(ctx context.Context, typs []string, childs []TypeProc) {
    for _, s := range typs {
        // 存在这个类型的进程
        if workers, ok := d.workers[s]; ok {
            child := getNamedProc(s, childs)
            if child == nil {
                continue
            }
            var newNames []string
            for _, v := range child {
                newNames = append(newNames, v.Name)
            }
 
            var runNames []string
            for _, v := range workers {
                runNames = append(runNames, v.info.Name)
            }
 
            add := difference(newNames, runNames)
            for _, c := range child {
                for _, v := range add {
                    if c.Name == v {
                        d.startWorker(ctx, s, &c)
                    }
                }
            }
 
            adjust := intersect(runNames, newNames)
            for _, v := range adjust {
                proc := getNamedProcInfo(v, child)
                if proc == nil {
                    continue
                }
 
                for _, w := range workers {
                    if v == w.info.Name {
                        // 找到了对应名字的进程,首先求不需要再运行的通道
                        removes := difference(w.info.Channels, proc.Channels)
                        if len(removes) > 0 {
                            // 通知子进程关闭通道
                            n := Notice{
                                Op:      opRemove,
                                Content: removes,
                            }
                            if d, err := json.Marshal(n); err == nil {
                                w.trans.chNotify <- d
                            }
                        }
 
                        // 其次求出新增的通道
                        adds := difference(proc.Channels, w.info.Channels)
                        if len(adds) > 0 {
                            // 通知子进程打开通道
                            n := Notice{
                                Op:      opAdd,
                                Content: adds,
                            }
                            if d, err := json.Marshal(n); err == nil {
                                w.trans.chNotify <- d
                            }
                        }
 
                    }
                }
            }
        }
    }
}
 
func (d *Daemon) startNewWorker(ctx context.Context, child TypeProc) {
    for _, v := range child.SNameProc {
        d.startWorker(ctx, child.Typ, &v)
    }
}
 
func (d *Daemon) adjustWorker(ctx context.Context, childs []TypeProc) {
    var newTypes []string
    for _, v := range childs {
        newTypes = append(newTypes, v.Typ)
    }
 
    var runTypes []string
    for k := range d.workers {
        runTypes = append(runTypes, k)
    }
 
    // 新类型添加
    addWorkers := difference(newTypes, runTypes)
    for _, a := range addWorkers {
        for _, v := range childs {
            if a == v.Typ {
                // start new type proc
                d.startNewWorker(ctx, v)
            }
        }
    }
 
    stillWorkers := intersect(newTypes, runTypes)
    // 调整已存在的进程的通道
    d.channelChanged(ctx, stillWorkers, childs)
}
 
func (d *Daemon) updateWorker(ctx context.Context, childs []TypeProc) {
    // 新的进程信息,首先删除掉不再运行的类型
    d.rmWorkerNoType(childs)
    // 按名字删除特定类型中不再运行的进程
    for _, v := range childs {
        if workers, ok := d.workers[v.Typ]; ok {
            nWorkers := d.rmWorkerNoNamed(workers, v.SNameProc)
            d.workers[v.Typ] = nWorkers
        }
    }
 
    d.adjustWorker(ctx, childs)
}
 
// Watch watch
func (d *Daemon) Watch(ctx context.Context, ch <-chan []TypeProc) {
    chExit := make(chan ExitInfo, 32)
    go Reap(chExit)
 
    for {
        select {
        case <-ctx.Done():
            return
        case i := <-chExit:
            d.reWorker(ctx, &i)
        case childs := <-ch:
            d.updateWorker(ctx, childs)
        default:
            time.Sleep(time.Second)
        }
    }
}
 
func (d *Daemon) reWorker(ctx context.Context, info *ExitInfo) {
    // 有退出的进程,查看是否在运行进程中,拉起
    for i, workers := range d.workers {
        found := false
        for j, w := range workers {
            if w.pid == info.Pid {
                w = d.restartWorker(ctx, w)
                d.workers[i][j] = w
                found = true
                break
            }
        }
        if found {
            break
        }
    }
}
 
func (d *Daemon) restartWorker(ctx context.Context, w *Worker) *Worker {
    w.cmd.Wait()
    w.cmd = runProc(ctx, w.cmd.Path, w.info.Env, w.cmd.Args[1:])
    w.pid = w.cmd.Process.Pid
    return w
}
 
func (d *Daemon) startWorker(ctx context.Context, typ string, info *NamedProc) {
 
    ipcID := "analysis-" + typ + "-" + info.Name
 
    args := []string{
        `-role=slave`,
        "-sdk=" + typ,
        "-id=" + ipcID,
        "-" + util.ConfigPath + "=" + info.Config,
    }
 
    args = append(args, info.Param...)
    cmd := runProc(ctx, "./analysis", info.Env, args)
 
    if cmd == nil {
        logo.Errorf("ANALYSIS START SLAVE PROC %s IPC: %s Failed\n", typ, ipcID)
        return
    }
    logo.Infof("START SDK %s ID %s PID %d Env: %s\n", typ, ipcID, cmd.Process.Pid, info.Env)
    logo.Infoln(cmd.Args)
 
    ch := make(chan []byte, 3)
    cancel := fnNotify(ctx, ipcID, ch, logo.Infoln)
 
    w := &Worker{
        pid:  cmd.Process.Pid,
        cmd:  cmd,
        info: info,
        trans: &transit{
            chNotify: ch,
            cancel:   cancel,
        },
    }
    d.workers[typ] = append(d.workers[typ], w)
}
 
func runProc(ctxt context.Context, bin, env string, args []string) *exec.Cmd {
    cmd := exec.CommandContext(ctxt, bin, args...)
    rEnv := ""
    if len(env) != 0 {
        runtime := "LD_LIBRARY_PATH"
        rEnv = runtime + "=" + env
        logo.Infoln("Env String: ", rEnv)
 
        // remove os environ ld
        old := os.Getenv(runtime)
        os.Unsetenv(runtime)
        cmd.Env = os.Environ()
        cmd.Env = append(cmd.Env, rEnv)
        os.Setenv(runtime, old)
    }
 
    //debug
    cmd.Stdout = os.Stdout
    cmd.Stderr = os.Stderr
    cmd.SysProcAttr = &syscall.SysProcAttr{Pdeathsig: syscall.SIGTERM}
 
    if err := cmd.Start(); err == nil {
        return cmd
    }
    return nil
}