From 8b6f0c6c4841d67e9d82562a6be0f3f3bb5c8844 Mon Sep 17 00:00:00 2001 From: Aaron Clawson <3766680+MadVikingGod@users.noreply.github.com> Date: Wed, 20 Apr 2022 14:04:14 -0500 Subject: [PATCH] Fixes for the otlp metrics exporter (#2835) --- Makefile | 2 +- exporters/otlp/otlpmetric/exporter.go | 33 +- exporters/otlp/otlpmetric/exporter_test.go | 611 ++++++++++-------- exporters/otlp/otlpmetric/go.mod | 19 +- exporters/otlp/otlpmetric/go.sum | 3 - .../internal/metrictransform/metric.go | 445 ++++--------- .../internal/metrictransform/metric_test.go | 427 ++++++------ .../internal/otlpmetrictest/data.go | 68 +- .../internal/otlpmetrictest/otlptest.go | 80 ++- exporters/otlp/otlpmetric/options.go | 8 +- .../otlpmetric/otlpmetricgrpc/client_test.go | 33 +- .../otlpmetric/otlpmetricgrpc/example_test.go | 96 +-- .../otlp/otlpmetric/otlpmetricgrpc/go.mod | 1 - .../otlp/otlpmetric/otlpmetricgrpc/go.sum | 2 - .../otlpmetric/otlpmetrichttp/client_test.go | 17 +- .../otlp/otlpmetric/otlpmetrichttp/go.mod | 1 - .../otlp/otlpmetric/otlpmetrichttp/go.sum | 2 - sdk/metric/aggregator/gauge/gauge.go | 7 + sdk/metric/aggregator/histogram/histogram.go | 32 + sdk/metric/aggregator/sum/sum.go | 14 + 20 files changed, 883 insertions(+), 1018 deletions(-) diff --git a/Makefile b/Makefile index b9c9c67a9b2..8d1a99486dd 100644 --- a/Makefile +++ b/Makefile @@ -14,7 +14,7 @@ TOOLS_MOD_DIR := ./internal/tools SKIP_MODS := ./bridge/opencensus ./bridge/opencensus/test ./example/opencensus -SKIP_MODS += ./exporters/otlp/otlpmetric ./exporters/otlp/otlpmetric/otlpmetricgrpc ./exporters/otlp/otlpmetric/otlpmetrichttp +# SKIP_MODS += ./exporters/otlp/otlpmetric ./exporters/otlp/otlpmetric/otlpmetricgrpc ./exporters/otlp/otlpmetric/otlpmetrichttp # SKIP_MODS += ./exporters/prometheus ./exporters/stdout/stdoutmetric ALL_DOCS := $(shell find . -name '*.md' -type f | sort) diff --git a/exporters/otlp/otlpmetric/exporter.go b/exporters/otlp/otlpmetric/exporter.go index caf21eaf2a3..b5b9667b6fc 100644 --- a/exporters/otlp/otlpmetric/exporter.go +++ b/exporters/otlp/otlpmetric/exporter.go @@ -20,10 +20,8 @@ import ( "sync" "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/internal/metrictransform" - "go.opentelemetry.io/otel/sdk/metric/export" - "go.opentelemetry.io/otel/sdk/metric/export/aggregation" - "go.opentelemetry.io/otel/sdk/metric/sdkapi" - "go.opentelemetry.io/otel/sdk/resource" + "go.opentelemetry.io/otel/sdk/metric/aggregator/aggregation" + "go.opentelemetry.io/otel/sdk/metric/reader" ) var ( @@ -32,8 +30,7 @@ var ( // Exporter exports metrics data in the OTLP wire format. type Exporter struct { - client Client - temporalitySelector aggregation.TemporalitySelector + client Client mu sync.RWMutex started bool @@ -42,15 +39,11 @@ type Exporter struct { stopOnce sync.Once } +var _ reader.Exporter = (*Exporter)(nil) + // Export exports a batch of metrics. -func (e *Exporter) Export(ctx context.Context, res *resource.Resource, ilr export.InstrumentationLibraryReader) error { - rm, err := metrictransform.InstrumentationLibraryReader(ctx, e, res, ilr, 1) - if err != nil { - return err - } - if rm == nil { - return nil - } +func (e *Exporter) Export(ctx context.Context, metrics reader.Metrics) error { + rm := metrictransform.TransformMetrics(metrics) // TODO: There is never more than one resource emitted by this // call, as per the specification. We can change the @@ -95,12 +88,11 @@ func (e *Exporter) Shutdown(ctx context.Context) error { return err } -func (e *Exporter) TemporalityFor(descriptor *sdkapi.Descriptor, kind aggregation.Kind) aggregation.Temporality { - return e.temporalitySelector.TemporalityFor(descriptor, kind) +func (e *Exporter) Flush(ctx context.Context) error { + // TODO Implement + return nil } -var _ export.Exporter = (*Exporter)(nil) - // New constructs a new Exporter and starts it. func New(ctx context.Context, client Client, opts ...Option) (*Exporter, error) { exp := NewUnstarted(client, opts...) @@ -116,7 +108,7 @@ func NewUnstarted(client Client, opts ...Option) *Exporter { // Note: the default TemporalitySelector is specified // as Cumulative: // https://github.com/open-telemetry/opentelemetry-specification/issues/731 - temporalitySelector: aggregation.CumulativeTemporalitySelector(), + temporality: aggregation.CumulativeTemporality, } for _, opt := range opts { @@ -124,8 +116,7 @@ func NewUnstarted(client Client, opts ...Option) *Exporter { } e := &Exporter{ - client: client, - temporalitySelector: cfg.temporalitySelector, + client: client, } return e diff --git a/exporters/otlp/otlpmetric/exporter_test.go b/exporters/otlp/otlpmetric/exporter_test.go index 242079a20c7..e10feee9b1b 100644 --- a/exporters/otlp/otlpmetric/exporter_test.go +++ b/exporters/otlp/otlpmetric/exporter_test.go @@ -28,17 +28,14 @@ import ( "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/exporters/otlp/otlpmetric" "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/internal/metrictransform" - "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/sdk/instrumentation" - "go.opentelemetry.io/otel/sdk/metric/aggregator" + "go.opentelemetry.io/otel/sdk/metric/aggregator/aggregation" "go.opentelemetry.io/otel/sdk/metric/aggregator/histogram" "go.opentelemetry.io/otel/sdk/metric/aggregator/sum" - "go.opentelemetry.io/otel/sdk/metric/export" - "go.opentelemetry.io/otel/sdk/metric/export/aggregation" - "go.opentelemetry.io/otel/sdk/metric/metrictest" "go.opentelemetry.io/otel/sdk/metric/number" - "go.opentelemetry.io/otel/sdk/metric/processor/processortest" - "go.opentelemetry.io/otel/sdk/metric/sdkapi" + + "go.opentelemetry.io/otel/sdk/metric/reader" + "go.opentelemetry.io/otel/sdk/metric/sdkinstrument" "go.opentelemetry.io/otel/sdk/resource" commonpb "go.opentelemetry.io/proto/otlp/common/v1" metricpb "go.opentelemetry.io/proto/otlp/metrics/v1" @@ -88,33 +85,6 @@ func pointTime() uint64 { return uint64(intervalEnd.UnixNano()) } -type testRecord struct { - name string - iKind sdkapi.InstrumentKind - nKind number.Kind - labels []attribute.KeyValue - - meterName string - meterOpts []metric.MeterOption -} - -func record( - name string, - iKind sdkapi.InstrumentKind, - nKind number.Kind, - labels []attribute.KeyValue, - meterName string, - meterOpts ...metric.MeterOption) testRecord { - return testRecord{ - name: name, - iKind: iKind, - nKind: nKind, - labels: labels, - meterName: meterName, - meterOpts: meterOpts, - } -} - var ( baseKeyValues = []attribute.KeyValue{attribute.String("host", "test.com")} cpuKey = attribute.Key("CPU") @@ -162,31 +132,37 @@ var ( testerAResourcePb = metrictransform.Resource(testerAResource) ) -const ( - // Most of this test uses an empty instrumentation library name. - testLibName = "" -) - func TestNoGroupingExport(t *testing.T) { runMetricExportTests( t, nil, - resource.Empty(), - []testRecord{ - record( - "int64-count", - sdkapi.CounterInstrumentKind, - number.Int64Kind, - append(baseKeyValues, cpuKey.Int(1)), - testLibName, - ), - record( - "int64-count", - sdkapi.CounterInstrumentKind, - number.Int64Kind, - append(baseKeyValues, cpuKey.Int(2)), - testLibName, - ), + reader.Metrics{ + Resource: resource.Empty(), + Scopes: []reader.Scope{ + { + Instruments: []reader.Instrument{ + { + Descriptor: sdkinstrument.NewDescriptor("int64-count", sdkinstrument.CounterKind, number.Int64Kind, "", ""), + Temporality: aggregation.CumulativeTemporality, + + Points: []reader.Point{ + { + Attributes: attribute.NewSet(append(baseKeyValues, cpuKey.Int(1))...), + Aggregation: sum.NewInt64Monotonic(11), + Start: intervalStart, + End: intervalEnd, + }, + { + Attributes: attribute.NewSet(append(baseKeyValues, cpuKey.Int(2))...), + Aggregation: sum.NewInt64Monotonic(11), + Start: intervalStart, + End: intervalEnd, + }, + }, + }, + }, + }, + }, }, []*metricpb.ResourceMetrics{ { @@ -226,13 +202,35 @@ func TestNoGroupingExport(t *testing.T) { } func TestHistogramInt64MetricGroupingExport(t *testing.T) { - r := record( - "int64-histogram", - sdkapi.HistogramInstrumentKind, - number.Int64Kind, - append(baseKeyValues, cpuKey.Int(1)), - testLibName, - ) + metrics := reader.Metrics{ + Resource: resource.Empty(), + Scopes: []reader.Scope{ + { + Instruments: []reader.Instrument{ + { + Descriptor: sdkinstrument.NewDescriptor("int64-histogram", sdkinstrument.HistogramKind, number.Int64Kind, "", ""), + Temporality: aggregation.CumulativeTemporality, + + Points: []reader.Point{ + { + Attributes: attribute.NewSet(append(baseKeyValues, cpuKey.Int(1))...), + Aggregation: histogram.NewInt64(testHistogramBoundaries, int64(1), int64(10)), + Start: intervalStart, + End: intervalEnd, + }, + { + Attributes: attribute.NewSet(append(baseKeyValues, cpuKey.Int(1))...), + Aggregation: histogram.NewInt64(testHistogramBoundaries, int64(1), int64(10)), + Start: intervalStart, + End: intervalEnd, + }, + }, + }, + }, + }, + }, + } + sum := 11.0 expected := []*metricpb.ResourceMetrics{ { @@ -273,17 +271,39 @@ func TestHistogramInt64MetricGroupingExport(t *testing.T) { }, }, } - runMetricExportTests(t, nil, resource.Empty(), []testRecord{r, r}, expected) + runMetricExportTests(t, nil, metrics, expected) } func TestHistogramFloat64MetricGroupingExport(t *testing.T) { - r := record( - "float64-histogram", - sdkapi.HistogramInstrumentKind, - number.Float64Kind, - append(baseKeyValues, cpuKey.Int(1)), - testLibName, - ) + + metrics := reader.Metrics{ + Resource: resource.Empty(), + Scopes: []reader.Scope{ + { + Instruments: []reader.Instrument{ + { + Descriptor: sdkinstrument.NewDescriptor("float64-histogram", sdkinstrument.HistogramKind, number.Float64Kind, "", ""), + Temporality: aggregation.CumulativeTemporality, + + Points: []reader.Point{ + { + Attributes: attribute.NewSet(append(baseKeyValues, cpuKey.Int(1))...), + Aggregation: histogram.NewFloat64(testHistogramBoundaries, 1, 10), + Start: intervalStart, + End: intervalEnd, + }, + { + Attributes: attribute.NewSet(append(baseKeyValues, cpuKey.Int(1))...), + Aggregation: histogram.NewFloat64(testHistogramBoundaries, 1, 10), + Start: intervalStart, + End: intervalEnd, + }, + }, + }, + }, + }, + }, + } sum := 11.0 expected := []*metricpb.ResourceMetrics{ { @@ -324,22 +344,44 @@ func TestHistogramFloat64MetricGroupingExport(t *testing.T) { }, }, } - runMetricExportTests(t, nil, resource.Empty(), []testRecord{r, r}, expected) + runMetricExportTests(t, nil, metrics, expected) } func TestCountInt64MetricGroupingExport(t *testing.T) { - r := record( - "int64-count", - sdkapi.CounterInstrumentKind, - number.Int64Kind, - append(baseKeyValues, cpuKey.Int(1)), - testLibName, - ) + + metrics := reader.Metrics{ + Resource: resource.Empty(), + Scopes: []reader.Scope{ + { + Instruments: []reader.Instrument{ + { + Descriptor: sdkinstrument.NewDescriptor("int64-count", sdkinstrument.CounterKind, number.Int64Kind, "", ""), + Temporality: aggregation.CumulativeTemporality, + + Points: []reader.Point{ + { + Attributes: attribute.NewSet(append(baseKeyValues, cpuKey.Int(1))...), + Aggregation: sum.NewInt64Monotonic(11), + Start: intervalStart, + End: intervalEnd, + }, + { + Attributes: attribute.NewSet(append(baseKeyValues, cpuKey.Int(1))...), + Aggregation: sum.NewInt64Monotonic(11), + Start: intervalStart, + End: intervalEnd, + }, + }, + }, + }, + }, + }, + } + runMetricExportTests( t, nil, - resource.Empty(), - []testRecord{r, r}, + metrics, []*metricpb.ResourceMetrics{ { Resource: nil, @@ -378,18 +420,40 @@ func TestCountInt64MetricGroupingExport(t *testing.T) { } func TestCountFloat64MetricGroupingExport(t *testing.T) { - r := record( - "float64-count", - sdkapi.CounterInstrumentKind, - number.Float64Kind, - append(baseKeyValues, cpuKey.Int(1)), - testLibName, - ) + + metrics := reader.Metrics{ + Resource: resource.Empty(), + Scopes: []reader.Scope{ + { + Instruments: []reader.Instrument{ + { + Descriptor: sdkinstrument.NewDescriptor("float64-count", sdkinstrument.CounterKind, number.Float64Kind, "", ""), + Temporality: aggregation.CumulativeTemporality, + + Points: []reader.Point{ + { + Attributes: attribute.NewSet(append(baseKeyValues, cpuKey.Int(1))...), + Aggregation: sum.NewFloat64Monotonic(11), + Start: intervalStart, + End: intervalEnd, + }, + { + Attributes: attribute.NewSet(append(baseKeyValues, cpuKey.Int(1))...), + Aggregation: sum.NewFloat64Monotonic(11), + Start: intervalStart, + End: intervalEnd, + }, + }, + }, + }, + }, + }, + } + runMetricExportTests( t, nil, - resource.Empty(), - []testRecord{r, r}, + metrics, []*metricpb.ResourceMetrics{ { Resource: nil, @@ -428,40 +492,51 @@ func TestCountFloat64MetricGroupingExport(t *testing.T) { } func TestResourceMetricGroupingExport(t *testing.T) { + metrics := reader.Metrics{ + Resource: testerAResource, + Scopes: []reader.Scope{ + { + Instruments: []reader.Instrument{ + { + Descriptor: sdkinstrument.NewDescriptor("int64-count", sdkinstrument.CounterKind, number.Int64Kind, "", ""), + Temporality: aggregation.CumulativeTemporality, + + Points: []reader.Point{ + { + Attributes: attribute.NewSet(append(baseKeyValues, cpuKey.Int(1))...), + Aggregation: sum.NewInt64Monotonic(11), + Start: intervalStart, + End: intervalEnd, + }, + { + Attributes: attribute.NewSet(append(baseKeyValues, cpuKey.Int(1))...), + Aggregation: sum.NewInt64Monotonic(11), + Start: intervalStart, + End: intervalEnd, + }, + { + Attributes: attribute.NewSet(append(baseKeyValues, cpuKey.Int(2))...), + Aggregation: sum.NewInt64Monotonic(11), + Start: intervalStart, + End: intervalEnd, + }, + { + Attributes: attribute.NewSet(append(baseKeyValues, cpuKey.Int(1))...), + Aggregation: sum.NewInt64Monotonic(11), + Start: intervalStart, + End: intervalEnd, + }, + }, + }, + }, + }, + }, + } + runMetricExportTests( t, nil, - testerAResource, - []testRecord{ - record( - "int64-count", - sdkapi.CounterInstrumentKind, - number.Int64Kind, - append(baseKeyValues, cpuKey.Int(1)), - testLibName, - ), - record( - "int64-count", - sdkapi.CounterInstrumentKind, - number.Int64Kind, - append(baseKeyValues, cpuKey.Int(1)), - testLibName, - ), - record( - "int64-count", - sdkapi.CounterInstrumentKind, - number.Int64Kind, - append(baseKeyValues, cpuKey.Int(2)), - testLibName, - ), - record( - "int64-count", - sdkapi.CounterInstrumentKind, - number.Int64Kind, - append(baseKeyValues, cpuKey.Int(1)), - testLibName, - ), - }, + metrics, []*metricpb.ResourceMetrics{ { Resource: testerAResourcePb, @@ -512,57 +587,97 @@ func TestResourceMetricGroupingExport(t *testing.T) { } func TestResourceInstLibMetricGroupingExport(t *testing.T) { - version1 := metric.WithInstrumentationVersion("v1") - version2 := metric.WithInstrumentationVersion("v2") - specialSchema := metric.WithSchemaURL("schurl") - summingLib := "summing-lib" - countingLib := "counting-lib" + // version1 := metric.WithInstrumentationVersion("v1") + // version2 := metric.WithInstrumentationVersion("v2") + // specialSchema := metric.WithSchemaURL("schurl") + // summingLib := "summing-lib" + // countingLib := "counting-lib" + //testerAResource, + metrics := reader.Metrics{ + Resource: testerAResource, + Scopes: []reader.Scope{ + { + Library: instrumentation.Library{ + Name: "counting-lib", + Version: "v1", + }, + Instruments: []reader.Instrument{ + { + Descriptor: sdkinstrument.NewDescriptor("int64-count", sdkinstrument.CounterKind, number.Int64Kind, "", ""), + Temporality: aggregation.CumulativeTemporality, + + Points: []reader.Point{ + { + Attributes: attribute.NewSet(append(baseKeyValues, cpuKey.Int(1))...), + Aggregation: sum.NewInt64Monotonic(11), + Start: intervalStart, + End: intervalEnd, + }, + { + Attributes: attribute.NewSet(append(baseKeyValues, cpuKey.Int(1))...), + Aggregation: sum.NewInt64Monotonic(11), + Start: intervalStart, + End: intervalEnd, + }, + { + Attributes: attribute.NewSet(append(baseKeyValues, cpuKey.Int(2))...), + Aggregation: sum.NewInt64Monotonic(11), + Start: intervalStart, + End: intervalEnd, + }, + }, + }, + }, + }, + { + Library: instrumentation.Library{ + Name: "counting-lib", + Version: "v2", + }, + Instruments: []reader.Instrument{ + { + Descriptor: sdkinstrument.NewDescriptor("int64-count", sdkinstrument.CounterKind, number.Int64Kind, "", ""), + Temporality: aggregation.CumulativeTemporality, + + Points: []reader.Point{ + { + Attributes: attribute.NewSet(append(baseKeyValues, cpuKey.Int(1))...), + Aggregation: sum.NewInt64Monotonic(11), + Start: intervalStart, + End: intervalEnd, + }, + }, + }, + }, + }, + { + Library: instrumentation.Library{ + Name: "summing-lib", + SchemaURL: "schurl", + }, + Instruments: []reader.Instrument{ + { + Descriptor: sdkinstrument.NewDescriptor("int64-count", sdkinstrument.CounterKind, number.Int64Kind, "", ""), + Temporality: aggregation.CumulativeTemporality, + + Points: []reader.Point{ + { + Attributes: attribute.NewSet(append(baseKeyValues, cpuKey.Int(1))...), + Aggregation: sum.NewInt64Monotonic(11), + Start: intervalStart, + End: intervalEnd, + }, + }, + }, + }, + }, + }, + } + runMetricExportTests( t, nil, - testerAResource, - []testRecord{ - record( - "int64-count", - sdkapi.CounterInstrumentKind, - number.Int64Kind, - append(baseKeyValues, cpuKey.Int(1)), - countingLib, - version1, - ), - record( - "int64-count", - sdkapi.CounterInstrumentKind, - number.Int64Kind, - append(baseKeyValues, cpuKey.Int(1)), - countingLib, - version2, - ), - record( - "int64-count", - sdkapi.CounterInstrumentKind, - number.Int64Kind, - append(baseKeyValues, cpuKey.Int(1)), - countingLib, - version1, - ), - record( - "int64-count", - sdkapi.CounterInstrumentKind, - number.Int64Kind, - append(baseKeyValues, cpuKey.Int(2)), - countingLib, - version1, - ), - record( - "int64-count", - sdkapi.CounterInstrumentKind, - number.Int64Kind, - append(baseKeyValues, cpuKey.Int(1)), - summingLib, - specialSchema, - ), - }, + metrics, []*metricpb.ResourceMetrics{ { Resource: testerAResourcePb, @@ -663,35 +778,76 @@ func TestResourceInstLibMetricGroupingExport(t *testing.T) { func TestStatelessAggregationTemporality(t *testing.T) { type testcase struct { name string - instrumentKind sdkapi.InstrumentKind + kind sdkinstrument.Kind + temporality aggregation.Temporality + aggregation aggregation.Aggregation aggTemporality metricpb.AggregationTemporality monotonic bool } for _, k := range []testcase{ - {"counter", sdkapi.CounterInstrumentKind, metricpb.AggregationTemporality_AGGREGATION_TEMPORALITY_DELTA, true}, - {"updowncounter", sdkapi.UpDownCounterInstrumentKind, metricpb.AggregationTemporality_AGGREGATION_TEMPORALITY_DELTA, false}, - {"counterobserver", sdkapi.CounterObserverInstrumentKind, metricpb.AggregationTemporality_AGGREGATION_TEMPORALITY_CUMULATIVE, true}, - {"updowncounterobserver", sdkapi.UpDownCounterObserverInstrumentKind, metricpb.AggregationTemporality_AGGREGATION_TEMPORALITY_CUMULATIVE, false}, + { + name: "counter", + kind: sdkinstrument.CounterKind, + temporality: aggregation.DeltaTemporality, + aggregation: sum.NewInt64Monotonic(11), + aggTemporality: metricpb.AggregationTemporality_AGGREGATION_TEMPORALITY_DELTA, + monotonic: true}, + { + name: "updowncounter", + kind: sdkinstrument.UpDownCounterKind, + temporality: aggregation.DeltaTemporality, + aggregation: sum.NewInt64NonMonotonic(11), + aggTemporality: metricpb.AggregationTemporality_AGGREGATION_TEMPORALITY_DELTA, + monotonic: false}, + { + name: "counterobserver", + kind: sdkinstrument.CounterObserverKind, + temporality: aggregation.CumulativeTemporality, + aggregation: sum.NewInt64Monotonic(11), + aggTemporality: metricpb.AggregationTemporality_AGGREGATION_TEMPORALITY_CUMULATIVE, + monotonic: true}, + { + name: "updowncounterobserver", + kind: sdkinstrument.UpDownCounterObserverKind, + temporality: aggregation.CumulativeTemporality, + aggregation: sum.NewInt64NonMonotonic(11), + aggTemporality: metricpb.AggregationTemporality_AGGREGATION_TEMPORALITY_CUMULATIVE, + monotonic: false}, } { t.Run(k.name, func(t *testing.T) { + + metrics := reader.Metrics{ + Resource: testerAResource, + Scopes: []reader.Scope{ + { + Instruments: []reader.Instrument{ + { + Descriptor: sdkinstrument.NewDescriptor("instrument", k.kind, number.Int64Kind, "", ""), + Temporality: k.temporality, + + Points: []reader.Point{ + { + Attributes: attribute.NewSet(append(baseKeyValues, cpuKey.Int(1))...), + Aggregation: k.aggregation, + Start: intervalStart, + End: intervalEnd, + }, + }, + }, + }, + }, + }, + } + runMetricExportTests( t, []otlpmetric.Option{ otlpmetric.WithMetricAggregationTemporalitySelector( - aggregation.StatelessTemporalitySelector(), - ), - }, - testerAResource, - []testRecord{ - record( - "instrument", - k.instrumentKind, - number.Int64Kind, - append(baseKeyValues, cpuKey.Int(1)), - testLibName, + aggregation.UndefinedTemporality, ), }, + metrics, []*metricpb.ResourceMetrics{ { Resource: testerAResourcePb, @@ -725,60 +881,12 @@ func TestStatelessAggregationTemporality(t *testing.T) { } } -func runMetricExportTests(t *testing.T, opts []otlpmetric.Option, res *resource.Resource, records []testRecord, expected []*metricpb.ResourceMetrics) { +func runMetricExportTests(t *testing.T, opts []otlpmetric.Option, metrics reader.Metrics, expected []*metricpb.ResourceMetrics) { + t.Helper() exp, driver := newExporter(t, opts...) - libraryRecs := map[instrumentation.Library][]export.Record{} - for _, r := range records { - lcopy := make([]attribute.KeyValue, len(r.labels)) - copy(lcopy, r.labels) - desc := metrictest.NewDescriptor(r.name, r.iKind, r.nKind) - labs := attribute.NewSet(lcopy...) - - var agg, ckpt aggregator.Aggregator - if r.iKind.Adding() { - sums := sum.New(2) - agg, ckpt = &sums[0], &sums[1] - } else { - histos := histogram.New(2, &desc, histogram.WithExplicitBoundaries(testHistogramBoundaries)) - agg, ckpt = &histos[0], &histos[1] - } - - ctx := context.Background() - if r.iKind.Synchronous() { - // For synchronous instruments, perform two updates: 1 and 10 - switch r.nKind { - case number.Int64Kind: - require.NoError(t, agg.Update(ctx, number.NewInt64Number(1), &desc)) - require.NoError(t, agg.Update(ctx, number.NewInt64Number(10), &desc)) - case number.Float64Kind: - require.NoError(t, agg.Update(ctx, number.NewFloat64Number(1), &desc)) - require.NoError(t, agg.Update(ctx, number.NewFloat64Number(10), &desc)) - default: - t.Fatalf("invalid number kind: %v", r.nKind) - } - } else { - // For asynchronous instruments, perform a single update: 11 - switch r.nKind { - case number.Int64Kind: - require.NoError(t, agg.Update(ctx, number.NewInt64Number(11), &desc)) - case number.Float64Kind: - require.NoError(t, agg.Update(ctx, number.NewFloat64Number(11), &desc)) - default: - t.Fatalf("invalid number kind: %v", r.nKind) - } - } - require.NoError(t, agg.SynchronizedMove(ckpt, &desc)) - - meterCfg := metric.NewMeterConfig(r.meterOpts...) - lib := instrumentation.Library{ - Name: r.meterName, - Version: meterCfg.InstrumentationVersion(), - SchemaURL: meterCfg.SchemaURL(), - } - libraryRecs[lib] = append(libraryRecs[lib], export.NewRecord(&desc, &labs, ckpt.Aggregation(), intervalStart, intervalEnd)) - } - assert.NoError(t, exp.Export(context.Background(), res, processortest.MultiInstrumentationLibraryReader(libraryRecs))) + err := exp.Export(context.Background(), metrics) + assert.NoError(t, err) // assert.ElementsMatch does not equate nested slices of different order, // therefore this requires the top level slice to be broken down. @@ -810,6 +918,8 @@ func runMetricExportTests(t *testing.T, opts []otlpmetric.Option, res *resource. continue } for i, expected := range sm.GetMetrics() { + t.Log("Expected: ", expected) + t.Log("Actual: ", g[i]) assert.Equal(t, "", cmp.Diff(expected, g[i], protocmp.Transform())) } } @@ -824,25 +934,14 @@ func runMetricExportTests(t *testing.T, opts []otlpmetric.Option, res *resource. func TestEmptyMetricExport(t *testing.T) { exp, driver := newExporter(t) - for _, test := range []struct { - records []export.Record - want []*metricpb.ResourceMetrics - }{ - { - []export.Record(nil), - []*metricpb.ResourceMetrics(nil), - }, - { - []export.Record{}, - []*metricpb.ResourceMetrics(nil), - }, - } { - driver.Reset() - require.NoError(t, exp.Export(context.Background(), resource.Empty(), processortest.MultiInstrumentationLibraryReader(map[instrumentation.Library][]export.Record{ - { - Name: testLibName, - }: test.records, - }))) - assert.Equal(t, test.want, driver.rm) + // { + records := reader.Metrics{} + want := []*metricpb.ResourceMetrics{ + {}, } + + driver.Reset() + require.NoError(t, exp.Export(context.Background(), records)) + assert.Equal(t, want, driver.rm) + } diff --git a/exporters/otlp/otlpmetric/go.mod b/exporters/otlp/otlpmetric/go.mod index fdab05efd87..70445d5cd8c 100644 --- a/exporters/otlp/otlpmetric/go.mod +++ b/exporters/otlp/otlpmetric/go.mod @@ -1,6 +1,6 @@ module go.opentelemetry.io/otel/exporters/otlp/otlpmetric -go 1.16 +go 1.18 require ( github.com/google/go-cmp v0.5.7 @@ -15,6 +15,23 @@ require ( google.golang.org/protobuf v1.28.0 ) +require ( + github.com/cenkalti/backoff/v4 v4.1.2 // indirect + github.com/davecgh/go-spew v1.1.0 // indirect + github.com/go-logr/logr v1.2.3 // indirect + github.com/go-logr/stdr v1.2.2 // indirect + github.com/golang/protobuf v1.5.2 // indirect + github.com/grpc-ecosystem/grpc-gateway/v2 v2.7.0 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + go.opentelemetry.io/otel/trace v1.6.1 // indirect + golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4 // indirect + golang.org/x/sys v0.0.0-20210510120138-977fb7262007 // indirect + golang.org/x/text v0.3.5 // indirect + golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect + google.golang.org/genproto v0.0.0-20211118181313-81c1377c94b1 // indirect + gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c // indirect +) + replace go.opentelemetry.io/otel => ../../.. replace go.opentelemetry.io/otel/sdk => ../../../sdk diff --git a/exporters/otlp/otlpmetric/go.sum b/exporters/otlp/otlpmetric/go.sum index 98fd383612f..5cfe2ba0323 100644 --- a/exporters/otlp/otlpmetric/go.sum +++ b/exporters/otlp/otlpmetric/go.sum @@ -35,8 +35,6 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03 github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= -github.com/benbjohnson/clock v1.3.0 h1:ip6w0uFQkncKQ979AypyG0ER7mqUSBdKLOgAle/AT8A= -github.com/benbjohnson/clock v1.3.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/cenkalti/backoff/v4 v4.1.2 h1:6Yo7N8UP2K6LWZnW94DLVSSrbobcWdVzAYOisuDPIFo= github.com/cenkalti/backoff/v4 v4.1.2/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= @@ -127,7 +125,6 @@ github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm4 github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg= github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk= -github.com/grpc-ecosystem/grpc-gateway v1.16.0 h1:gmcG1KaJ57LophUzW0Hy8NmPhnMZb4M0+kPpLofRdBo= github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw= github.com/grpc-ecosystem/grpc-gateway/v2 v2.7.0 h1:BZHcxBETFHIdVyhyEfOvn/RdU/QGdLI4y34qQGjGWO0= github.com/grpc-ecosystem/grpc-gateway/v2 v2.7.0/go.mod h1:hgWBS7lorOAVIJEQMi4ZsPv9hVvWI6+ch50m39Pf2Ks= diff --git a/exporters/otlp/otlpmetric/internal/metrictransform/metric.go b/exporters/otlp/otlpmetric/internal/metrictransform/metric.go index 03a3d250ab0..7396e61b822 100644 --- a/exporters/otlp/otlpmetric/internal/metrictransform/metric.go +++ b/exporters/otlp/otlpmetric/internal/metrictransform/metric.go @@ -17,18 +17,14 @@ package metrictransform // import "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/internal/metrictransform" import ( - "context" "errors" "fmt" - "strings" - "sync" "time" - "go.opentelemetry.io/otel/sdk/instrumentation" - "go.opentelemetry.io/otel/sdk/metric/export" - "go.opentelemetry.io/otel/sdk/metric/export/aggregation" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/sdk/metric/aggregator/aggregation" "go.opentelemetry.io/otel/sdk/metric/number" - "go.opentelemetry.io/otel/sdk/resource" + "go.opentelemetry.io/otel/sdk/metric/reader" commonpb "go.opentelemetry.io/proto/otlp/common/v1" metricpb "go.opentelemetry.io/proto/otlp/metrics/v1" ) @@ -55,12 +51,6 @@ var ( ErrTransforming = errors.New("transforming failed") ) -// result is the product of transforming Records into OTLP Metrics. -type result struct { - Metric *metricpb.Metric - Err error -} - // toNanos returns the number of nanoseconds since the UNIX epoch. func toNanos(t time.Time) uint64 { if t.IsZero() { @@ -69,256 +59,143 @@ func toNanos(t time.Time) uint64 { return uint64(t.UnixNano()) } -// InstrumentationLibraryReader transforms all records contained in a checkpoint into -// batched OTLP ResourceMetrics. -func InstrumentationLibraryReader(ctx context.Context, temporalitySelector aggregation.TemporalitySelector, res *resource.Resource, ilmr export.InstrumentationLibraryReader, numWorkers uint) (*metricpb.ResourceMetrics, error) { +func TransformMetrics(metrics reader.Metrics) *metricpb.ResourceMetrics { var sms []*metricpb.ScopeMetrics - err := ilmr.ForEach(func(lib instrumentation.Library, mr export.Reader) error { - - records, errc := source(ctx, temporalitySelector, mr) - - // Start a fixed number of goroutines to transform records. - transformed := make(chan result) - var wg sync.WaitGroup - wg.Add(int(numWorkers)) - for i := uint(0); i < numWorkers; i++ { - go func() { - defer wg.Done() - transformer(ctx, temporalitySelector, records, transformed) - }() - } - go func() { - wg.Wait() - close(transformed) - }() - - // Synchronously collect the transformed records and transmit. - ms, err := sink(ctx, transformed) - if err != nil { - return nil - } + for _, scope := range metrics.Scopes { + metrics := []*metricpb.Metric{} - // source is complete, check for any errors. - if err := <-errc; err != nil { - return err - } - if len(ms) == 0 { - return nil + for _, inst := range scope.Instruments { + metric := transformInstrument(inst) + if metric != nil { + metrics = append(metrics, metric) + } } sms = append(sms, &metricpb.ScopeMetrics{ - Metrics: ms, - SchemaUrl: lib.SchemaURL, Scope: &commonpb.InstrumentationScope{ - Name: lib.Name, - Version: lib.Version, + Name: scope.Library.Name, + Version: scope.Library.Version, }, + Metrics: metrics, + SchemaUrl: scope.Library.SchemaURL, }) - return nil - }) - if len(sms) == 0 { - return nil, err } - rms := &metricpb.ResourceMetrics{ - Resource: Resource(res), - SchemaUrl: res.SchemaURL(), + return &metricpb.ResourceMetrics{ + Resource: Resource(metrics.Resource), + SchemaUrl: metrics.Resource.SchemaURL(), ScopeMetrics: sms, } - - return rms, err -} - -// source starts a goroutine that sends each one of the Records yielded by -// the Reader on the returned chan. Any error encountered will be sent -// on the returned error chan after seeding is complete. -func source(ctx context.Context, temporalitySelector aggregation.TemporalitySelector, mr export.Reader) (<-chan export.Record, <-chan error) { - errc := make(chan error, 1) - out := make(chan export.Record) - // Seed records into process. - go func() { - defer close(out) - // No select is needed since errc is buffered. - errc <- mr.ForEach(temporalitySelector, func(r export.Record) error { - select { - case <-ctx.Done(): - return ErrContextCanceled - case out <- r: - } - return nil - }) - }() - return out, errc } -// transformer transforms records read from the passed in chan into -// OTLP Metrics which are sent on the out chan. -func transformer(ctx context.Context, temporalitySelector aggregation.TemporalitySelector, in <-chan export.Record, out chan<- result) { - for r := range in { - m, err := Record(temporalitySelector, r) - // Propagate errors, but do not send empty results. - if err == nil && m == nil { - continue - } - res := result{ - Metric: m, - Err: err, - } - select { - case <-ctx.Done(): - return - case out <- res: +func transformInstrument(inst reader.Instrument) *metricpb.Metric { + metric := &metricpb.Metric{ + Name: inst.Descriptor.Name, + Description: inst.Descriptor.Description, + Unit: string(inst.Descriptor.Unit), + } + if len(inst.Points) > 0 { + switch inst.Points[0].Aggregation.Category() { + case aggregation.GaugeCategory: + metric.Data = gaugePoints(inst.Points, inst.Descriptor.NumberKind) + case aggregation.MonotonicSumCategory, aggregation.NonMonotonicSumCategory: + metric.Data = sumPoints(inst.Points, inst.Descriptor.NumberKind, inst.Temporality) + case aggregation.HistogramCategory: + metric.Data = histogramPoints(inst.Points, inst.Descriptor.NumberKind, inst.Temporality) } + } + return metric } -// sink collects transformed Records and batches them. -// -// Any errors encountered transforming input will be reported with an -// ErrTransforming as well as the completed ResourceMetrics. It is up to the -// caller to handle any incorrect data in these ResourceMetric. -func sink(ctx context.Context, in <-chan result) ([]*metricpb.Metric, error) { - var errStrings []string - - // Group by the MetricDescriptor. - grouped := map[string]*metricpb.Metric{} - for res := range in { - if res.Err != nil { - errStrings = append(errStrings, res.Err.Error()) - continue - } +func gaugePoints(points []reader.Point, kind number.Kind) *metricpb.Metric_Gauge { + if len(points) == 0 { + return nil + } + dataPoints := make([]*metricpb.NumberDataPoint, len(points)) - mID := res.Metric.GetName() - m, ok := grouped[mID] + for i := range points { + dataPoint := &metricpb.NumberDataPoint{ + Attributes: Iterator(points[i].Attributes.Iter()), + StartTimeUnixNano: toNanos(points[i].Start), + TimeUnixNano: toNanos(points[i].End), + } + gauge, ok := points[i].Aggregation.(aggregation.Gauge) if !ok { - grouped[mID] = res.Metric - continue - + otel.Handle(ErrIncompatibleAgg) + return nil } - // Note: There is extra work happening in this code - // that can be improved when the work described in - // #2119 is completed. The SDK has a guarantee that - // no more than one point per period per label set is - // produced, so this fallthrough should never happen. - // The final step of #2119 is to remove all the - // grouping logic here. - switch res.Metric.Data.(type) { - case *metricpb.Metric_Gauge: - m.GetGauge().DataPoints = append(m.GetGauge().DataPoints, res.Metric.GetGauge().DataPoints...) - case *metricpb.Metric_Sum: - m.GetSum().DataPoints = append(m.GetSum().DataPoints, res.Metric.GetSum().DataPoints...) - case *metricpb.Metric_Histogram: - m.GetHistogram().DataPoints = append(m.GetHistogram().DataPoints, res.Metric.GetHistogram().DataPoints...) - case *metricpb.Metric_Summary: - m.GetSummary().DataPoints = append(m.GetSummary().DataPoints, res.Metric.GetSummary().DataPoints...) + switch kind { + case number.Int64Kind: + dataPoint.Value = &metricpb.NumberDataPoint_AsInt{ + AsInt: int64(gauge.Gauge()), + } + case number.Float64Kind: + dataPoint.Value = &metricpb.NumberDataPoint_AsDouble{ + AsDouble: gauge.Gauge().CoerceToFloat64(kind), + } default: - err := fmt.Sprintf("unsupported metric type: %T", res.Metric.Data) - errStrings = append(errStrings, err) + otel.Handle(fmt.Errorf("%w: %v", ErrUnknownValueType, kind)) + return nil } - } - if len(grouped) == 0 { - return nil, nil - } + dataPoints[i] = dataPoint - ms := make([]*metricpb.Metric, 0, len(grouped)) - for _, m := range grouped { - ms = append(ms, m) } - // Report any transform errors. - if len(errStrings) > 0 { - return ms, fmt.Errorf("%w:\n -%s", ErrTransforming, strings.Join(errStrings, "\n -")) + return &metricpb.Metric_Gauge{ + Gauge: &metricpb.Gauge{ + DataPoints: dataPoints, + }, } - return ms, nil } -// Record transforms a Record into an OTLP Metric. An ErrIncompatibleAgg -// error is returned if the Record Aggregator is not supported. -func Record(temporalitySelector aggregation.TemporalitySelector, r export.Record) (*metricpb.Metric, error) { - agg := r.Aggregation() - switch agg.Kind() { - case aggregation.HistogramKind: - h, ok := agg.(aggregation.Histogram) - if !ok { - return nil, fmt.Errorf("%w: %T", ErrIncompatibleAgg, agg) - } - return histogramPoint(r, temporalitySelector.TemporalityFor(r.Descriptor(), aggregation.HistogramKind), h) +func sumPoints(points []reader.Point, kind number.Kind, temporality aggregation.Temporality) *metricpb.Metric_Sum { + if len(points) == 0 { + return nil + } - case aggregation.SumKind: - s, ok := agg.(aggregation.Sum) - if !ok { - return nil, fmt.Errorf("%w: %T", ErrIncompatibleAgg, agg) - } - sum, err := s.Sum() - if err != nil { - return nil, err - } - return sumPoint(r, sum, r.StartTime(), r.EndTime(), temporalitySelector.TemporalityFor(r.Descriptor(), aggregation.SumKind), r.Descriptor().InstrumentKind().Monotonic()) + dataPoints := make([]*metricpb.NumberDataPoint, len(points)) - case aggregation.LastValueKind: - lv, ok := agg.(aggregation.LastValue) + for i := range points { + dataPoint := &metricpb.NumberDataPoint{ + Attributes: Iterator(points[i].Attributes.Iter()), + StartTimeUnixNano: toNanos(points[i].Start), + TimeUnixNano: toNanos(points[i].End), + } + sum, ok := points[i].Aggregation.(aggregation.Sum) if !ok { - return nil, fmt.Errorf("%w: %T", ErrIncompatibleAgg, agg) + otel.Handle(ErrIncompatibleAgg) + return nil } - value, tm, err := lv.LastValue() - if err != nil { - return nil, err + switch kind { + case number.Int64Kind: + dataPoint.Value = &metricpb.NumberDataPoint_AsInt{ + AsInt: int64(sum.Sum()), + } + case number.Float64Kind: + dataPoint.Value = &metricpb.NumberDataPoint_AsDouble{ + AsDouble: sum.Sum().CoerceToFloat64(kind), + } + default: + otel.Handle(fmt.Errorf("%w: %v", ErrUnknownValueType, kind)) + return nil } - return gaugePoint(r, value, time.Time{}, tm) + dataPoints[i] = dataPoint - default: - return nil, fmt.Errorf("%w: %T", ErrUnimplementedAgg, agg) } -} - -func gaugePoint(record export.Record, num number.Number, start, end time.Time) (*metricpb.Metric, error) { - desc := record.Descriptor() - labels := record.Labels() - - m := &metricpb.Metric{ - Name: desc.Name(), - Description: desc.Description(), - Unit: string(desc.Unit()), - } - - switch n := desc.NumberKind(); n { - case number.Int64Kind: - m.Data = &metricpb.Metric_Gauge{ - Gauge: &metricpb.Gauge{ - DataPoints: []*metricpb.NumberDataPoint{ - { - Value: &metricpb.NumberDataPoint_AsInt{ - AsInt: num.CoerceToInt64(n), - }, - Attributes: Iterator(labels.Iter()), - StartTimeUnixNano: toNanos(start), - TimeUnixNano: toNanos(end), - }, - }, - }, - } - case number.Float64Kind: - m.Data = &metricpb.Metric_Gauge{ - Gauge: &metricpb.Gauge{ - DataPoints: []*metricpb.NumberDataPoint{ - { - Value: &metricpb.NumberDataPoint_AsDouble{ - AsDouble: num.CoerceToFloat64(n), - }, - Attributes: Iterator(labels.Iter()), - StartTimeUnixNano: toNanos(start), - TimeUnixNano: toNanos(end), - }, - }, - }, - } - default: - return nil, fmt.Errorf("%w: %v", ErrUnknownValueType, n) + return &metricpb.Metric_Sum{ + Sum: &metricpb.Sum{ + DataPoints: dataPoints, + AggregationTemporality: sdkTemporalityToTemporality(temporality), + IsMonotonic: isMonotonic(points[0].Aggregation), + }, } +} - return m, nil +func isMonotonic(agg aggregation.Aggregation) bool { + return agg.Category() == aggregation.MonotonicSumCategory } func sdkTemporalityToTemporality(temporality aggregation.Temporality) metricpb.AggregationTemporality { @@ -331,111 +208,39 @@ func sdkTemporalityToTemporality(temporality aggregation.Temporality) metricpb.A return metricpb.AggregationTemporality_AGGREGATION_TEMPORALITY_UNSPECIFIED } -func sumPoint(record export.Record, num number.Number, start, end time.Time, temporality aggregation.Temporality, monotonic bool) (*metricpb.Metric, error) { - desc := record.Descriptor() - labels := record.Labels() - - m := &metricpb.Metric{ - Name: desc.Name(), - Description: desc.Description(), - Unit: string(desc.Unit()), - } - - switch n := desc.NumberKind(); n { - case number.Int64Kind: - m.Data = &metricpb.Metric_Sum{ - Sum: &metricpb.Sum{ - IsMonotonic: monotonic, - AggregationTemporality: sdkTemporalityToTemporality(temporality), - DataPoints: []*metricpb.NumberDataPoint{ - { - Value: &metricpb.NumberDataPoint_AsInt{ - AsInt: num.CoerceToInt64(n), - }, - Attributes: Iterator(labels.Iter()), - StartTimeUnixNano: toNanos(start), - TimeUnixNano: toNanos(end), - }, - }, - }, - } - case number.Float64Kind: - m.Data = &metricpb.Metric_Sum{ - Sum: &metricpb.Sum{ - IsMonotonic: monotonic, - AggregationTemporality: sdkTemporalityToTemporality(temporality), - DataPoints: []*metricpb.NumberDataPoint{ - { - Value: &metricpb.NumberDataPoint_AsDouble{ - AsDouble: num.CoerceToFloat64(n), - }, - Attributes: Iterator(labels.Iter()), - StartTimeUnixNano: toNanos(start), - TimeUnixNano: toNanos(end), - }, - }, - }, - } - default: - return nil, fmt.Errorf("%w: %v", ErrUnknownValueType, n) +func histogramPoints(points []reader.Point, kind number.Kind, temporality aggregation.Temporality) *metricpb.Metric_Histogram { + if len(points) == 0 { + return nil } + dataPoints := make([]*metricpb.HistogramDataPoint, len(points)) - return m, nil -} + for i := range points { -func histogramValues(a aggregation.Histogram) (boundaries []float64, counts []uint64, err error) { - var buckets aggregation.Buckets - if buckets, err = a.Histogram(); err != nil { - return - } - boundaries, counts = buckets.Boundaries, buckets.Counts - if len(counts) != len(boundaries)+1 { - err = ErrTransforming - return - } - return -} + histogram, ok := points[i].Aggregation.(aggregation.Histogram) + if !ok { + otel.Handle(ErrIncompatibleAgg) + return nil + } + sum := histogram.Sum().CoerceToFloat64(kind) -// histogram transforms a Histogram Aggregator into an OTLP Metric. -func histogramPoint(record export.Record, temporality aggregation.Temporality, a aggregation.Histogram) (*metricpb.Metric, error) { - desc := record.Descriptor() - labels := record.Labels() - boundaries, counts, err := histogramValues(a) - if err != nil { - return nil, err - } + dataPoint := &metricpb.HistogramDataPoint{ + Attributes: Iterator(points[i].Attributes.Iter()), + StartTimeUnixNano: toNanos(points[i].Start), + TimeUnixNano: toNanos(points[i].End), + BucketCounts: histogram.Histogram().Counts, + ExplicitBounds: histogram.Histogram().Boundaries, + Count: histogram.Count(), + Sum: &sum, + } - count, err := a.Count() - if err != nil { - return nil, err - } + dataPoints[i] = dataPoint - sum, err := a.Sum() - if err != nil { - return nil, err } + return &metricpb.Metric_Histogram{ + Histogram: &metricpb.Histogram{ - sumFloat64 := sum.CoerceToFloat64(desc.NumberKind()) - m := &metricpb.Metric{ - Name: desc.Name(), - Description: desc.Description(), - Unit: string(desc.Unit()), - Data: &metricpb.Metric_Histogram{ - Histogram: &metricpb.Histogram{ - AggregationTemporality: sdkTemporalityToTemporality(temporality), - DataPoints: []*metricpb.HistogramDataPoint{ - { - Sum: &sumFloat64, - Attributes: Iterator(labels.Iter()), - StartTimeUnixNano: toNanos(record.StartTime()), - TimeUnixNano: toNanos(record.EndTime()), - Count: uint64(count), - BucketCounts: counts, - ExplicitBounds: boundaries, - }, - }, - }, + DataPoints: dataPoints, + AggregationTemporality: sdkTemporalityToTemporality(temporality), }, } - return m, nil } diff --git a/exporters/otlp/otlpmetric/internal/metrictransform/metric_test.go b/exporters/otlp/otlpmetric/internal/metrictransform/metric_test.go index d8f2a1c8b62..83e12913769 100644 --- a/exporters/otlp/otlpmetric/internal/metrictransform/metric_test.go +++ b/exporters/otlp/otlpmetric/internal/metrictransform/metric_test.go @@ -15,40 +15,25 @@ package metrictransform import ( - "context" - "errors" - "fmt" "testing" - "time" "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/sdk/metric/aggregator" - "go.opentelemetry.io/otel/sdk/metric/aggregator/lastvalue" + + "go.opentelemetry.io/otel/sdk/metric/aggregator/aggregation" + "go.opentelemetry.io/otel/sdk/metric/aggregator/gauge" + "go.opentelemetry.io/otel/sdk/metric/aggregator/histogram" "go.opentelemetry.io/otel/sdk/metric/aggregator/sum" - "go.opentelemetry.io/otel/sdk/metric/export" - "go.opentelemetry.io/otel/sdk/metric/export/aggregation" - "go.opentelemetry.io/otel/sdk/metric/metrictest" + "go.opentelemetry.io/otel/sdk/metric/reader" + + // "go.opentelemetry.io/otel/sdk/metric/aggregator/lastvalue" + "go.opentelemetry.io/otel/sdk/metric/number" - "go.opentelemetry.io/otel/sdk/metric/sdkapi" commonpb "go.opentelemetry.io/proto/otlp/common/v1" metricpb "go.opentelemetry.io/proto/otlp/metrics/v1" ) -var ( - // Timestamps used in this test: - - intervalStart = time.Now() - intervalEnd = intervalStart.Add(time.Hour) -) - -const ( - otelCumulative = metricpb.AggregationTemporality_AGGREGATION_TEMPORALITY_CUMULATIVE - otelDelta = metricpb.AggregationTemporality_AGGREGATION_TEMPORALITY_DELTA -) - func TestStringKeyValues(t *testing.T) { tests := []struct { kvs []attribute.KeyValue @@ -96,220 +81,232 @@ func TestStringKeyValues(t *testing.T) { } } -func TestSumIntDataPoints(t *testing.T) { - desc := metrictest.NewDescriptor("", sdkapi.HistogramInstrumentKind, number.Int64Kind) - labels := attribute.NewSet(attribute.String("one", "1")) - sums := sum.New(2) - s, ckpt := &sums[0], &sums[1] +func TestSumPoints(t *testing.T) { + // desc := sdkinstrument.NewDescriptor("", sdkinstrument.HistogramKind, number.Int64Kind) + // labels := attribute.NewSet(attribute.String("one", "1")) - assert.NoError(t, s.Update(context.Background(), number.Number(1), &desc)) - require.NoError(t, s.SynchronizedMove(ckpt, &desc)) - record := export.NewRecord(&desc, &labels, ckpt.Aggregation(), intervalStart, intervalEnd) - - value, err := ckpt.Sum() - require.NoError(t, err) - - if m, err := sumPoint(record, value, record.StartTime(), record.EndTime(), aggregation.CumulativeTemporality, true); assert.NoError(t, err) { - assert.Nil(t, m.GetGauge()) - assert.Equal(t, &metricpb.Sum{ - AggregationTemporality: otelCumulative, - IsMonotonic: true, - DataPoints: []*metricpb.NumberDataPoint{{ - StartTimeUnixNano: uint64(intervalStart.UnixNano()), - TimeUnixNano: uint64(intervalEnd.UnixNano()), - Attributes: []*commonpb.KeyValue{ - { - Key: "one", - Value: &commonpb.AnyValue{Value: &commonpb.AnyValue_StringValue{StringValue: "1"}}, - }, + testcases := []struct { + name string + points []reader.Point + kind number.Kind + temporality aggregation.Temporality + want *metricpb.Metric_Sum + }{ + { + name: "no points", + points: []reader.Point{}, + want: nil, + }, + { + name: "incorrect aggregation", + points: []reader.Point{ + { + Aggregation: gauge.NewInt64(1), }, - Value: &metricpb.NumberDataPoint_AsInt{ - AsInt: 1, + }, + want: nil, // Error is in the error handler + }, + { + name: "int data", + points: []reader.Point{ + { + Aggregation: sum.NewInt64Monotonic(2), }, - }}, - }, m.GetSum()) - assert.Nil(t, m.GetHistogram()) - assert.Nil(t, m.GetSummary()) - } -} - -func TestSumFloatDataPoints(t *testing.T) { - desc := metrictest.NewDescriptor("", sdkapi.HistogramInstrumentKind, number.Float64Kind) - labels := attribute.NewSet(attribute.String("one", "1")) - sums := sum.New(2) - s, ckpt := &sums[0], &sums[1] - - assert.NoError(t, s.Update(context.Background(), number.NewFloat64Number(1), &desc)) - require.NoError(t, s.SynchronizedMove(ckpt, &desc)) - record := export.NewRecord(&desc, &labels, ckpt.Aggregation(), intervalStart, intervalEnd) - value, err := ckpt.Sum() - require.NoError(t, err) - - if m, err := sumPoint(record, value, record.StartTime(), record.EndTime(), aggregation.DeltaTemporality, false); assert.NoError(t, err) { - assert.Nil(t, m.GetGauge()) - assert.Equal(t, &metricpb.Sum{ - IsMonotonic: false, - AggregationTemporality: otelDelta, - DataPoints: []*metricpb.NumberDataPoint{{ - Value: &metricpb.NumberDataPoint_AsDouble{ - AsDouble: 1.0, + }, + kind: number.Int64Kind, + want: &metricpb.Metric_Sum{ + Sum: &metricpb.Sum{ + DataPoints: []*metricpb.NumberDataPoint{ + { + Value: &metricpb.NumberDataPoint_AsInt{ + AsInt: 2, + }, + }, + }, + IsMonotonic: true, + }, + }, + }, + { + name: "float data", + points: []reader.Point{ + { + Aggregation: sum.NewFloat64NonMonotonic(5), }, - StartTimeUnixNano: uint64(intervalStart.UnixNano()), - TimeUnixNano: uint64(intervalEnd.UnixNano()), - Attributes: []*commonpb.KeyValue{ - { - Key: "one", - Value: &commonpb.AnyValue{Value: &commonpb.AnyValue_StringValue{StringValue: "1"}}, + }, + kind: number.Float64Kind, + want: &metricpb.Metric_Sum{ + Sum: &metricpb.Sum{ + DataPoints: []*metricpb.NumberDataPoint{ + { + Value: &metricpb.NumberDataPoint_AsDouble{ + AsDouble: 5, + }, + }, }, + IsMonotonic: false, }, - }}}, m.GetSum()) - assert.Nil(t, m.GetHistogram()) - assert.Nil(t, m.GetSummary()) + }, + }, + } + for _, tt := range testcases { + t.Run(tt.name, func(t *testing.T) { + got := sumPoints(tt.points, tt.kind, tt.temporality) + assert.Equal(t, tt.want, got) + }) } -} -func TestLastValueIntDataPoints(t *testing.T) { - desc := metrictest.NewDescriptor("", sdkapi.HistogramInstrumentKind, number.Int64Kind) - labels := attribute.NewSet(attribute.String("one", "1")) - lvs := lastvalue.New(2) - lv, ckpt := &lvs[0], &lvs[1] +} - assert.NoError(t, lv.Update(context.Background(), number.Number(100), &desc)) - require.NoError(t, lv.SynchronizedMove(ckpt, &desc)) - record := export.NewRecord(&desc, &labels, ckpt.Aggregation(), intervalStart, intervalEnd) - value, timestamp, err := ckpt.LastValue() - require.NoError(t, err) +func TestGaugePoints(t *testing.T) { + // desc := sdkinstrument.NewDescriptor("", sdkinstrument.HistogramKind, number.Int64Kind) + // labels := attribute.NewSet(attribute.String("one", "1")) - if m, err := gaugePoint(record, value, time.Time{}, timestamp); assert.NoError(t, err) { - assert.Equal(t, []*metricpb.NumberDataPoint{{ - StartTimeUnixNano: 0, - TimeUnixNano: uint64(timestamp.UnixNano()), - Attributes: []*commonpb.KeyValue{ + testcases := []struct { + name string + points []reader.Point + kind number.Kind + temporality aggregation.Temporality + want *metricpb.Metric_Gauge + }{ + { + name: "no points", + points: []reader.Point{}, + want: nil, + }, + { + name: "incorrect aggregation", + points: []reader.Point{ { - Key: "one", - Value: &commonpb.AnyValue{Value: &commonpb.AnyValue_StringValue{StringValue: "1"}}, + Aggregation: sum.NewInt64Monotonic(1), }, }, - Value: &metricpb.NumberDataPoint_AsInt{ - AsInt: 100, + want: nil, // Error is in the error handler + }, + { + name: "int data", + points: []reader.Point{ + { + Aggregation: gauge.NewInt64(2), + }, + }, + kind: number.Int64Kind, + want: &metricpb.Metric_Gauge{ + Gauge: &metricpb.Gauge{ + DataPoints: []*metricpb.NumberDataPoint{ + { + Value: &metricpb.NumberDataPoint_AsInt{ + AsInt: 2, + }, + }, + }, + }, + }, + }, + { + name: "float data", + points: []reader.Point{ + { + Aggregation: gauge.NewFloat64(5), + }, }, - }}, m.GetGauge().DataPoints) - assert.Nil(t, m.GetSum()) - assert.Nil(t, m.GetHistogram()) - assert.Nil(t, m.GetSummary()) + kind: number.Float64Kind, + want: &metricpb.Metric_Gauge{ + Gauge: &metricpb.Gauge{ + DataPoints: []*metricpb.NumberDataPoint{ + { + Value: &metricpb.NumberDataPoint_AsDouble{ + AsDouble: 5, + }, + }, + }, + }, + }, + }, } -} - -func TestSumErrUnknownValueType(t *testing.T) { - desc := metrictest.NewDescriptor("", sdkapi.HistogramInstrumentKind, number.Kind(-1)) - labels := attribute.NewSet() - s := &sum.New(1)[0] - record := export.NewRecord(&desc, &labels, s, intervalStart, intervalEnd) - value, err := s.Sum() - require.NoError(t, err) - _, err = sumPoint(record, value, record.StartTime(), record.EndTime(), aggregation.CumulativeTemporality, true) - assert.Error(t, err) - if !errors.Is(err, ErrUnknownValueType) { - t.Errorf("expected ErrUnknownValueType, got %v", err) + for _, tt := range testcases { + t.Run(tt.name, func(t *testing.T) { + got := gaugePoints(tt.points, tt.kind) + assert.Equal(t, tt.want, got) + }) } -} - -type testAgg struct { - kind aggregation.Kind - agg aggregation.Aggregation -} - -func (t *testAgg) Kind() aggregation.Kind { - return t.kind -} -func (t *testAgg) Aggregation() aggregation.Aggregation { - return t.agg } - -// None of these three are used: - -func (t *testAgg) Update(ctx context.Context, number number.Number, descriptor *sdkapi.Descriptor) error { - return nil -} -func (t *testAgg) SynchronizedMove(destination aggregator.Aggregator, descriptor *sdkapi.Descriptor) error { - return nil -} -func (t *testAgg) Merge(aggregator aggregator.Aggregator, descriptor *sdkapi.Descriptor) error { - return nil -} - -type testErrSum struct { - err error -} - -type testErrLastValue struct { - err error -} - -func (te *testErrLastValue) LastValue() (number.Number, time.Time, error) { - return 0, time.Time{}, te.err -} -func (te *testErrLastValue) Kind() aggregation.Kind { - return aggregation.LastValueKind -} - -func (te *testErrSum) Sum() (number.Number, error) { - return 0, te.err -} -func (te *testErrSum) Kind() aggregation.Kind { - return aggregation.SumKind -} - -var _ aggregator.Aggregator = &testAgg{} -var _ aggregation.Aggregation = &testAgg{} -var _ aggregation.Sum = &testErrSum{} -var _ aggregation.LastValue = &testErrLastValue{} - -func TestRecordAggregatorIncompatibleErrors(t *testing.T) { - makeMpb := func(kind aggregation.Kind, agg aggregation.Aggregation) (*metricpb.Metric, error) { - desc := metrictest.NewDescriptor("things", sdkapi.CounterInstrumentKind, number.Int64Kind) - labels := attribute.NewSet() - test := &testAgg{ - kind: kind, - agg: agg, - } - return Record(aggregation.CumulativeTemporalitySelector(), export.NewRecord(&desc, &labels, test, intervalStart, intervalEnd)) +func TestHistogramPoints(t *testing.T) { + boundaries := []float64{2.0, 5.0, 8.0} + testSum := float64(11) + testcases := []struct { + name string + points []reader.Point + kind number.Kind + temporality aggregation.Temporality + want *metricpb.Metric_Histogram + }{ + { + name: "no points", + points: []reader.Point{}, + want: nil, + }, + { + name: "incorrect aggregation", + points: []reader.Point{ + { + Aggregation: sum.NewInt64Monotonic(1), + }, + }, + want: nil, // Error is in the error handler + }, + { + name: "int data", + points: []reader.Point{ + { + Aggregation: histogram.NewInt64(boundaries, 1, 10), + }, + }, + kind: number.Int64Kind, + want: &metricpb.Metric_Histogram{ + Histogram: &metricpb.Histogram{ + DataPoints: []*metricpb.HistogramDataPoint{ + { + Count: 2, + Sum: &testSum, + BucketCounts: []uint64{1, 0, 0, 1}, + ExplicitBounds: boundaries, + }, + }, + }, + }, + }, + { + name: "float data", + points: []reader.Point{ + { + Aggregation: histogram.NewFloat64(boundaries, 1, 10), + }, + }, + kind: number.Float64Kind, + want: &metricpb.Metric_Histogram{ + Histogram: &metricpb.Histogram{ + DataPoints: []*metricpb.HistogramDataPoint{ + { + Count: 2, + Sum: &testSum, + BucketCounts: []uint64{1, 0, 0, 1}, + ExplicitBounds: boundaries, + }, + }, + }, + }, + }, } - mpb, err := makeMpb(aggregation.SumKind, &lastvalue.New(1)[0]) - - require.Error(t, err) - require.Nil(t, mpb) - require.True(t, errors.Is(err, ErrIncompatibleAgg)) - - mpb, err = makeMpb(aggregation.LastValueKind, &sum.New(1)[0]) - - require.Error(t, err) - require.Nil(t, mpb) - require.True(t, errors.Is(err, ErrIncompatibleAgg)) -} - -func TestRecordAggregatorUnexpectedErrors(t *testing.T) { - makeMpb := func(kind aggregation.Kind, agg aggregation.Aggregation) (*metricpb.Metric, error) { - desc := metrictest.NewDescriptor("things", sdkapi.CounterInstrumentKind, number.Int64Kind) - labels := attribute.NewSet() - return Record(aggregation.CumulativeTemporalitySelector(), export.NewRecord(&desc, &labels, agg, intervalStart, intervalEnd)) + for _, tt := range testcases { + t.Run(tt.name, func(t *testing.T) { + got := histogramPoints(tt.points, tt.kind, tt.temporality) + assert.Equal(t, tt.want, got) + }) } - errEx := fmt.Errorf("timeout") - - mpb, err := makeMpb(aggregation.SumKind, &testErrSum{errEx}) - - require.Error(t, err) - require.Nil(t, mpb) - require.True(t, errors.Is(err, errEx)) - - mpb, err = makeMpb(aggregation.LastValueKind, &testErrLastValue{errEx}) - - require.Error(t, err) - require.Nil(t, mpb) - require.True(t, errors.Is(err, errEx)) } diff --git a/exporters/otlp/otlpmetric/internal/otlpmetrictest/data.go b/exporters/otlp/otlpmetric/internal/otlpmetrictest/data.go index 2da921f142c..0ca6c78bc79 100644 --- a/exporters/otlp/otlpmetric/internal/otlpmetrictest/data.go +++ b/exporters/otlp/otlpmetric/internal/otlpmetrictest/data.go @@ -15,57 +15,31 @@ package otlpmetrictest // import "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/internal/otlpmetrictest" import ( - "context" - "fmt" - "time" - - "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/sdk/instrumentation" "go.opentelemetry.io/otel/sdk/metric/aggregator/sum" - "go.opentelemetry.io/otel/sdk/metric/export" - "go.opentelemetry.io/otel/sdk/metric/metrictest" "go.opentelemetry.io/otel/sdk/metric/number" - "go.opentelemetry.io/otel/sdk/metric/processor/processortest" - "go.opentelemetry.io/otel/sdk/metric/sdkapi" + "go.opentelemetry.io/otel/sdk/metric/reader" + "go.opentelemetry.io/otel/sdk/metric/sdkinstrument" ) -// OneRecordReader is a Reader that returns just one -// filled record. It may be useful for testing driver's metrics -// export. -func OneRecordReader() export.InstrumentationLibraryReader { - desc := metrictest.NewDescriptor( - "foo", - sdkapi.CounterInstrumentKind, - number.Int64Kind, - ) - agg := sum.New(1) - if err := agg[0].Update(context.Background(), number.NewInt64Number(42), &desc); err != nil { - panic(err) - } - start := time.Date(2020, time.December, 8, 19, 15, 0, 0, time.UTC) - end := time.Date(2020, time.December, 8, 19, 16, 0, 0, time.UTC) - labels := attribute.NewSet(attribute.String("abc", "def"), attribute.Int64("one", 1)) - rec := export.NewRecord(&desc, &labels, agg[0].Aggregation(), start, end) - - return processortest.MultiInstrumentationLibraryReader( - map[instrumentation.Library][]export.Record{ - { - Name: "onelib", - }: {rec}, - }) -} - -func EmptyReader() export.InstrumentationLibraryReader { - return processortest.MultiInstrumentationLibraryReader(nil) +var OneMetric = reader.Metrics{ + Scopes: []reader.Scope{ + { + Library: instrumentation.Library{ + Name: "OneMetricLibrary", + }, + Instruments: []reader.Instrument{ + { + Descriptor: sdkinstrument.NewDescriptor("oneMetricInstrument", sdkinstrument.CounterKind, number.Int64Kind, "", ""), + Points: []reader.Point{ + { + Aggregation: sum.NewInt64Monotonic(7), + }, + }, + }, + }, + }, + }, } -// FailReader is a checkpointer that returns an error during -// ForEach. -type FailReader struct{} - -var _ export.InstrumentationLibraryReader = FailReader{} - -// ForEach implements export.Reader. It always fails. -func (FailReader) ForEach(readerFunc func(instrumentation.Library, export.Reader) error) error { - return fmt.Errorf("fail") -} +var EmptyMetric = reader.Metrics{} diff --git a/exporters/otlp/otlpmetric/internal/otlpmetrictest/otlptest.go b/exporters/otlp/otlpmetric/internal/otlpmetrictest/otlptest.go index 524cb774588..86efe971523 100644 --- a/exporters/otlp/otlpmetric/internal/otlpmetrictest/otlptest.go +++ b/exporters/otlp/otlpmetric/internal/otlpmetrictest/otlptest.go @@ -17,6 +17,8 @@ package otlpmetrictest // import "go.opentelemetry.io/otel/exporters/otlp/otlpme import ( "context" "fmt" + "sync" + "sync/atomic" "testing" "time" @@ -26,41 +28,67 @@ import ( "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/exporters/otlp/otlpmetric" "go.opentelemetry.io/otel/metric/instrument" - controller "go.opentelemetry.io/otel/sdk/metric/controller/basic" - "go.opentelemetry.io/otel/sdk/metric/export/aggregation" + sdkmetric "go.opentelemetry.io/otel/sdk/metric" "go.opentelemetry.io/otel/sdk/metric/number" - processor "go.opentelemetry.io/otel/sdk/metric/processor/basic" - "go.opentelemetry.io/otel/sdk/metric/sdkapi" - "go.opentelemetry.io/otel/sdk/metric/selector/simple" + "go.opentelemetry.io/otel/sdk/metric/reader" + "go.opentelemetry.io/otel/sdk/metric/sdkinstrument" metricpb "go.opentelemetry.io/proto/otlp/metrics/v1" ) +type client struct { + lock sync.Mutex + resourceMetrics *metricpb.ResourceMetrics + uploadCount uint64 + startCount uint64 + stopCount uint64 +} + +var _ otlpmetric.Client = &client{} + +func (c *client) Start(ctx context.Context) error { + atomic.AddUint64(&c.startCount, 1) + return nil +} +func (c *client) Stop(ctx context.Context) error { + atomic.AddUint64(&c.stopCount, 1) + return nil +} +func (c *client) UploadMetrics(ctx context.Context, protoMetrics *metricpb.ResourceMetrics) error { + atomic.AddUint64(&c.uploadCount, 1) + c.lock.Lock() + defer c.lock.Unlock() + + c.resourceMetrics = protoMetrics + + return nil +} + // RunEndToEndTest can be used by protocol driver tests to validate // themselves. func RunEndToEndTest(ctx context.Context, t *testing.T, exp *otlpmetric.Exporter, mcMetrics Collector) { - selector := simple.NewWithInexpensiveDistribution() - proc := processor.NewFactory(selector, aggregation.StatelessTemporalitySelector()) - cont := controller.New(proc, controller.WithExporter(exp)) - require.NoError(t, cont.Start(ctx)) + rdr := reader.NewManualReader(exp) + mp := sdkmetric.New( + sdkmetric.WithReader(rdr), + ) - meter := cont.Meter("test-meter") + meter := mp.Meter("test-meter") labels := []attribute.KeyValue{attribute.Bool("test", true)} type data struct { - iKind sdkapi.InstrumentKind + iKind sdkinstrument.Kind nKind number.Kind val int64 } instruments := map[string]data{ - "test-int64-counter": {sdkapi.CounterInstrumentKind, number.Int64Kind, 1}, - "test-float64-counter": {sdkapi.CounterInstrumentKind, number.Float64Kind, 1}, - "test-int64-gaugeobserver": {sdkapi.GaugeObserverInstrumentKind, number.Int64Kind, 3}, - "test-float64-gaugeobserver": {sdkapi.GaugeObserverInstrumentKind, number.Float64Kind, 3}, + "test-int64-counter": {sdkinstrument.CounterKind, number.Int64Kind, 1}, + "test-float64-counter": {sdkinstrument.CounterKind, number.Float64Kind, 1}, + "test-int64-gaugeobserver": {sdkinstrument.GaugeObserverKind, number.Int64Kind, 3}, + "test-float64-gaugeobserver": {sdkinstrument.GaugeObserverKind, number.Float64Kind, 3}, } for name, data := range instruments { data := data switch data.iKind { - case sdkapi.CounterInstrumentKind: + case sdkinstrument.CounterKind: switch data.nKind { case number.Int64Kind: c, _ := meter.SyncInt64().Counter(name) @@ -71,7 +99,7 @@ func RunEndToEndTest(ctx context.Context, t *testing.T, exp *otlpmetric.Exporter default: assert.Failf(t, "unsupported number testing kind", data.nKind.String()) } - case sdkapi.HistogramInstrumentKind: + case sdkinstrument.HistogramKind: switch data.nKind { case number.Int64Kind: c, _ := meter.SyncInt64().Histogram(name) @@ -82,7 +110,7 @@ func RunEndToEndTest(ctx context.Context, t *testing.T, exp *otlpmetric.Exporter default: assert.Failf(t, "unsupported number testing kind", data.nKind.String()) } - case sdkapi.GaugeObserverInstrumentKind: + case sdkinstrument.GaugeObserverKind: switch data.nKind { case number.Int64Kind: g, _ := meter.AsyncInt64().Gauge(name) @@ -102,11 +130,9 @@ func RunEndToEndTest(ctx context.Context, t *testing.T, exp *otlpmetric.Exporter } } - // Flush and close. - require.NoError(t, cont.Stop(ctx)) - - // Wait >2 cycles. - <-time.After(40 * time.Millisecond) + // Collect + err := rdr.Collect(ctx, nil) + assert.NoError(t, err) // Now shutdown the exporter ctx, cancel := context.WithTimeout(ctx, 10*time.Second) @@ -131,13 +157,13 @@ func RunEndToEndTest(ctx context.Context, t *testing.T, exp *otlpmetric.Exporter seen[m.Name] = struct{}{} switch data.iKind { - case sdkapi.CounterInstrumentKind, sdkapi.GaugeObserverInstrumentKind: + case sdkinstrument.CounterKind, sdkinstrument.GaugeObserverKind: var dp []*metricpb.NumberDataPoint switch data.iKind { - case sdkapi.CounterInstrumentKind: + case sdkinstrument.CounterKind: require.NotNil(t, m.GetSum()) dp = m.GetSum().GetDataPoints() - case sdkapi.GaugeObserverInstrumentKind: + case sdkinstrument.GaugeObserverKind: require.NotNil(t, m.GetGauge()) dp = m.GetGauge().GetDataPoints() } @@ -151,7 +177,7 @@ func RunEndToEndTest(ctx context.Context, t *testing.T, exp *otlpmetric.Exporter assert.Equal(t, v, dp[0].Value, "invalid value for %q", m.Name) } } - case sdkapi.HistogramInstrumentKind: + case sdkinstrument.HistogramKind: require.NotNil(t, m.GetSummary()) if dp := m.GetSummary().DataPoints; assert.Len(t, dp, 1) { count := dp[0].Count diff --git a/exporters/otlp/otlpmetric/options.go b/exporters/otlp/otlpmetric/options.go index bd8706a74d3..8a46e3c9ecb 100644 --- a/exporters/otlp/otlpmetric/options.go +++ b/exporters/otlp/otlpmetric/options.go @@ -14,7 +14,7 @@ package otlpmetric // import "go.opentelemetry.io/otel/exporters/otlp/otlpmetric" -import "go.opentelemetry.io/otel/sdk/metric/export/aggregation" +import "go.opentelemetry.io/otel/sdk/metric/aggregator/aggregation" // Option are setting options passed to an Exporter on creation. type Option interface { @@ -28,16 +28,16 @@ func (fn exporterOptionFunc) apply(cfg config) config { } type config struct { - temporalitySelector aggregation.TemporalitySelector + temporality aggregation.Temporality } // WithMetricAggregationTemporalitySelector defines the aggregation.TemporalitySelector used // for selecting aggregation.Temporality (i.e., Cumulative vs. Delta // aggregation). If not specified otherwise, exporter will use a // cumulative temporality selector. -func WithMetricAggregationTemporalitySelector(selector aggregation.TemporalitySelector) Option { +func WithMetricAggregationTemporalitySelector(temp aggregation.Temporality) Option { return exporterOptionFunc(func(cfg config) config { - cfg.temporalitySelector = selector + cfg.temporality = temp return cfg }) } diff --git a/exporters/otlp/otlpmetric/otlpmetricgrpc/client_test.go b/exporters/otlp/otlpmetric/otlpmetricgrpc/client_test.go index dc7eeda51d9..6a4ddeee017 100644 --- a/exporters/otlp/otlpmetric/otlpmetricgrpc/client_test.go +++ b/exporters/otlp/otlpmetric/otlpmetricgrpc/client_test.go @@ -32,13 +32,6 @@ import ( "go.opentelemetry.io/otel/exporters/otlp/otlpmetric" "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/internal/otlpmetrictest" "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc" - "go.opentelemetry.io/otel/sdk/resource" -) - -var ( - oneRecord = otlpmetrictest.OneRecordReader() - - testResource = resource.Empty() ) func TestNewExporter_endToEnd(t *testing.T) { @@ -205,7 +198,7 @@ func TestNewExporter_withHeaders(t *testing.T) { ctx := context.Background() exp := newGRPCExporter(t, ctx, mc.endpoint, otlpmetricgrpc.WithHeaders(map[string]string{"header1": "value1"})) - require.NoError(t, exp.Export(ctx, testResource, oneRecord)) + require.NoError(t, exp.Export(ctx, otlpmetrictest.OneMetric)) defer func() { _ = exp.Shutdown(ctx) @@ -229,7 +222,7 @@ func TestNewExporter_WithTimeout(t *testing.T) { { name: "Timeout Metrics", fn: func(exp *otlpmetric.Exporter) error { - return exp.Export(context.Background(), testResource, oneRecord) + return exp.Export(context.Background(), otlpmetrictest.OneMetric) }, timeout: time.Millisecond * 100, code: codes.DeadlineExceeded, @@ -239,7 +232,7 @@ func TestNewExporter_WithTimeout(t *testing.T) { { name: "No Timeout Metrics", fn: func(exp *otlpmetric.Exporter) error { - return exp.Export(context.Background(), testResource, oneRecord) + return exp.Export(context.Background(), otlpmetrictest.OneMetric) }, timeout: time.Minute, metrics: 1, @@ -310,23 +303,5 @@ func TestEmptyData(t *testing.T) { assert.NoError(t, exp.Shutdown(ctx)) }() - assert.NoError(t, exp.Export(ctx, testResource, otlpmetrictest.EmptyReader())) -} - -func TestFailedMetricTransform(t *testing.T) { - mc := runMockCollector(t) - - defer func() { - _ = mc.stop() - }() - - <-time.After(5 * time.Millisecond) - - ctx := context.Background() - exp := newGRPCExporter(t, ctx, mc.endpoint) - defer func() { - assert.NoError(t, exp.Shutdown(ctx)) - }() - - assert.Error(t, exp.Export(ctx, testResource, otlpmetrictest.FailReader{})) + assert.NoError(t, exp.Export(ctx, otlpmetrictest.EmptyMetric)) } diff --git a/exporters/otlp/otlpmetric/otlpmetricgrpc/example_test.go b/exporters/otlp/otlpmetric/otlpmetricgrpc/example_test.go index fe0866b7af5..21a826e185a 100644 --- a/exporters/otlp/otlpmetric/otlpmetricgrpc/example_test.go +++ b/exporters/otlp/otlpmetric/otlpmetricgrpc/example_test.go @@ -25,9 +25,8 @@ import ( "go.opentelemetry.io/otel/exporters/otlp/otlpmetric" "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc" "go.opentelemetry.io/otel/metric/instrument" - controller "go.opentelemetry.io/otel/sdk/metric/controller/basic" - processor "go.opentelemetry.io/otel/sdk/metric/processor/basic" - "go.opentelemetry.io/otel/sdk/metric/selector/simple" + "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/metric/reader/periodic" ) func Example_insecure() { @@ -45,32 +44,14 @@ func Example_insecure() { } }() - pusher := controller.New( - processor.NewFactory( - simple.NewWithHistogramDistribution(), - exp, - ), - controller.WithExporter(exp), - controller.WithCollectPeriod(2*time.Second), + // This creates a reader that will collect from all instruments once a + // minute, and will timeout after two seconds. + rdr := periodic.New(time.Minute, exp, periodic.WithTimeout(2*time.Second)) + meterProvider := metric.New( + metric.WithReader(rdr), ) - // TODO Bring back Global package - // global.SetMeterProvider(pusher) - if err := pusher.Start(ctx); err != nil { - log.Fatalf("could not start metric controoler: %v", err) - } - defer func() { - ctx, cancel := context.WithTimeout(ctx, time.Second) - defer cancel() - // pushes any last exports to the receiver - if err := pusher.Stop(ctx); err != nil { - otel.Handle(err) - } - }() - - // TODO Bring Back Global package - // meter := global.Meter("go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc_test") - meter := pusher.Meter("go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc_test") + meter := meterProvider.Meter("go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc_test") // Recorder metric example @@ -107,33 +88,14 @@ func Example_withTLS() { } }() - pusher := controller.New( - processor.NewFactory( - simple.NewWithHistogramDistribution(), - exp, - ), - controller.WithExporter(exp), - controller.WithCollectPeriod(2*time.Second), + // This creates a reader that will collect from all instruments once a + // minute, and will timeout after two seconds. + rdr := periodic.New(time.Minute, exp, periodic.WithTimeout(2*time.Second)) + meterProvider := metric.New( + metric.WithReader(rdr), ) - // TODO Bring back Global package - // global.SetMeterProvider(pusher) - if err := pusher.Start(ctx); err != nil { - log.Fatalf("could not start metric controoler: %v", err) - } - - defer func() { - ctx, cancel := context.WithTimeout(ctx, time.Second) - defer cancel() - // pushes any last exports to the receiver - if err := pusher.Stop(ctx); err != nil { - otel.Handle(err) - } - }() - - // TODO Bring back Global package - // meter := global.Meter("go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc_test") - meter := pusher.Meter("go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc_test") + meter := meterProvider.Meter("go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc_test") // Recorder metric example counter, err := meter.SyncFloat64().Counter("an_important_metric", instrument.WithDescription("Measures the cumulative epicness of the app")) @@ -166,32 +128,14 @@ func Example_withDifferentSignalCollectors() { } }() - pusher := controller.New( - processor.NewFactory( - simple.NewWithHistogramDistribution(), - exp, - ), - controller.WithExporter(exp), - controller.WithCollectPeriod(2*time.Second), + // This creates a reader that will collect from all instruments once a + // minute, and will timeout after two seconds. + rdr := periodic.New(time.Minute, exp, periodic.WithTimeout(2*time.Second)) + meterProvider := metric.New( + metric.WithReader(rdr), ) - // TODO Bring back Global package - // global.SetMeterProvider(pusher) - - if err := pusher.Start(ctx); err != nil { - log.Fatalf("could not start metric controoler: %v", err) - } - defer func() { - ctx, cancel := context.WithTimeout(ctx, time.Second) - defer cancel() - // pushes any last exports to the receiver - if err := pusher.Stop(ctx); err != nil { - otel.Handle(err) - } - }() - // TODO Bring back Global package - // meter := global.Meter("go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc_test") - meter := pusher.Meter("go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc_test") + meter := meterProvider.Meter("go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc_test") // Recorder metric example counter, err := meter.SyncFloat64().Counter("an_important_metric", instrument.WithDescription("Measures the cumulative epicness of the app")) diff --git a/exporters/otlp/otlpmetric/otlpmetricgrpc/go.mod b/exporters/otlp/otlpmetric/otlpmetricgrpc/go.mod index d643ecf73b9..5f31e3a016f 100644 --- a/exporters/otlp/otlpmetric/otlpmetricgrpc/go.mod +++ b/exporters/otlp/otlpmetric/otlpmetricgrpc/go.mod @@ -8,7 +8,6 @@ require ( go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.6.1 go.opentelemetry.io/otel/exporters/otlp/otlpmetric v0.28.0 go.opentelemetry.io/otel/metric v0.28.0 - go.opentelemetry.io/otel/sdk v1.6.1 go.opentelemetry.io/otel/sdk/metric v0.28.0 go.opentelemetry.io/proto/otlp v0.15.0 google.golang.org/genproto v0.0.0-20211118181313-81c1377c94b1 diff --git a/exporters/otlp/otlpmetric/otlpmetricgrpc/go.sum b/exporters/otlp/otlpmetric/otlpmetricgrpc/go.sum index 98fd383612f..7f886d05d81 100644 --- a/exporters/otlp/otlpmetric/otlpmetricgrpc/go.sum +++ b/exporters/otlp/otlpmetric/otlpmetricgrpc/go.sum @@ -35,8 +35,6 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03 github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= -github.com/benbjohnson/clock v1.3.0 h1:ip6w0uFQkncKQ979AypyG0ER7mqUSBdKLOgAle/AT8A= -github.com/benbjohnson/clock v1.3.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/cenkalti/backoff/v4 v4.1.2 h1:6Yo7N8UP2K6LWZnW94DLVSSrbobcWdVzAYOisuDPIFo= github.com/cenkalti/backoff/v4 v4.1.2/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= diff --git a/exporters/otlp/otlpmetric/otlpmetrichttp/client_test.go b/exporters/otlp/otlpmetric/otlpmetrichttp/client_test.go index 5e614da2640..a338d7dbaac 100644 --- a/exporters/otlp/otlpmetric/otlpmetrichttp/client_test.go +++ b/exporters/otlp/otlpmetric/otlpmetrichttp/client_test.go @@ -27,7 +27,6 @@ import ( "go.opentelemetry.io/otel/exporters/otlp/otlpmetric" "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/internal/otlpmetrictest" "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp" - "go.opentelemetry.io/otel/sdk/resource" ) const ( @@ -35,12 +34,6 @@ const ( otherMetricsPath = "/post/metrics/here" ) -var ( - oneRecord = otlpmetrictest.OneRecordReader() - - testResource = resource.Empty() -) - var ( testHeaders = map[string]string{ "Otel-Go-Key-1": "somevalue", @@ -160,7 +153,7 @@ func TestTimeout(t *testing.T) { defer func() { assert.NoError(t, exporter.Shutdown(ctx)) }() - err = exporter.Export(ctx, testResource, oneRecord) + err = exporter.Export(ctx, otlpmetrictest.OneMetric) assert.Equalf(t, true, os.IsTimeout(err), "expected timeout error, got: %v", err) } @@ -179,7 +172,7 @@ func TestEmptyData(t *testing.T) { assert.NoError(t, exporter.Shutdown(ctx)) }() assert.NoError(t, err) - err = exporter.Export(ctx, testResource, oneRecord) + err = exporter.Export(ctx, otlpmetrictest.OneMetric) assert.NoError(t, err) assert.NotEmpty(t, mc.GetMetrics()) } @@ -204,7 +197,7 @@ func TestCancelledContext(t *testing.T) { assert.NoError(t, exporter.Shutdown(context.Background())) }() cancel() - _ = exporter.Export(ctx, testResource, oneRecord) + _ = exporter.Export(ctx, otlpmetrictest.OneMetric) assert.Empty(t, mc.GetMetrics()) } @@ -231,7 +224,7 @@ func TestDeadlineContext(t *testing.T) { }() ctx, cancel := context.WithTimeout(ctx, time.Second) defer cancel() - err = exporter.Export(ctx, testResource, oneRecord) + err = exporter.Export(ctx, otlpmetrictest.OneMetric) assert.Error(t, err) assert.Empty(t, mc.GetMetrics()) } @@ -259,7 +252,7 @@ func TestStopWhileExporting(t *testing.T) { }() doneCh := make(chan struct{}) go func() { - err := exporter.Export(ctx, testResource, oneRecord) + err := exporter.Export(ctx, otlpmetrictest.OneMetric) assert.Error(t, err) assert.Empty(t, mc.GetMetrics()) close(doneCh) diff --git a/exporters/otlp/otlpmetric/otlpmetrichttp/go.mod b/exporters/otlp/otlpmetric/otlpmetrichttp/go.mod index 4b3c46a3f37..3c3e2e9bfdb 100644 --- a/exporters/otlp/otlpmetric/otlpmetrichttp/go.mod +++ b/exporters/otlp/otlpmetric/otlpmetrichttp/go.mod @@ -6,7 +6,6 @@ require ( github.com/stretchr/testify v1.7.1 go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.6.1 go.opentelemetry.io/otel/exporters/otlp/otlpmetric v0.28.0 - go.opentelemetry.io/otel/sdk v1.6.1 go.opentelemetry.io/proto/otlp v0.15.0 google.golang.org/protobuf v1.28.0 ) diff --git a/exporters/otlp/otlpmetric/otlpmetrichttp/go.sum b/exporters/otlp/otlpmetric/otlpmetrichttp/go.sum index 98fd383612f..7f886d05d81 100644 --- a/exporters/otlp/otlpmetric/otlpmetrichttp/go.sum +++ b/exporters/otlp/otlpmetric/otlpmetrichttp/go.sum @@ -35,8 +35,6 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03 github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= -github.com/benbjohnson/clock v1.3.0 h1:ip6w0uFQkncKQ979AypyG0ER7mqUSBdKLOgAle/AT8A= -github.com/benbjohnson/clock v1.3.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/cenkalti/backoff/v4 v4.1.2 h1:6Yo7N8UP2K6LWZnW94DLVSSrbobcWdVzAYOisuDPIFo= github.com/cenkalti/backoff/v4 v4.1.2/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= diff --git a/sdk/metric/aggregator/gauge/gauge.go b/sdk/metric/aggregator/gauge/gauge.go index f2256fcd3f0..0d1f1677e26 100644 --- a/sdk/metric/aggregator/gauge/gauge.go +++ b/sdk/metric/aggregator/gauge/gauge.go @@ -37,6 +37,13 @@ var ( _ aggregation.Gauge = &State[float64, traits.Float64]{} ) +func NewInt64(value int64) aggregation.Gauge { + return &State[int64, traits.Int64]{value: value} +} +func NewFloat64(value float64) aggregation.Gauge { + return &State[float64, traits.Float64]{value: value} +} + func (lv *State[N, Traits]) Gauge() number.Number { var traits Traits return traits.ToNumber(lv.value) diff --git a/sdk/metric/aggregator/histogram/histogram.go b/sdk/metric/aggregator/histogram/histogram.go index 16eadc4bf40..43a8fd1d96d 100644 --- a/sdk/metric/aggregator/histogram/histogram.go +++ b/sdk/metric/aggregator/histogram/histogram.go @@ -53,6 +53,38 @@ var ( _ aggregation.Histogram = &State[float64, traits.Float64]{} ) +func NewFloat64(boundries []float64, values ...float64) aggregation.Histogram { + if len(boundries) < 1 { + boundries = DefaultFloat64Boundaries + } + + hist := &State[float64, traits.Float64]{ + boundaries: boundries, + bucketCounts: make([]uint64, len(boundries)+1), + } + methods := Methods[float64, traits.Float64, State[float64, traits.Float64]]{} + for _, val := range values { + methods.Update(hist, val) + } + return hist +} + +func NewInt64(boundries []float64, values ...int64) aggregation.Histogram { + if len(boundries) < 1 { + boundries = DefaultFloat64Boundaries + } + + hist := &State[int64, traits.Int64]{ + boundaries: boundries, + bucketCounts: make([]uint64, len(boundries)+1), + } + methods := Methods[int64, traits.Int64, State[int64, traits.Int64]]{} + for _, val := range values { + methods.Update(hist, val) + } + return hist +} + // DefaultBoundaries have been copied from prometheus.DefBuckets. var DefaultFloat64Boundaries = []float64{.005, .01, .025, .05, .1, .25, .5, 1, 2.5, 5, 10} diff --git a/sdk/metric/aggregator/sum/sum.go b/sdk/metric/aggregator/sum/sum.go index b2e2368c830..72cb06a833f 100644 --- a/sdk/metric/aggregator/sum/sum.go +++ b/sdk/metric/aggregator/sum/sum.go @@ -36,6 +36,20 @@ type ( } ) +func NewInt64Monotonic(value int64) aggregation.Sum { + return &State[int64, traits.Int64, Monotonic]{value: value} +} +func NewFloat64Monotonic(value float64) aggregation.Sum { + return &State[float64, traits.Float64, Monotonic]{value: value} +} + +func NewInt64NonMonotonic(value int64) aggregation.Sum { + return &State[int64, traits.Int64, NonMonotonic]{value: value} +} +func NewFloat64NonMonotonic(value float64) aggregation.Sum { + return &State[float64, traits.Float64, NonMonotonic]{value: value} +} + func (Monotonic) category() aggregation.Category { return aggregation.MonotonicSumCategory }