Skip to content

Commit

Permalink
prometheus exporter: use explicit histograms (#777)
Browse files Browse the repository at this point in the history
The current prometheus exporter can't export histograms. This is because
the current converter passes the prometheus exporter _exponential
histograms_, which it doesn't know how to accept. This PR teaches the
converter to produce _explicit histograms_ instead, which the prometheus
exporter does know how to accept.

Testing:
* a simple integration test to makes sure the `/metrics` endpoint can
produce is exposing histogram points
* fairly involved unit tests of the converter from lightstep histogram
to OTel explicit histogram

R: @jmacd

---------

Co-authored-by: Isaac Zinda <isaac@lightstep.com>
Co-authored-by: Joshua MacDonald <josh.macdonald@gmail.com>
  • Loading branch information
3 people authored Sep 24, 2024
1 parent 8c0a5bf commit ec020ef
Show file tree
Hide file tree
Showing 5 changed files with 341 additions and 15 deletions.
1 change: 1 addition & 0 deletions lightstep/sdk/metric/exporters/otlp/otelcol/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ func (c *client) ExportMetrics(ctx context.Context, data data.Metrics) error {
c.counter,
&c.ResourceMap,
c.exporter,
true, // use exponential histograms
)
}

Expand Down
11 changes: 10 additions & 1 deletion lightstep/sdk/metric/exporters/prom/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,16 @@ func (c *client) String() string {

// ExportMetrics implements PushExporter.
func (c *client) ExportMetrics(ctx context.Context, data data.Metrics) error {
return export.ExportMetrics(ctx, data, c.tracer, c.counter, &c.ResourceMap, c.exporter)
return export.ExportMetrics(
ctx,
data,
c.tracer,
c.counter,
&c.ResourceMap,
c.exporter,
// don't use exponential histograms, since the prometheus exporter doesn't support them
false,
)
}

// ShutdownMetrics implements PushExporter.
Expand Down
27 changes: 22 additions & 5 deletions lightstep/sdk/metric/exporters/prom/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"context"
"fmt"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/sdk/resource"
"io"
Expand Down Expand Up @@ -50,9 +49,9 @@ func TestExporterSuite(t *testing.T) {
suite.Run(t, new(clientTestSuite))
}

func (t *clientTestSuite) SetupTest() {
func (t *clientTestSuite) SetupSuite() {
ctx := context.Background()

exp, err := NewExporter(
ctx,
NewConfig(WithPort(promPort)),
Expand All @@ -65,11 +64,13 @@ func (t *clientTestSuite) SetupTest() {
resource.NewSchemaless(testResourceAttrs...),
),
)
}

otel.SetMeterProvider(t.sdk)
func (t *clientTestSuite) TearDownSuite() {
require.NoError(t.T(), t.sdk.Shutdown(context.Background()))
}

func (t *clientTestSuite) TestExporter() {
func (t *clientTestSuite) TestInt64Counter() {
ctx := context.Background()

meter := t.sdk.Meter("test-meter")
Expand All @@ -85,6 +86,22 @@ func (t *clientTestSuite) TestExporter() {
}, 15*time.Second, time.Second, "verify requests metric")
}

func (t *clientTestSuite) TestInt64Histogram() {
ctx := context.Background()

meter := t.sdk.Meter("test-meter")
counter, err := meter.Int64Histogram("request-size")
require.NoError(t.T(), err)

counter.Record(ctx, 0)

require.Eventuallyf(t.T(), func() bool {
lines := readMetricsEndpoint(t.T())

return slices.Contains(lines, `request_size_bucket{job="tester",property="value",service_name="tester",le="0"} 1`)
}, 15*time.Second, time.Second, "verify request-size metric")
}

func readMetricsEndpoint(t *testing.T) []string {
resp, err := http.Get(fmt.Sprintf("http://localhost:%d/metrics", promPort))
require.NoError(t, err)
Expand Down
108 changes: 99 additions & 9 deletions lightstep/sdk/metric/internal/export/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ package export
import (
"context"
"errors"

"github.com/lightstep/go-expohisto/mapping/logarithm"
"github.com/lightstep/otel-launcher-go/lightstep/sdk/internal"
"github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/aggregator/aggregation"
"github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/aggregator/gauge"
Expand Down Expand Up @@ -94,7 +96,86 @@ func copyGaugePoints(m pmetric.Metric, inM data.Instrument) {
}
}

func copyHistogramPoints(m pmetric.Metric, inM data.Instrument) {
func copyExplicitHistogramPoints(m pmetric.Metric, inM data.Instrument) {
s := m.SetEmptyHistogram()
s.SetAggregationTemporality(toTemporality(inM.Points[0].Temporality))

for _, inP := range inM.Points {
dp := s.DataPoints().AppendEmpty()

dp.SetStartTimestamp(pcommon.NewTimestampFromTime(inP.Start))
dp.SetTimestamp(pcommon.NewTimestampFromTime(inP.End))
internal.CopyAttributes(dp.Attributes(), inP.Attributes)

switch t := inP.Aggregation.(type) {
case *histogram.Int64:
dp.SetSum(t.Sum().CoerceToFloat64(number.Int64Kind))
dp.SetCount(t.Count())

if t.Count() != 0 {
dp.SetMax(t.Max().CoerceToFloat64(number.Int64Kind))
dp.SetMin(t.Min().CoerceToFloat64(number.Int64Kind))
}

copyExplicitHistogramBuckets(t.Positive(), t.ZeroCount(), t.Scale(), dp)

case *histogram.Float64:
dp.SetSum(number.ToFloat64(t.Sum()))
dp.SetCount(t.Count())
if t.Count() != 0 {
dp.SetMax(number.ToFloat64(t.Max()))
dp.SetMin(number.ToFloat64(t.Min()))
}

copyExplicitHistogramBuckets(t.Positive(), t.ZeroCount(), t.Scale(), dp)
default:
panic("unhandled case")
}
}
}

// copyExplicitHistogramBuckets converts a Lightstep exponential histogram to an OTel histogram with
// explicitly defined buckets.
func copyExplicitHistogramBuckets(
sourcePositiveBuckets aggregation.Buckets,
sourceZeroCount uint64,
sourceScale int32,
dest pmetric.HistogramDataPoint,
) {
// add the zero bucket in: (-Inf, 0]
dest.ExplicitBounds().Append(0)
dest.BucketCounts().Append(sourceZeroCount)

if sourcePositiveBuckets.Len() > 0 {
positiveOffset := sourcePositiveBuckets.Offset()
positiveNumElements := int32(sourcePositiveBuckets.Len())
mapping, _ := logarithm.NewMapping(sourceScale)
leftBound, _ := mapping.LowerBoundary(positiveOffset)

for element := int32(0); element < positiveNumElements; element++ {
index := element + positiveOffset

rightBound, _ := mapping.LowerBoundary(index + 1)

// We have to add a bucket to get from zero to the start of the first user-defined bucket.
// This has no count.
if element == 0 {
dest.ExplicitBounds().Append(leftBound)
dest.BucketCounts().Append(0)
}

dest.ExplicitBounds().Append(rightBound)
dest.BucketCounts().Append(sourcePositiveBuckets.At(uint32(element)))

leftBound = rightBound
}
}

// There are no elements in the (..., +Inf] bucket.
dest.BucketCounts().Append(0)
}

func copyExponentialHistogramPoints(m pmetric.Metric, inM data.Instrument) {
s := m.SetEmptyExponentialHistogram()
s.SetAggregationTemporality(toTemporality(inM.Points[0].Temporality))

Expand All @@ -117,10 +198,10 @@ func copyHistogramPoints(m pmetric.Metric, inM data.Instrument) {
dp.SetMin(t.Min().CoerceToFloat64(number.Int64Kind))
}
if t.Positive().Len() != 0 {
copyHistogramBuckets(dp.Positive(), t.Positive())
copyExponentialHistogramBuckets(dp.Positive(), t.Positive())
}
if t.Negative().Len() != 0 {
copyHistogramBuckets(dp.Negative(), t.Negative())
copyExponentialHistogramBuckets(dp.Negative(), t.Negative())
}
case *histogram.Float64:
dp.SetSum(number.ToFloat64(t.Sum()))
Expand All @@ -132,18 +213,18 @@ func copyHistogramPoints(m pmetric.Metric, inM data.Instrument) {
dp.SetMin(number.ToFloat64(t.Min()))
}
if t.Positive().Len() != 0 {
copyHistogramBuckets(dp.Positive(), t.Positive())
copyExponentialHistogramBuckets(dp.Positive(), t.Positive())
}
if t.Negative().Len() != 0 {
copyHistogramBuckets(dp.Negative(), t.Negative())
copyExponentialHistogramBuckets(dp.Negative(), t.Negative())
}
default:
panic("unhandled case")
}
}
}

func copyHistogramBuckets(dest pmetric.ExponentialHistogramDataPointBuckets, src aggregation.Buckets) {
func copyExponentialHistogramBuckets(dest pmetric.ExponentialHistogramDataPointBuckets, src aggregation.Buckets) {
if src.Len() == 0 {
return
}
Expand Down Expand Up @@ -187,7 +268,11 @@ func copyMMSCPoints(m pmetric.Metric, inM data.Instrument) {
}
}

func d2pd(resourceMap *internal.ResourceMap, in data.Metrics) pmetric.Metrics {
func d2pd(
resourceMap *internal.ResourceMap,
in data.Metrics,
useExponentialHistogram bool,
) pmetric.Metrics {
out := pmetric.NewMetrics()
rm := out.ResourceMetrics().AppendEmpty()

Expand Down Expand Up @@ -217,7 +302,11 @@ func d2pd(resourceMap *internal.ResourceMap, in data.Metrics) pmetric.Metrics {
case *gauge.Int64, *gauge.Float64:
copyGaugePoints(m, inM)
case *histogram.Int64, *histogram.Float64:
copyHistogramPoints(m, inM)
if useExponentialHistogram {
copyExponentialHistogramPoints(m, inM)
} else {
copyExplicitHistogramPoints(m, inM)
}
case *minmaxsumcount.Int64, *minmaxsumcount.Float64:
copyMMSCPoints(m, inM)
}
Expand All @@ -234,14 +323,15 @@ func ExportMetrics(
telemetryItemsCounter metricapi.Int64Counter,
resourceMap *internal.ResourceMap,
exporter exporter.Metrics,
useExponentialHistogram bool,
) error {
ctx, span := tracer.Start(
ctx,
"otelsdk_export_metrics",
)
defer span.End()

converted := d2pd(resourceMap, data)
converted := d2pd(resourceMap, data, useExponentialHistogram)
points := int64(converted.DataPointCount())

err := exporter.ConsumeMetrics(ctx, converted)
Expand Down
Loading

0 comments on commit ec020ef

Please sign in to comment.