package service
|
|
import (
|
"context"
|
"fmt"
|
"os"
|
"strings"
|
"time"
|
|
"nanomsg.org/go-mangos"
|
"nanomsg.org/go-mangos/protocol/pull"
|
"nanomsg.org/go-mangos/transport/ipc"
|
"nanomsg.org/go-mangos/transport/tcp"
|
)
|
|
// 山东断流监控
|
func rmExistedIpcName(url string) {
|
s := strings.Split(url, "://")
|
|
if s[0] == "ipc" {
|
if _, err := os.Stat(s[1]); err == nil {
|
os.Remove(s[1])
|
} else if !os.IsNotExist(err) {
|
os.Remove(s[1])
|
}
|
}
|
}
|
|
func newPull(ctx context.Context, url string, timeout int) mangos.Socket {
|
rmExistedIpcName(url)
|
|
sock, err := pull.NewSocket()
|
loop1:
|
for {
|
select {
|
case <-ctx.Done():
|
return nil
|
default:
|
if err == nil {
|
break loop1
|
}
|
time.Sleep(time.Second)
|
rmExistedIpcName(url)
|
sock, err = pull.NewSocket()
|
fmt.Println("!!!!!!Pull can't get new socket:", err)
|
}
|
}
|
|
sock.AddTransport(ipc.NewTransport())
|
sock.AddTransport(tcp.NewTransport())
|
sock.SetOption(mangos.OptionRecvDeadline, time.Duration(timeout)*time.Second)
|
sock.SetOption(mangos.OptionSendDeadline, time.Duration(timeout)*time.Second)
|
|
err = sock.Listen(url)
|
loop2:
|
for {
|
select {
|
case <-ctx.Done():
|
return nil
|
default:
|
if err == nil {
|
break loop2
|
}
|
time.Sleep(time.Second)
|
rmExistedIpcName(url)
|
err = sock.Listen(url)
|
|
fmt.Println("!!!!!!Pull can't listen socket:", err, "URL:", url)
|
}
|
}
|
|
return sock
|
}
|