qixiaoning
2025-07-08 84d2ef9760af0a4a4aa933937294400b3caa291d
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
package service
 
import (
    "context"
    "fmt"
    "os"
    "strings"
    "time"
 
    "nanomsg.org/go-mangos"
    "nanomsg.org/go-mangos/protocol/pull"
    "nanomsg.org/go-mangos/transport/ipc"
    "nanomsg.org/go-mangos/transport/tcp"
)
 
// 山东断流监控
func rmExistedIpcName(url string) {
    s := strings.Split(url, "://")
 
    if s[0] == "ipc" {
        if _, err := os.Stat(s[1]); err == nil {
            os.Remove(s[1])
        } else if !os.IsNotExist(err) {
            os.Remove(s[1])
        }
    }
}
 
func newPull(ctx context.Context, url string, timeout int) mangos.Socket {
    rmExistedIpcName(url)
 
    sock, err := pull.NewSocket()
loop1:
    for {
        select {
        case <-ctx.Done():
            return nil
        default:
            if err == nil {
                break loop1
            }
            time.Sleep(time.Second)
            rmExistedIpcName(url)
            sock, err = pull.NewSocket()
            fmt.Println("!!!!!!Pull can't get new socket:", err)
        }
    }
 
    sock.AddTransport(ipc.NewTransport())
    sock.AddTransport(tcp.NewTransport())
    sock.SetOption(mangos.OptionRecvDeadline, time.Duration(timeout)*time.Second)
    sock.SetOption(mangos.OptionSendDeadline, time.Duration(timeout)*time.Second)
 
    err = sock.Listen(url)
loop2:
    for {
        select {
        case <-ctx.Done():
            return nil
        default:
            if err == nil {
                break loop2
            }
            time.Sleep(time.Second)
            rmExistedIpcName(url)
            err = sock.Listen(url)
 
            fmt.Println("!!!!!!Pull can't listen socket:", err, "URL:", url)
        }
    }
 
    return sock
}