diff --git a/CHANGELOG.md b/CHANGELOG.md index e02f3b0ecca..2abfd8f564e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -32,6 +32,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - `NewExporter` from `exporters/otlp` now takes a `ProtocolDriver` as a parameter. (#1369) - Many OTLP Exporter options became gRPC ProtocolDriver options. (#1369) - Unify endpoint API that related to OTel exporter. (#1401) +- Optimize metric histogram aggregator to re-use its slice of buckets. (#1435) - Metric aggregator Count() and histogram Bucket.Counts are consistently `uint64`. (1430) - `SamplingResult` now passed a `Tracestate` from the parent `SpanContext` (#1432) - Moved gRPC driver for OTLP exporter to `exporters/otlp/otlpgrpc`. (#1420) diff --git a/sdk/metric/aggregator/histogram/histogram.go b/sdk/metric/aggregator/histogram/histogram.go index c2a4c40032f..b83af270941 100644 --- a/sdk/metric/aggregator/histogram/histogram.go +++ b/sdk/metric/aggregator/histogram/histogram.go @@ -38,7 +38,7 @@ type ( lock sync.Mutex boundaries []float64 kind number.Kind - state state + state *state } // state represents the state of a histogram, consisting of @@ -78,8 +78,8 @@ func New(cnt int, desc *metric.Descriptor, boundaries []float64) []Aggregator { aggs[i] = Aggregator{ kind: desc.NumberKind(), boundaries: sortedBoundaries, - state: emptyState(sortedBoundaries), } + aggs[i].state = aggs[i].newState() } return aggs } @@ -123,20 +123,42 @@ func (c *Aggregator) SynchronizedMove(oa export.Aggregator, desc *metric.Descrip return aggregator.NewInconsistentAggregatorError(c, oa) } + if o != nil { + // Swap case: This is the ordinary case for a + // synchronous instrument, where the SDK allocates two + // Aggregators and lock contention is anticipated. + // Reset the target state before swapping it under the + // lock below. + o.clearState() + } + c.lock.Lock() if o != nil { - o.state = c.state + c.state, o.state = o.state, c.state + } else { + // No swap case: This is the ordinary case for an + // asynchronous instrument, where the SDK allocates a + // single Aggregator and there is no anticipated lock + // contention. + c.clearState() } - c.state = emptyState(c.boundaries) c.lock.Unlock() return nil } -func emptyState(boundaries []float64) state { - return state{ - bucketCounts: make([]uint64, len(boundaries)+1), +func (c *Aggregator) newState() *state { + return &state{ + bucketCounts: make([]uint64, len(c.boundaries)+1), + } +} + +func (c *Aggregator) clearState() { + for i := range c.state.bucketCounts { + c.state.bucketCounts[i] = 0 } + c.state.sum = 0 + c.state.count = 0 } // Update adds the recorded measurement to the current data set. diff --git a/sdk/metric/aggregator/histogram/histogram_test.go b/sdk/metric/aggregator/histogram/histogram_test.go index ed0bf3fdd5f..4e381659d44 100644 --- a/sdk/metric/aggregator/histogram/histogram_test.go +++ b/sdk/metric/aggregator/histogram/histogram_test.go @@ -115,42 +115,23 @@ func testHistogram(t *testing.T, profile aggregatortest.Profile, policy policy) agg, ckpt := new2(descriptor) - all := aggregatortest.NewNumbers(profile.NumberKind) - - for i := 0; i < count; i++ { - x := profile.Random(policy.sign()) - all.Append(x) - aggregatortest.CheckedUpdate(t, agg, x, descriptor) - } - - require.NoError(t, agg.SynchronizedMove(ckpt, descriptor)) - - checkZero(t, agg, descriptor) - - all.Sort() - - asum, err := ckpt.Sum() - sum := all.Sum() - require.InEpsilon(t, - sum.CoerceToFloat64(profile.NumberKind), - asum.CoerceToFloat64(profile.NumberKind), - 0.000000001, - "Same sum - "+policy.name) - require.NoError(t, err) + // This needs to repeat at least 3 times to uncover a failure to reset + // for the overall sum and count fields, since the third time through + // is the first time a `histogram.state` object is reused. + for repeat := 0; repeat < 3; repeat++ { + all := aggregatortest.NewNumbers(profile.NumberKind) - count, err := ckpt.Count() - require.Equal(t, all.Count(), count, "Same count -"+policy.name) - require.NoError(t, err) + for i := 0; i < count; i++ { + x := profile.Random(policy.sign()) + all.Append(x) + aggregatortest.CheckedUpdate(t, agg, x, descriptor) + } - buckets, err := ckpt.Histogram() - require.NoError(t, err) + require.NoError(t, agg.SynchronizedMove(ckpt, descriptor)) - require.Equal(t, len(buckets.Counts), len(boundaries)+1, "There should be b + 1 counts, where b is the number of boundaries") + checkZero(t, agg, descriptor) - counts := calcBuckets(all.Points(), profile) - for i, v := range counts { - bCount := uint64(buckets.Counts[i]) - require.Equal(t, v, bCount, "Wrong bucket #%d count: %v != %v", i, counts, buckets.Counts) + checkHistogram(t, all, profile, ckpt) } } @@ -191,31 +172,7 @@ func TestHistogramMerge(t *testing.T) { aggregatortest.CheckedMerge(t, ckpt1, ckpt2, descriptor) - all.Sort() - - asum, err := ckpt1.Sum() - sum := all.Sum() - require.InEpsilon(t, - sum.CoerceToFloat64(profile.NumberKind), - asum.CoerceToFloat64(profile.NumberKind), - 0.000000001, - "Same sum - absolute") - require.NoError(t, err) - - count, err := ckpt1.Count() - require.Equal(t, all.Count(), count, "Same count - absolute") - require.NoError(t, err) - - buckets, err := ckpt1.Histogram() - require.NoError(t, err) - - require.Equal(t, len(buckets.Counts), len(boundaries)+1, "There should be b + 1 counts, where b is the number of boundaries") - - counts := calcBuckets(all.Points(), profile) - for i, v := range counts { - bCount := uint64(buckets.Counts[i]) - require.Equal(t, v, bCount, "Wrong bucket #%d count: %v != %v", i, counts, buckets.Counts) - } + checkHistogram(t, all, profile, ckpt1) }) } @@ -233,22 +190,49 @@ func TestHistogramNotSet(t *testing.T) { }) } -func calcBuckets(points []number.Number, profile aggregatortest.Profile) []uint64 { - sortedBoundaries := make([]float64, len(boundaries)) +// checkHistogram ensures the correct aggregated state between `all` +// (test aggregator) and `agg` (code under test). +func checkHistogram(t *testing.T, all aggregatortest.Numbers, profile aggregatortest.Profile, agg *histogram.Aggregator) { + + all.Sort() + + asum, err := agg.Sum() + require.NoError(t, err) + + sum := all.Sum() + require.InEpsilon(t, + sum.CoerceToFloat64(profile.NumberKind), + asum.CoerceToFloat64(profile.NumberKind), + 0.000000001) + count, err := agg.Count() + require.NoError(t, err) + require.Equal(t, all.Count(), count) + + buckets, err := agg.Histogram() + require.NoError(t, err) + + require.Equal(t, len(buckets.Counts), len(boundaries)+1, + "There should be b + 1 counts, where b is the number of boundaries") + + sortedBoundaries := make([]float64, len(boundaries)) copy(sortedBoundaries, boundaries) sort.Float64s(sortedBoundaries) + require.EqualValues(t, sortedBoundaries, buckets.Boundaries) + counts := make([]uint64, len(sortedBoundaries)+1) idx := 0 - for _, p := range points { + for _, p := range all.Points() { for idx < len(sortedBoundaries) && p.CoerceToFloat64(profile.NumberKind) >= sortedBoundaries[idx] { idx++ } counts[idx]++ } - - return counts + for i, v := range counts { + bCount := uint64(buckets.Counts[i]) + require.Equal(t, v, bCount, "Wrong bucket #%d count: %v != %v", i, counts, buckets.Counts) + } } func TestSynchronizedMoveReset(t *testing.T) {