package coordinate import ( "fmt" "math" "sort" "sync" "time" "github.com/armon/go-metrics" ) // Client manages the estimated network coordinate for a given node, and adjusts // it as the node observes round trip times and estimated coordinates from other // nodes. The core algorithm is based on Vivaldi, see the documentation for Config // for more details. type Client struct { // coord is the current estimate of the client's network coordinate. coord *Coordinate // origin is a coordinate sitting at the origin. origin *Coordinate // config contains the tuning parameters that govern the performance of // the algorithm. config *Config // adjustmentIndex is the current index into the adjustmentSamples slice. adjustmentIndex uint // adjustment is used to store samples for the adjustment calculation. adjustmentSamples []float64 // latencyFilterSamples is used to store the last several RTT samples, // keyed by node name. We will use the config's LatencyFilterSamples // value to determine how many samples we keep, per node. latencyFilterSamples map[string][]float64 // stats is used to record events that occur when updating coordinates. stats ClientStats // mutex enables safe concurrent access to the client. mutex sync.RWMutex } // ClientStats is used to record events that occur when updating coordinates. type ClientStats struct { // Resets is incremented any time we reset our local coordinate because // our calculations have resulted in an invalid state. Resets int } // NewClient creates a new Client and verifies the configuration is valid. func NewClient(config *Config) (*Client, error) { if !(config.Dimensionality > 0) { return nil, fmt.Errorf("dimensionality must be >0") } return &Client{ coord: NewCoordinate(config), origin: NewCoordinate(config), config: config, adjustmentIndex: 0, adjustmentSamples: make([]float64, config.AdjustmentWindowSize), latencyFilterSamples: make(map[string][]float64), }, nil } // GetCoordinate returns a copy of the coordinate for this client. func (c *Client) GetCoordinate() *Coordinate { c.mutex.RLock() defer c.mutex.RUnlock() return c.coord.Clone() } // SetCoordinate forces the client's coordinate to a known state. func (c *Client) SetCoordinate(coord *Coordinate) error { c.mutex.Lock() defer c.mutex.Unlock() if err := c.checkCoordinate(coord); err != nil { return err } c.coord = coord.Clone() return nil } // ForgetNode removes any client state for the given node. func (c *Client) ForgetNode(node string) { c.mutex.Lock() defer c.mutex.Unlock() delete(c.latencyFilterSamples, node) } // Stats returns a copy of stats for the client. func (c *Client) Stats() ClientStats { c.mutex.Lock() defer c.mutex.Unlock() return c.stats } // checkCoordinate returns an error if the coordinate isn't compatible with // this client, or if the coordinate itself isn't valid. This assumes the mutex // has been locked already. func (c *Client) checkCoordinate(coord *Coordinate) error { if !c.coord.IsCompatibleWith(coord) { return fmt.Errorf("dimensions aren't compatible") } if !coord.IsValid() { return fmt.Errorf("coordinate is invalid") } return nil } // latencyFilter applies a simple moving median filter with a new sample for // a node. This assumes that the mutex has been locked already. func (c *Client) latencyFilter(node string, rttSeconds float64) float64 { samples, ok := c.latencyFilterSamples[node] if !ok { samples = make([]float64, 0, c.config.LatencyFilterSize) } // Add the new sample and trim the list, if needed. samples = append(samples, rttSeconds) if len(samples) > int(c.config.LatencyFilterSize) { samples = samples[1:] } c.latencyFilterSamples[node] = samples // Sort a copy of the samples and return the median. sorted := make([]float64, len(samples)) copy(sorted, samples) sort.Float64s(sorted) return sorted[len(sorted)/2] } // updateVivialdi updates the Vivaldi portion of the client's coordinate. This // assumes that the mutex has been locked already. func (c *Client) updateVivaldi(other *Coordinate, rttSeconds float64) { const zeroThreshold = 1.0e-6 dist := c.coord.DistanceTo(other).Seconds() if rttSeconds < zeroThreshold { rttSeconds = zeroThreshold } wrongness := math.Abs(dist-rttSeconds) / rttSeconds totalError := c.coord.Error + other.Error if totalError < zeroThreshold { totalError = zeroThreshold } weight := c.coord.Error / totalError c.coord.Error = c.config.VivaldiCE*weight*wrongness + c.coord.Error*(1.0-c.config.VivaldiCE*weight) if c.coord.Error > c.config.VivaldiErrorMax { c.coord.Error = c.config.VivaldiErrorMax } delta := c.config.VivaldiCC * weight force := delta * (rttSeconds - dist) c.coord = c.coord.ApplyForce(c.config, force, other) } // updateAdjustment updates the adjustment portion of the client's coordinate, if // the feature is enabled. This assumes that the mutex has been locked already. func (c *Client) updateAdjustment(other *Coordinate, rttSeconds float64) { if c.config.AdjustmentWindowSize == 0 { return } // Note that the existing adjustment factors don't figure in to this // calculation so we use the raw distance here. dist := c.coord.rawDistanceTo(other) c.adjustmentSamples[c.adjustmentIndex] = rttSeconds - dist c.adjustmentIndex = (c.adjustmentIndex + 1) % c.config.AdjustmentWindowSize sum := 0.0 for _, sample := range c.adjustmentSamples { sum += sample } c.coord.Adjustment = sum / (2.0 * float64(c.config.AdjustmentWindowSize)) } // updateGravity applies a small amount of gravity to pull coordinates towards // the center of the coordinate system to combat drift. This assumes that the // mutex is locked already. func (c *Client) updateGravity() { dist := c.origin.DistanceTo(c.coord).Seconds() force := -1.0 * math.Pow(dist/c.config.GravityRho, 2.0) c.coord = c.coord.ApplyForce(c.config, force, c.origin) } // Update takes other, a coordinate for another node, and rtt, a round trip // time observation for a ping to that node, and updates the estimated position of // the client's coordinate. Returns the updated coordinate. func (c *Client) Update(node string, other *Coordinate, rtt time.Duration) (*Coordinate, error) { c.mutex.Lock() defer c.mutex.Unlock() if err := c.checkCoordinate(other); err != nil { return nil, err } // The code down below can handle zero RTTs, which we have seen in // https://github.com/hashicorp/consul/issues/3789, presumably in // environments with coarse-grained monotonic clocks (we are still // trying to pin this down). In any event, this is ok from a code PoV // so we don't need to alert operators with spammy messages. We did // add a counter so this is still observable, though. const maxRTT = 10 * time.Second if rtt < 0 || rtt > maxRTT { return nil, fmt.Errorf("round trip time not in valid range, duration %v is not a positive value less than %v ", rtt, maxRTT) } if rtt == 0 { metrics.IncrCounter([]string{"serf", "coordinate", "zero-rtt"}, 1) } rttSeconds := c.latencyFilter(node, rtt.Seconds()) c.updateVivaldi(other, rttSeconds) c.updateAdjustment(other, rttSeconds) c.updateGravity() if !c.coord.IsValid() { c.stats.Resets++ c.coord = NewCoordinate(c.config) } return c.coord.Clone(), nil } // DistanceTo returns the estimated RTT from the client's coordinate to other, the // coordinate for another node. func (c *Client) DistanceTo(other *Coordinate) time.Duration { c.mutex.RLock() defer c.mutex.RUnlock() return c.coord.DistanceTo(other) }