package main import ( "context" "os" "strings" "time" "nanomsg.org/go-mangos" "nanomsg.org/go-mangos/protocol/rep" "nanomsg.org/go-mangos/protocol/req" "nanomsg.org/go-mangos/transport/ipc" "nanomsg.org/go-mangos/transport/tcp" ) func request(url string, timeout int, fn func(...interface{})) mangos.Socket { var sock mangos.Socket var err error for { if sock, err = req.NewSocket(); err != nil { fn("!!!!!!Notify can't get new request socket: ", err) time.Sleep(time.Second) } else { break } } sock.AddTransport(ipc.NewTransport()) sock.AddTransport(tcp.NewTransport()) sock.SetOption(mangos.OptionRecvDeadline, time.Duration(timeout)*time.Second) sock.SetOption(mangos.OptionSendDeadline, time.Duration(timeout)*time.Second) for { if err = sock.Dial(url); err != nil { fn("!!!!!!Notify can't dial request socket: ", err, "URL:", url) time.Sleep(time.Second) } else { break } } return sock } func notify(ctx context.Context, sock mangos.Socket, ch <-chan []byte, fn func(...interface{})) { for { select { case <-ctx.Done(): sock.Close() return case data := <-ch: var ret []byte var err error err = sock.Send(data) for { if err == nil { break } fn("!!!!!!Notify Send To Slave ERROR: ", err) time.Sleep(500 * time.Millisecond) continue } ret, err = sock.Recv() for { if err == nil { fn("~~~~~Notify Recv From Slave: ", string(ret)) break } fn("!!!!!!Notify Recv From Slave Error: ", err) time.Sleep(500 * time.Microsecond) continue } default: time.Sleep(time.Second) } } } func getIPCURL(id string) string { return `ipc:///tmp/` + id + `.ipc` } // Notify master sync notify to slave func Notify(ctx context.Context, url string, ch <-chan []byte, fn func(...interface{})) context.CancelFunc { rctx, cancel := context.WithCancel(ctx) ipcURL := getIPCURL(url) sock := request(ipcURL, 2, fn) go notify(rctx, sock, ch, fn) return cancel } ////////////////////////////////////////////////////////////////// func rmExistedIpcName(url string) { s := strings.Split(url, "://") if s[0] == "ipc" { if _, err := os.Stat(s[1]); err == nil { os.Remove(s[1]) } else if !os.IsNotExist(err) { os.Remove(s[1]) } } } func reply(url string, timeout int, fn func(...interface{})) mangos.Socket { rmExistedIpcName(url) var sock mangos.Socket var err error for { if sock, err = rep.NewSocket(); err != nil { rmExistedIpcName(url) fn("!!!!!!Notify can't get new reply socket: ", err) time.Sleep(time.Second) } else { break } } sock.AddTransport(ipc.NewTransport()) sock.AddTransport(tcp.NewTransport()) sock.SetOption(mangos.OptionRecvDeadline, time.Duration(timeout)*time.Second) sock.SetOption(mangos.OptionSendDeadline, time.Duration(timeout)*time.Second) for { if err = sock.Listen(url); err != nil { rmExistedIpcName(url) fn("!!!!!!Notify can't listen reply socket: ", err, "URL:", url) time.Sleep(time.Second) } else { break } } return sock } func notifiee(ctx context.Context, sock mangos.Socket, ch chan<- []byte, fn func(...interface{})) { for { select { case <-ctx.Done(): sock.Close() return default: msg, err := sock.Recv() for { if err == nil { fn("~~~~~Notifiee Recv From Master: ", string(msg)) break } fn("!!!!!!Notify Recv From Master Error: ", err) time.Sleep(500 * time.Microsecond) continue } err = sock.Send([]byte("ok")) for { if err == nil { break } fn("!!!!!!Notify Send To Master ERROR: ", err) time.Sleep(500 * time.Millisecond) continue } } } } // Notifiee slave sync recv notice from master func Notifiee(ctx context.Context, url string, ch chan<- []byte, fn func(...interface{})) context.CancelFunc { rctx, cancel := context.WithCancel(ctx) ipcURL := getIPCURL(url) sock := request(ipcURL, 2, fn) go notifiee(rctx, sock, ch, fn) return cancel }