Skip to content

Commit

Permalink
Make some utils public to ease transition (#1730)
Browse files Browse the repository at this point in the history
* Make some utils public to ease transition

Signed-off-by: Yuri Shkuro <ys@uber.com>

* Add tests

Signed-off-by: Yuri Shkuro <ys@uber.com>

* Make fields public too

Signed-off-by: Yuri Shkuro <ys@uber.com>

* make fmt

Signed-off-by: Yuri Shkuro <ys@uber.com>
  • Loading branch information
yurishkuro committed Aug 14, 2019
1 parent 2724a58 commit 75b670f
Show file tree
Hide file tree
Showing 8 changed files with 99 additions and 43 deletions.
20 changes: 11 additions & 9 deletions plugin/sampling/strategystore/adaptive/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,24 +14,26 @@

package adaptive

// samplingCacheEntry keeps track of the probability and whether a service-operation is observed
// SamplingCacheEntry keeps track of the probability and whether a service-operation is observed
// using adaptive sampling.
type samplingCacheEntry struct {
probability float64
usingAdaptive bool
type SamplingCacheEntry struct {
Probability float64
UsingAdaptive bool
}

// nested map: service -> operation -> cache entry.
type samplingCache map[string]map[string]*samplingCacheEntry
// SamplingCache is a nested map: service -> operation -> cache entry.
type SamplingCache map[string]map[string]*SamplingCacheEntry

func (s samplingCache) Set(service, operation string, entry *samplingCacheEntry) {
// Set adds a new entry for given service/operation.
func (s SamplingCache) Set(service, operation string, entry *SamplingCacheEntry) {
if _, ok := s[service]; !ok {
s[service] = make(map[string]*samplingCacheEntry)
s[service] = make(map[string]*SamplingCacheEntry)
}
s[service][operation] = entry
}

func (s samplingCache) Get(service, operation string) *samplingCacheEntry {
// Get retrieves the entry for given service/operation. Returns nil if not found.
func (s SamplingCache) Get(service, operation string) *SamplingCacheEntry {
v, ok := s[service]
if !ok {
return nil
Expand Down
4 changes: 2 additions & 2 deletions plugin/sampling/strategystore/adaptive/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@ import (

func TestSamplingCache(t *testing.T) {
var (
c = samplingCache{}
c = SamplingCache{}
service = "svc"
operation = "op"
)
c.Set(service, operation, &samplingCacheEntry{})
c.Set(service, operation, &SamplingCacheEntry{})
assert.NotNil(t, c.Get(service, operation))
assert.Nil(t, c.Get("blah", "blah"))
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@ import (
"strconv"
)

func truncateFloat(v float64) string {
// TruncateFloat truncates float to 6 decimal positions and converts to string.
func TruncateFloat(v float64) string {
return strconv.FormatFloat(v, 'f', 6, 64)
}

func floatEquals(a, b float64) bool {
// FloatEquals compares two floats with 10 decimal positions precision.
func FloatEquals(a, b float64) bool {
return math.Abs(a-b) < 1e-10
}
52 changes: 52 additions & 0 deletions plugin/sampling/strategystore/adaptive/floatutils_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// Copyright (c) 2019 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package adaptive

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestTruncateFloat(t *testing.T) {
tests := []struct {
prob float64
expected string
}{
{prob: 1, expected: "1.000000"},
{prob: 0.00001, expected: "0.000010"},
{prob: 0.00230234, expected: "0.002302"},
{prob: 0.1040445000, expected: "0.104044"},
{prob: 0.10404450002098709, expected: "0.104045"},
}
for _, test := range tests {
assert.Equal(t, test.expected, TruncateFloat(test.prob))
}
}

func TestFloatEquals(t *testing.T) {
tests := []struct {
f1 float64
f2 float64
equal bool
}{
{f1: 0.123456789123, f2: 0.123456789123, equal: true},
{f1: 0.123456789123, f2: 0.123456789111, equal: true},
{f1: 0.123456780000, f2: 0.123456781111, equal: false},
}
for _, test := range tests {
assert.Equal(t, test.equal, FloatEquals(test.f1, test.f2))
}
}
26 changes: 13 additions & 13 deletions plugin/sampling/strategystore/adaptive/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ type processor struct {
// TODO change this to work with protobuf model instead, to support gRPC endpoint.
strategyResponses map[string]*sampling.SamplingStrategyResponse

weightVectorCache *weightVectorCache
weightVectorCache *WeightVectorCache

probabilityCalculator calculationstrategy.ProbabilityCalculator

Expand All @@ -107,7 +107,7 @@ type processor struct {
// cache.
followerRefreshInterval time.Duration

serviceCache []samplingCache
serviceCache []SamplingCache

shutdown chan struct{}

Expand Down Expand Up @@ -141,10 +141,10 @@ func NewProcessor(
logger: logger,
electionParticipant: electionParticipant,
// TODO make weightsCache and probabilityCalculator configurable
weightVectorCache: newWeightVectorCache(),
weightVectorCache: NewWeightVectorCache(),
probabilityCalculator: calculationstrategy.NewPercentageIncreaseCappedCalculator(1.0),
followerRefreshInterval: defaultFollowerProbabilityInterval,
serviceCache: []samplingCache{},
serviceCache: []SamplingCache{},
operationsCalculatedGauge: metricsFactory.Gauge(metrics.Options{Name: "operations_calculated"}),
calculateProbabilitiesLatency: metricsFactory.Timer(metrics.TimerOptions{Name: "calculate_probabilities"}),
}, nil
Expand Down Expand Up @@ -366,7 +366,7 @@ func (p *processor) calculateWeightedQPS(allQPS []float64) float64 {
if len(allQPS) == 0 {
return 0
}
weights := p.weightVectorCache.getWeights(len(allQPS))
weights := p.weightVectorCache.GetWeights(len(allQPS))
var qps float64
for i := 0; i < len(allQPS); i++ {
qps += allQPS[i] * weights[i]
Expand All @@ -375,7 +375,7 @@ func (p *processor) calculateWeightedQPS(allQPS []float64) float64 {
}

func (p *processor) prependServiceCache() {
p.serviceCache = append([]samplingCache{make(samplingCache)}, p.serviceCache...)
p.serviceCache = append([]SamplingCache{make(SamplingCache)}, p.serviceCache...)
if len(p.serviceCache) > serviceCacheSize {
p.serviceCache = p.serviceCache[0:serviceCacheSize]
}
Expand Down Expand Up @@ -418,9 +418,9 @@ func (p *processor) calculateProbability(service, operation string, qps float64)
p.RUnlock()

usingAdaptiveSampling := p.isUsingAdaptiveSampling(oldProbability, service, operation, latestThroughput)
p.serviceCache[0].Set(service, operation, &samplingCacheEntry{
probability: oldProbability,
usingAdaptive: usingAdaptiveSampling,
p.serviceCache[0].Set(service, operation, &SamplingCacheEntry{
Probability: oldProbability,
UsingAdaptive: usingAdaptiveSampling,
})

// Short circuit if the qps is close enough to targetQPS or if the service doesn't appear to be using
Expand All @@ -429,7 +429,7 @@ func (p *processor) calculateProbability(service, operation string, qps float64)
return oldProbability
}
var newProbability float64
if floatEquals(qps, 0) {
if FloatEquals(qps, 0) {
// Edge case; we double the sampling probability if the QPS is 0 so that we force the service
// to at least sample one span probabilistically.
newProbability = oldProbability * 2.0
Expand Down Expand Up @@ -458,14 +458,14 @@ func (p *processor) isUsingAdaptiveSampling(
operation string,
throughput serviceOperationThroughput,
) bool {
if floatEquals(probability, p.InitialSamplingProbability) {
if FloatEquals(probability, p.InitialSamplingProbability) {
// If the service is seen for the first time, assume it's using adaptive sampling (ie prob == initialProb).
// Even if this isn't the case, the next time around this loop, the newly calculated probability will not equal
// the initialProb so the logic will fall through.
return true
}
if opThroughput, ok := throughput.get(service, operation); ok {
f := truncateFloat(probability)
f := TruncateFloat(probability)
_, ok := opThroughput.Probabilities[f]
return ok
}
Expand All @@ -474,7 +474,7 @@ func (p *processor) isUsingAdaptiveSampling(
// before.
if len(p.serviceCache) > 1 {
if e := p.serviceCache[1].Get(service, operation); e != nil {
return e.usingAdaptive && !floatEquals(e.probability, p.InitialSamplingProbability)
return e.UsingAdaptive && !FloatEquals(e.Probability, p.InitialSamplingProbability)
}
}
return false
Expand Down
10 changes: 5 additions & 5 deletions plugin/sampling/strategystore/adaptive/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ func TestGenerateOperationQPS_UseMostRecentBucketOnly(t *testing.T) {
}

func TestCalculateWeightedQPS(t *testing.T) {
p := processor{weightVectorCache: newWeightVectorCache()}
p := processor{weightVectorCache: NewWeightVectorCache()}
assert.InDelta(t, 0.86735, p.calculateWeightedQPS([]float64{0.8, 1.2, 1.0}), 0.001)
assert.InDelta(t, 0.95197, p.calculateWeightedQPS([]float64{1.0, 1.0, 0.0, 0.0}), 0.001)
assert.Equal(t, 0.0, p.calculateWeightedQPS([]float64{}))
Expand Down Expand Up @@ -260,7 +260,7 @@ func TestCalculateProbability(t *testing.T) {
probabilities: probabilities,
probabilityCalculator: testCalculator,
throughputs: throughputs,
serviceCache: []samplingCache{{"svcA": {}, "svcB": {}}},
serviceCache: []SamplingCache{{"svcA": {}, "svcB": {}}},
}
tests := []struct {
service string
Expand Down Expand Up @@ -303,7 +303,7 @@ func TestCalculateProbabilitiesAndQPS(t *testing.T) {
BucketsForCalculation: 10,
},
throughputs: testThroughputBuckets, probabilities: prevProbabilities, qps: qps,
weightVectorCache: newWeightVectorCache(), probabilityCalculator: testCalculator,
weightVectorCache: NewWeightVectorCache(), probabilityCalculator: testCalculator,
operationsCalculatedGauge: mets.Gauge(metrics.Options{Name: "test"}),
}
probabilities, qps := p.calculateProbabilitiesAndQPS()
Expand Down Expand Up @@ -630,9 +630,9 @@ func TestCalculateProbabilitiesAndQPSMultiple(t *testing.T) {
AggregationBuckets: 10,
},
throughputs: buckets, probabilities: make(model.ServiceOperationProbabilities),
qps: make(model.ServiceOperationQPS), weightVectorCache: newWeightVectorCache(),
qps: make(model.ServiceOperationQPS), weightVectorCache: NewWeightVectorCache(),
probabilityCalculator: calculationstrategy.NewPercentageIncreaseCappedCalculator(1.0),
serviceCache: []samplingCache{},
serviceCache: []SamplingCache{},
operationsCalculatedGauge: metrics.NullFactory.Gauge(metrics.Options{}),
}

Expand Down
16 changes: 8 additions & 8 deletions plugin/sampling/strategystore/adaptive/weightvectorcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,24 +19,24 @@ import (
"sync"
)

// weightVectorCache stores normalizing weights of different lengths. The head of the weights slice
// contains the largest weight.
type weightVectorCache struct {
// WeightVectorCache stores normalizing weight vectors of different lengths.
// The head of each weight vector contains the largest weight.
type WeightVectorCache struct {
sync.Mutex

cache map[int][]float64
}

// newweightVectorCache returns a new weights vector cache.
func newWeightVectorCache() *weightVectorCache {
// NewWeightVectorCache returns a new weights vector cache.
func NewWeightVectorCache() *WeightVectorCache {
// TODO allow users to plugin different weighting algorithms
return &weightVectorCache{
return &WeightVectorCache{
cache: make(map[int][]float64),
}
}

// getWeights returns weights for the specified length { w(i) = i ^ 4, i=1..L }, normalized.
func (c *weightVectorCache) getWeights(length int) []float64 {
// GetWeights returns weights for the specified length { w(i) = i ^ 4, i=1..L }, normalized.
func (c *WeightVectorCache) GetWeights(length int) []float64 {
c.Lock()
defer c.Unlock()
if weights, ok := c.cache[length]; ok {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,16 @@ import (
)

func TestGetWeights(t *testing.T) {
c := newWeightVectorCache()
c := NewWeightVectorCache()

weights := c.getWeights(1)
weights := c.GetWeights(1)
assert.Len(t, weights, 1)

weights = c.getWeights(3)
weights = c.GetWeights(3)
assert.Len(t, weights, 3)
assert.InDelta(t, 0.8265306122448979, weights[0], 0.001)

weights = c.getWeights(5)
weights = c.GetWeights(5)
assert.Len(t, weights, 5)
assert.InDelta(t, 0.6384, weights[0], 0.001)
assert.InDelta(t, 0.0010, weights[4], 0.001)
Expand Down

0 comments on commit 75b670f

Please sign in to comment.