Skip to content

Commit

Permalink
OTel-Arrow components: trace Arrow stream send/recv and propagate tra…
Browse files Browse the repository at this point in the history
…ce context (#139)

Adds tracing support. This will enable diagnosis of pipeline stalls when
they happen, which we believe is caused by a downstream component that
we will be able to identify with traces enabled.

TESTED=manual & unittests

Test configuration like
```
receivers:
  otelarrow/1:
    protocols:
      grpc:
        endpoint: 127.0.0.1:4317
  otelarrow/2:
    protocols:
      grpc:
        endpoint: 127.0.0.1:4319

exporters:
  otelarrow:
    endpoint: 127.0.0.1:4319
    wait_for_ready: true
    tls:
      insecure: true
    sending_queue:
      enabled: false

  debug:
    # Uncomment this to print actual data.
    verbosity: detailed

service:
  pipelines:
    traces/1:
      receivers: [otelarrow/1]
      processors: []
      exporters: [otelarrow]
    traces/2:
      receivers: [otelarrow/2]
      processors: []
      exporters: [debug]

  telemetry:
    resource:
      "service.name": "simple-gateway"
    traces:
      propagators: [tracecontext]
      processors:
        - batch:
            exporter:
              otlp:
                protocol: grpc/protobuf
                endpoint: 127.0.0.1:4319
    logs:
      level: info
```
  • Loading branch information
jmacd authored Jan 17, 2024
1 parent c9bd21a commit 4f7fedc
Show file tree
Hide file tree
Showing 4 changed files with 302 additions and 98 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ import (
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/trace"
)

const defaultMaxStreamLifetime = 11 * time.Second
Expand Down Expand Up @@ -575,6 +578,77 @@ func TestArrowExporterHeaders(t *testing.T) {
require.NoError(t, tc.exporter.Shutdown(bg))
}

// TestArrowExporterIsTraced tests whether trace and span ID are
// propagated.
func TestArrowExporterIsTraced(t *testing.T) {
otel.SetTextMapPropagator(propagation.TraceContext{})
tc := newSingleStreamTestCase(t)
channel := newHealthyTestChannel()

tc.traceCall.AnyTimes().DoAndReturn(tc.returnNewStream(channel))

bg := context.Background()
require.NoError(t, tc.exporter.Start(bg))

var expectOutput []metadata.MD
var actualOutput []metadata.MD

var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
md := metadata.MD{}
hpd := hpack.NewDecoder(4096, func(f hpack.HeaderField) {
md[f.Name] = append(md[f.Name], f.Value)
})
for data := range channel.sent {
if len(data.Headers) == 0 {
actualOutput = append(actualOutput, nil)
} else {
_, err := hpd.Write(data.Headers)
require.NoError(t, err)
actualOutput = append(actualOutput, md)
md = metadata.MD{}
}
channel.recv <- statusOKFor(data.BatchId)
}
}()

for times := 0; times < 10; times++ {
input := testdata.GenerateTraces(2)
ctx := context.Background()

if times%2 == 1 {
ctx = trace.ContextWithSpanContext(ctx,
trace.NewSpanContext(trace.SpanContextConfig{
TraceID: [16]byte{byte(times), 1, 2, 3, 4, 5, 6, 7, 8, 9, 0xa, 0xb, 0xc, 0xd, 0xe, 0xf},
SpanID: [8]byte{byte(times), 1, 2, 3, 4, 5, 6, 7},
}),
)
expectMap := map[string]string{}
propagation.TraceContext{}.Inject(ctx, propagation.MapCarrier(expectMap))

md := metadata.MD{
"traceparent": []string{expectMap["traceparent"]},
}
expectOutput = append(expectOutput, md)
} else {
expectOutput = append(expectOutput, nil)
}

sent, err := tc.exporter.SendAndWait(ctx, input)
require.NoError(t, err)
require.True(t, sent)
}
// Stop the test conduit started above. If the sender were
// still sending, it would panic on a closed channel.
close(channel.sent)
wg.Wait()

require.Equal(t, expectOutput, actualOutput)
require.NoError(t, tc.exporter.Shutdown(bg))
}

func TestAddJitter(t *testing.T) {
require.Equal(t, time.Duration(0), addJitter(0))

Expand Down
126 changes: 83 additions & 43 deletions collector/exporter/otelarrowexporter/internal/arrow/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ import (
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/trace"
)

// Stream is 1:1 with gRPC stream.
Expand All @@ -52,6 +55,9 @@ type Stream struct {
// telemetry are a copy of the exporter's telemetry settings
telemetry component.TelemetrySettings

// tracer is used to create a span describing the export.
tracer trace.Tracer

// client uses the exporter's grpc.ClientConn. this is
// initially nil only set when ArrowStream() calls meaning the
// endpoint recognizes OTel-Arrow.
Expand Down Expand Up @@ -86,6 +92,8 @@ type writeItem struct {
// uncompSize is computed by the appropriate sizer (in the
// caller's goroutine)
uncompSize int
// parent will be used to create a span around the stream request.
parent context.Context
}

// newStream constructs a stream
Expand All @@ -96,11 +104,13 @@ func newStream(
perRPCCredentials credentials.PerRPCCredentials,
netReporter netstats.Interface,
) *Stream {
tracer := telemetry.TracerProvider.Tracer("otel-arrow-exporter")
return &Stream{
producer: producer,
prioritizer: prioritizer,
perRPCCredentials: perRPCCredentials,
telemetry: telemetry,
tracer: tracer,
toWrite: make(chan writeItem, 1),
waiters: map[int64]chan error{},
netReporter: netReporter,
Expand Down Expand Up @@ -315,59 +325,87 @@ func (s *Stream) write(ctx context.Context) error {
s.prioritizer.removeReady(s)
return ctx.Err()
}
// Note: For the two return statements below there is no potential
// sender race because the stream is not available, as indicated by
// the successful <-stream.toWrite.

batch, err := s.encode(wri.records)
err := s.encodeAndSend(wri, &hdrsBuf, hdrsEnc)
if err != nil {
// This is some kind of internal error. We will restart the
// stream and mark this record as a permanent one.
err = fmt.Errorf("encode: %w", err)
wri.errCh <- consumererror.NewPermanent(err)
// Note: For the return statement below, there is no potential
// sender race because the stream is not available, as indicated by
// the successful <-stream.toWrite above
return err
}
}
}

// Optionally include outgoing metadata, if present.
if len(wri.md) != 0 {
hdrsBuf.Reset()
for key, val := range wri.md {
err := hdrsEnc.WriteField(hpack.HeaderField{
Name: key,
Value: val,
})
if err != nil {
// This case is like the encode-failure case
// above, we will restart the stream but consider
// this a permenent error.
err = fmt.Errorf("hpack: %w", err)
wri.errCh <- consumererror.NewPermanent(err)
return err
}
}
batch.Headers = hdrsBuf.Bytes()
func (s *Stream) encodeAndSend(wri writeItem, hdrsBuf *bytes.Buffer, hdrsEnc *hpack.Encoder) error {
ctx, span := s.tracer.Start(wri.parent, "otel_arrow_stream_send")
defer span.End()

// Get the global propagator, to inject context. When there
// are no fields, it's a no-op propagator implementation and
// we can skip the allocations inside this block.
prop := otel.GetTextMapPropagator()
if len(prop.Fields()) > 0 {
// When the incoming context carries nothing, the map
// will be nil. Allocate, if necessary.
if wri.md == nil {
wri.md = map[string]string{}
}
// Use the global propagator to inject trace context. Note that
// OpenTelemetry Collector will set a global propagator from the
// service::telemetry::traces configuration.
otel.GetTextMapPropagator().Inject(ctx, propagation.MapCarrier(wri.md))
}

// Let the receiver knows what to look for.
s.setBatchChannel(batch.BatchId, wri.errCh)

// The netstats code knows that uncompressed size is
// unreliable for arrow transport, so we instrument it
// directly here. Only the primary direction of transport
// is instrumented this way.
if wri.uncompSize != 0 {
var sized netstats.SizesStruct
sized.Method = s.method
sized.Length = int64(wri.uncompSize)
s.netReporter.CountSend(ctx, sized)
}
batch, err := s.encode(wri.records)
if err != nil {
// This is some kind of internal error. We will restart the
// stream and mark this record as a permanent one.
err = fmt.Errorf("encode: %w", err)
wri.errCh <- consumererror.NewPermanent(err)
return err
}

if err := s.client.Send(batch); err != nil {
// The error will be sent to errCh during cleanup for this stream.
// Note: do not wrap this error, it may contain a Status.
return err
// Optionally include outgoing metadata, if present.
if len(wri.md) != 0 {
hdrsBuf.Reset()
for key, val := range wri.md {
err := hdrsEnc.WriteField(hpack.HeaderField{
Name: key,
Value: val,
})
if err != nil {
// This case is like the encode-failure case
// above, we will restart the stream but consider
// this a permenent error.
err = fmt.Errorf("hpack: %w", err)
wri.errCh <- consumererror.NewPermanent(err)
return err
}
}
batch.Headers = hdrsBuf.Bytes()
}

// Let the receiver knows what to look for.
s.setBatchChannel(batch.BatchId, wri.errCh)

// The netstats code knows that uncompressed size is
// unreliable for arrow transport, so we instrument it
// directly here. Only the primary direction of transport
// is instrumented this way.
if wri.uncompSize != 0 {
var sized netstats.SizesStruct
sized.Method = s.method
sized.Length = int64(wri.uncompSize)
s.netReporter.CountSend(ctx, sized)
}

if err := s.client.Send(batch); err != nil {
// The error will be sent to errCh during cleanup for this stream.
// Note: do not wrap this error, it may contain a Status.
return err
}

return nil
}

// read repeatedly reads a batch status and releases the consumers waiting for
Expand Down Expand Up @@ -485,6 +523,7 @@ func (s *Stream) SendAndWait(ctx context.Context, records interface{}) error {
return err
}
}

// Note that the uncompressed size as measured by the receiver
// will be different than uncompressed size as measured by the
// exporter, because of the optimization phase performed in the
Expand All @@ -509,6 +548,7 @@ func (s *Stream) SendAndWait(ctx context.Context, records interface{}) error {
md: md,
uncompSize: uncompSize,
errCh: errCh,
parent: ctx,
}

// Note this ensures the caller's timeout is respected.
Expand Down
Loading

0 comments on commit 4f7fedc

Please sign in to comment.