From d6beb6ac9b62c9b2e8837743ab017ebcb74a76fb Mon Sep 17 00:00:00 2001 From: Joshua MacDonald Date: Wed, 15 Feb 2023 14:33:20 -0800 Subject: [PATCH] Concurrent performance improvements; add performance settings field; support `IgnoreCollisions` (#384) --- CHANGELOG.md | 8 + VERSION | 2 +- go.mod | 6 +- launcher/version.go | 2 +- lightstep/sdk/metric/README.md | 12 ++ lightstep/sdk/metric/benchmark_test.go | 44 ++++++ lightstep/sdk/metric/config.go | 12 ++ lightstep/sdk/metric/example/go.mod | 2 +- .../sdk/metric/internal/asyncstate/async.go | 8 +- .../metric/internal/asyncstate/async_test.go | 8 +- .../sdk/metric/internal/syncstate/sync.go | 144 +++++++++++------- .../metric/internal/syncstate/sync_test.go | 112 +++++++++++++- lightstep/sdk/metric/meter.go | 3 +- .../sdk/metric/sdkinstrument/performance.go | 24 +++ pipelines/go.mod | 4 +- 15 files changed, 320 insertions(+), 71 deletions(-) create mode 100644 lightstep/sdk/metric/sdkinstrument/performance.go diff --git a/CHANGELOG.md b/CHANGELOG.md index ab6b0a97..4d3af541 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,14 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm ## Unreleased +## [1.13.1](https://github.com/lightstep/otel-launcher-go/releases/tag/v1.13.1) - 2022-02-15) + +- Adds performance improvements for concurrent use of synchronous + instruments. [#384](https://github.com/lightstep/otel-launcher-go/pull/384) +- Adds `WithPerformance()` and `IgnoreCollisions` setting which offers + around 10% faster operations in exchange for safety and correctness. This + setting is off by default. [#384](https://github.com/lightstep/otel-launcher-go/pull/384) + ## [1.13.0](https://github.com/lightstep/otel-launcher-go/releases/tag/v1.13.0) - 2022-02-15) - Updates OTel-Go version dependencies to `trace@v1.12.0`, `metrics@v0.35.0`, diff --git a/VERSION b/VERSION index feaae22b..b50dd27d 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -1.13.0 +1.13.1 diff --git a/go.mod b/go.mod index 1012615a..ad0a7e4d 100644 --- a/go.mod +++ b/go.mod @@ -3,8 +3,8 @@ module github.com/lightstep/otel-launcher-go go 1.18 require ( - github.com/lightstep/otel-launcher-go/lightstep/sdk/metric v1.13.0 - github.com/lightstep/otel-launcher-go/pipelines v1.13.0 + github.com/lightstep/otel-launcher-go/lightstep/sdk/metric v1.13.1 + github.com/lightstep/otel-launcher-go/pipelines v1.13.1 github.com/sethvargo/go-envconfig v0.8.3 github.com/stretchr/testify v1.8.1 go.opentelemetry.io/otel v1.12.0 @@ -23,7 +23,7 @@ require ( github.com/golang/protobuf v1.5.2 // indirect github.com/grpc-ecosystem/grpc-gateway/v2 v2.7.0 // indirect github.com/lightstep/go-expohisto v1.0.0 // indirect - github.com/lightstep/otel-launcher-go/lightstep/instrumentation v1.13.0 // indirect + github.com/lightstep/otel-launcher-go/lightstep/instrumentation v1.13.1 // indirect github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect diff --git a/launcher/version.go b/launcher/version.go index db2fc120..339118cf 100644 --- a/launcher/version.go +++ b/launcher/version.go @@ -14,4 +14,4 @@ package launcher -const version = "1.13.0" +const version = "1.13.1" diff --git a/lightstep/sdk/metric/README.md b/lightstep/sdk/metric/README.md index d8f8979d..21eed7f3 100644 --- a/lightstep/sdk/metric/README.md +++ b/lightstep/sdk/metric/README.md @@ -122,3 +122,15 @@ For example, to configure a synchronous Gauge: ) ``` + +### Performance settings + +The `WithPerformance()` option supports control over performance +settings. + +#### IgnoreCollisions + +With `IgnoreCollisions` set to true, the SDK will ignore fingerprint +collisions and bypass a safety mechanism that ensures correctness in +spite of fingerprint collisions in the fast path for synchronous +instruments. diff --git a/lightstep/sdk/metric/benchmark_test.go b/lightstep/sdk/metric/benchmark_test.go index 665fdd2f..a0661dab 100644 --- a/lightstep/sdk/metric/benchmark_test.go +++ b/lightstep/sdk/metric/benchmark_test.go @@ -19,10 +19,15 @@ import ( "testing" "github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/data" + "github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/sdkinstrument" "github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/view" "go.opentelemetry.io/otel/attribute" ) +var unsafePerf = WithPerformance(sdkinstrument.Performance{ + IgnoreCollisions: true, +}) + // Tested prior to 0.11.0 release // goos: darwin // goarch: arm64 @@ -64,6 +69,19 @@ func BenchmarkCounterAddOneAttr(b *testing.B) { } } +func BenchmarkCounterAddOneAttrUnsafe(b *testing.B) { + ctx := context.Background() + rdr := NewManualReader("bench") + provider := NewMeterProvider(WithReader(rdr), unsafePerf) + b.ReportAllocs() + + cntr, _ := provider.Meter("test").Int64Counter("hello") + + for i := 0; i < b.N; i++ { + cntr.Add(ctx, 1, attribute.String("K", "V")) + } +} + func BenchmarkCounterAddOneInvalidAttr(b *testing.B) { ctx := context.Background() rdr := NewManualReader("bench") @@ -90,6 +108,19 @@ func BenchmarkCounterAddManyAttrs(b *testing.B) { } } +func BenchmarkCounterAddManyAttrsUnsafe(b *testing.B) { + ctx := context.Background() + rdr := NewManualReader("bench") + provider := NewMeterProvider(WithReader(rdr), unsafePerf) + b.ReportAllocs() + + cntr, _ := provider.Meter("test").Int64Counter("hello") + + for i := 0; i < b.N; i++ { + cntr.Add(ctx, 1, attribute.Int("K", i)) + } +} + func BenchmarkCounterAddManyInvalidAttrs(b *testing.B) { ctx := context.Background() rdr := NewManualReader("bench") @@ -103,6 +134,19 @@ func BenchmarkCounterAddManyInvalidAttrs(b *testing.B) { } } +func BenchmarkCounterAddManyInvalidAttrsUnsafe(b *testing.B) { + ctx := context.Background() + rdr := NewManualReader("bench") + provider := NewMeterProvider(WithReader(rdr), unsafePerf) + b.ReportAllocs() + + cntr, _ := provider.Meter("test").Int64Counter("hello") + + for i := 0; i < b.N; i++ { + cntr.Add(ctx, 1, attribute.Int("", i), attribute.Int("K", i)) + } +} + func BenchmarkCounterAddManyFilteredAttrs(b *testing.B) { ctx := context.Background() rdr := NewManualReader("bench") diff --git a/lightstep/sdk/metric/config.go b/lightstep/sdk/metric/config.go index 6c3366bf..34eaa327 100644 --- a/lightstep/sdk/metric/config.go +++ b/lightstep/sdk/metric/config.go @@ -15,6 +15,7 @@ package metric // import "github.com/lightstep/otel-launcher-go/lightstep/sdk/metric" import ( + "github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/sdkinstrument" "github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/view" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/sdk/resource" @@ -32,6 +33,9 @@ type config struct { // views is a slice of *Views instances corresponding with readers. // the i'th views applies to the i'th reader. views []*view.Views + + // performance settings + performance sdkinstrument.Performance } // Option applies a configuration option value to a MeterProvider. @@ -68,3 +72,11 @@ func WithReader(r Reader, opts ...view.Option) Option { return cfg }) } + +// WithPerformance supports modifying performance settings. +func WithPerformance(perf sdkinstrument.Performance) Option { + return optionFunction(func(cfg config) config { + cfg.performance = perf + return cfg + }) +} diff --git a/lightstep/sdk/metric/example/go.mod b/lightstep/sdk/metric/example/go.mod index 47cddc4d..b1447f7c 100644 --- a/lightstep/sdk/metric/example/go.mod +++ b/lightstep/sdk/metric/example/go.mod @@ -3,7 +3,7 @@ module github.com/lightstep/otel-launcher-go/lightstep/sdk/metric/example go 1.18 require ( - github.com/lightstep/otel-launcher-go/lightstep/sdk/metric v1.13.0 + github.com/lightstep/otel-launcher-go/lightstep/sdk/metric v1.13.1 github.com/lightstep/otel-launcher-go/pipelines v1.8.0 go.opentelemetry.io/proto/otlp v0.19.0 ) diff --git a/lightstep/sdk/metric/internal/asyncstate/async.go b/lightstep/sdk/metric/internal/asyncstate/async.go index 1ec32a71..498422e4 100644 --- a/lightstep/sdk/metric/internal/asyncstate/async.go +++ b/lightstep/sdk/metric/internal/asyncstate/async.go @@ -79,12 +79,16 @@ func NewState(pipe int) *State { } } -// New returns a new Instrument; this compiles individual +// New returns a new Observer; this compiles individual // instruments for each reader. -func New(desc sdkinstrument.Descriptor, opaque interface{}, compiled pipeline.Register[viewstate.Instrument]) *Observer { +func New(desc sdkinstrument.Descriptor, _ sdkinstrument.Performance, opaque interface{}, compiled pipeline.Register[viewstate.Instrument]) *Observer { // Note: we return a non-nil instrument even when all readers // disabled the instrument. This ensures that certain error // checks still work (wrong meter, wrong callback, etc). + // + // Note: performance settings are not used because async + // instruments do not use fingerprinting so IgnoreCollisions is + // meaningless. return &Observer{ opaque: opaque, descriptor: desc, diff --git a/lightstep/sdk/metric/internal/asyncstate/async_test.go b/lightstep/sdk/metric/internal/asyncstate/async_test.go index b333a117..630cdfc1 100644 --- a/lightstep/sdk/metric/internal/asyncstate/async_test.go +++ b/lightstep/sdk/metric/internal/asyncstate/async_test.go @@ -53,6 +53,10 @@ var ( Last: middleTime, Now: endTime, } + + ignorePerf = sdkinstrument.Performance{ + IgnoreCollisions: false, + } ) type testSDK struct { @@ -106,12 +110,12 @@ type floatObserver struct { func testIntObserver(tsdk *testSDK, name string, ik sdkinstrument.Kind) intObserver { desc := test.Descriptor(name, ik, number.Int64Kind) - return intObserver{Observer: New(desc, tsdk, tsdk.compile(desc))} + return intObserver{Observer: New(desc, ignorePerf, tsdk, tsdk.compile(desc))} } func testFloatObserver(tsdk *testSDK, name string, ik sdkinstrument.Kind) floatObserver { desc := test.Descriptor(name, ik, number.Float64Kind) - return floatObserver{Observer: New(desc, tsdk, tsdk.compile(desc))} + return floatObserver{Observer: New(desc, ignorePerf, tsdk, tsdk.compile(desc))} } func nopCB(context.Context, metric.Observer) error { diff --git a/lightstep/sdk/metric/internal/syncstate/sync.go b/lightstep/sdk/metric/internal/syncstate/sync.go index b4ff4908..dcc12aca 100644 --- a/lightstep/sdk/metric/internal/syncstate/sync.go +++ b/lightstep/sdk/metric/internal/syncstate/sync.go @@ -30,12 +30,6 @@ import ( "go.opentelemetry.io/otel/metric/instrument" ) -var sortableAttributesPool = sync.Pool{ - New: func() any { - return new(attribute.Sortable) - }, -} - // Instrument maintains a mapping from attribute.Set to an internal // record type for a single API-level instrument. This type is // organized so that a single attribute.Set lookup is performed @@ -50,6 +44,9 @@ type Observer struct { // instrument, unmodified by views. descriptor sdkinstrument.Descriptor + // performance settings for the instrument. + performance sdkinstrument.Performance + // compiled will be a single compiled instrument or a // multi-instrument in case of multiple view behaviors // and/or readers; these distinctions do not matter @@ -63,11 +60,11 @@ type Observer struct { current map[uint64]*record } -// New builds a new synchronous instrument given the +// New builds a new synchronous instrument *Observer given the // per-pipeline instrument-views compiled. Note that the unused -// second parameter is an opaque value used in the asyncstate package, +// third parameter is an opaque value used in the asyncstate package, // passed here to make these two packages generalize. -func New(desc sdkinstrument.Descriptor, _ interface{}, compiled pipeline.Register[viewstate.Instrument]) *Observer { +func New(desc sdkinstrument.Descriptor, performance sdkinstrument.Performance, _ interface{}, compiled pipeline.Register[viewstate.Instrument]) *Observer { var nonnil []viewstate.Instrument for _, comp := range compiled { if comp != nil { @@ -79,8 +76,9 @@ func New(desc sdkinstrument.Descriptor, _ interface{}, compiled pipeline.Registe return nil } return &Observer{ - descriptor: desc, - current: map[uint64]*record{}, + descriptor: desc, + current: map[uint64]*record{}, + performance: performance, // Note that viewstate.Combine is used to eliminate // the per-pipeline distinction that is useful in the @@ -183,20 +181,43 @@ type record struct { // supports checking for no updates during a round. collectedCount int64 - // accumulator can be a multi-accumulator if there - // are multiple behaviors or multiple readers, but - // these distinctions are not relevant for synchronous - // instruments. - accumulator viewstate.Accumulator - - // attributeSet is ordered and deduplicated - attributeSet attribute.Set - - // attributeList is in user-specified order, may contain duplicates. - attributeList []attribute.KeyValue + // inst allows referring to performance settings. + inst *Observer // next is protected by the instrument's RWLock. + // + // this field is unused when Performance.IgnoreCollisions is true. next *record + + // once governs access to `attrsUnsafe` and + // `accumulatorsUnsafe`. The caller that created the `record` + // must call once.Do(initialize) on its own code path, although + // another goroutine might actually perform the + // initialization. This is arranged with the use of + // readAccumulator() and readAttributes(). + once sync.Once + + // accumulatorUnsafe can be a multi-accumulator if there + // are multiple behaviors or multiple readers, but + // these distinctions are not relevant for synchronous + // instruments. + // + // Note: use record.readAccumulator() to access this value, + // to ensure that once.Do(initialize) is called. + accumulatorUnsafe viewstate.Accumulator + + // attrsUnsafe is set in acquireUninitialized by the caller that + // creates the provisional new record, after not finding it in + // acquireRead. + // + // These attributes are in user-specified order and may contain + // duplicates. When the record is initialized, this field is + // set to a copy of the attribute set that first observed the + // fingerprint. + // + // When IgnoreCollisions is true, this field is used as a temporary + // in building a new attribute set, then set to nil. + attrsUnsafe attribute.Sortable } // conditionalSnapshotAndProcess checks whether the accumulator has been @@ -214,13 +235,47 @@ func (rec *record) conditionalSnapshotAndProcess(release bool) bool { } } - rec.accumulator.SnapshotAndProcess(release) + rec.readAccumulator().SnapshotAndProcess(release) // Updates happened in this interval, collect and continue. atomic.StoreInt64(&rec.collectedCount, mods) return true } +// readAttributes gets a copy of the attributes matching a fingerprint after +// once.Do(initialize). +func (rec *record) readAttributes() []attribute.KeyValue { + rec.once.Do(rec.initialize) + return []attribute.KeyValue(rec.attrsUnsafe) +} + +// readAttributes gets the accumulator for this record after once.Do(initialize). +func (rec *record) readAccumulator() viewstate.Accumulator { + rec.once.Do(rec.initialize) + return rec.accumulatorUnsafe +} + +// initialize ensures that accumulatorUnsafe and attrsUnsafe are correctly initialized. +// +// readAttributes() and readAccumulator() call this inside a sync.Once.Do(). The +// behavior of this method depends on IgnoreCollisions, as documented in the +// corresponding "unsafe" fields. +func (rec *record) initialize() { + + var aset attribute.Set + + if rec.inst.performance.IgnoreCollisions { + aset = attribute.NewSetWithSortable(rec.attrsUnsafe, &rec.attrsUnsafe) + } else { + acpy := make(attribute.Sortable, len(rec.attrsUnsafe)) + copy(acpy, rec.attrsUnsafe) + aset = attribute.NewSetWithSortable(acpy, &rec.attrsUnsafe) + rec.attrsUnsafe = acpy + } + + rec.accumulatorUnsafe = rec.inst.compiled.NewAccumulator(aset) +} + func (inst *Observer) ObserveInt64(ctx context.Context, num int64, attrs ...attribute.KeyValue) { Observe[int64, number.Int64Traits](ctx, inst, num, attrs...) } @@ -242,10 +297,10 @@ func Observe[N number.Any, Traits number.Traits[N]](_ context.Context, inst *Obs return } - rec := acquireRecord[N](inst, attrs) + rec := acquireUninitialized[N](inst, attrs) defer rec.refMapped.unref() - rec.accumulator.(viewstate.Updater[N]).Update(num) + rec.readAccumulator().(viewstate.Updater[N]).Update(num) // Record was modified. atomic.AddInt64(&rec.updateCount, 1) @@ -352,10 +407,11 @@ func acquireRead(inst *Observer, fp uint64, attrs []attribute.KeyValue) *record rec := inst.current[fp] - // Note: we could (optionally) allow collisions and not scan this list. - // The copied `attributeList` can be avoided in this case, as well. - for rec != nil && !attributesEqual(attrs, rec.attributeList) { - rec = rec.next + // Potentially test for hash collisions. + if !inst.performance.IgnoreCollisions { + for rec != nil && !attributesEqual(attrs, rec.readAttributes()) { + rec = rec.next + } } // Existing record case. @@ -368,9 +424,10 @@ func acquireRead(inst *Observer, fp uint64, attrs []attribute.KeyValue) *record return nil } -// acquireRecord gets or creates a `*record` corresponding to `attrs`, -// the input attributes. -func acquireRecord[N number.Any](inst *Observer, attrs []attribute.KeyValue) *record { +// acquireUninitialized gets or creates a `*record` corresponding to +// `attrs`, the input attributes. The returned record is mapped but +// possibly not initialized. +func acquireUninitialized[N number.Any](inst *Observer, attrs []attribute.KeyValue) *record { fp := fingerprintAttributes(attrs) rec := acquireRead(inst, fp, attrs) @@ -378,21 +435,10 @@ func acquireRecord[N number.Any](inst *Observer, attrs []attribute.KeyValue) *re return rec } - // Build the attribute set. Make a copy of the attribute list - // because we are keeping a copy in the record. - acpy := make([]attribute.KeyValue, len(attrs)) - copy(acpy, attrs) - tmp := sortableAttributesPool.Get().(*attribute.Sortable) - defer sortableAttributesPool.Put(tmp) - aset := attribute.NewSetWithSortable(acpy, tmp) - - // Note: the accumulator set below is created speculatively; - // it will be released if it is never returned. newRec := &record{ - refMapped: newRefcountMapped(), - accumulator: inst.compiled.NewAccumulator(aset), - attributeList: acpy, - attributeSet: aset, + inst: inst, + refMapped: newRefcountMapped(), + attrsUnsafe: attrs, } for { @@ -406,10 +452,6 @@ func acquireRecord[N number.Any](inst *Observer, attrs []attribute.KeyValue) *re continue } - if acquired != newRec { - // Release the speculative accumulator, since it was not used. - newRec.accumulator.SnapshotAndProcess(true) - } return acquired } } @@ -421,7 +463,7 @@ func acquireWrite(inst *Observer, fp uint64, newRec *record) (*record, bool) { for oldRec := inst.current[fp]; oldRec != nil; oldRec = oldRec.next { - if attributesEqual(oldRec.attributeList, newRec.attributeList) { + if inst.performance.IgnoreCollisions || attributesEqual(oldRec.readAttributes(), newRec.attrsUnsafe) { if oldRec.refMapped.ref() { return oldRec, true } diff --git a/lightstep/sdk/metric/internal/syncstate/sync_test.go b/lightstep/sdk/metric/internal/syncstate/sync_test.go index 47abff31..108c2200 100644 --- a/lightstep/sdk/metric/internal/syncstate/sync_test.go +++ b/lightstep/sdk/metric/internal/syncstate/sync_test.go @@ -50,6 +50,10 @@ var ( Last: middleTime, Now: endTime, } + + safePerf = sdkinstrument.Performance{ + IgnoreCollisions: false, + } ) func deltaUpdate[N number.Any](old, new N) N { @@ -142,7 +146,7 @@ func testSyncStateConcurrency[N number.Any, Traits number.Traits[N]](t *testing. pipes[vci], _ = vcs[vci].Compile(desc) } - inst := New(desc, nil, pipes) + inst := New(desc, safePerf, nil, pipes) require.NotNil(t, inst) ctx, cancel := context.WithCancel(context.Background()) @@ -241,7 +245,7 @@ func TestSyncStatePartialNoopInstrument(t *testing.T) { require.Nil(t, pipes[0]) require.NotNil(t, pipes[1]) - inst := New(desc, nil, pipes) + inst := New(desc, safePerf, nil, pipes) require.NotNil(t, inst) inst.ObserveFloat64(ctx, 1) @@ -309,7 +313,7 @@ func TestSyncStateFullNoopInstrument(t *testing.T) { require.Nil(t, pipes[0]) require.Nil(t, pipes[1]) - inst := New(desc, nil, pipes) + inst := New(desc, safePerf, nil, pipes) require.Nil(t, inst) inst.ObserveFloat64(ctx, 1) @@ -342,7 +346,7 @@ func TestOutOfRangeValues(t *testing.T) { pipes := make(pipeline.Register[viewstate.Instrument], 1) pipes[0], _ = vcs[0].Compile(desc) - inst := New(desc, nil, pipes) + inst := New(desc, safePerf, nil, pipes) require.NotNil(t, inst) var negOne aggregation.Aggregation @@ -442,7 +446,7 @@ func TestSyncGaugeDeltaInstrument(t *testing.T) { require.NotNil(t, pipes[0]) - inst := New(indesc, nil, pipes) + inst := New(indesc, safePerf, nil, pipes) require.NotNil(t, inst) inst.ObserveFloat64(ctx, 1) @@ -707,7 +711,7 @@ func TestFingerprintCollision(t *testing.T) { ) } -func TestDuplicateFingerprint(t *testing.T) { +func TestDuplicateFingerprintSafety(t *testing.T) { ctx := context.Background() lib := instrumentation.Library{ Name: "testlib", @@ -737,7 +741,7 @@ func TestDuplicateFingerprint(t *testing.T) { require.NotNil(t, pipes[0]) require.NotNil(t, pipes[1]) - inst := New(desc, nil, pipes) + inst := New(desc, safePerf, nil, pipes) require.NotNil(t, inst) attr1 := attribute.Int64(fpKey, fpInt1) @@ -903,3 +907,97 @@ func TestDuplicateFingerprint(t *testing.T) { ) } + +func TestDuplicateFingerprintCollisionIgnored(t *testing.T) { + ctx := context.Background() + lib := instrumentation.Library{ + Name: "testlib", + } + vcs := make([]*viewstate.Compiler, 1) + vcs[0] = viewstate.New(lib, view.New( + "test", + deltaSelector, + )) + + desc := test.Descriptor("c", sdkinstrument.SyncCounter, number.Float64Kind) + + pipes := make(pipeline.Register[viewstate.Instrument], 1) + pipes[0], _ = vcs[0].Compile(desc) + + require.NotNil(t, pipes[0]) + + inst := New(desc, sdkinstrument.Performance{ + // Do not check the collision. + IgnoreCollisions: true, + }, nil, pipes) + require.NotNil(t, inst) + + attr1 := attribute.Int64(fpKey, fpInt1) + attr2 := attribute.Int64(fpKey, fpInt2) + + // Because of the duplicate, the first attribute set wins. + inst.ObserveFloat64(ctx, 1, attr1) + inst.ObserveFloat64(ctx, 2, attr2) + + // collect reader + inst.SnapshotAndProcess() + test.RequireEqualMetrics( + t, + test.CollectScope( + t, + vcs[0].Collectors(), + testSequence, + ), + test.Instrument( + desc, + test.Point(middleTime, endTime, + sum.NewMonotonicFloat64(3), // combined values + aggregation.DeltaTemporality, + attr1, // first attribute set observed + ), + ), + ) + + // There is 1 entry in memory + require.Equal(t, 1, vcs[0].Collectors()[0].Size()) + + // collect reader again + inst.SnapshotAndProcess() + test.RequireEqualMetrics( + t, + test.CollectScope( + t, + vcs[0].Collectors(), + testSequence, + ), + test.Instrument( + desc, + ), + ) + + // There are 0 entries in memory + require.Equal(t, 0, vcs[0].Collectors()[0].Size()) + + // Use both attribute sets in the opposite order, collect + // reader again. + inst.ObserveFloat64(ctx, 6, attr2) + inst.ObserveFloat64(ctx, 5, attr1) + + inst.SnapshotAndProcess() + test.RequireEqualMetrics( + t, + test.CollectScope( + t, + vcs[0].Collectors(), + testSequence, + ), + test.Instrument( + desc, + test.Point(middleTime, endTime, + sum.NewMonotonicFloat64(11), // combined values + aggregation.DeltaTemporality, + attr2, // first attribute set observed + ), + ), + ) +} diff --git a/lightstep/sdk/metric/meter.go b/lightstep/sdk/metric/meter.go index b4c01fcd..3120fba5 100644 --- a/lightstep/sdk/metric/meter.go +++ b/lightstep/sdk/metric/meter.go @@ -118,6 +118,7 @@ func (mr *metricRegistration) Unregister() error { // package for the generalization used here to work. type instrumentConstructor[T any] func( instrument sdkinstrument.Descriptor, + performance sdkinstrument.Performance, opaque interface{}, compiled pipeline.Register[viewstate.Instrument], ) *T @@ -164,7 +165,7 @@ func configureInstrument[T any]( } // Build the new instrument, cache it, append to the list. - inst := ctor(desc, m, compiled) + inst := ctor(desc, m.provider.cfg.performance, m, compiled) err := conflicts.AsError() if inst != nil { diff --git a/lightstep/sdk/metric/sdkinstrument/performance.go b/lightstep/sdk/metric/sdkinstrument/performance.go new file mode 100644 index 00000000..2d60daaf --- /dev/null +++ b/lightstep/sdk/metric/sdkinstrument/performance.go @@ -0,0 +1,24 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sdkinstrument + +// Performace configures features that allow the user to control +// performance. +type Performance struct { + // IgnoreCollisions indicates the user is willing to bypass an + // attributes-set comparison after finding a fingerprint + // match. + IgnoreCollisions bool +} diff --git a/pipelines/go.mod b/pipelines/go.mod index 4b99d411..c0703fd8 100644 --- a/pipelines/go.mod +++ b/pipelines/go.mod @@ -51,8 +51,8 @@ require ( ) require ( - github.com/lightstep/otel-launcher-go/lightstep/instrumentation v1.13.0 - github.com/lightstep/otel-launcher-go/lightstep/sdk/metric v1.13.0 + github.com/lightstep/otel-launcher-go/lightstep/instrumentation v1.13.1 + github.com/lightstep/otel-launcher-go/lightstep/sdk/metric v1.13.1 go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v0.35.0 )