Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add admission control to traces sdk #743

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ require (
github.com/HdrHistogram/hdrhistogram-go v1.1.2 // indirect
github.com/apache/arrow/go/v16 v16.1.0 // indirect
github.com/axiomhq/hyperloglog v0.0.0-20230201085229-3ddf4bad03dc // indirect
github.com/bahlo/generic-list-go v0.2.0 // indirect
github.com/buger/jsonparser v1.1.1 // indirect
github.com/cenkalti/backoff/v4 v4.3.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/dgryski/go-farm v0.0.0-20200201041132-a6ae2369ad13 // indirect
Expand Down Expand Up @@ -47,6 +49,7 @@ require (
github.com/lightstep/otel-launcher-go/lightstep/sdk/metric v1.30.0 // indirect
github.com/lightstep/otel-launcher-go/lightstep/sdk/trace v1.29.0 // indirect
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
github.com/mitchellh/copystructure v1.2.0 // indirect
github.com/mitchellh/reflectwalk v1.0.2 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
Expand All @@ -63,6 +66,7 @@ require (
github.com/shoenig/go-m1cpu v0.1.6 // indirect
github.com/tklauser/go-sysconf v0.3.12 // indirect
github.com/tklauser/numcpus v0.6.1 // indirect
github.com/wk8/go-ordered-map/v2 v2.1.8 // indirect
github.com/x448/float16 v0.8.4 // indirect
github.com/yusufpapurcu/wmi v1.2.4 // indirect
github.com/zeebo/xxh3 v1.0.2 // indirect
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ github.com/grpc-ecosystem/grpc-gateway/v2 v2.20.0 h1:bkypFPDjIYGfCYD5mRBvpqxfYX1
github.com/grpc-ecosystem/grpc-gateway/v2 v2.20.0/go.mod h1:P+Lt/0by1T8bfcF3z737NnSbmxQAppXMRziHUxPOC8k=
github.com/hashicorp/go-version v1.7.0 h1:5tqGy27NaOTB8yJKUZELlFAS/LTKJkrmONwQKeRZfjY=
github.com/hashicorp/go-version v1.7.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA=
github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y=
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
github.com/jung-kurt/gofpdf v1.0.3-0.20190309125859-24315acbbda5/go.mod h1:7Id9E/uU8ce6rXgefFLlgrJj/GYY22cpxn+r32jIOes=
Expand Down
96 changes: 60 additions & 36 deletions lightstep/sdk/trace/exporters/otlp/otelcol/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"time"

"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/otelarrowexporter"
"github.com/open-telemetry/otel-arrow/collector/admission"
"github.com/open-telemetry/otel-arrow/collector/processor/concurrentbatchprocessor"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configcompression"
Expand Down Expand Up @@ -50,10 +51,12 @@ type Option func(*Config)
// TODO: Config, Option, and the option impls are duplicated between
// this package and the metric exporter. Fix this.
type Config struct {
SelfMetrics bool
SelfSpans bool
Batcher concurrentbatchprocessor.Config
Exporter otelarrowexporter.Config
SelfMetrics bool
SelfSpans bool
AdmissionLimitMiB int64
WaiterLimit int64
Batcher concurrentbatchprocessor.Config
Exporter otelarrowexporter.Config
}

type ExporterOptions struct {
Expand All @@ -76,46 +79,63 @@ func WithMeterProvider(mp metric.MeterProvider) func(*ExporterOptions) {
type client struct {
internal.ResourceMap

exporter exporter.Traces
batcher processor.Traces
settings exporter.Settings
tracer traceapi.Tracer
exporter exporter.Traces
batcher processor.Traces
settings exporter.Settings
tracer traceapi.Tracer
boundedQueue *admission.BoundedQueue
}

func (c *client) ExportSpans(ctx context.Context, spans []trace.ReadOnlySpan) error {
func (c *client) ExportSpans(ctx context.Context, spans []trace.ReadOnlySpan) (retErr error) {
count := 0
ctx, span := c.tracer.Start(
ctx,
"otelsdk_export_traces",
)
defer span.End()
defer func() {
success := retErr == nil
var state string
if success {
state = "ok"
} else if errors.Is(retErr, context.Canceled) {
state = "canceled"
} else if errors.Is(retErr, context.DeadlineExceeded) {
state = "timeout"
} else {
state = "error"
}

converted := c.d2pd(spans)
count := converted.SpanCount()

err := c.batcher.ConsumeTraces(ctx, converted)
success := err == nil
var state string
if success {
state = "ok"
} else if errors.Is(err, context.Canceled) {
state = "canceled"
} else if errors.Is(err, context.DeadlineExceeded) {
state = "timeout"
} else {
state = "error"
}
var attrs = []attribute.KeyValue{
attribute.Bool("success", success),
attribute.String("state", state),
}
span.SetAttributes(append(attrs, attribute.Int("num_spans", count))...)
if retErr == nil {
span.SetStatus(otelcodes.Ok, state)
} else {
span.SetStatus(otelcodes.Error, state)
}
}()

var attrs = []attribute.KeyValue{
attribute.Bool("success", success),
attribute.String("state", state),
spanSz := 0
for _, span := range spans {
spanSz += sizeOfROSpan(span)
}
span.SetAttributes(append(attrs, attribute.Int("num_spans", count))...)
if err == nil {
span.SetStatus(otelcodes.Ok, state)
} else {
span.SetStatus(otelcodes.Error, state)
retErr = c.boundedQueue.Acquire(ctx, int64(spanSz))
if retErr != nil {
return
}
return err

defer func() {
retErr = c.boundedQueue.Release(int64(spanSz))
}()

converted := c.d2pd(spans)
count = converted.SpanCount()

retErr = c.batcher.ConsumeTraces(ctx, converted)
return
}

func (c *client) Shutdown(ctx context.Context) error {
Expand All @@ -131,8 +151,10 @@ func (c *client) Shutdown(ctx context.Context) error {

func NewDefaultConfig() Config {
cfg := Config{
SelfMetrics: true,
SelfSpans: true,
SelfMetrics: true,
SelfSpans: true,
AdmissionLimitMiB: 64,
WaiterLimit: 1000,
Batcher: concurrentbatchprocessor.Config{
Timeout: time.Second,
SendBatchSize: 1000,
Expand Down Expand Up @@ -212,7 +234,9 @@ func NewExporter(ctx context.Context, cfg Config, opts ...func(options *Exporter
opt(&options)
}

c := &client{}
c := &client{
boundedQueue: admission.NewBoundedQueue(cfg.AdmissionLimitMiB<<20, cfg.WaiterLimit),
}

if !cfg.Exporter.Arrow.Disabled {
c.settings.ID = component.NewID(component.MustNewType("otel_sdk_trace_arrow"))
Expand Down
62 changes: 56 additions & 6 deletions lightstep/sdk/trace/exporters/otlp/otelcol/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@ package otelcol
import (
"context"
"encoding/hex"
"fmt"
"net"
"strconv"
"testing"
"time"

"github.com/google/go-cmp/cmp"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/otelarrowreceiver"
// "github.com/open-telemetry/otel-arrow/collector/admission"
"github.com/stretchr/testify/suite"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config/confignet"
Expand All @@ -43,7 +45,6 @@ import (
resourcev1 "go.opentelemetry.io/proto/otlp/resource/v1"
tracev1 "go.opentelemetry.io/proto/otlp/trace/v1"

// "google.golang.org/protobuf/encoding/prototext"
"google.golang.org/protobuf/proto"
)

Expand Down Expand Up @@ -200,15 +201,15 @@ func (t *clientTestSuite) TestSpan() {
{
Resource: &resourcev1.Resource{
Attributes: []*commonpb.KeyValue{
&commonpb.KeyValue{
{
Key: "property",
Value: &commonpb.AnyValue{
Value: &commonpb.AnyValue_StringValue{
StringValue: "value",
},
},
},
&commonpb.KeyValue{
{
Key: "service.name",
Value: &commonpb.AnyValue{
Value: &commonpb.AnyValue_StringValue{
Expand All @@ -219,12 +220,12 @@ func (t *clientTestSuite) TestSpan() {
},
},
ScopeSpans: []*tracev1.ScopeSpans{
&tracev1.ScopeSpans{
{
Scope: &commonpb.InstrumentationScope{
Name: "test-tracer",
},
Spans: []*tracev1.Span{
&tracev1.Span{
{
SpanId: []byte(unqSpanID),
TraceId: []byte(unqTraceID),
Kind: tracev1.Span_SPAN_KIND_SERVER,
Expand All @@ -234,7 +235,7 @@ func (t *clientTestSuite) TestSpan() {
Message: "failed",
},
Attributes: []*commonpb.KeyValue{
&commonpb.KeyValue{
{
Key: "test-attribute-1",
Value: &commonpb.AnyValue{
Value: &commonpb.AnyValue_StringValue{
Expand Down Expand Up @@ -344,3 +345,52 @@ func (t *clientTestSuite) TestD2PD() {
sattr, _ = childSpan.Attributes().Get("test-attribute-2")
t.Equal("test-value-2", sattr.Str())
}

func (t *clientTestSuite) TestSpanSizeTooLarge() {
ctx := context.Background()
cfg := NewConfig(
WithInsecure(),
WithEndpoint(t.addr),
WithHeaders(map[string]string{"lightstep-access-token": "${TOKEN}"}),
)
cfg.AdmissionLimitMiB = 0
exp, err := NewExporter(
ctx,
cfg,
)
t.NoError(err)

t.sdk = sdktrace.NewTracerProvider(
sdktrace.WithResource(
resource.NewSchemaless(testResourceAttrs...),
),
sdktrace.WithBatcher(exp),
)

tracer := t.sdk.Tracer("test-tracer")
_, span := tracer.Start(ctx, "ExecuteRequest", trace.WithSpanKind(trace.SpanKindClient))
for i := 0; i < 100000; i++ {
key := fmt.Sprintf("test-attribute-%d", i)
val := fmt.Sprintf("test-value-%d", i)
span.SetAttributes(attribute.String(key, val))
}
span.AddEvent("test event", trace.WithAttributes(attribute.String("test-event-attribute-1", "test-event-value-1")))
span.SetStatus(codes.Ok, "this is suppressed")

_, child := tracer.Start(ctx, "child", trace.WithSpanKind(trace.SpanKindInternal))
child.SetAttributes(attribute.String("test-attribute-2", "test-value-2"))
child.AddEvent("child test event", trace.WithAttributes(attribute.String("test-child-event-attribute-2", "test-child-event-value-2")))
child.End()

span.End()

t.NoError(t.sdk.Shutdown(ctx))

// AdmissionLimitMiB is 0 so we should have no traces arriving.
t.Equal(0, len(t.sink.AllTraces()))
t.assertTimestamps()

roSpans := []sdktrace.ReadOnlySpan{span.(sdktrace.ReadOnlySpan), child.(sdktrace.ReadOnlySpan)}
err = exp.ExportSpans(ctx, roSpans)
t.ErrorContains(err, "request size larger than configured limit")
}
Loading
Loading