longganhua
2019-07-18 2fcec5d0debb4819c651e8f3b1287f18de9efee9
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
package agent
 
import (
    "fmt"
    "io"
    "log"
    "net"
    "time"
 
    "github.com/hashicorp/mdns"
)
 
const (
    mdnsPollInterval  = 60 * time.Second
    mdnsQuietInterval = 100 * time.Millisecond
)
 
// AgentMDNS is used to advertise ourself using mDNS and to
// attempt to join peers periodically using mDNS queries.
type AgentMDNS struct {
    agent    *Agent
    discover string
    logger   *log.Logger
    seen     map[string]struct{}
    server   *mdns.Server
    replay   bool
    iface    *net.Interface
}
 
// NewAgentMDNS is used to create a new AgentMDNS
func NewAgentMDNS(agent *Agent, logOutput io.Writer, replay bool,
    node, discover string, iface *net.Interface, bind net.IP, port int) (*AgentMDNS, error) {
    // Create the service
    service, err := mdns.NewMDNSService(
        node,
        mdnsName(discover),
        "",
        "",
        port,
        []net.IP{bind},
        []string{fmt.Sprintf("Serf '%s' cluster", discover)})
    if err != nil {
        return nil, err
    }
 
    // Configure mdns server
    conf := &mdns.Config{
        Zone:  service,
        Iface: iface,
    }
 
    // Create the server
    server, err := mdns.NewServer(conf)
    if err != nil {
        return nil, err
    }
 
    // Initialize the AgentMDNS
    m := &AgentMDNS{
        agent:    agent,
        discover: discover,
        logger:   log.New(logOutput, "", log.LstdFlags),
        seen:     make(map[string]struct{}),
        server:   server,
        replay:   replay,
        iface:    iface,
    }
 
    // Start the background workers
    go m.run()
    return m, nil
}
 
// run is a long running goroutine that scans for new hosts periodically
func (m *AgentMDNS) run() {
    hosts := make(chan *mdns.ServiceEntry, 32)
    poll := time.After(0)
    var quiet <-chan time.Time
    var join []string
 
    for {
        select {
        case h := <-hosts:
            // Format the host address
            addr := net.TCPAddr{IP: h.Addr, Port: h.Port}
            addrS := addr.String()
 
            // Skip if we've handled this host already
            if _, ok := m.seen[addrS]; ok {
                continue
            }
 
            // Queue for handling
            join = append(join, addrS)
            quiet = time.After(mdnsQuietInterval)
 
        case <-quiet:
            // Attempt the join
            n, err := m.agent.Join(join, m.replay)
            if err != nil {
                m.logger.Printf("[ERR] agent.mdns: Failed to join: %v", err)
            }
            if n > 0 {
                m.logger.Printf("[INFO] agent.mdns: Joined %d hosts", n)
            }
 
            // Mark all as seen
            for _, n := range join {
                m.seen[n] = struct{}{}
            }
            join = nil
 
        case <-poll:
            poll = time.After(mdnsPollInterval)
            go m.poll(hosts)
        }
    }
}
 
// poll is invoked periodically to check for new hosts
func (m *AgentMDNS) poll(hosts chan *mdns.ServiceEntry) {
    params := mdns.QueryParam{
        Service:   mdnsName(m.discover),
        Interface: m.iface,
        Entries:   hosts,
    }
    if err := mdns.Query(&params); err != nil {
        m.logger.Printf("[ERR] agent.mdns: Failed to poll for new hosts: %v", err)
    }
}
 
// mdnsName returns the service name to register and to lookup
func mdnsName(discover string) string {
    return fmt.Sprintf("_serf_%s._tcp", discover)
}