package main
|
|
import (
|
"context"
|
"os"
|
"strings"
|
"time"
|
|
"nanomsg.org/go-mangos"
|
"nanomsg.org/go-mangos/protocol/rep"
|
"nanomsg.org/go-mangos/protocol/req"
|
"nanomsg.org/go-mangos/transport/ipc"
|
"nanomsg.org/go-mangos/transport/tcp"
|
)
|
|
func request(url string, timeout int, fn func(...interface{})) mangos.Socket {
|
|
var sock mangos.Socket
|
var err error
|
|
for {
|
if sock, err = req.NewSocket(); err != nil {
|
fn("!!!!!!Notify can't get new request socket: ", err)
|
time.Sleep(time.Second)
|
} else {
|
break
|
}
|
}
|
|
sock.AddTransport(ipc.NewTransport())
|
sock.AddTransport(tcp.NewTransport())
|
sock.SetOption(mangos.OptionRecvDeadline, time.Duration(timeout)*time.Second)
|
sock.SetOption(mangos.OptionSendDeadline, time.Duration(timeout)*time.Second)
|
|
for {
|
if err = sock.Dial(url); err != nil {
|
fn("!!!!!!Notify can't dial request socket: ", err, "URL:", url)
|
time.Sleep(time.Second)
|
} else {
|
break
|
}
|
}
|
|
return sock
|
}
|
|
func notify(ctx context.Context, sock mangos.Socket, ch <-chan []byte, fn func(...interface{})) {
|
for {
|
select {
|
case <-ctx.Done():
|
sock.Close()
|
return
|
case data := <-ch:
|
var ret []byte
|
var err error
|
|
err = sock.Send(data)
|
for {
|
if err == nil {
|
break
|
}
|
fn("!!!!!!Notify Send To Slave ERROR: ", err)
|
time.Sleep(500 * time.Millisecond)
|
continue
|
}
|
|
ret, err = sock.Recv()
|
for {
|
if err == nil {
|
fn("~~~~~Notify Recv From Slave: ", string(ret))
|
break
|
}
|
fn("!!!!!!Notify Recv From Slave Error: ", err)
|
time.Sleep(500 * time.Microsecond)
|
continue
|
}
|
|
default:
|
time.Sleep(time.Second)
|
}
|
}
|
}
|
|
// Notify master sync notify to slave
|
func Notify(ctx context.Context, url string, ch <-chan []byte, fn func(...interface{})) context.CancelFunc {
|
rctx, cancel := context.WithCancel(ctx)
|
|
sock := request(url, 2, fn)
|
|
go notify(rctx, sock, ch, fn)
|
return cancel
|
}
|
|
//////////////////////////////////////////////////////////////////
|
|
func rmExistedIpcName(url string) {
|
s := strings.Split(url, "://")
|
|
if s[0] == "ipc" {
|
if _, err := os.Stat(s[1]); err == nil {
|
os.Remove(s[1])
|
} else if !os.IsNotExist(err) {
|
os.Remove(s[1])
|
}
|
}
|
}
|
|
func reply(url string, timeout int, fn func(...interface{})) mangos.Socket {
|
rmExistedIpcName(url)
|
|
var sock mangos.Socket
|
var err error
|
|
for {
|
if sock, err = rep.NewSocket(); err != nil {
|
rmExistedIpcName(url)
|
fn("!!!!!!Notify can't get new reply socket: ", err)
|
time.Sleep(time.Second)
|
} else {
|
break
|
}
|
}
|
|
sock.AddTransport(ipc.NewTransport())
|
sock.AddTransport(tcp.NewTransport())
|
sock.SetOption(mangos.OptionRecvDeadline, time.Duration(timeout)*time.Second)
|
sock.SetOption(mangos.OptionSendDeadline, time.Duration(timeout)*time.Second)
|
|
for {
|
if err = sock.Listen(url); err != nil {
|
rmExistedIpcName(url)
|
|
fn("!!!!!!Notify can't listen reply socket: ", err, "URL:", url)
|
time.Sleep(time.Second)
|
} else {
|
break
|
}
|
}
|
|
return sock
|
}
|
|
func notifiee(ctx context.Context, sock mangos.Socket, ch chan<- []byte, fn func(...interface{})) {
|
for {
|
select {
|
case <-ctx.Done():
|
sock.Close()
|
return
|
default:
|
|
msg, err := sock.Recv()
|
for {
|
if err == nil {
|
fn("~~~~~Notifiee Recv From Master: ", string(msg))
|
break
|
}
|
fn("!!!!!!Notify Recv From Master Error: ", err)
|
time.Sleep(500 * time.Microsecond)
|
continue
|
}
|
|
err = sock.Send([]byte("ok"))
|
for {
|
if err == nil {
|
break
|
}
|
fn("!!!!!!Notify Send To Master ERROR: ", err)
|
time.Sleep(500 * time.Millisecond)
|
continue
|
}
|
}
|
}
|
}
|
|
// Notifiee slave sync recv notice from master
|
func Notifiee(ctx context.Context, url string, ch chan<- []byte, fn func(...interface{})) context.CancelFunc {
|
rctx, cancel := context.WithCancel(ctx)
|
|
sock := reply(url, 2, fn)
|
|
go notifiee(rctx, sock, ch, fn)
|
return cancel
|
}
|