From e6a5be3714b70236d84f25be2ce858c3d7e379d8 Mon Sep 17 00:00:00 2001 From: zhangmeng <775834166@qq.com> Date: 星期二, 21 一月 2020 14:16:01 +0800 Subject: [PATCH] add recv sdk proc info from dispath --- libcomm/notify.go | 183 ++++++++++ /dev/null | 15 libcomm/fetcher.go | 39 ++ util/common.go | 2 app/master/master.go | 211 +++++++---- app/master/daemon.go | 413 ++++++++++++++++++++++++ go.mod | 3 libcomm/go.mod | 2 app/master/reaper.go | 102 ----- 9 files changed, 784 insertions(+), 186 deletions(-) diff --git a/app/master/daemon.go b/app/master/daemon.go new file mode 100644 index 0000000..84d626b --- /dev/null +++ b/app/master/daemon.go @@ -0,0 +1,413 @@ +package master + +import ( + "context" + "encoding/json" + "os" + "os/exec" + "syscall" + "time" + + "analysis/logo" + "analysis/util" +) + +// NamedProc 鍗曚釜杩涚▼鍚嶅瓧鍜屾湇鍔¢�氶亾 +type NamedProc struct { + // 杩涚▼鍚嶅瓧 + Name string + // 杩涚▼閫氶亾 + Channels []string + // 杩涚▼runtime + Env string + // 杩涚▼config file path + Config string + // 杩涚▼param + Param []string +} + +// TypeProc 姣忎釜Type杩涚▼鐨勫弬鏁� +type TypeProc struct { + // 杩涚▼绫诲瀷FaceDetect/Yolo + Typ string + // 鍏峰悕杩涚▼ + SNameProc []NamedProc +} + +// Notice transit to slave +type Notice struct { + Op string `json:"Op"` + Content []string `json:"Content"` +} +type transit struct { + chNotify chan<- []byte + cancel context.CancelFunc +} + +// Worker 鍗曚釜杩涚▼鏈嶅姟 +type Worker struct { + pid int + cmd *exec.Cmd + info *NamedProc + trans *transit +} + +// Daemon 鐩戞帶鐨勬墍鏈夊瓙杩涚▼ +type Daemon struct { + // 姣忎釜sdk绫诲瀷鍚姩鐨勮繘绋嬫暟閲� + workers map[string][]*Worker +} + +// NewDaemon 鐩戞帶 +func NewDaemon() *Daemon { + return &Daemon{ + workers: make(map[string][]*Worker), + } +} + +//姹備氦闆� +func intersect(slice1, slice2 []string) []string { + m := make(map[string]int) + nn := make([]string, 0) + for _, v := range slice1 { + m[v]++ + } + + for _, v := range slice2 { + times, _ := m[v] + if times == 1 { + nn = append(nn, v) + } + } + return nn +} + +//姹傚樊闆� slice1-骞堕泦 +func difference(slice1, slice2 []string) []string { + m := make(map[string]int) + nn := make([]string, 0) + inter := intersect(slice1, slice2) + for _, v := range inter { + m[v]++ + } + + for _, value := range slice1 { + times, _ := m[value] + if times == 0 { + nn = append(nn, value) + } + } + return nn +} + +func removeWorker(w *Worker) { + syscall.Kill(w.pid, syscall.SIGTERM) + w.cmd.Wait() + w.trans.cancel() +} + +func (d *Daemon) rmWorkerWith(typ string) { + if workers, ok := d.workers[typ]; ok { + delete(d.workers, typ) + + for _, w := range workers { + removeWorker(w) + } + } +} + +func (d *Daemon) rmWorkerNoType(childs []TypeProc) { + var newTypes []string + for _, v := range childs { + newTypes = append(newTypes, v.Typ) + } + + var runTypes []string + for k := range d.workers { + runTypes = append(runTypes, k) + } + + // 涓嶅瓨鍦ㄤ簬鏂颁俊鎭腑鐨則ype, remove + rmTypes := difference(runTypes, newTypes) + if len(rmTypes) > 0 { + for _, v := range rmTypes { + d.rmWorkerWith(v) + } + } +} + +func (d *Daemon) rmWorkerNoNamed(workers []*Worker, procs []NamedProc) []*Worker { + var newNames []string + for _, v := range procs { + newNames = append(newNames, v.Name) + } + + var runNames []string + for _, v := range workers { + runNames = append(runNames, v.info.Name) + } + + // 宸茬粡涓嶉渶瑕佸瓨鍦ㄨ繘绋�,remove + rmWorkers := difference(runNames, newNames) + for _, v := range rmWorkers { + for _, w := range workers { + if v == w.info.Name { + removeWorker(w) + } + } + } + + // 淇濈暀宸插瓨鍦ㄧ殑杩涚▼ + stillWorks := intersect(runNames, newNames) + var ret []*Worker + for _, v := range stillWorks { + for _, w := range workers { + if v == w.info.Name { + ret = append(ret, w) + } + } + } + + return ret +} + +////////////////////////////////////////////////////////// + +func getNamedProc(typ string, childs []TypeProc) []NamedProc { + for _, v := range childs { + if v.Typ == typ { + return v.SNameProc + } + } + return nil +} + +func getNamedProcInfo(name string, procs []NamedProc) *NamedProc { + for _, v := range procs { + if name == v.Name { + return &v + } + } + return nil +} + +func (d *Daemon) channelChanged(ctx context.Context, typs []string, childs []TypeProc) { + for _, s := range typs { + // 瀛樺湪杩欎釜绫诲瀷鐨勮繘绋� + if workers, ok := d.workers[s]; ok { + child := getNamedProc(s, childs) + if child == nil { + continue + } + var newNames []string + for _, v := range child { + newNames = append(newNames, v.Name) + } + + var runNames []string + for _, v := range workers { + runNames = append(runNames, v.info.Name) + } + + add := difference(newNames, runNames) + for _, c := range child { + for _, v := range add { + if c.Name == v { + d.startWorker(ctx, s, &c) + } + } + } + + adjust := intersect(runNames, newNames) + for _, v := range adjust { + proc := getNamedProcInfo(v, child) + if proc == nil { + continue + } + + for _, w := range workers { + if v == w.info.Name { + // 鎵惧埌浜嗗搴斿悕瀛楃殑杩涚▼,棣栧厛姹備笉闇�瑕佸啀杩愯鐨勯�氶亾 + var notice *Notice + removes := difference(w.info.Channels, proc.Channels) + if len(removes) > 0 { + // 閫氱煡瀛愯繘绋嬪叧闂�氶亾 + notice = &Notice{ + Op: "remove", + Content: removes, + } + + } + + // 鍏舵姹傚嚭鏂板鐨勯�氶亾 + adds := difference(proc.Channels, w.info.Channels) + if len(adds) > 0 { + // 閫氱煡瀛愯繘绋嬫墦寮�閫氶亾 + notice = &Notice{ + Op: "add", + Content: adds, + } + } + if notice != nil { + if d, err := json.Marshal(*notice); err == nil { + w.trans.chNotify <- d + } + } + } + } + } + } + } +} + +func (d *Daemon) startNewWorker(ctx context.Context, child TypeProc) { + for _, v := range child.SNameProc { + d.startWorker(ctx, child.Typ, &v) + } +} + +func (d *Daemon) adjustWorker(ctx context.Context, childs []TypeProc) { + var newTypes []string + for _, v := range childs { + newTypes = append(newTypes, v.Typ) + } + + var runTypes []string + for k := range d.workers { + runTypes = append(runTypes, k) + } + + // 鏂扮被鍨嬫坊鍔� + addWorkers := difference(newTypes, runTypes) + for _, a := range addWorkers { + for _, v := range childs { + if a == v.Typ { + // start new type proc + d.startNewWorker(ctx, v) + } + } + } + + stillWorkers := intersect(newTypes, runTypes) + // 璋冩暣宸插瓨鍦ㄧ殑杩涚▼鐨勯�氶亾 + d.channelChanged(ctx, stillWorkers, childs) +} + +func (d *Daemon) updateWorker(ctx context.Context, childs []TypeProc) { + // 鏂扮殑杩涚▼淇℃伅,棣栧厛鍒犻櫎鎺変笉鍐嶈繍琛岀殑绫诲瀷 + d.rmWorkerNoType(childs) + // 鎸夊悕瀛楀垹闄ょ壒瀹氱被鍨嬩腑涓嶅啀杩愯鐨勮繘绋� + for _, v := range childs { + if workers, ok := d.workers[v.Typ]; ok { + nWorkers := d.rmWorkerNoNamed(workers, v.SNameProc) + d.workers[v.Typ] = nWorkers + } + } + + d.adjustWorker(ctx, childs) +} + +// Watch watch +func (d *Daemon) Watch(ctx context.Context, ch <-chan []TypeProc) { + chExit := make(chan ExitInfo, 32) + go Reap(chExit) + + for { + select { + case <-ctx.Done(): + return + case i := <-chExit: + d.reWorker(ctx, &i) + case childs := <-ch: + d.updateWorker(ctx, childs) + default: + time.Sleep(time.Second) + } + } +} + +func (d *Daemon) reWorker(ctx context.Context, info *ExitInfo) { + // 鏈夐��鍑虹殑杩涚▼,鏌ョ湅鏄惁鍦ㄨ繍琛岃繘绋嬩腑,鎷夎捣 + for i, workers := range d.workers { + found := false + for j, w := range workers { + if w.pid == info.Pid { + w = d.restartWorker(ctx, w) + d.workers[i][j] = w + found = true + break + } + } + if found { + break + } + } +} + +func (d *Daemon) restartWorker(ctx context.Context, w *Worker) *Worker { + w.cmd.Wait() + w.cmd = runProc(ctx, w.cmd.Path, w.info.Env, w.cmd.Args[1:]) + w.pid = w.cmd.Process.Pid + return w +} + +func (d *Daemon) startWorker(ctx context.Context, typ string, info *NamedProc) { + + ipcID := "analysis-" + typ + "-" + info.Name + + args := []string{ + `-role=slave`, + "-sdk=" + typ, + "-id=" + ipcID, + "-" + util.ConfigPath + "=" + info.Config, + } + + args = append(args, info.Param...) + cmd := runProc(ctx, "./analysis", info.Env, args) + + if cmd == nil { + logo.Errorf("ANALYSIS START SLAVE PROC %s IPC: %s Failed\n", typ, ipcID) + } + logo.Infof("START SDK %s ID %s PID %d Env: %s\n", typ, ipcID, cmd.Process.Pid, info.Env) + + ch := make(chan []byte, 3) + cancel := fnNotify(ctx, ipcID, ch, logo.Infoln) + + w := &Worker{ + pid: cmd.Process.Pid, + cmd: cmd, + info: info, + trans: &transit{ + chNotify: ch, + cancel: cancel, + }, + } + d.workers[typ] = append(d.workers[typ], w) +} + +func runProc(ctxt context.Context, bin, env string, args []string) *exec.Cmd { + cmd := exec.CommandContext(ctxt, bin, args...) + rEnv := "" + if len(env) != 0 { + runtime := "LD_LIBRARY_PATH" + rEnv = runtime + "=" + env + logo.Infoln("Env String: ", rEnv) + + // remove os environ ld + old := os.Getenv(runtime) + os.Unsetenv(runtime) + cmd.Env = os.Environ() + cmd.Env = append(cmd.Env, rEnv) + os.Setenv(runtime, old) + } + + // //debug + // cmd.Stdout = os.Stdout + // cmd.Stderr = os.Stderr + cmd.SysProcAttr = &syscall.SysProcAttr{Pdeathsig: syscall.SIGTERM} + + if err := cmd.Start(); err == nil { + return cmd + } + return nil +} diff --git a/app/master/dbfetcher.go b/app/master/dbfetcher.go deleted file mode 100644 index 186fe9d..0000000 --- a/app/master/dbfetcher.go +++ /dev/null @@ -1,35 +0,0 @@ -package master - -import ( - "analysis/app" - "analysis/logo" - "plugin" - - "basic.com/valib/pubsub.git" -) - -// Fetcher db -type Fetcher struct { - fnInit func(string, string, int, []string, string) (chan pubsub.Message, error) -} - -// NewFetcher new -func NewFetcher(soFile string) *Fetcher { - - plug, err := plugin.Open(soFile) - if err != nil { - logo.Errorln("Open: ", soFile, " error: ", err) - return nil - } - - fn, err := app.LoadFunc(plug, soFile, "Init") - if err != nil { - logo.Infoln("Lookup Func Init From File: ", soFile, " Error") - return nil - } - fnInit := fn.(func(string, string, int, []string, string) (chan pubsub.Message, error)) - - return &Fetcher{ - fnInit: fnInit, - } -} diff --git a/app/master/master.go b/app/master/master.go index 1199713..797d4d1 100644 --- a/app/master/master.go +++ b/app/master/master.go @@ -7,64 +7,134 @@ "context" "encoding/json" "os" + "plugin" "strconv" "strings" "time" - - "basic.com/libgowrapper/sdkstruct.git" - "basic.com/valib/pubsub.git" ) -func reaper(ctxt context.Context) { - pidChan := make(chan int, 1) - Reap(pidChan) - go waitForRestart(ctxt, pidChan) +var ( + soLoaded bool + fnFetch func(context.Context, string, string, int, string, chan<- []byte, func(...interface{})) + fnNotify func(context.Context, string, <-chan []byte, func(...interface{})) context.CancelFunc +) + +func soLoad(soFile string) bool { + if fnFetch != nil && fnNotify != nil { + return true + } + + plug, err := plugin.Open(soFile) + if err != nil { + logo.Errorln("Open: ", soFile, " error: ", err) + return false + } + + var fn plugin.Symbol + + fn, err = app.LoadFunc(plug, soFile, "Fetch") + if err != nil { + logo.Infoln("Lookup Func Fetch From File: ", soFile, " Error") + return false + } + fnFetch = fn.(func(context.Context, string, string, int, string, chan<- []byte, func(...interface{}))) + + fn, err = app.LoadFunc(plug, soFile, "Notify") + if err != nil { + logo.Infoln("Lookup Func Notify From File: ", soFile, " Error") + return false + } + fnNotify = fn.(func(context.Context, string, <-chan []byte, func(...interface{})) context.CancelFunc) + return true } -// Run run -func Run(ctx context.Context, soFile, configPath string) bool { - reaper(ctx) - - fetcher := NewFetcher(soFile) - if fetcher == nil { +func initFetcher(ctx context.Context, soFile string) <-chan []byte { + if !soLoad(soFile) { logo.Errorln("New Fetcher Load so File Funcs Error From File: ", soFile) - return false + return nil } logo.Infoln("~~~~~~Created Fetcher, Now Sync From DB") - ip := "tcp://" + util.FSI.IP + ip := "tcp://192.168.5.22" + // ip := "tcp://" + util.FSI.IP url := ip + ":" + strconv.Itoa(util.FSI.DataPort) hearturl := ip + ":" + strconv.Itoa(util.FSI.HBPort) - chMsg, err := fetcher.fnInit(url, hearturl, 0, []string{pubsub.Topic_Sdk}, "analysis-master"+strconv.Itoa(os.Getpid())) - for { - if err == nil { - break - } - logo.Infoln("Analysis Fetcher INIT Error! URL:", url) - time.Sleep(time.Second) - chMsg, err = fetcher.fnInit(url, hearturl, 0, []string{pubsub.Topic_Sdk}, "analysis-master"+strconv.Itoa(os.Getpid())) + ch := make(chan []byte, 3) + + fnFetch(ctx, url, hearturl, 0, "analysis-master"+strconv.Itoa(os.Getpid()), ch, logo.Infoln) + + logo.Infoln("~~~~~~Start Recv SDK Infos") + return ch +} + +// Run run +func Run(ctx context.Context, soFile, configPath string) bool { + daemon := NewDaemon() + chProc := make(chan []TypeProc, 32) + go daemon.Watch(ctx, chProc) + + chMsg := initFetcher(ctx, soFile) + if chMsg == nil { + logo.Infoln("Master Run initFetcher Failed") + return false } + params := app.GetParams() for { select { case <-ctx.Done(): return true case msg := <-chMsg: - // sdktype process_name topic null - // yolo/face yolo_0/yolo_1 channel + // sdktype process_name topic null + // yolo/face yolo_0/yolo_1 channel var sdk map[string](map[string](map[string]interface{})) - if err := json.Unmarshal(msg.Msg, &sdk); err != nil { + if err := json.Unmarshal(msg, &sdk); err != nil { logo.Infoln("Fetcher SDK unmarshal err:", err) continue } logo.Infoln("~~~~~~Recv New SDKInfos") - chCameras <- CameraInfo{ - Cameras: cameras, + + var typeProcs []TypeProc + + for sdkType, mapSdkProc := range sdk { + config := findConfigFile(sdkType, configPath) + if config == nil { + logo.Infoln("!!!!!!There Is Not ", sdkType, " Config File In ", configPath, " Skip It") + continue + } + env := checkConfig(sdkType, *config) + if env == nil { + continue + } + + var channels []string + var namedProcs []NamedProc + + for procName, mapProcChannels := range mapSdkProc { + for c := range mapProcChannels { + channels = append(channels, c) + } + p := NamedProc{ + Name: procName, + Channels: channels, + Env: *env, + Config: *config, + Param: params, + } + namedProcs = append(namedProcs, p) + } + t := TypeProc{ + Typ: sdkType, + SNameProc: namedProcs, + } + typeProcs = append(typeProcs, t) } + chProc <- typeProcs + logo.Infoln("~~~~~~Recv New SDKInfos Over") default: @@ -74,56 +144,43 @@ } -func manualStart(ctx context.Context, sdks []sdkstruct.SDKInfo, configPath string) bool { +func findConfigFile(typ, configPath string) *string { rPath := configPath - - params := app.GetParams() - - for _, v := range sdks { - - file := rPath + v.SdkType + ".json" - if rPath[len(rPath)-1] != '/' { - file = rPath + "/" + v.SdkType + ".json" - } - - cfg, err := app.ReadConfig(file) - if err != nil { - logo.Errorln("Master Read: ", file, " Config Error: ", err) - continue - } - - if len(cfg.Env) > 0 { - envs := strings.Split(cfg.Env, ":") - normal := true - for _, v := range envs { - if !util.IsFileExist(v) { - normal = false - break - } - } - if !normal { - logo.Infoln("Can't Find Runtime Path, Skip It: ", file) - continue - } - } - - logo.Infoln(file, " CONFIG: ", cfg) - - args := []string{ - `-role=slave`, - "-sdk=" + v.SdkType, - "-id=" + v.IpcID, - "-" + util.ConfigPath + "=" + file, - } - - args = append(args, params...) - pid, err := runProc(ctx, "./analysis", args, cfg.Env) - - if err != nil { - logo.Errorf("ANALYSIS START SLAVE PROC %s IPC: %s error %+v\n", v.SdkType, v.IpcID, err) - } - logo.Infof("START SDK %s ID %s PID %d Env: %s\n", v.SdkType, v.IpcID, pid, cfg.Env) + // default config file + file := rPath + typ + ".json" + // if configPath not end with '/' + if rPath[len(rPath)-1] != '/' { + file = rPath + "/" + typ + ".json" } - return true + // whether file exist + if util.IsFileExist(file) { + return &file + } + return nil +} +func checkConfig(typ, file string) *string { + cfg, err := app.ReadConfig(file) + if err != nil { + logo.Errorln("!!!!!!Master Read: ", file, " for ", typ, " Config Error: ", err) + return nil + } + + // check config runtime exist if config this item + env := strings.TrimSpace(cfg.Env) + if len(env) > 0 { + envs := strings.Split(env, ":") + pathExist := true + for _, v := range envs { + if !util.IsFileExist(v) { + pathExist = false + break + } + } + if !pathExist { + logo.Infoln("Can't Find Runtime Path, Skip SDK: ", typ) + return nil + } + } + return &env } diff --git a/app/master/reaper.go b/app/master/reaper.go index 1c48d59..01b4420 100644 --- a/app/master/reaper.go +++ b/app/master/reaper.go @@ -2,81 +2,18 @@ import ( "analysis/logo" - "context" "os" - "os/exec" "os/signal" "syscall" - "time" ) -type procInfo struct { - cmd *exec.Cmd - env string +// ExitInfo info +type ExitInfo struct { + Pid int + ExitCode int } -var ( - procMap = make(map[int]*procInfo) -) - -func restartProc(ctxt context.Context, pid int) { - info, ok := procMap[pid] - if ok { - err := info.cmd.Wait() - - if err != nil { - logo.Errorln("pid : [", pid, "] quit error: ", err) - } else { - logo.Infoln("pid : [", pid, "] quit") - } - delete(procMap, pid) - runProc(ctxt, info.cmd.Path, info.cmd.Args[1:], info.env) - } else { - logo.Errorln(pid, " doesn't exist") - } -} - -func quitProc(pid int) { - info, ok := procMap[pid] - if ok { - delete(procMap, pid) - - syscall.Kill(pid, syscall.SIGINT) - info.cmd.Wait() - } else { - logo.Errorln(pid, " doesn't exist") - } -} -func runProc(ctxt context.Context, bin string, args []string, env string) (int, error) { - cmd := exec.CommandContext(ctxt, bin, args...) - rEnv := "" - if len(env) != 0 { - runtime := "LD_LIBRARY_PATH" - rEnv = runtime + "=" + env - logo.Infoln("Env String: ", rEnv) - - // remove os environ ld - old := os.Getenv(runtime) - os.Unsetenv(runtime) - cmd.Env = os.Environ() - cmd.Env = append(cmd.Env, rEnv) - os.Setenv(runtime, old) - } - - pid := -1 - cmd.Stdout = os.Stdout - cmd.Stderr = os.Stderr - cmd.SysProcAttr = &syscall.SysProcAttr{Pdeathsig: syscall.SIGTERM} - - err := cmd.Start() - if err == nil { - pid = cmd.Process.Pid - procMap[pid] = &procInfo{cmd, env} - } - return pid, err -} - -// Config conf +// Config config type Config struct { Pid int Options int @@ -88,6 +25,7 @@ func sigChildHandler(notifications chan os.Signal) { sigs := make(chan os.Signal, 3) signal.Notify(sigs, syscall.SIGCHLD) + signal.Ignore(syscall.SIGPIPE) for { sig := <-sigs @@ -106,7 +44,7 @@ } /* End of function sigChildHandler. */ // Be a good parent - clean up behind the children. -func reapChildren(config Config, pidChan chan<- int) { +func reapChildren(config Config, info chan<- ExitInfo) { notifications := make(chan os.Signal, 1) go sigChildHandler(notifications) @@ -125,10 +63,10 @@ * Plants vs. Zombies!! */ pid, err := syscall.Wait4(pid, &wstatus, opts, nil) - pidChan <- pid + info <- ExitInfo{pid, wstatus.ExitStatus()} for syscall.EINTR == err { pid, err = syscall.Wait4(pid, &wstatus, opts, nil) - pidChan <- pid + info <- ExitInfo{pid, wstatus.ExitStatus()} } if syscall.ECHILD == err { @@ -153,7 +91,7 @@ // background inside a goroutine. // Reap reap -func Reap(pidChan chan<- int) { +func Reap(info chan<- ExitInfo) { /* * Only reap processes if we are taking over init's duties aka * we are running as pid 1 inside a docker container. The default @@ -163,7 +101,7 @@ Pid: -1, Options: 0, DisablePid1Check: true, - }, pidChan) + }, info) } /* End of [exported] function Reap. */ @@ -172,7 +110,7 @@ // The child processes are reaped in the background inside a goroutine. // Start start -func Start(config Config, pidChan chan<- int) { +func Start(config Config, info chan<- ExitInfo) { /* * Start the Reaper with configuration options. This allows you to * reap processes even if the current pid isn't running as pid 1. @@ -194,20 +132,6 @@ * of 'em all, either way we get to play the grim reaper. * You will be missed, Terry Pratchett!! RIP */ - go reapChildren(config, pidChan) + go reapChildren(config, info) } /* End of [exported] function Start. */ - -func waitForRestart(ctxt context.Context, pidChan <-chan int) { - - for { - select { - case <-ctxt.Done(): - return - case pid := <-pidChan: - restartProc(ctxt, pid) - default: - time.Sleep(3 * time.Second) - } - } -} diff --git a/go.mod b/go.mod index 7483532..104cc36 100644 --- a/go.mod +++ b/go.mod @@ -3,10 +3,7 @@ go 1.12 require ( - basic.com/libgowrapper/sdkstruct.git v0.0.0-20191220011601-e0b3d1f0183c basic.com/valib/gogpu.git v0.0.0-20190711044327-62043b070865 - basic.com/valib/gopherdiscovery.git v0.0.0-20200113080951-9bccb7681924 // indirect - basic.com/valib/pubsub.git v0.0.0-20200116061307-c43a8e3e552e github.com/amoghe/distillog v0.0.0-20180726233512-ae382b35b717 github.com/natefinch/lumberjack v2.0.0+incompatible github.com/olebedev/config v0.0.0-20190528211619-364964f3a8e4 diff --git a/libcomm/db.go b/libcomm/db.go deleted file mode 100644 index d9104e9..0000000 --- a/libcomm/db.go +++ /dev/null @@ -1,15 +0,0 @@ -package main - -import ( - "basic.com/valib/pubsub.git" -) - -// Init init tcp://192.168.5.22:4005 -func Init(url, heartBeatURL string, mode int, topics []string, processID string) (chan pubsub.Message, error) { - p, err := pubsub.NewSubscriber(url, heartBeatURL, mode, topics, processID) - if err != nil { - return nil, err - } - c := p.Recv() - return c, err -} diff --git a/libcomm/fetcher.go b/libcomm/fetcher.go new file mode 100644 index 0000000..a0c0079 --- /dev/null +++ b/libcomm/fetcher.go @@ -0,0 +1,39 @@ +package main + +import ( + "context" + "time" + + "basic.com/valib/pubsub.git" +) + +func wait(ctx context.Context, c chan pubsub.Message, out chan<- []byte) { + for { + select { + case <-ctx.Done(): + return + case msg := <-c: + out <- msg.Msg + default: + time.Sleep(time.Second) + } + } +} + +// Fetch Fetch from tcp://192.168.5.22:4005 +func Fetch(ctx context.Context, url, heartBeatURL string, mode int, processID string, out chan<- []byte, fn func(...interface{})) { + topics := []string{pubsub.Topic_Sdk} + p, err := pubsub.NewSubscriber(url, heartBeatURL, mode, topics, processID) + for { + if err == nil { + break + } + fn("libcomm.so, pubsub subscribe error: ", err) + time.Sleep(time.Second) + p, err = pubsub.NewSubscriber(url, heartBeatURL, mode, topics, processID) + } + + c := p.Recv() + + go wait(ctx, c, out) +} diff --git a/libcomm/go.mod b/libcomm/go.mod index 324f971..0484bff 100644 --- a/libcomm/go.mod +++ b/libcomm/go.mod @@ -6,5 +6,5 @@ basic.com/valib/gopherdiscovery.git v0.0.0-20200113080951-9bccb7681924 // indirect basic.com/valib/pubsub.git v0.0.0-20200116061307-c43a8e3e552e golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa // indirect - nanomsg.org/go-mangos v1.4.0 // indirect + nanomsg.org/go-mangos v1.4.0 ) diff --git a/libcomm/notify.go b/libcomm/notify.go new file mode 100644 index 0000000..d930c6a --- /dev/null +++ b/libcomm/notify.go @@ -0,0 +1,183 @@ +package main + +import ( + "context" + "os" + "strings" + "time" + + "nanomsg.org/go-mangos" + "nanomsg.org/go-mangos/protocol/rep" + "nanomsg.org/go-mangos/protocol/req" + "nanomsg.org/go-mangos/transport/ipc" + "nanomsg.org/go-mangos/transport/tcp" +) + +func request(url string, timeout int, fn func(...interface{})) mangos.Socket { + + var sock mangos.Socket + var err error + + for { + if sock, err = req.NewSocket(); err != nil { + fn("!!!!!!Notify can't get new request socket: ", err) + time.Sleep(time.Second) + } else { + break + } + } + + sock.AddTransport(ipc.NewTransport()) + sock.AddTransport(tcp.NewTransport()) + sock.SetOption(mangos.OptionRecvDeadline, time.Duration(timeout)*time.Second) + sock.SetOption(mangos.OptionSendDeadline, time.Duration(timeout)*time.Second) + + for { + if err = sock.Dial(url); err != nil { + fn("!!!!!!Notify can't dial request socket: ", err, "URL:", url) + time.Sleep(time.Second) + } else { + break + } + } + + return sock +} + +func notify(ctx context.Context, sock mangos.Socket, ch <-chan []byte, fn func(...interface{})) { + for { + select { + case <-ctx.Done(): + sock.Close() + return + case data := <-ch: + var ret []byte + var err error + + err = sock.Send(data) + for { + if err == nil { + break + } + fn("!!!!!!Notify Send To Slave ERROR: ", err) + time.Sleep(500 * time.Millisecond) + continue + } + + ret, err = sock.Recv() + for { + if err == nil { + fn("~~~~~Notify Recv From Slave: ", string(ret)) + break + } + fn("!!!!!!Notify Recv From Slave Error: ", err) + time.Sleep(500 * time.Microsecond) + continue + } + + default: + time.Sleep(time.Second) + } + } +} + +// Notify master sync notify to slave +func Notify(ctx context.Context, url string, ch <-chan []byte, fn func(...interface{})) context.CancelFunc { + rctx, cancel := context.WithCancel(ctx) + + sock := request(url, 2, fn) + + go notify(rctx, sock, ch, fn) + return cancel +} + +////////////////////////////////////////////////////////////////// + +func rmExistedIpcName(url string) { + s := strings.Split(url, "://") + + if s[0] == "ipc" { + if _, err := os.Stat(s[1]); err == nil { + os.Remove(s[1]) + } else if !os.IsNotExist(err) { + os.Remove(s[1]) + } + } +} + +func reply(url string, timeout int, fn func(...interface{})) mangos.Socket { + rmExistedIpcName(url) + + var sock mangos.Socket + var err error + + for { + if sock, err = rep.NewSocket(); err != nil { + rmExistedIpcName(url) + fn("!!!!!!Notify can't get new reply socket: ", err) + time.Sleep(time.Second) + } else { + break + } + } + + sock.AddTransport(ipc.NewTransport()) + sock.AddTransport(tcp.NewTransport()) + sock.SetOption(mangos.OptionRecvDeadline, time.Duration(timeout)*time.Second) + sock.SetOption(mangos.OptionSendDeadline, time.Duration(timeout)*time.Second) + + for { + if err = sock.Listen(url); err != nil { + rmExistedIpcName(url) + + fn("!!!!!!Notify can't listen reply socket: ", err, "URL:", url) + time.Sleep(time.Second) + } else { + break + } + } + + return sock +} + +func notifiee(ctx context.Context, sock mangos.Socket, ch chan<- []byte, fn func(...interface{})) { + for { + select { + case <-ctx.Done(): + sock.Close() + return + default: + + msg, err := sock.Recv() + for { + if err == nil { + fn("~~~~~Notifiee Recv From Master: ", string(msg)) + break + } + fn("!!!!!!Notify Recv From Master Error: ", err) + time.Sleep(500 * time.Microsecond) + continue + } + + err = sock.Send([]byte("ok")) + for { + if err == nil { + break + } + fn("!!!!!!Notify Send To Master ERROR: ", err) + time.Sleep(500 * time.Millisecond) + continue + } + } + } +} + +// Notifiee slave sync recv notice from master +func Notifiee(ctx context.Context, url string, ch chan<- []byte, fn func(...interface{})) context.CancelFunc { + rctx, cancel := context.WithCancel(ctx) + + sock := reply(url, 2, fn) + + go notifiee(rctx, sock, ch, fn) + return cancel +} diff --git a/util/common.go b/util/common.go index a4fdf8a..93ada39 100644 --- a/util/common.go +++ b/util/common.go @@ -65,7 +65,7 @@ } } if file == nil { - fmt.Println(`Read All Log Config Files Failed, Use Default, "./log/analysis-[type]"`) + fmt.Println(`Read All Log Config Files Failed, If -logit Use Default, "./log/analysis-[type]"`) return } yamlString := string(file) -- Gitblit v1.8.0