package serf import ( "bytes" "time" "github.com/armon/go-metrics" "github.com/hashicorp/go-msgpack/codec" "github.com/hashicorp/memberlist" "basic.com/valib/serf.git/coordinate" ) // pingDelegate is notified when memberlist successfully completes a direct ping // of a peer node. We use this to update our estimated network coordinate, as // well as cache the coordinate of the peer. type pingDelegate struct { serf *Serf } const ( // PingVersion is an internal version for the ping message, above the normal // versioning we get from the protocol version. This enables small updates // to the ping message without a full protocol bump. PingVersion = 1 ) // AckPayload is called to produce a payload to send back in response to a ping // request. func (p *pingDelegate) AckPayload() []byte { var buf bytes.Buffer // The first byte is the version number, forming a simple header. version := []byte{PingVersion} buf.Write(version) // The rest of the message is the serialized coordinate. enc := codec.NewEncoder(&buf, &codec.MsgpackHandle{}) if err := enc.Encode(p.serf.coordClient.GetCoordinate()); err != nil { p.serf.logger.Printf("[ERR] serf: Failed to encode coordinate: %v\n", err) } return buf.Bytes() } // NotifyPingComplete is called when this node successfully completes a direct ping // of a peer node. func (p *pingDelegate) NotifyPingComplete(other *memberlist.Node, rtt time.Duration, payload []byte) { if payload == nil || len(payload) == 0 { return } // Verify ping version in the header. version := payload[0] if version != PingVersion { p.serf.logger.Printf("[ERR] serf: Unsupported ping version: %v", version) return } // Process the remainder of the message as a coordinate. r := bytes.NewReader(payload[1:]) dec := codec.NewDecoder(r, &codec.MsgpackHandle{}) var coord coordinate.Coordinate if err := dec.Decode(&coord); err != nil { p.serf.logger.Printf("[ERR] serf: Failed to decode coordinate from ping: %v", err) return } // Apply the update. before := p.serf.coordClient.GetCoordinate() after, err := p.serf.coordClient.Update(other.Name, &coord, rtt) if err != nil { metrics.IncrCounter([]string{"serf", "coordinate", "rejected"}, 1) p.serf.logger.Printf("[TRACE] serf: Rejected coordinate from %s: %v\n", other.Name, err) return } // Publish some metrics to give us an idea of how much we are // adjusting each time we update. d := float32(before.DistanceTo(after).Seconds() * 1.0e3) metrics.AddSample([]string{"serf", "coordinate", "adjustment-ms"}, d) // Cache the coordinate for the other node, and add our own // to the cache as well since it just got updated. This lets // users call GetCachedCoordinate with our node name, which is // more friendly. p.serf.coordCacheLock.Lock() p.serf.coordCache[other.Name] = &coord p.serf.coordCache[p.serf.config.NodeName] = p.serf.coordClient.GetCoordinate() p.serf.coordCacheLock.Unlock() }