Skip to content

Commit

Permalink
Merge branch 'new_sdk/main' into prevent-duplicate-reader-reg
Browse files Browse the repository at this point in the history
  • Loading branch information
MrAlias authored May 21, 2022
2 parents afcea7b + 9e45f76 commit 9a5c1f0
Show file tree
Hide file tree
Showing 13 changed files with 552 additions and 13 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ env:
# Path to where test results will be saved.
TEST_RESULTS: /tmp/test-results
# Default minimum version of Go to support.
DEFAULT_GO_VERSION: 1.16
DEFAULT_GO_VERSION: 1.17
jobs:
lint:
runs-on: ubuntu-latest
Expand Down
3 changes: 3 additions & 0 deletions sdk/metric/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build go1.17
// +build go1.17

package metric // import "go.opentelemetry.io/otel/sdk/metric"

// config contains configuration options for a MeterProvider.
Expand Down
3 changes: 3 additions & 0 deletions sdk/metric/export/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build go1.17
// +build go1.17

// TODO: NOTE this is a temporary space, it may be moved following the
// discussion of #2813, or #2841

Expand Down
61 changes: 61 additions & 0 deletions sdk/metric/exporter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build go1.17
// +build go1.17

package metric // import "go.opentelemetry.io/otel/sdk/metric"

import (
"context"
"fmt"

"go.opentelemetry.io/otel/sdk/metric/export"
)

// ErrExporterShutdown is returned if Export or Shutdown are called after an
// Exporter has been Shutdown.
var ErrExporterShutdown = fmt.Errorf("exporter is shutdown")

// Exporter handles the delivery of metric data to external receivers. This is
// the final component in the metric push pipeline.
type Exporter interface {
// Export serializes and transmits metric data to a receiver.
//
// This is called synchronously, there is no concurrency safety
// requirement. Because of this, it is critical that all timeouts and
// cancellations of the passed context be honored.
//
// All retry logic must be contained in this function. The SDK does not
// implement any retry logic. All errors returned by this function are
// considered unrecoverable and will be reported to a configured error
// Handler.
Export(context.Context, export.Metrics) error

// ForceFlush flushes any metric data held by an exporter.
//
// The deadline or cancellation of the passed context must be honored. An
// appropriate error should be returned in these situations.
ForceFlush(context.Context) error

// Shutdown flushes all metric data held by an exporter and releases any
// held computational resources.
//
// The deadline or cancellation of the passed context must be honored. An
// appropriate error should be returned in these situations.
//
// After Shutdown is called, calls to Export will perform no operation and
// instead will return an error indicating the shutdown state.
Shutdown(context.Context) error
}
12 changes: 3 additions & 9 deletions sdk/metric/manual_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build go1.17
// +build go1.17

package metric // import "go.opentelemetry.io/otel/sdk/metric"

import (
"context"
"fmt"
"sync"

"go.opentelemetry.io/otel/internal/global"
Expand Down Expand Up @@ -81,11 +83,3 @@ func (mr *manualReader) Collect(ctx context.Context) (export.Metrics, error) {
}
return mr.producer.produce(ctx)
}

// ErrReaderNotRegistered is returned if Collect or Shutdown are called before
// the reader is registered with a MeterProvider.
var ErrReaderNotRegistered = fmt.Errorf("reader is not registered")

// ErrReaderShutdown is returned if Collect or Shutdown are called after a
// reader has been Shutdown once.
var ErrReaderShutdown = fmt.Errorf("reader is shutdown")
3 changes: 3 additions & 0 deletions sdk/metric/manual_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build go1.17
// +build go1.17

package metric // import "go.opentelemetry.io/otel/sdk/metric/reader"

import (
Expand Down
3 changes: 3 additions & 0 deletions sdk/metric/meter.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build go1.17
// +build go1.17

package metric // import "go.opentelemetry.io/otel/sdk/metric"

import (
Expand Down
230 changes: 230 additions & 0 deletions sdk/metric/periodic_reader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,230 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build go1.17
// +build go1.17

package metric // import "go.opentelemetry.io/otel/sdk/metric"

import (
"context"
"fmt"
"sync"
"sync/atomic"
"time"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/sdk/metric/export"
)

// Default periodic reader timing.
const (
defaultTimeout = time.Millisecond * 30000
defaultInterval = time.Millisecond * 60000
)

// periodicReaderConfig contains configuration options for a PeriodicReader.
type periodicReaderConfig struct {
interval time.Duration
timeout time.Duration
}

// newPeriodicReaderConfig returns a periodicReaderConfig configured with
// options.
func newPeriodicReaderConfig(options []PeriodicReaderOption) periodicReaderConfig {
c := periodicReaderConfig{
interval: defaultInterval,
timeout: defaultTimeout,
}
for _, o := range options {
c = o.apply(c)
}
return c
}

// PeriodicReaderOption applies a configuration option value to a PeriodicReader.
type PeriodicReaderOption interface {
apply(periodicReaderConfig) periodicReaderConfig
}

// periodicReaderOptionFunc applies a set of options to a periodicReaderConfig.
type periodicReaderOptionFunc func(periodicReaderConfig) periodicReaderConfig

// apply returns a periodicReaderConfig with option(s) applied.
func (o periodicReaderOptionFunc) apply(conf periodicReaderConfig) periodicReaderConfig {
return o(conf)
}

// WithTimeout configures the time a PeriodicReader waits for an export to
// complete before canceling it.
//
// If this option is not used or d is less than or equal to zero, 30 seconds
// is used as the default.
func WithTimeout(d time.Duration) PeriodicReaderOption {
return periodicReaderOptionFunc(func(conf periodicReaderConfig) periodicReaderConfig {
if d <= 0 {
return conf
}
conf.timeout = d
return conf
})
}

// WithInterval configures the intervening time between exports for a
// PeriodicReader.
//
// If this option is not used or d is less than or equal to zero, 60 seconds
// is used as the default.
func WithInterval(d time.Duration) PeriodicReaderOption {
return periodicReaderOptionFunc(func(conf periodicReaderConfig) periodicReaderConfig {
if d <= 0 {
return conf
}
conf.interval = d
return conf
})
}

// NewPeriodicReader returns a Reader that collects and exports metric data to
// the exporter at a defined interval. By default, the returned Reader will
// collect and export data every 60 seconds, and will cancel export attempts
// that exceed 30 seconds. The export time is not counted towards the interval
// between attempts.
//
// The Collect method of the returned Reader continues to gather and return
// metric data to the user. It will not automatically send that data to the
// exporter. That is left to the user to accomplish.
func NewPeriodicReader(exporter Exporter, options ...PeriodicReaderOption) Reader {
conf := newPeriodicReaderConfig(options)
ctx, cancel := context.WithCancel(context.Background())
r := &periodicReader{
timeout: conf.timeout,
exporter: exporter,
cancel: cancel,
}

r.wg.Add(1)
go func() {
defer r.wg.Done()
r.run(ctx, conf.interval)
}()

return r
}

// periodicReader is a Reader that continuously collects and exports metric
// data at a set interval.
type periodicReader struct {
producer atomic.Value

timeout time.Duration
exporter Exporter

wg sync.WaitGroup
cancel context.CancelFunc
shutdownOnce sync.Once
}

// newTicker allows testing override.
var newTicker = time.NewTicker

// run continuously collects and exports metric data at the specified
// interval. This will run until ctx is canceled or times out.
func (r *periodicReader) run(ctx context.Context, interval time.Duration) {
ticker := newTicker(interval)
defer ticker.Stop()

for {
select {
case <-ticker.C:
m, err := r.Collect(ctx)
if err == nil {
c, cancel := context.WithTimeout(ctx, r.timeout)
err = r.exporter.Export(c, m)
cancel()
}
if err != nil {
otel.Handle(err)
}
case <-ctx.Done():
return
}
}
}

// register registers p as the producer of this reader.
func (r *periodicReader) register(p producer) {
// Only register once. If producer is already set, do nothing.
r.producer.CompareAndSwap(nil, produceHolder{produce: p.produce})
}

// Collect gathers and returns all metric data related to the Reader from
// the SDK. The returned metric data is not exported to the configured
// exporter, it is left to the caller to handle that if desired.
//
// An error is returned if this is called after Shutdown.
func (r *periodicReader) Collect(ctx context.Context) (export.Metrics, error) {
p := r.producer.Load()
if p == nil {
return export.Metrics{}, ErrReaderNotRegistered
}

ph, ok := p.(produceHolder)
if !ok {
// The atomic.Value is entirely in the periodicReader's control so
// this should never happen. In the unforeseen case that this does
// happen, return an error instead of panicking so a users code does
// not halt in the processes.
err := fmt.Errorf("periodic reader: invalid producer: %T", p)
return export.Metrics{}, err
}
return ph.produce(ctx)
}

// ForceFlush flushes the Exporter.
func (r *periodicReader) ForceFlush(ctx context.Context) error {
return r.exporter.ForceFlush(ctx)
}

// Shutdown stops the export pipeline.
func (r *periodicReader) Shutdown(ctx context.Context) error {
err := ErrReaderShutdown
r.shutdownOnce.Do(func() {
// Stop the run loop.
r.cancel()
r.wg.Wait()

// Any future call to Collect will now return ErrReaderShutdown.
r.producer.Store(produceHolder{
produce: shutdownProducer{}.produce,
})

err = r.exporter.Shutdown(ctx)
})
return err
}

// produceHolder is used as an atomic.Value to wrap the non-concrete producer
// type.
type produceHolder struct {
produce func(context.Context) (export.Metrics, error)
}

// shutdownProducer produces an ErrReaderShutdown error always.
type shutdownProducer struct{}

// produce returns an ErrReaderShutdown error.
func (p shutdownProducer) produce(context.Context) (export.Metrics, error) {
return export.Metrics{}, ErrReaderShutdown
}
Loading

0 comments on commit 9a5c1f0

Please sign in to comment.