zhangmeng
2020-01-21 e6a5be3714b70236d84f25be2ce858c3d7e379d8
add recv sdk proc info from dispath
2个文件已删除
3个文件已添加
5个文件已修改
1005 ■■■■ 已修改文件
app/master/daemon.go 413 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
app/master/dbfetcher.go 35 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
app/master/master.go 211 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
app/master/reaper.go 102 ●●●● 补丁 | 查看 | 原始文档 | blame | 历史
go.mod 3 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
libcomm/db.go 15 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
libcomm/fetcher.go 39 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
libcomm/go.mod 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
libcomm/notify.go 183 ●●●●● 补丁 | 查看 | 原始文档 | blame | 历史
util/common.go 2 ●●● 补丁 | 查看 | 原始文档 | blame | 历史
app/master/daemon.go
New file
@@ -0,0 +1,413 @@
package master
import (
    "context"
    "encoding/json"
    "os"
    "os/exec"
    "syscall"
    "time"
    "analysis/logo"
    "analysis/util"
)
// NamedProc 单个进程名字和服务通道
type NamedProc struct {
    // 进程名字
    Name string
    // 进程通道
    Channels []string
    // 进程runtime
    Env string
    // 进程config file path
    Config string
    // 进程param
    Param []string
}
// TypeProc 每个Type进程的参数
type TypeProc struct {
    // 进程类型FaceDetect/Yolo
    Typ string
    // 具名进程
    SNameProc []NamedProc
}
// Notice transit to slave
type Notice struct {
    Op      string   `json:"Op"`
    Content []string `json:"Content"`
}
type transit struct {
    chNotify chan<- []byte
    cancel   context.CancelFunc
}
// Worker 单个进程服务
type Worker struct {
    pid   int
    cmd   *exec.Cmd
    info  *NamedProc
    trans *transit
}
// Daemon 监控的所有子进程
type Daemon struct {
    // 每个sdk类型启动的进程数量
    workers map[string][]*Worker
}
// NewDaemon 监控
func NewDaemon() *Daemon {
    return &Daemon{
        workers: make(map[string][]*Worker),
    }
}
//求交集
func intersect(slice1, slice2 []string) []string {
    m := make(map[string]int)
    nn := make([]string, 0)
    for _, v := range slice1 {
        m[v]++
    }
    for _, v := range slice2 {
        times, _ := m[v]
        if times == 1 {
            nn = append(nn, v)
        }
    }
    return nn
}
//求差集 slice1-并集
func difference(slice1, slice2 []string) []string {
    m := make(map[string]int)
    nn := make([]string, 0)
    inter := intersect(slice1, slice2)
    for _, v := range inter {
        m[v]++
    }
    for _, value := range slice1 {
        times, _ := m[value]
        if times == 0 {
            nn = append(nn, value)
        }
    }
    return nn
}
func removeWorker(w *Worker) {
    syscall.Kill(w.pid, syscall.SIGTERM)
    w.cmd.Wait()
    w.trans.cancel()
}
func (d *Daemon) rmWorkerWith(typ string) {
    if workers, ok := d.workers[typ]; ok {
        delete(d.workers, typ)
        for _, w := range workers {
            removeWorker(w)
        }
    }
}
func (d *Daemon) rmWorkerNoType(childs []TypeProc) {
    var newTypes []string
    for _, v := range childs {
        newTypes = append(newTypes, v.Typ)
    }
    var runTypes []string
    for k := range d.workers {
        runTypes = append(runTypes, k)
    }
    // 不存在于新信息中的type, remove
    rmTypes := difference(runTypes, newTypes)
    if len(rmTypes) > 0 {
        for _, v := range rmTypes {
            d.rmWorkerWith(v)
        }
    }
}
func (d *Daemon) rmWorkerNoNamed(workers []*Worker, procs []NamedProc) []*Worker {
    var newNames []string
    for _, v := range procs {
        newNames = append(newNames, v.Name)
    }
    var runNames []string
    for _, v := range workers {
        runNames = append(runNames, v.info.Name)
    }
    // 已经不需要存在进程,remove
    rmWorkers := difference(runNames, newNames)
    for _, v := range rmWorkers {
        for _, w := range workers {
            if v == w.info.Name {
                removeWorker(w)
            }
        }
    }
    // 保留已存在的进程
    stillWorks := intersect(runNames, newNames)
    var ret []*Worker
    for _, v := range stillWorks {
        for _, w := range workers {
            if v == w.info.Name {
                ret = append(ret, w)
            }
        }
    }
    return ret
}
//////////////////////////////////////////////////////////
func getNamedProc(typ string, childs []TypeProc) []NamedProc {
    for _, v := range childs {
        if v.Typ == typ {
            return v.SNameProc
        }
    }
    return nil
}
func getNamedProcInfo(name string, procs []NamedProc) *NamedProc {
    for _, v := range procs {
        if name == v.Name {
            return &v
        }
    }
    return nil
}
func (d *Daemon) channelChanged(ctx context.Context, typs []string, childs []TypeProc) {
    for _, s := range typs {
        // 存在这个类型的进程
        if workers, ok := d.workers[s]; ok {
            child := getNamedProc(s, childs)
            if child == nil {
                continue
            }
            var newNames []string
            for _, v := range child {
                newNames = append(newNames, v.Name)
            }
            var runNames []string
            for _, v := range workers {
                runNames = append(runNames, v.info.Name)
            }
            add := difference(newNames, runNames)
            for _, c := range child {
                for _, v := range add {
                    if c.Name == v {
                        d.startWorker(ctx, s, &c)
                    }
                }
            }
            adjust := intersect(runNames, newNames)
            for _, v := range adjust {
                proc := getNamedProcInfo(v, child)
                if proc == nil {
                    continue
                }
                for _, w := range workers {
                    if v == w.info.Name {
                        // 找到了对应名字的进程,首先求不需要再运行的通道
                        var notice *Notice
                        removes := difference(w.info.Channels, proc.Channels)
                        if len(removes) > 0 {
                            // 通知子进程关闭通道
                            notice = &Notice{
                                Op:      "remove",
                                Content: removes,
                            }
                        }
                        // 其次求出新增的通道
                        adds := difference(proc.Channels, w.info.Channels)
                        if len(adds) > 0 {
                            // 通知子进程打开通道
                            notice = &Notice{
                                Op:      "add",
                                Content: adds,
                            }
                        }
                        if notice != nil {
                            if d, err := json.Marshal(*notice); err == nil {
                                w.trans.chNotify <- d
                            }
                        }
                    }
                }
            }
        }
    }
}
func (d *Daemon) startNewWorker(ctx context.Context, child TypeProc) {
    for _, v := range child.SNameProc {
        d.startWorker(ctx, child.Typ, &v)
    }
}
func (d *Daemon) adjustWorker(ctx context.Context, childs []TypeProc) {
    var newTypes []string
    for _, v := range childs {
        newTypes = append(newTypes, v.Typ)
    }
    var runTypes []string
    for k := range d.workers {
        runTypes = append(runTypes, k)
    }
    // 新类型添加
    addWorkers := difference(newTypes, runTypes)
    for _, a := range addWorkers {
        for _, v := range childs {
            if a == v.Typ {
                // start new type proc
                d.startNewWorker(ctx, v)
            }
        }
    }
    stillWorkers := intersect(newTypes, runTypes)
    // 调整已存在的进程的通道
    d.channelChanged(ctx, stillWorkers, childs)
}
func (d *Daemon) updateWorker(ctx context.Context, childs []TypeProc) {
    // 新的进程信息,首先删除掉不再运行的类型
    d.rmWorkerNoType(childs)
    // 按名字删除特定类型中不再运行的进程
    for _, v := range childs {
        if workers, ok := d.workers[v.Typ]; ok {
            nWorkers := d.rmWorkerNoNamed(workers, v.SNameProc)
            d.workers[v.Typ] = nWorkers
        }
    }
    d.adjustWorker(ctx, childs)
}
// Watch watch
func (d *Daemon) Watch(ctx context.Context, ch <-chan []TypeProc) {
    chExit := make(chan ExitInfo, 32)
    go Reap(chExit)
    for {
        select {
        case <-ctx.Done():
            return
        case i := <-chExit:
            d.reWorker(ctx, &i)
        case childs := <-ch:
            d.updateWorker(ctx, childs)
        default:
            time.Sleep(time.Second)
        }
    }
}
func (d *Daemon) reWorker(ctx context.Context, info *ExitInfo) {
    // 有退出的进程,查看是否在运行进程中,拉起
    for i, workers := range d.workers {
        found := false
        for j, w := range workers {
            if w.pid == info.Pid {
                w = d.restartWorker(ctx, w)
                d.workers[i][j] = w
                found = true
                break
            }
        }
        if found {
            break
        }
    }
}
func (d *Daemon) restartWorker(ctx context.Context, w *Worker) *Worker {
    w.cmd.Wait()
    w.cmd = runProc(ctx, w.cmd.Path, w.info.Env, w.cmd.Args[1:])
    w.pid = w.cmd.Process.Pid
    return w
}
func (d *Daemon) startWorker(ctx context.Context, typ string, info *NamedProc) {
    ipcID := "analysis-" + typ + "-" + info.Name
    args := []string{
        `-role=slave`,
        "-sdk=" + typ,
        "-id=" + ipcID,
        "-" + util.ConfigPath + "=" + info.Config,
    }
    args = append(args, info.Param...)
    cmd := runProc(ctx, "./analysis", info.Env, args)
    if cmd == nil {
        logo.Errorf("ANALYSIS START SLAVE PROC %s IPC: %s Failed\n", typ, ipcID)
    }
    logo.Infof("START SDK %s ID %s PID %d Env: %s\n", typ, ipcID, cmd.Process.Pid, info.Env)
    ch := make(chan []byte, 3)
    cancel := fnNotify(ctx, ipcID, ch, logo.Infoln)
    w := &Worker{
        pid:  cmd.Process.Pid,
        cmd:  cmd,
        info: info,
        trans: &transit{
            chNotify: ch,
            cancel:   cancel,
        },
    }
    d.workers[typ] = append(d.workers[typ], w)
}
func runProc(ctxt context.Context, bin, env string, args []string) *exec.Cmd {
    cmd := exec.CommandContext(ctxt, bin, args...)
    rEnv := ""
    if len(env) != 0 {
        runtime := "LD_LIBRARY_PATH"
        rEnv = runtime + "=" + env
        logo.Infoln("Env String: ", rEnv)
        // remove os environ ld
        old := os.Getenv(runtime)
        os.Unsetenv(runtime)
        cmd.Env = os.Environ()
        cmd.Env = append(cmd.Env, rEnv)
        os.Setenv(runtime, old)
    }
    // //debug
    // cmd.Stdout = os.Stdout
    // cmd.Stderr = os.Stderr
    cmd.SysProcAttr = &syscall.SysProcAttr{Pdeathsig: syscall.SIGTERM}
    if err := cmd.Start(); err == nil {
        return cmd
    }
    return nil
}
app/master/dbfetcher.go
File was deleted
app/master/master.go
@@ -7,64 +7,134 @@
    "context"
    "encoding/json"
    "os"
    "plugin"
    "strconv"
    "strings"
    "time"
    "basic.com/libgowrapper/sdkstruct.git"
    "basic.com/valib/pubsub.git"
)
func reaper(ctxt context.Context) {
    pidChan := make(chan int, 1)
    Reap(pidChan)
    go waitForRestart(ctxt, pidChan)
var (
    soLoaded bool
    fnFetch  func(context.Context, string, string, int, string, chan<- []byte, func(...interface{}))
    fnNotify func(context.Context, string, <-chan []byte, func(...interface{})) context.CancelFunc
)
func soLoad(soFile string) bool {
    if fnFetch != nil && fnNotify != nil {
        return true
    }
    plug, err := plugin.Open(soFile)
    if err != nil {
        logo.Errorln("Open: ", soFile, " error: ", err)
        return false
    }
    var fn plugin.Symbol
    fn, err = app.LoadFunc(plug, soFile, "Fetch")
    if err != nil {
        logo.Infoln("Lookup Func Fetch From File: ", soFile, " Error")
        return false
    }
    fnFetch = fn.(func(context.Context, string, string, int, string, chan<- []byte, func(...interface{})))
    fn, err = app.LoadFunc(plug, soFile, "Notify")
    if err != nil {
        logo.Infoln("Lookup Func Notify From File: ", soFile, " Error")
        return false
    }
    fnNotify = fn.(func(context.Context, string, <-chan []byte, func(...interface{})) context.CancelFunc)
    return true
}
// Run run
func Run(ctx context.Context, soFile, configPath string) bool {
    reaper(ctx)
    fetcher := NewFetcher(soFile)
    if fetcher == nil {
func initFetcher(ctx context.Context, soFile string) <-chan []byte {
    if !soLoad(soFile) {
        logo.Errorln("New Fetcher Load so File Funcs Error From File: ", soFile)
        return false
        return nil
    }
    logo.Infoln("~~~~~~Created Fetcher, Now Sync From DB")
    ip := "tcp://" + util.FSI.IP
    ip := "tcp://192.168.5.22"
    // ip := "tcp://" + util.FSI.IP
    url := ip + ":" + strconv.Itoa(util.FSI.DataPort)
    hearturl := ip + ":" + strconv.Itoa(util.FSI.HBPort)
    chMsg, err := fetcher.fnInit(url, hearturl, 0, []string{pubsub.Topic_Sdk}, "analysis-master"+strconv.Itoa(os.Getpid()))
    for {
        if err == nil {
            break
        }
        logo.Infoln("Analysis Fetcher INIT Error! URL:", url)
        time.Sleep(time.Second)
        chMsg, err = fetcher.fnInit(url, hearturl, 0, []string{pubsub.Topic_Sdk}, "analysis-master"+strconv.Itoa(os.Getpid()))
    ch := make(chan []byte, 3)
    fnFetch(ctx, url, hearturl, 0, "analysis-master"+strconv.Itoa(os.Getpid()), ch, logo.Infoln)
    logo.Infoln("~~~~~~Start Recv SDK Infos")
    return ch
}
// Run run
func Run(ctx context.Context, soFile, configPath string) bool {
    daemon := NewDaemon()
    chProc := make(chan []TypeProc, 32)
    go daemon.Watch(ctx, chProc)
    chMsg := initFetcher(ctx, soFile)
    if chMsg == nil {
        logo.Infoln("Master Run initFetcher Failed")
        return false
    }
    params := app.GetParams()
    for {
        select {
        case <-ctx.Done():
            return true
        case msg := <-chMsg:
            //              sdktype       process_name   topic        null
            //                yolo/face  yolo_0/yolo_1  channel
            //          sdktype       process_name   topic        null
            //            yolo/face  yolo_0/yolo_1  channel
            var sdk map[string](map[string](map[string]interface{}))
            if err := json.Unmarshal(msg.Msg, &sdk); err != nil {
            if err := json.Unmarshal(msg, &sdk); err != nil {
                logo.Infoln("Fetcher SDK unmarshal err:", err)
                continue
            }
            logo.Infoln("~~~~~~Recv New SDKInfos")
            chCameras <- CameraInfo{
                Cameras: cameras,
            var typeProcs []TypeProc
            for sdkType, mapSdkProc := range sdk {
                config := findConfigFile(sdkType, configPath)
                if config == nil {
                    logo.Infoln("!!!!!!There Is Not ", sdkType, " Config File In ", configPath, " Skip It")
                    continue
                }
                env := checkConfig(sdkType, *config)
                if env == nil {
                    continue
                }
                var channels []string
                var namedProcs []NamedProc
                for procName, mapProcChannels := range mapSdkProc {
                    for c := range mapProcChannels {
                        channels = append(channels, c)
                    }
                    p := NamedProc{
                        Name:     procName,
                        Channels: channels,
                        Env:      *env,
                        Config:   *config,
                        Param:    params,
                    }
                    namedProcs = append(namedProcs, p)
                }
                t := TypeProc{
                    Typ:       sdkType,
                    SNameProc: namedProcs,
                }
                typeProcs = append(typeProcs, t)
            }
            chProc <- typeProcs
            logo.Infoln("~~~~~~Recv New SDKInfos Over")
        default:
@@ -74,56 +144,43 @@
}
func manualStart(ctx context.Context, sdks []sdkstruct.SDKInfo, configPath string) bool {
func findConfigFile(typ, configPath string) *string {
    rPath := configPath
    params := app.GetParams()
    for _, v := range sdks {
        file := rPath + v.SdkType + ".json"
        if rPath[len(rPath)-1] != '/' {
            file = rPath + "/" + v.SdkType + ".json"
        }
        cfg, err := app.ReadConfig(file)
        if err != nil {
            logo.Errorln("Master Read: ", file, " Config Error: ", err)
            continue
        }
        if len(cfg.Env) > 0 {
            envs := strings.Split(cfg.Env, ":")
            normal := true
            for _, v := range envs {
                if !util.IsFileExist(v) {
                    normal = false
                    break
                }
            }
            if !normal {
                logo.Infoln("Can't Find Runtime Path, Skip It: ", file)
                continue
            }
        }
        logo.Infoln(file, " CONFIG: ", cfg)
        args := []string{
            `-role=slave`,
            "-sdk=" + v.SdkType,
            "-id=" + v.IpcID,
            "-" + util.ConfigPath + "=" + file,
        }
        args = append(args, params...)
        pid, err := runProc(ctx, "./analysis", args, cfg.Env)
        if err != nil {
            logo.Errorf("ANALYSIS START SLAVE PROC %s IPC: %s error %+v\n", v.SdkType, v.IpcID, err)
        }
        logo.Infof("START SDK %s ID %s PID %d Env: %s\n", v.SdkType, v.IpcID, pid, cfg.Env)
    // default config file
    file := rPath + typ + ".json"
    // if configPath not end with '/'
    if rPath[len(rPath)-1] != '/' {
        file = rPath + "/" + typ + ".json"
    }
    return true
    // whether file exist
    if util.IsFileExist(file) {
        return &file
    }
    return nil
}
func checkConfig(typ, file string) *string {
    cfg, err := app.ReadConfig(file)
    if err != nil {
        logo.Errorln("!!!!!!Master Read: ", file, " for ", typ, " Config Error: ", err)
        return nil
    }
    // check config runtime exist if config this item
    env := strings.TrimSpace(cfg.Env)
    if len(env) > 0 {
        envs := strings.Split(env, ":")
        pathExist := true
        for _, v := range envs {
            if !util.IsFileExist(v) {
                pathExist = false
                break
            }
        }
        if !pathExist {
            logo.Infoln("Can't Find Runtime Path, Skip SDK: ", typ)
            return nil
        }
    }
    return &env
}
app/master/reaper.go
@@ -2,81 +2,18 @@
import (
    "analysis/logo"
    "context"
    "os"
    "os/exec"
    "os/signal"
    "syscall"
    "time"
)
type procInfo struct {
    cmd *exec.Cmd
    env string
// ExitInfo info
type ExitInfo struct {
    Pid      int
    ExitCode int
}
var (
    procMap = make(map[int]*procInfo)
)
func restartProc(ctxt context.Context, pid int) {
    info, ok := procMap[pid]
    if ok {
        err := info.cmd.Wait()
        if err != nil {
            logo.Errorln("pid : [", pid, "] quit error: ", err)
        } else {
            logo.Infoln("pid : [", pid, "] quit")
        }
        delete(procMap, pid)
        runProc(ctxt, info.cmd.Path, info.cmd.Args[1:], info.env)
    } else {
        logo.Errorln(pid, " doesn't exist")
    }
}
func quitProc(pid int) {
    info, ok := procMap[pid]
    if ok {
        delete(procMap, pid)
        syscall.Kill(pid, syscall.SIGINT)
        info.cmd.Wait()
    } else {
        logo.Errorln(pid, " doesn't exist")
    }
}
func runProc(ctxt context.Context, bin string, args []string, env string) (int, error) {
    cmd := exec.CommandContext(ctxt, bin, args...)
    rEnv := ""
    if len(env) != 0 {
        runtime := "LD_LIBRARY_PATH"
        rEnv = runtime + "=" + env
        logo.Infoln("Env String: ", rEnv)
        // remove os environ ld
        old := os.Getenv(runtime)
        os.Unsetenv(runtime)
        cmd.Env = os.Environ()
        cmd.Env = append(cmd.Env, rEnv)
        os.Setenv(runtime, old)
    }
    pid := -1
    cmd.Stdout = os.Stdout
    cmd.Stderr = os.Stderr
    cmd.SysProcAttr = &syscall.SysProcAttr{Pdeathsig: syscall.SIGTERM}
    err := cmd.Start()
    if err == nil {
        pid = cmd.Process.Pid
        procMap[pid] = &procInfo{cmd, env}
    }
    return pid, err
}
// Config conf
// Config config
type Config struct {
    Pid              int
    Options          int
@@ -88,6 +25,7 @@
func sigChildHandler(notifications chan os.Signal) {
    sigs := make(chan os.Signal, 3)
    signal.Notify(sigs, syscall.SIGCHLD)
    signal.Ignore(syscall.SIGPIPE)
    for {
        sig := <-sigs
@@ -106,7 +44,7 @@
} /*  End of function  sigChildHandler.  */
//  Be a good parent - clean up behind the children.
func reapChildren(config Config, pidChan chan<- int) {
func reapChildren(config Config, info chan<- ExitInfo) {
    notifications := make(chan os.Signal, 1)
    go sigChildHandler(notifications)
@@ -125,10 +63,10 @@
             *  Plants vs. Zombies!!
             */
            pid, err := syscall.Wait4(pid, &wstatus, opts, nil)
            pidChan <- pid
            info <- ExitInfo{pid, wstatus.ExitStatus()}
            for syscall.EINTR == err {
                pid, err = syscall.Wait4(pid, &wstatus, opts, nil)
                pidChan <- pid
                info <- ExitInfo{pid, wstatus.ExitStatus()}
            }
            if syscall.ECHILD == err {
@@ -153,7 +91,7 @@
//  background inside a goroutine.
// Reap reap
func Reap(pidChan chan<- int) {
func Reap(info chan<- ExitInfo) {
    /*
     *  Only reap processes if we are taking over init's duties aka
     *  we are running as pid 1 inside a docker container. The default
@@ -163,7 +101,7 @@
        Pid:              -1,
        Options:          0,
        DisablePid1Check: true,
    }, pidChan)
    }, info)
} /*  End of [exported] function  Reap.  */
@@ -172,7 +110,7 @@
//  The child processes are reaped in the background inside a goroutine.
// Start start
func Start(config Config, pidChan chan<- int) {
func Start(config Config, info chan<- ExitInfo) {
    /*
     *  Start the Reaper with configuration options. This allows you to
     *  reap processes even if the current pid isn't running as pid 1.
@@ -194,20 +132,6 @@
     *  of 'em all, either way we get to play the grim reaper.
     *  You will be missed, Terry Pratchett!! RIP
     */
    go reapChildren(config, pidChan)
    go reapChildren(config, info)
} /*  End of [exported] function  Start.  */
func waitForRestart(ctxt context.Context, pidChan <-chan int) {
    for {
        select {
        case <-ctxt.Done():
            return
        case pid := <-pidChan:
            restartProc(ctxt, pid)
        default:
            time.Sleep(3 * time.Second)
        }
    }
}
go.mod
@@ -3,10 +3,7 @@
go 1.12
require (
    basic.com/libgowrapper/sdkstruct.git v0.0.0-20191220011601-e0b3d1f0183c
    basic.com/valib/gogpu.git v0.0.0-20190711044327-62043b070865
    basic.com/valib/gopherdiscovery.git v0.0.0-20200113080951-9bccb7681924 // indirect
    basic.com/valib/pubsub.git v0.0.0-20200116061307-c43a8e3e552e
    github.com/amoghe/distillog v0.0.0-20180726233512-ae382b35b717
    github.com/natefinch/lumberjack v2.0.0+incompatible
    github.com/olebedev/config v0.0.0-20190528211619-364964f3a8e4
libcomm/db.go
File was deleted
libcomm/fetcher.go
New file
@@ -0,0 +1,39 @@
package main
import (
    "context"
    "time"
    "basic.com/valib/pubsub.git"
)
func wait(ctx context.Context, c chan pubsub.Message, out chan<- []byte) {
    for {
        select {
        case <-ctx.Done():
            return
        case msg := <-c:
            out <- msg.Msg
        default:
            time.Sleep(time.Second)
        }
    }
}
// Fetch Fetch from tcp://192.168.5.22:4005
func Fetch(ctx context.Context, url, heartBeatURL string, mode int, processID string, out chan<- []byte, fn func(...interface{})) {
    topics := []string{pubsub.Topic_Sdk}
    p, err := pubsub.NewSubscriber(url, heartBeatURL, mode, topics, processID)
    for {
        if err == nil {
            break
        }
        fn("libcomm.so, pubsub subscribe error: ", err)
        time.Sleep(time.Second)
        p, err = pubsub.NewSubscriber(url, heartBeatURL, mode, topics, processID)
    }
    c := p.Recv()
    go wait(ctx, c, out)
}
libcomm/go.mod
@@ -6,5 +6,5 @@
    basic.com/valib/gopherdiscovery.git v0.0.0-20200113080951-9bccb7681924 // indirect
    basic.com/valib/pubsub.git v0.0.0-20200116061307-c43a8e3e552e
    golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa // indirect
    nanomsg.org/go-mangos v1.4.0 // indirect
    nanomsg.org/go-mangos v1.4.0
)
libcomm/notify.go
New file
@@ -0,0 +1,183 @@
package main
import (
    "context"
    "os"
    "strings"
    "time"
    "nanomsg.org/go-mangos"
    "nanomsg.org/go-mangos/protocol/rep"
    "nanomsg.org/go-mangos/protocol/req"
    "nanomsg.org/go-mangos/transport/ipc"
    "nanomsg.org/go-mangos/transport/tcp"
)
func request(url string, timeout int, fn func(...interface{})) mangos.Socket {
    var sock mangos.Socket
    var err error
    for {
        if sock, err = req.NewSocket(); err != nil {
            fn("!!!!!!Notify can't get new request socket: ", err)
            time.Sleep(time.Second)
        } else {
            break
        }
    }
    sock.AddTransport(ipc.NewTransport())
    sock.AddTransport(tcp.NewTransport())
    sock.SetOption(mangos.OptionRecvDeadline, time.Duration(timeout)*time.Second)
    sock.SetOption(mangos.OptionSendDeadline, time.Duration(timeout)*time.Second)
    for {
        if err = sock.Dial(url); err != nil {
            fn("!!!!!!Notify can't dial request socket: ", err, "URL:", url)
            time.Sleep(time.Second)
        } else {
            break
        }
    }
    return sock
}
func notify(ctx context.Context, sock mangos.Socket, ch <-chan []byte, fn func(...interface{})) {
    for {
        select {
        case <-ctx.Done():
            sock.Close()
            return
        case data := <-ch:
            var ret []byte
            var err error
            err = sock.Send(data)
            for {
                if err == nil {
                    break
                }
                fn("!!!!!!Notify Send To Slave ERROR: ", err)
                time.Sleep(500 * time.Millisecond)
                continue
            }
            ret, err = sock.Recv()
            for {
                if err == nil {
                    fn("~~~~~Notify Recv From Slave: ", string(ret))
                    break
                }
                fn("!!!!!!Notify Recv From Slave Error: ", err)
                time.Sleep(500 * time.Microsecond)
                continue
            }
        default:
            time.Sleep(time.Second)
        }
    }
}
// Notify master sync notify to slave
func Notify(ctx context.Context, url string, ch <-chan []byte, fn func(...interface{})) context.CancelFunc {
    rctx, cancel := context.WithCancel(ctx)
    sock := request(url, 2, fn)
    go notify(rctx, sock, ch, fn)
    return cancel
}
//////////////////////////////////////////////////////////////////
func rmExistedIpcName(url string) {
    s := strings.Split(url, "://")
    if s[0] == "ipc" {
        if _, err := os.Stat(s[1]); err == nil {
            os.Remove(s[1])
        } else if !os.IsNotExist(err) {
            os.Remove(s[1])
        }
    }
}
func reply(url string, timeout int, fn func(...interface{})) mangos.Socket {
    rmExistedIpcName(url)
    var sock mangos.Socket
    var err error
    for {
        if sock, err = rep.NewSocket(); err != nil {
            rmExistedIpcName(url)
            fn("!!!!!!Notify can't get new reply socket: ", err)
            time.Sleep(time.Second)
        } else {
            break
        }
    }
    sock.AddTransport(ipc.NewTransport())
    sock.AddTransport(tcp.NewTransport())
    sock.SetOption(mangos.OptionRecvDeadline, time.Duration(timeout)*time.Second)
    sock.SetOption(mangos.OptionSendDeadline, time.Duration(timeout)*time.Second)
    for {
        if err = sock.Listen(url); err != nil {
            rmExistedIpcName(url)
            fn("!!!!!!Notify can't listen reply socket: ", err, "URL:", url)
            time.Sleep(time.Second)
        } else {
            break
        }
    }
    return sock
}
func notifiee(ctx context.Context, sock mangos.Socket, ch chan<- []byte, fn func(...interface{})) {
    for {
        select {
        case <-ctx.Done():
            sock.Close()
            return
        default:
            msg, err := sock.Recv()
            for {
                if err == nil {
                    fn("~~~~~Notifiee Recv From Master: ", string(msg))
                    break
                }
                fn("!!!!!!Notify Recv From Master Error: ", err)
                time.Sleep(500 * time.Microsecond)
                continue
            }
            err = sock.Send([]byte("ok"))
            for {
                if err == nil {
                    break
                }
                fn("!!!!!!Notify Send To Master ERROR: ", err)
                time.Sleep(500 * time.Millisecond)
                continue
            }
        }
    }
}
// Notifiee slave sync recv notice from master
func Notifiee(ctx context.Context, url string, ch chan<- []byte, fn func(...interface{})) context.CancelFunc {
    rctx, cancel := context.WithCancel(ctx)
    sock := reply(url, 2, fn)
    go notifiee(rctx, sock, ch, fn)
    return cancel
}
util/common.go
@@ -65,7 +65,7 @@
        }
    }
    if file == nil {
        fmt.Println(`Read All Log Config Files Failed, Use Default, "./log/analysis-[type]"`)
        fmt.Println(`Read All Log Config Files Failed, If -logit Use Default, "./log/analysis-[type]"`)
        return
    }
    yamlString := string(file)