package agent
|
|
import (
|
"fmt"
|
"io"
|
"log"
|
"net"
|
"time"
|
|
"github.com/hashicorp/mdns"
|
)
|
|
const (
|
mdnsPollInterval = 60 * time.Second
|
mdnsQuietInterval = 100 * time.Millisecond
|
)
|
|
// AgentMDNS is used to advertise ourself using mDNS and to
|
// attempt to join peers periodically using mDNS queries.
|
type AgentMDNS struct {
|
agent *Agent
|
discover string
|
logger *log.Logger
|
seen map[string]struct{}
|
server *mdns.Server
|
replay bool
|
iface *net.Interface
|
}
|
|
// NewAgentMDNS is used to create a new AgentMDNS
|
func NewAgentMDNS(agent *Agent, logOutput io.Writer, replay bool,
|
node, discover string, iface *net.Interface, bind net.IP, port int) (*AgentMDNS, error) {
|
// Create the service
|
service, err := mdns.NewMDNSService(
|
node,
|
mdnsName(discover),
|
"",
|
"",
|
port,
|
[]net.IP{bind},
|
[]string{fmt.Sprintf("Serf '%s' cluster", discover)})
|
if err != nil {
|
return nil, err
|
}
|
|
// Configure mdns server
|
conf := &mdns.Config{
|
Zone: service,
|
Iface: iface,
|
}
|
|
// Create the server
|
server, err := mdns.NewServer(conf)
|
if err != nil {
|
return nil, err
|
}
|
|
// Initialize the AgentMDNS
|
m := &AgentMDNS{
|
agent: agent,
|
discover: discover,
|
logger: log.New(logOutput, "", log.LstdFlags),
|
seen: make(map[string]struct{}),
|
server: server,
|
replay: replay,
|
iface: iface,
|
}
|
|
// Start the background workers
|
go m.run()
|
return m, nil
|
}
|
|
// run is a long running goroutine that scans for new hosts periodically
|
func (m *AgentMDNS) run() {
|
hosts := make(chan *mdns.ServiceEntry, 32)
|
poll := time.After(0)
|
var quiet <-chan time.Time
|
var join []string
|
|
for {
|
select {
|
case h := <-hosts:
|
// Format the host address
|
addr := net.TCPAddr{IP: h.Addr, Port: h.Port}
|
addrS := addr.String()
|
|
// Skip if we've handled this host already
|
if _, ok := m.seen[addrS]; ok {
|
continue
|
}
|
|
// Queue for handling
|
join = append(join, addrS)
|
quiet = time.After(mdnsQuietInterval)
|
|
case <-quiet:
|
// Attempt the join
|
n, err := m.agent.Join(join, m.replay)
|
if err != nil {
|
m.logger.Printf("[ERR] agent.mdns: Failed to join: %v", err)
|
}
|
if n > 0 {
|
m.logger.Printf("[INFO] agent.mdns: Joined %d hosts", n)
|
}
|
|
// Mark all as seen
|
for _, n := range join {
|
m.seen[n] = struct{}{}
|
}
|
join = nil
|
|
case <-poll:
|
poll = time.After(mdnsPollInterval)
|
go m.poll(hosts)
|
}
|
}
|
}
|
|
// poll is invoked periodically to check for new hosts
|
func (m *AgentMDNS) poll(hosts chan *mdns.ServiceEntry) {
|
params := mdns.QueryParam{
|
Service: mdnsName(m.discover),
|
Interface: m.iface,
|
Entries: hosts,
|
}
|
if err := mdns.Query(¶ms); err != nil {
|
m.logger.Printf("[ERR] agent.mdns: Failed to poll for new hosts: %v", err)
|
}
|
}
|
|
// mdnsName returns the service name to register and to lookup
|
func mdnsName(discover string) string {
|
return fmt.Sprintf("_serf_%s._tcp", discover)
|
}
|