From e6a5be3714b70236d84f25be2ce858c3d7e379d8 Mon Sep 17 00:00:00 2001
From: zhangmeng <775834166@qq.com>
Date: 星期二, 21 一月 2020 14:16:01 +0800
Subject: [PATCH] add recv sdk proc info from dispath

---
 libcomm/notify.go    |  183 ++++++++++
 /dev/null            |   15 
 libcomm/fetcher.go   |   39 ++
 util/common.go       |    2 
 app/master/master.go |  211 +++++++----
 app/master/daemon.go |  413 ++++++++++++++++++++++++
 go.mod               |    3 
 libcomm/go.mod       |    2 
 app/master/reaper.go |  102 -----
 9 files changed, 784 insertions(+), 186 deletions(-)

diff --git a/app/master/daemon.go b/app/master/daemon.go
new file mode 100644
index 0000000..84d626b
--- /dev/null
+++ b/app/master/daemon.go
@@ -0,0 +1,413 @@
+package master
+
+import (
+	"context"
+	"encoding/json"
+	"os"
+	"os/exec"
+	"syscall"
+	"time"
+
+	"analysis/logo"
+	"analysis/util"
+)
+
+// NamedProc 鍗曚釜杩涚▼鍚嶅瓧鍜屾湇鍔¢�氶亾
+type NamedProc struct {
+	// 杩涚▼鍚嶅瓧
+	Name string
+	// 杩涚▼閫氶亾
+	Channels []string
+	// 杩涚▼runtime
+	Env string
+	// 杩涚▼config file path
+	Config string
+	// 杩涚▼param
+	Param []string
+}
+
+// TypeProc 姣忎釜Type杩涚▼鐨勫弬鏁�
+type TypeProc struct {
+	// 杩涚▼绫诲瀷FaceDetect/Yolo
+	Typ string
+	// 鍏峰悕杩涚▼
+	SNameProc []NamedProc
+}
+
+// Notice transit to slave
+type Notice struct {
+	Op      string   `json:"Op"`
+	Content []string `json:"Content"`
+}
+type transit struct {
+	chNotify chan<- []byte
+	cancel   context.CancelFunc
+}
+
+// Worker 鍗曚釜杩涚▼鏈嶅姟
+type Worker struct {
+	pid   int
+	cmd   *exec.Cmd
+	info  *NamedProc
+	trans *transit
+}
+
+// Daemon 鐩戞帶鐨勬墍鏈夊瓙杩涚▼
+type Daemon struct {
+	// 姣忎釜sdk绫诲瀷鍚姩鐨勮繘绋嬫暟閲�
+	workers map[string][]*Worker
+}
+
+// NewDaemon 鐩戞帶
+func NewDaemon() *Daemon {
+	return &Daemon{
+		workers: make(map[string][]*Worker),
+	}
+}
+
+//姹備氦闆�
+func intersect(slice1, slice2 []string) []string {
+	m := make(map[string]int)
+	nn := make([]string, 0)
+	for _, v := range slice1 {
+		m[v]++
+	}
+
+	for _, v := range slice2 {
+		times, _ := m[v]
+		if times == 1 {
+			nn = append(nn, v)
+		}
+	}
+	return nn
+}
+
+//姹傚樊闆� slice1-骞堕泦
+func difference(slice1, slice2 []string) []string {
+	m := make(map[string]int)
+	nn := make([]string, 0)
+	inter := intersect(slice1, slice2)
+	for _, v := range inter {
+		m[v]++
+	}
+
+	for _, value := range slice1 {
+		times, _ := m[value]
+		if times == 0 {
+			nn = append(nn, value)
+		}
+	}
+	return nn
+}
+
+func removeWorker(w *Worker) {
+	syscall.Kill(w.pid, syscall.SIGTERM)
+	w.cmd.Wait()
+	w.trans.cancel()
+}
+
+func (d *Daemon) rmWorkerWith(typ string) {
+	if workers, ok := d.workers[typ]; ok {
+		delete(d.workers, typ)
+
+		for _, w := range workers {
+			removeWorker(w)
+		}
+	}
+}
+
+func (d *Daemon) rmWorkerNoType(childs []TypeProc) {
+	var newTypes []string
+	for _, v := range childs {
+		newTypes = append(newTypes, v.Typ)
+	}
+
+	var runTypes []string
+	for k := range d.workers {
+		runTypes = append(runTypes, k)
+	}
+
+	// 涓嶅瓨鍦ㄤ簬鏂颁俊鎭腑鐨則ype, remove
+	rmTypes := difference(runTypes, newTypes)
+	if len(rmTypes) > 0 {
+		for _, v := range rmTypes {
+			d.rmWorkerWith(v)
+		}
+	}
+}
+
+func (d *Daemon) rmWorkerNoNamed(workers []*Worker, procs []NamedProc) []*Worker {
+	var newNames []string
+	for _, v := range procs {
+		newNames = append(newNames, v.Name)
+	}
+
+	var runNames []string
+	for _, v := range workers {
+		runNames = append(runNames, v.info.Name)
+	}
+
+	// 宸茬粡涓嶉渶瑕佸瓨鍦ㄨ繘绋�,remove
+	rmWorkers := difference(runNames, newNames)
+	for _, v := range rmWorkers {
+		for _, w := range workers {
+			if v == w.info.Name {
+				removeWorker(w)
+			}
+		}
+	}
+
+	// 淇濈暀宸插瓨鍦ㄧ殑杩涚▼
+	stillWorks := intersect(runNames, newNames)
+	var ret []*Worker
+	for _, v := range stillWorks {
+		for _, w := range workers {
+			if v == w.info.Name {
+				ret = append(ret, w)
+			}
+		}
+	}
+
+	return ret
+}
+
+//////////////////////////////////////////////////////////
+
+func getNamedProc(typ string, childs []TypeProc) []NamedProc {
+	for _, v := range childs {
+		if v.Typ == typ {
+			return v.SNameProc
+		}
+	}
+	return nil
+}
+
+func getNamedProcInfo(name string, procs []NamedProc) *NamedProc {
+	for _, v := range procs {
+		if name == v.Name {
+			return &v
+		}
+	}
+	return nil
+}
+
+func (d *Daemon) channelChanged(ctx context.Context, typs []string, childs []TypeProc) {
+	for _, s := range typs {
+		// 瀛樺湪杩欎釜绫诲瀷鐨勮繘绋�
+		if workers, ok := d.workers[s]; ok {
+			child := getNamedProc(s, childs)
+			if child == nil {
+				continue
+			}
+			var newNames []string
+			for _, v := range child {
+				newNames = append(newNames, v.Name)
+			}
+
+			var runNames []string
+			for _, v := range workers {
+				runNames = append(runNames, v.info.Name)
+			}
+
+			add := difference(newNames, runNames)
+			for _, c := range child {
+				for _, v := range add {
+					if c.Name == v {
+						d.startWorker(ctx, s, &c)
+					}
+				}
+			}
+
+			adjust := intersect(runNames, newNames)
+			for _, v := range adjust {
+				proc := getNamedProcInfo(v, child)
+				if proc == nil {
+					continue
+				}
+
+				for _, w := range workers {
+					if v == w.info.Name {
+						// 鎵惧埌浜嗗搴斿悕瀛楃殑杩涚▼,棣栧厛姹備笉闇�瑕佸啀杩愯鐨勯�氶亾
+						var notice *Notice
+						removes := difference(w.info.Channels, proc.Channels)
+						if len(removes) > 0 {
+							// 閫氱煡瀛愯繘绋嬪叧闂�氶亾
+							notice = &Notice{
+								Op:      "remove",
+								Content: removes,
+							}
+
+						}
+
+						// 鍏舵姹傚嚭鏂板鐨勯�氶亾
+						adds := difference(proc.Channels, w.info.Channels)
+						if len(adds) > 0 {
+							// 閫氱煡瀛愯繘绋嬫墦寮�閫氶亾
+							notice = &Notice{
+								Op:      "add",
+								Content: adds,
+							}
+						}
+						if notice != nil {
+							if d, err := json.Marshal(*notice); err == nil {
+								w.trans.chNotify <- d
+							}
+						}
+					}
+				}
+			}
+		}
+	}
+}
+
+func (d *Daemon) startNewWorker(ctx context.Context, child TypeProc) {
+	for _, v := range child.SNameProc {
+		d.startWorker(ctx, child.Typ, &v)
+	}
+}
+
+func (d *Daemon) adjustWorker(ctx context.Context, childs []TypeProc) {
+	var newTypes []string
+	for _, v := range childs {
+		newTypes = append(newTypes, v.Typ)
+	}
+
+	var runTypes []string
+	for k := range d.workers {
+		runTypes = append(runTypes, k)
+	}
+
+	// 鏂扮被鍨嬫坊鍔�
+	addWorkers := difference(newTypes, runTypes)
+	for _, a := range addWorkers {
+		for _, v := range childs {
+			if a == v.Typ {
+				// start new type proc
+				d.startNewWorker(ctx, v)
+			}
+		}
+	}
+
+	stillWorkers := intersect(newTypes, runTypes)
+	// 璋冩暣宸插瓨鍦ㄧ殑杩涚▼鐨勯�氶亾
+	d.channelChanged(ctx, stillWorkers, childs)
+}
+
+func (d *Daemon) updateWorker(ctx context.Context, childs []TypeProc) {
+	// 鏂扮殑杩涚▼淇℃伅,棣栧厛鍒犻櫎鎺変笉鍐嶈繍琛岀殑绫诲瀷
+	d.rmWorkerNoType(childs)
+	// 鎸夊悕瀛楀垹闄ょ壒瀹氱被鍨嬩腑涓嶅啀杩愯鐨勮繘绋�
+	for _, v := range childs {
+		if workers, ok := d.workers[v.Typ]; ok {
+			nWorkers := d.rmWorkerNoNamed(workers, v.SNameProc)
+			d.workers[v.Typ] = nWorkers
+		}
+	}
+
+	d.adjustWorker(ctx, childs)
+}
+
+// Watch watch
+func (d *Daemon) Watch(ctx context.Context, ch <-chan []TypeProc) {
+	chExit := make(chan ExitInfo, 32)
+	go Reap(chExit)
+
+	for {
+		select {
+		case <-ctx.Done():
+			return
+		case i := <-chExit:
+			d.reWorker(ctx, &i)
+		case childs := <-ch:
+			d.updateWorker(ctx, childs)
+		default:
+			time.Sleep(time.Second)
+		}
+	}
+}
+
+func (d *Daemon) reWorker(ctx context.Context, info *ExitInfo) {
+	// 鏈夐��鍑虹殑杩涚▼,鏌ョ湅鏄惁鍦ㄨ繍琛岃繘绋嬩腑,鎷夎捣
+	for i, workers := range d.workers {
+		found := false
+		for j, w := range workers {
+			if w.pid == info.Pid {
+				w = d.restartWorker(ctx, w)
+				d.workers[i][j] = w
+				found = true
+				break
+			}
+		}
+		if found {
+			break
+		}
+	}
+}
+
+func (d *Daemon) restartWorker(ctx context.Context, w *Worker) *Worker {
+	w.cmd.Wait()
+	w.cmd = runProc(ctx, w.cmd.Path, w.info.Env, w.cmd.Args[1:])
+	w.pid = w.cmd.Process.Pid
+	return w
+}
+
+func (d *Daemon) startWorker(ctx context.Context, typ string, info *NamedProc) {
+
+	ipcID := "analysis-" + typ + "-" + info.Name
+
+	args := []string{
+		`-role=slave`,
+		"-sdk=" + typ,
+		"-id=" + ipcID,
+		"-" + util.ConfigPath + "=" + info.Config,
+	}
+
+	args = append(args, info.Param...)
+	cmd := runProc(ctx, "./analysis", info.Env, args)
+
+	if cmd == nil {
+		logo.Errorf("ANALYSIS START SLAVE PROC %s IPC: %s Failed\n", typ, ipcID)
+	}
+	logo.Infof("START SDK %s ID %s PID %d Env: %s\n", typ, ipcID, cmd.Process.Pid, info.Env)
+
+	ch := make(chan []byte, 3)
+	cancel := fnNotify(ctx, ipcID, ch, logo.Infoln)
+
+	w := &Worker{
+		pid:  cmd.Process.Pid,
+		cmd:  cmd,
+		info: info,
+		trans: &transit{
+			chNotify: ch,
+			cancel:   cancel,
+		},
+	}
+	d.workers[typ] = append(d.workers[typ], w)
+}
+
+func runProc(ctxt context.Context, bin, env string, args []string) *exec.Cmd {
+	cmd := exec.CommandContext(ctxt, bin, args...)
+	rEnv := ""
+	if len(env) != 0 {
+		runtime := "LD_LIBRARY_PATH"
+		rEnv = runtime + "=" + env
+		logo.Infoln("Env String: ", rEnv)
+
+		// remove os environ ld
+		old := os.Getenv(runtime)
+		os.Unsetenv(runtime)
+		cmd.Env = os.Environ()
+		cmd.Env = append(cmd.Env, rEnv)
+		os.Setenv(runtime, old)
+	}
+
+	// //debug
+	// cmd.Stdout = os.Stdout
+	// cmd.Stderr = os.Stderr
+	cmd.SysProcAttr = &syscall.SysProcAttr{Pdeathsig: syscall.SIGTERM}
+
+	if err := cmd.Start(); err == nil {
+		return cmd
+	}
+	return nil
+}
diff --git a/app/master/dbfetcher.go b/app/master/dbfetcher.go
deleted file mode 100644
index 186fe9d..0000000
--- a/app/master/dbfetcher.go
+++ /dev/null
@@ -1,35 +0,0 @@
-package master
-
-import (
-	"analysis/app"
-	"analysis/logo"
-	"plugin"
-
-	"basic.com/valib/pubsub.git"
-)
-
-// Fetcher db
-type Fetcher struct {
-	fnInit func(string, string, int, []string, string) (chan pubsub.Message, error)
-}
-
-// NewFetcher new
-func NewFetcher(soFile string) *Fetcher {
-
-	plug, err := plugin.Open(soFile)
-	if err != nil {
-		logo.Errorln("Open: ", soFile, " error: ", err)
-		return nil
-	}
-
-	fn, err := app.LoadFunc(plug, soFile, "Init")
-	if err != nil {
-		logo.Infoln("Lookup Func Init From File: ", soFile, " Error")
-		return nil
-	}
-	fnInit := fn.(func(string, string, int, []string, string) (chan pubsub.Message, error))
-
-	return &Fetcher{
-		fnInit: fnInit,
-	}
-}
diff --git a/app/master/master.go b/app/master/master.go
index 1199713..797d4d1 100644
--- a/app/master/master.go
+++ b/app/master/master.go
@@ -7,64 +7,134 @@
 	"context"
 	"encoding/json"
 	"os"
+	"plugin"
 	"strconv"
 	"strings"
 	"time"
-
-	"basic.com/libgowrapper/sdkstruct.git"
-	"basic.com/valib/pubsub.git"
 )
 
-func reaper(ctxt context.Context) {
-	pidChan := make(chan int, 1)
-	Reap(pidChan)
-	go waitForRestart(ctxt, pidChan)
+var (
+	soLoaded bool
+	fnFetch  func(context.Context, string, string, int, string, chan<- []byte, func(...interface{}))
+	fnNotify func(context.Context, string, <-chan []byte, func(...interface{})) context.CancelFunc
+)
+
+func soLoad(soFile string) bool {
+	if fnFetch != nil && fnNotify != nil {
+		return true
+	}
+
+	plug, err := plugin.Open(soFile)
+	if err != nil {
+		logo.Errorln("Open: ", soFile, " error: ", err)
+		return false
+	}
+
+	var fn plugin.Symbol
+
+	fn, err = app.LoadFunc(plug, soFile, "Fetch")
+	if err != nil {
+		logo.Infoln("Lookup Func Fetch From File: ", soFile, " Error")
+		return false
+	}
+	fnFetch = fn.(func(context.Context, string, string, int, string, chan<- []byte, func(...interface{})))
+
+	fn, err = app.LoadFunc(plug, soFile, "Notify")
+	if err != nil {
+		logo.Infoln("Lookup Func Notify From File: ", soFile, " Error")
+		return false
+	}
+	fnNotify = fn.(func(context.Context, string, <-chan []byte, func(...interface{})) context.CancelFunc)
+	return true
 }
 
-// Run run
-func Run(ctx context.Context, soFile, configPath string) bool {
-	reaper(ctx)
-
-	fetcher := NewFetcher(soFile)
-	if fetcher == nil {
+func initFetcher(ctx context.Context, soFile string) <-chan []byte {
+	if !soLoad(soFile) {
 		logo.Errorln("New Fetcher Load so File Funcs Error From File: ", soFile)
-		return false
+		return nil
 	}
 
 	logo.Infoln("~~~~~~Created Fetcher, Now Sync From DB")
 
-	ip := "tcp://" + util.FSI.IP
+	ip := "tcp://192.168.5.22"
+	// ip := "tcp://" + util.FSI.IP
 	url := ip + ":" + strconv.Itoa(util.FSI.DataPort)
 	hearturl := ip + ":" + strconv.Itoa(util.FSI.HBPort)
 
-	chMsg, err := fetcher.fnInit(url, hearturl, 0, []string{pubsub.Topic_Sdk}, "analysis-master"+strconv.Itoa(os.Getpid()))
-	for {
-		if err == nil {
-			break
-		}
-		logo.Infoln("Analysis Fetcher INIT Error! URL:", url)
-		time.Sleep(time.Second)
-		chMsg, err = fetcher.fnInit(url, hearturl, 0, []string{pubsub.Topic_Sdk}, "analysis-master"+strconv.Itoa(os.Getpid()))
+	ch := make(chan []byte, 3)
+
+	fnFetch(ctx, url, hearturl, 0, "analysis-master"+strconv.Itoa(os.Getpid()), ch, logo.Infoln)
+
+	logo.Infoln("~~~~~~Start Recv SDK Infos")
+	return ch
+}
+
+// Run run
+func Run(ctx context.Context, soFile, configPath string) bool {
+	daemon := NewDaemon()
+	chProc := make(chan []TypeProc, 32)
+	go daemon.Watch(ctx, chProc)
+
+	chMsg := initFetcher(ctx, soFile)
+	if chMsg == nil {
+		logo.Infoln("Master Run initFetcher Failed")
+		return false
 	}
+	params := app.GetParams()
 
 	for {
 		select {
 		case <-ctx.Done():
 			return true
 		case msg := <-chMsg:
-			//      		sdktype	   process_name   topic		null
-			//				yolo/face  yolo_0/yolo_1  channel
+			//      	sdktype	   process_name   topic		null
+			//			yolo/face  yolo_0/yolo_1  channel
 			var sdk map[string](map[string](map[string]interface{}))
 
-			if err := json.Unmarshal(msg.Msg, &sdk); err != nil {
+			if err := json.Unmarshal(msg, &sdk); err != nil {
 				logo.Infoln("Fetcher SDK unmarshal err:", err)
 				continue
 			}
 
 			logo.Infoln("~~~~~~Recv New SDKInfos")
-			chCameras <- CameraInfo{
-				Cameras: cameras,
+
+			var typeProcs []TypeProc
+
+			for sdkType, mapSdkProc := range sdk {
+				config := findConfigFile(sdkType, configPath)
+				if config == nil {
+					logo.Infoln("!!!!!!There Is Not ", sdkType, " Config File In ", configPath, " Skip It")
+					continue
+				}
+				env := checkConfig(sdkType, *config)
+				if env == nil {
+					continue
+				}
+
+				var channels []string
+				var namedProcs []NamedProc
+
+				for procName, mapProcChannels := range mapSdkProc {
+					for c := range mapProcChannels {
+						channels = append(channels, c)
+					}
+					p := NamedProc{
+						Name:     procName,
+						Channels: channels,
+						Env:      *env,
+						Config:   *config,
+						Param:    params,
+					}
+					namedProcs = append(namedProcs, p)
+				}
+				t := TypeProc{
+					Typ:       sdkType,
+					SNameProc: namedProcs,
+				}
+				typeProcs = append(typeProcs, t)
 			}
+			chProc <- typeProcs
+
 			logo.Infoln("~~~~~~Recv New SDKInfos Over")
 
 		default:
@@ -74,56 +144,43 @@
 
 }
 
-func manualStart(ctx context.Context, sdks []sdkstruct.SDKInfo, configPath string) bool {
+func findConfigFile(typ, configPath string) *string {
 	rPath := configPath
-
-	params := app.GetParams()
-
-	for _, v := range sdks {
-
-		file := rPath + v.SdkType + ".json"
-		if rPath[len(rPath)-1] != '/' {
-			file = rPath + "/" + v.SdkType + ".json"
-		}
-
-		cfg, err := app.ReadConfig(file)
-		if err != nil {
-			logo.Errorln("Master Read: ", file, " Config Error: ", err)
-			continue
-		}
-
-		if len(cfg.Env) > 0 {
-			envs := strings.Split(cfg.Env, ":")
-			normal := true
-			for _, v := range envs {
-				if !util.IsFileExist(v) {
-					normal = false
-					break
-				}
-			}
-			if !normal {
-				logo.Infoln("Can't Find Runtime Path, Skip It: ", file)
-				continue
-			}
-		}
-
-		logo.Infoln(file, " CONFIG: ", cfg)
-
-		args := []string{
-			`-role=slave`,
-			"-sdk=" + v.SdkType,
-			"-id=" + v.IpcID,
-			"-" + util.ConfigPath + "=" + file,
-		}
-
-		args = append(args, params...)
-		pid, err := runProc(ctx, "./analysis", args, cfg.Env)
-
-		if err != nil {
-			logo.Errorf("ANALYSIS START SLAVE PROC %s IPC: %s error %+v\n", v.SdkType, v.IpcID, err)
-		}
-		logo.Infof("START SDK %s ID %s PID %d Env: %s\n", v.SdkType, v.IpcID, pid, cfg.Env)
+	// default config file
+	file := rPath + typ + ".json"
+	// if configPath not end with '/'
+	if rPath[len(rPath)-1] != '/' {
+		file = rPath + "/" + typ + ".json"
 	}
-	return true
+	// whether file exist
+	if util.IsFileExist(file) {
+		return &file
+	}
+	return nil
+}
 
+func checkConfig(typ, file string) *string {
+	cfg, err := app.ReadConfig(file)
+	if err != nil {
+		logo.Errorln("!!!!!!Master Read: ", file, " for ", typ, " Config Error: ", err)
+		return nil
+	}
+
+	// check config runtime exist if config this item
+	env := strings.TrimSpace(cfg.Env)
+	if len(env) > 0 {
+		envs := strings.Split(env, ":")
+		pathExist := true
+		for _, v := range envs {
+			if !util.IsFileExist(v) {
+				pathExist = false
+				break
+			}
+		}
+		if !pathExist {
+			logo.Infoln("Can't Find Runtime Path, Skip SDK: ", typ)
+			return nil
+		}
+	}
+	return &env
 }
diff --git a/app/master/reaper.go b/app/master/reaper.go
index 1c48d59..01b4420 100644
--- a/app/master/reaper.go
+++ b/app/master/reaper.go
@@ -2,81 +2,18 @@
 
 import (
 	"analysis/logo"
-	"context"
 	"os"
-	"os/exec"
 	"os/signal"
 	"syscall"
-	"time"
 )
 
-type procInfo struct {
-	cmd *exec.Cmd
-	env string
+// ExitInfo info
+type ExitInfo struct {
+	Pid      int
+	ExitCode int
 }
 
-var (
-	procMap = make(map[int]*procInfo)
-)
-
-func restartProc(ctxt context.Context, pid int) {
-	info, ok := procMap[pid]
-	if ok {
-		err := info.cmd.Wait()
-
-		if err != nil {
-			logo.Errorln("pid : [", pid, "] quit error: ", err)
-		} else {
-			logo.Infoln("pid : [", pid, "] quit")
-		}
-		delete(procMap, pid)
-		runProc(ctxt, info.cmd.Path, info.cmd.Args[1:], info.env)
-	} else {
-		logo.Errorln(pid, " doesn't exist")
-	}
-}
-
-func quitProc(pid int) {
-	info, ok := procMap[pid]
-	if ok {
-		delete(procMap, pid)
-
-		syscall.Kill(pid, syscall.SIGINT)
-		info.cmd.Wait()
-	} else {
-		logo.Errorln(pid, " doesn't exist")
-	}
-}
-func runProc(ctxt context.Context, bin string, args []string, env string) (int, error) {
-	cmd := exec.CommandContext(ctxt, bin, args...)
-	rEnv := ""
-	if len(env) != 0 {
-		runtime := "LD_LIBRARY_PATH"
-		rEnv = runtime + "=" + env
-		logo.Infoln("Env String: ", rEnv)
-
-		// remove os environ ld
-		old := os.Getenv(runtime)
-		os.Unsetenv(runtime)
-		cmd.Env = os.Environ()
-		cmd.Env = append(cmd.Env, rEnv)
-		os.Setenv(runtime, old)
-	}
-
-	pid := -1
-	cmd.Stdout = os.Stdout
-	cmd.Stderr = os.Stderr
-	cmd.SysProcAttr = &syscall.SysProcAttr{Pdeathsig: syscall.SIGTERM}
-
-	err := cmd.Start()
-	if err == nil {
-		pid = cmd.Process.Pid
-		procMap[pid] = &procInfo{cmd, env}
-	}
-	return pid, err
-}
-
-// Config conf
+// Config config
 type Config struct {
 	Pid              int
 	Options          int
@@ -88,6 +25,7 @@
 func sigChildHandler(notifications chan os.Signal) {
 	sigs := make(chan os.Signal, 3)
 	signal.Notify(sigs, syscall.SIGCHLD)
+	signal.Ignore(syscall.SIGPIPE)
 
 	for {
 		sig := <-sigs
@@ -106,7 +44,7 @@
 } /*  End of function  sigChildHandler.  */
 
 //  Be a good parent - clean up behind the children.
-func reapChildren(config Config, pidChan chan<- int) {
+func reapChildren(config Config, info chan<- ExitInfo) {
 	notifications := make(chan os.Signal, 1)
 
 	go sigChildHandler(notifications)
@@ -125,10 +63,10 @@
 			 *  Plants vs. Zombies!!
 			 */
 			pid, err := syscall.Wait4(pid, &wstatus, opts, nil)
-			pidChan <- pid
+			info <- ExitInfo{pid, wstatus.ExitStatus()}
 			for syscall.EINTR == err {
 				pid, err = syscall.Wait4(pid, &wstatus, opts, nil)
-				pidChan <- pid
+				info <- ExitInfo{pid, wstatus.ExitStatus()}
 			}
 
 			if syscall.ECHILD == err {
@@ -153,7 +91,7 @@
 //  background inside a goroutine.
 
 // Reap reap
-func Reap(pidChan chan<- int) {
+func Reap(info chan<- ExitInfo) {
 	/*
 	 *  Only reap processes if we are taking over init's duties aka
 	 *  we are running as pid 1 inside a docker container. The default
@@ -163,7 +101,7 @@
 		Pid:              -1,
 		Options:          0,
 		DisablePid1Check: true,
-	}, pidChan)
+	}, info)
 
 } /*  End of [exported] function  Reap.  */
 
@@ -172,7 +110,7 @@
 //  The child processes are reaped in the background inside a goroutine.
 
 // Start start
-func Start(config Config, pidChan chan<- int) {
+func Start(config Config, info chan<- ExitInfo) {
 	/*
 	 *  Start the Reaper with configuration options. This allows you to
 	 *  reap processes even if the current pid isn't running as pid 1.
@@ -194,20 +132,6 @@
 	 *  of 'em all, either way we get to play the grim reaper.
 	 *  You will be missed, Terry Pratchett!! RIP
 	 */
-	go reapChildren(config, pidChan)
+	go reapChildren(config, info)
 
 } /*  End of [exported] function  Start.  */
-
-func waitForRestart(ctxt context.Context, pidChan <-chan int) {
-
-	for {
-		select {
-		case <-ctxt.Done():
-			return
-		case pid := <-pidChan:
-			restartProc(ctxt, pid)
-		default:
-			time.Sleep(3 * time.Second)
-		}
-	}
-}
diff --git a/go.mod b/go.mod
index 7483532..104cc36 100644
--- a/go.mod
+++ b/go.mod
@@ -3,10 +3,7 @@
 go 1.12
 
 require (
-	basic.com/libgowrapper/sdkstruct.git v0.0.0-20191220011601-e0b3d1f0183c
 	basic.com/valib/gogpu.git v0.0.0-20190711044327-62043b070865
-	basic.com/valib/gopherdiscovery.git v0.0.0-20200113080951-9bccb7681924 // indirect
-	basic.com/valib/pubsub.git v0.0.0-20200116061307-c43a8e3e552e
 	github.com/amoghe/distillog v0.0.0-20180726233512-ae382b35b717
 	github.com/natefinch/lumberjack v2.0.0+incompatible
 	github.com/olebedev/config v0.0.0-20190528211619-364964f3a8e4
diff --git a/libcomm/db.go b/libcomm/db.go
deleted file mode 100644
index d9104e9..0000000
--- a/libcomm/db.go
+++ /dev/null
@@ -1,15 +0,0 @@
-package main
-
-import (
-	"basic.com/valib/pubsub.git"
-)
-
-// Init init tcp://192.168.5.22:4005
-func Init(url, heartBeatURL string, mode int, topics []string, processID string) (chan pubsub.Message, error) {
-	p, err := pubsub.NewSubscriber(url, heartBeatURL, mode, topics, processID)
-	if err != nil {
-		return nil, err
-	}
-	c := p.Recv()
-	return c, err
-}
diff --git a/libcomm/fetcher.go b/libcomm/fetcher.go
new file mode 100644
index 0000000..a0c0079
--- /dev/null
+++ b/libcomm/fetcher.go
@@ -0,0 +1,39 @@
+package main
+
+import (
+	"context"
+	"time"
+
+	"basic.com/valib/pubsub.git"
+)
+
+func wait(ctx context.Context, c chan pubsub.Message, out chan<- []byte) {
+	for {
+		select {
+		case <-ctx.Done():
+			return
+		case msg := <-c:
+			out <- msg.Msg
+		default:
+			time.Sleep(time.Second)
+		}
+	}
+}
+
+// Fetch Fetch from tcp://192.168.5.22:4005
+func Fetch(ctx context.Context, url, heartBeatURL string, mode int, processID string, out chan<- []byte, fn func(...interface{})) {
+	topics := []string{pubsub.Topic_Sdk}
+	p, err := pubsub.NewSubscriber(url, heartBeatURL, mode, topics, processID)
+	for {
+		if err == nil {
+			break
+		}
+		fn("libcomm.so, pubsub subscribe error: ", err)
+		time.Sleep(time.Second)
+		p, err = pubsub.NewSubscriber(url, heartBeatURL, mode, topics, processID)
+	}
+
+	c := p.Recv()
+
+	go wait(ctx, c, out)
+}
diff --git a/libcomm/go.mod b/libcomm/go.mod
index 324f971..0484bff 100644
--- a/libcomm/go.mod
+++ b/libcomm/go.mod
@@ -6,5 +6,5 @@
 	basic.com/valib/gopherdiscovery.git v0.0.0-20200113080951-9bccb7681924 // indirect
 	basic.com/valib/pubsub.git v0.0.0-20200116061307-c43a8e3e552e
 	golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa // indirect
-	nanomsg.org/go-mangos v1.4.0 // indirect
+	nanomsg.org/go-mangos v1.4.0
 )
diff --git a/libcomm/notify.go b/libcomm/notify.go
new file mode 100644
index 0000000..d930c6a
--- /dev/null
+++ b/libcomm/notify.go
@@ -0,0 +1,183 @@
+package main
+
+import (
+	"context"
+	"os"
+	"strings"
+	"time"
+
+	"nanomsg.org/go-mangos"
+	"nanomsg.org/go-mangos/protocol/rep"
+	"nanomsg.org/go-mangos/protocol/req"
+	"nanomsg.org/go-mangos/transport/ipc"
+	"nanomsg.org/go-mangos/transport/tcp"
+)
+
+func request(url string, timeout int, fn func(...interface{})) mangos.Socket {
+
+	var sock mangos.Socket
+	var err error
+
+	for {
+		if sock, err = req.NewSocket(); err != nil {
+			fn("!!!!!!Notify can't get new request socket: ", err)
+			time.Sleep(time.Second)
+		} else {
+			break
+		}
+	}
+
+	sock.AddTransport(ipc.NewTransport())
+	sock.AddTransport(tcp.NewTransport())
+	sock.SetOption(mangos.OptionRecvDeadline, time.Duration(timeout)*time.Second)
+	sock.SetOption(mangos.OptionSendDeadline, time.Duration(timeout)*time.Second)
+
+	for {
+		if err = sock.Dial(url); err != nil {
+			fn("!!!!!!Notify can't dial request socket: ", err, "URL:", url)
+			time.Sleep(time.Second)
+		} else {
+			break
+		}
+	}
+
+	return sock
+}
+
+func notify(ctx context.Context, sock mangos.Socket, ch <-chan []byte, fn func(...interface{})) {
+	for {
+		select {
+		case <-ctx.Done():
+			sock.Close()
+			return
+		case data := <-ch:
+			var ret []byte
+			var err error
+
+			err = sock.Send(data)
+			for {
+				if err == nil {
+					break
+				}
+				fn("!!!!!!Notify Send To Slave ERROR: ", err)
+				time.Sleep(500 * time.Millisecond)
+				continue
+			}
+
+			ret, err = sock.Recv()
+			for {
+				if err == nil {
+					fn("~~~~~Notify Recv From Slave: ", string(ret))
+					break
+				}
+				fn("!!!!!!Notify Recv From Slave Error: ", err)
+				time.Sleep(500 * time.Microsecond)
+				continue
+			}
+
+		default:
+			time.Sleep(time.Second)
+		}
+	}
+}
+
+// Notify master sync notify to slave
+func Notify(ctx context.Context, url string, ch <-chan []byte, fn func(...interface{})) context.CancelFunc {
+	rctx, cancel := context.WithCancel(ctx)
+
+	sock := request(url, 2, fn)
+
+	go notify(rctx, sock, ch, fn)
+	return cancel
+}
+
+//////////////////////////////////////////////////////////////////
+
+func rmExistedIpcName(url string) {
+	s := strings.Split(url, "://")
+
+	if s[0] == "ipc" {
+		if _, err := os.Stat(s[1]); err == nil {
+			os.Remove(s[1])
+		} else if !os.IsNotExist(err) {
+			os.Remove(s[1])
+		}
+	}
+}
+
+func reply(url string, timeout int, fn func(...interface{})) mangos.Socket {
+	rmExistedIpcName(url)
+
+	var sock mangos.Socket
+	var err error
+
+	for {
+		if sock, err = rep.NewSocket(); err != nil {
+			rmExistedIpcName(url)
+			fn("!!!!!!Notify can't get new reply socket: ", err)
+			time.Sleep(time.Second)
+		} else {
+			break
+		}
+	}
+
+	sock.AddTransport(ipc.NewTransport())
+	sock.AddTransport(tcp.NewTransport())
+	sock.SetOption(mangos.OptionRecvDeadline, time.Duration(timeout)*time.Second)
+	sock.SetOption(mangos.OptionSendDeadline, time.Duration(timeout)*time.Second)
+
+	for {
+		if err = sock.Listen(url); err != nil {
+			rmExistedIpcName(url)
+
+			fn("!!!!!!Notify can't listen reply socket: ", err, "URL:", url)
+			time.Sleep(time.Second)
+		} else {
+			break
+		}
+	}
+
+	return sock
+}
+
+func notifiee(ctx context.Context, sock mangos.Socket, ch chan<- []byte, fn func(...interface{})) {
+	for {
+		select {
+		case <-ctx.Done():
+			sock.Close()
+			return
+		default:
+
+			msg, err := sock.Recv()
+			for {
+				if err == nil {
+					fn("~~~~~Notifiee Recv From Master: ", string(msg))
+					break
+				}
+				fn("!!!!!!Notify Recv From Master Error: ", err)
+				time.Sleep(500 * time.Microsecond)
+				continue
+			}
+
+			err = sock.Send([]byte("ok"))
+			for {
+				if err == nil {
+					break
+				}
+				fn("!!!!!!Notify Send To Master ERROR: ", err)
+				time.Sleep(500 * time.Millisecond)
+				continue
+			}
+		}
+	}
+}
+
+// Notifiee slave sync recv notice from master
+func Notifiee(ctx context.Context, url string, ch chan<- []byte, fn func(...interface{})) context.CancelFunc {
+	rctx, cancel := context.WithCancel(ctx)
+
+	sock := reply(url, 2, fn)
+
+	go notifiee(rctx, sock, ch, fn)
+	return cancel
+}
diff --git a/util/common.go b/util/common.go
index a4fdf8a..93ada39 100644
--- a/util/common.go
+++ b/util/common.go
@@ -65,7 +65,7 @@
 		}
 	}
 	if file == nil {
-		fmt.Println(`Read All Log Config Files Failed, Use Default, "./log/analysis-[type]"`)
+		fmt.Println(`Read All Log Config Files Failed, If -logit Use Default, "./log/analysis-[type]"`)
 		return
 	}
 	yamlString := string(file)

--
Gitblit v1.8.0