Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Propagate errors from exporters to receivers #7486

Closed
16 changes: 16 additions & 0 deletions .chloggen/propagate-errors.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: 'enhancement'

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: otlpreceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Propagate HTTP errors from the exporter back to the caller

# One or more tracking issues or pull requests related to the change
issues: [7486]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: The Collector can now propagate errors from backends to the client when used as a proxy.
10 changes: 4 additions & 6 deletions exporter/otlphttpexporter/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@ import (

"go.uber.org/zap"
"google.golang.org/genproto/googleapis/rpc/status"
gstatus "google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/exporterhelper"
"go.opentelemetry.io/collector/internal/colerrs"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/plog/plogotlp"
"go.opentelemetry.io/collector/pdata/pmetric"
Expand Down Expand Up @@ -142,13 +144,9 @@ func (e *baseExporter) export(ctx context.Context, url string, request []byte) e
// Format the error message. Use the status if it is present in the response.
var formattedErr error
if respStatus != nil {
formattedErr = fmt.Errorf(
"error exporting items, request to %s responded with HTTP Status Code %d, Message=%s, Details=%v",
url, resp.StatusCode, respStatus.Message, respStatus.Details)
formattedErr = gstatus.ErrorProto(respStatus)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we know that the gRPC code in the response body more accurately describes the error than the HTTP status code returned by the backend?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe so. The code for readResponse from a few lines above states:

		// Request failed. Read the body. OTLP spec says:
		// "Response body for all HTTP 4xx and HTTP 5xx responses MUST be a
		// Protobuf-encoded Status message that describes the problem."

Given that our OTLP receiver and exporter are sending and receiving proto payloads, I would say that the gRPC status codes are generally better at describing what happened.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a fair point. However looking at the Failures section of the OTLP spec, it also says the following:

This specification does not use Status.code field and the server MAY omit Status.code field. The clients are not expected to alter their behavior based on Status.code field but MAY record it for troubleshooting purposes.

The way that reads to me implies that we may not want to use the code field for functional purposes like this. If we do use it, I think we should at least check if the code is a zero value given that it's optional, and we should probably also call this out in the exporter readme so the behavior is documented.

} else {
formattedErr = fmt.Errorf(
"error exporting items, request to %s responded with HTTP Status Code %d",
url, resp.StatusCode)
formattedErr = colerrs.NewRequestError(resp.StatusCode, fmt.Errorf("error exporting items to %q", url))
}

if isRetryableStatusCode(resp.StatusCode) {
Expand Down
15 changes: 7 additions & 8 deletions exporter/otlphttpexporter/otlp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,6 @@ func startAndCleanup(t *testing.T, cmp component.Component) {

func TestErrorResponses(t *testing.T) {
addr := testutil.GetAvailableLocalAddress(t)
errMsgPrefix := fmt.Sprintf("error exporting items, request to http://%s/v1/traces responded with HTTP Status Code ", addr)

tests := []struct {
name string
Expand Down Expand Up @@ -425,11 +424,11 @@ func TestErrorResponses(t *testing.T) {
isPermErr: true,
},
{
name: "419",
name: "429",
responseStatus: http.StatusTooManyRequests,
responseBody: status.New(codes.InvalidArgument, "Quota exceeded"),
err: exporterhelper.NewThrottleRetry(
errors.New(errMsgPrefix+"429, Message=Quota exceeded, Details=[]"),
status.New(codes.InvalidArgument, "Quota exceeded").Err(),
time.Duration(0)*time.Second),
},
{
Expand All @@ -443,15 +442,15 @@ func TestErrorResponses(t *testing.T) {
responseStatus: http.StatusBadGateway,
responseBody: status.New(codes.InvalidArgument, "Bad gateway"),
err: exporterhelper.NewThrottleRetry(
errors.New(errMsgPrefix+"502, Message=Bad gateway, Details=[]"),
status.New(codes.InvalidArgument, "Bad gateway").Err(),
time.Duration(0)*time.Second),
},
{
name: "503",
responseStatus: http.StatusServiceUnavailable,
responseBody: status.New(codes.InvalidArgument, "Server overloaded"),
err: exporterhelper.NewThrottleRetry(
errors.New(errMsgPrefix+"503, Message=Server overloaded, Details=[]"),
status.New(codes.InvalidArgument, "Server overloaded").Err(),
time.Duration(0)*time.Second),
},
{
Expand All @@ -460,15 +459,15 @@ func TestErrorResponses(t *testing.T) {
responseBody: status.New(codes.InvalidArgument, "Server overloaded"),
headers: map[string]string{"Retry-After": "30"},
err: exporterhelper.NewThrottleRetry(
errors.New(errMsgPrefix+"503, Message=Server overloaded, Details=[]"),
status.New(codes.InvalidArgument, "Server overloaded").Err(),
time.Duration(30)*time.Second),
},
{
name: "504",
responseStatus: http.StatusGatewayTimeout,
responseBody: status.New(codes.InvalidArgument, "Gateway timeout"),
err: exporterhelper.NewThrottleRetry(
errors.New(errMsgPrefix+"504, Message=Gateway timeout, Details=[]"),
status.New(codes.InvalidArgument, "Gateway timeout").Err(),
time.Duration(0)*time.Second),
},
}
Expand Down Expand Up @@ -521,7 +520,7 @@ func TestErrorResponses(t *testing.T) {
if test.isPermErr {
assert.True(t, consumererror.IsPermanent(err))
} else {
assert.EqualValues(t, test.err, err)
assert.EqualValues(t, test.err.Error(), err.Error())
}

srv.Close()
Expand Down
21 changes: 21 additions & 0 deletions internal/colerrs/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# Internal collector errors

This package includes helpers propagating errors from exporters or processors
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The behaviour around error propagation for synchronous pipelines should be documented in a place that's more visible than the package's readme :)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

docs/design.md perhaps, on a new "Errors" section? Would it be OK to do it on a follow-up PR, as I think this section would need more than just information about errors in sync pipelines. I should add a word or two about errors in async pipelines as well.

back to receivers. Error propagation is helpful in a setup where the Collector
acts as a simple gateway between clients and other Collector instances or
backends.

Error propagation only works when there's synchronous processing of a pipeline.
jpkrohling marked this conversation as resolved.
Show resolved Hide resolved
Concretely, this means there should be no batch processors, sending queues for
exporters and potentially other resiliency mechanisms should be disabled. When
the pipeline is async, as it is the case when the batch processor is part of it,
the errors being returned by the backends used by the exporters are returned
only after the Collector's client has received its response already.

Processors doing data transformation should be treated with care, perhaps even
avoided for sync pipelines. The reason is that when a failure happens _because_
of those transformations, the Collector's client isn't at fault: a 400 returned
by a backend should NOT propagate back to the client.

Shared receivers might also fail in different ways than the outcome of the
exporters.
32 changes: 32 additions & 0 deletions internal/colerrs/request.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package colerrs // import "go.opentelemetry.io/collector/internal/colerrs"

var _ error = (*RequestError)(nil)

// RequestError represents an error returned during HTTP client operations
type RequestError struct {
statusCode int
wrapped error
}

// NewRequestError creates a new HTTP Client Request error with the given parameters
func NewRequestError(statusCode int, wrapped error) RequestError {
return RequestError{
wrapped: wrapped,
statusCode: statusCode,
}
}

func (r RequestError) Error() string {
return r.wrapped.Error()
}

func (r RequestError) Unwrap() error {
return r.wrapped
}

func (r RequestError) StatusCode() int {
return r.statusCode
}
26 changes: 26 additions & 0 deletions internal/colerrs/request_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package colerrs

import (
"errors"
"net/http"
"testing"

"github.com/stretchr/testify/assert"
)

func TestNewRequestError(t *testing.T) {
// prepare
err := errors.New("boo")

// test
reqerr := NewRequestError(http.StatusInternalServerError, err)

// verify
assert.Error(t, reqerr)
assert.Equal(t, err, reqerr.Unwrap())
assert.Equal(t, http.StatusInternalServerError, reqerr.StatusCode())
assert.Equal(t, "boo", reqerr.Error())
}
50 changes: 50 additions & 0 deletions receiver/otlpreceiver/erroradapter.go
jpkrohling marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package otlpreceiver // import "go.opentelemetry.io/collector/receiver/otlpreceiver"

import (
"net/http"

"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

func gRPCToHTTP(s *status.Status) int {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there an official guideline around translation from one protocol to another?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to find one, or at least a library we could use, but I couldn't.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, after looking at it again, I recognized as it being my source of truth :-) I couldn't find a lib to reuse though. In any case, I should have documented the source of the data so that we are able to sync it in the future if we ever get new status codes.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the record: I miss the mapping for 419.

switch s.Code() {
case codes.Aborted:
return http.StatusInternalServerError
case codes.AlreadyExists:
return http.StatusConflict
case codes.Canceled:
return http.StatusInternalServerError
case codes.DataLoss:
return http.StatusInternalServerError
jpkrohling marked this conversation as resolved.
Show resolved Hide resolved
case codes.DeadlineExceeded:
return http.StatusRequestTimeout
case codes.FailedPrecondition:
return http.StatusPreconditionFailed
case codes.Internal:
return http.StatusInternalServerError
case codes.InvalidArgument:
return http.StatusBadRequest
case codes.NotFound:
return http.StatusNotFound
case codes.OutOfRange:
return http.StatusBadRequest
case codes.PermissionDenied:
return http.StatusForbidden
case codes.ResourceExhausted:
return http.StatusInternalServerError
case codes.Unauthenticated:
return http.StatusUnauthorized
case codes.Unavailable:
return http.StatusServiceUnavailable
case codes.Unimplemented:
return http.StatusNotFound
case codes.Unknown:
return http.StatusInternalServerError
default:
return http.StatusInternalServerError
}
}
56 changes: 50 additions & 6 deletions receiver/otlpreceiver/otlp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ import (
"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/collector/config/configtls"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/internal/colerrs"
"go.opentelemetry.io/collector/internal/testdata"
"go.opentelemetry.io/collector/internal/testutil"
"go.opentelemetry.io/collector/obsreport/obsreporttest"
Expand Down Expand Up @@ -415,9 +417,10 @@ func testHTTPJSONRequest(t *testing.T, url string, sink *errOrSinkConsumer, enco

func TestProtoHttp(t *testing.T) {
tests := []struct {
name string
encoding string
err error
name string
encoding string
err error
statusCode int
}{
{
name: "ProtoUncompressed",
Expand All @@ -437,6 +440,12 @@ func TestProtoHttp(t *testing.T) {
encoding: "",
err: status.New(codes.Internal, "").Err(),
},
{
name: "Invalid argument becomes bad request",
encoding: "",
err: status.New(codes.InvalidArgument, "").Err(),
statusCode: http.StatusBadRequest,
},
}
addr := testutil.GetAvailableLocalAddress(t)

Expand All @@ -460,7 +469,7 @@ func TestProtoHttp(t *testing.T) {
t.Run(test.name, func(t *testing.T) {
url := fmt.Sprintf("http://%s/v1/traces", addr)
tSink.Reset()
testHTTPProtobufRequest(t, url, tSink, test.encoding, traceBytes, test.err, td)
testHTTPProtobufRequest(t, url, tSink, test.encoding, traceBytes, test.err, td, test.statusCode)
})
}
}
Expand Down Expand Up @@ -495,7 +504,12 @@ func testHTTPProtobufRequest(
traceBytes []byte,
expectedErr error,
wantData ptrace.Traces,
httpStatusCode int,
) {
if httpStatusCode == 0 {
httpStatusCode = http.StatusInternalServerError
}

tSink.SetConsumeError(expectedErr)

req := createHTTPProtobufRequest(t, url, encoding, traceBytes)
Expand Down Expand Up @@ -524,10 +538,10 @@ func testHTTPProtobufRequest(
errStatus := &spb.Status{}
assert.NoError(t, proto.Unmarshal(respBytes, errStatus))
if s, ok := status.FromError(expectedErr); ok {
assert.Equal(t, http.StatusInternalServerError, resp.StatusCode)
assert.Equal(t, httpStatusCode, resp.StatusCode)
assert.True(t, proto.Equal(errStatus, s.Proto()))
} else {
assert.Equal(t, http.StatusInternalServerError, resp.StatusCode)
assert.Equal(t, httpStatusCode, resp.StatusCode)
assert.True(t, proto.Equal(errStatus, &spb.Status{Code: int32(codes.Unknown), Message: "my error"}))
}
require.Len(t, allTraces, 0)
Expand Down Expand Up @@ -922,6 +936,36 @@ func TestHTTPMaxRequestBodySize_TooLarge(t *testing.T) {
testHTTPMaxRequestBodySizeJSON(t, traceJSON, len(traceJSON)-1, 400)
}

func TestDetermineHTTPStatusOnError(t *testing.T) {
testCases := []struct {
desc string
err error
expected int
}{
{
desc: "permanent error yields 500",
err: consumererror.NewPermanent(errors.New("boo")),
expected: http.StatusInternalServerError,
},
{
desc: "same as gRPC",
err: status.Error(codes.AlreadyExists, "boo"),
expected: http.StatusConflict,
},
{
desc: "same as HTTP",
err: colerrs.NewRequestError(http.StatusTeapot, errors.New("not coffee")),
expected: http.StatusTeapot,
},
}
for _, tC := range testCases {
t.Run(tC.desc, func(t *testing.T) {
st := determineHTTPStatus(tC.err)
assert.Equal(t, tC.expected, st)
})
}
}

func newGRPCReceiver(t *testing.T, endpoint string, tc consumer.Traces, mc consumer.Metrics) component.Component {
factory := NewFactory()
cfg := factory.CreateDefaultConfig().(*Config)
Expand Down
Loading