package service import ( "context" "fmt" "os" "strings" "time" "nanomsg.org/go-mangos" "nanomsg.org/go-mangos/protocol/pull" "nanomsg.org/go-mangos/transport/ipc" "nanomsg.org/go-mangos/transport/tcp" ) // 山东断流监控 func rmExistedIpcName(url string) { s := strings.Split(url, "://") if s[0] == "ipc" { if _, err := os.Stat(s[1]); err == nil { os.Remove(s[1]) } else if !os.IsNotExist(err) { os.Remove(s[1]) } } } func newPull(ctx context.Context, url string, timeout int) mangos.Socket { rmExistedIpcName(url) sock, err := pull.NewSocket() loop1: for { select { case <-ctx.Done(): return nil default: if err == nil { break loop1 } time.Sleep(time.Second) rmExistedIpcName(url) sock, err = pull.NewSocket() fmt.Println("!!!!!!Pull can't get new socket:", err) } } sock.AddTransport(ipc.NewTransport()) sock.AddTransport(tcp.NewTransport()) sock.SetOption(mangos.OptionRecvDeadline, time.Duration(timeout)*time.Second) sock.SetOption(mangos.OptionSendDeadline, time.Duration(timeout)*time.Second) err = sock.Listen(url) loop2: for { select { case <-ctx.Done(): return nil default: if err == nil { break loop2 } time.Sleep(time.Second) rmExistedIpcName(url) err = sock.Listen(url) fmt.Println("!!!!!!Pull can't listen socket:", err, "URL:", url) } } return sock }