Skip to content

Commit

Permalink
Return proto status for OTLP receiver when failed (#1788)
Browse files Browse the repository at this point in the history
Signed-off-by: Bogdan Drutu <bogdandrutu@gmail.com>
  • Loading branch information
bogdandrutu committed Sep 15, 2020
1 parent e52703a commit e00fdab
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 18 deletions.
1 change: 1 addition & 0 deletions receiver/otlpreceiver/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ func newOtlpReceiver(cfg *Config) (*otlpReceiver, error) {
OrigName: true,
}
r.gatewayMux = gatewayruntime.NewServeMux(
gatewayruntime.WithProtoErrorHandler(gatewayruntime.DefaultHTTPProtoErrorHandler),
gatewayruntime.WithMarshalerOption("application/x-protobuf", &xProtobufMarshaler{}),
gatewayruntime.WithMarshalerOption(gatewayruntime.MIMEWildcard, jsonpb),
)
Expand Down
80 changes: 62 additions & 18 deletions receiver/otlpreceiver/otlp_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,22 @@ import (
"compress/gzip"
"context"
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"net"
"net/http"
"testing"
"time"

"github.com/gogo/protobuf/proto"
gogoproto "github.com/gogo/protobuf/proto"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
spb "google.golang.org/genproto/googleapis/rpc/status"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
Expand All @@ -55,10 +58,11 @@ import (

const otlpReceiverName = "otlp_receiver_test"

func TestGrpcGateway_endToEnd(t *testing.T) {
func TestJsonHttp(t *testing.T) {
tests := []struct {
name string
encoding string
err error
}{
{
name: "JSONUncompressed",
Expand All @@ -68,6 +72,16 @@ func TestGrpcGateway_endToEnd(t *testing.T) {
name: "JSONGzipCompressed",
encoding: "gzip",
},
{
name: "NotGRPCError",
encoding: "",
err: errors.New("my error"),
},
{
name: "GRPCError",
encoding: "",
err: status.New(codes.Internal, "").Err(),
},
}
addr := testutil.GetAvailableLocalAddress(t)

Expand Down Expand Up @@ -129,6 +143,7 @@ func TestGrpcGateway_endToEnd(t *testing.T) {
default:
buf = bytes.NewBuffer(traceJSON)
}
sink.SetConsumeTraceError(test.err)
req, err := http.NewRequest("POST", url, buf)
require.NoError(t, err, "Error creating trace POST request: %v", err)
req.Header.Set("Content-Type", "application/json")
Expand All @@ -149,16 +164,21 @@ func TestGrpcGateway_endToEnd(t *testing.T) {
t.Errorf("Error closing response body, %v", err)
}

if resp.StatusCode != 200 {
t.Errorf("Unexpected status from trace grpc-gateway: %v", resp.StatusCode)
}

var respJSON map[string]interface{}
err = json.Unmarshal([]byte(respStr), &respJSON)
assert.NoError(t, err)

if len(respJSON) != 0 {
t.Errorf("Got unexpected response from trace grpc-gateway: %v", respStr)
if test.err == nil {
assert.Equal(t, 200, resp.StatusCode)
var respJSON map[string]interface{}
assert.NoError(t, json.Unmarshal([]byte(respStr), &respJSON))
assert.Len(t, respJSON, 0, "Got unexpected response from trace grpc-gateway")
} else {
errStatus := &spb.Status{}
assert.NoError(t, json.Unmarshal([]byte(respStr), errStatus))
if s, ok := status.FromError(test.err); ok {
assert.Equal(t, http.StatusInternalServerError, resp.StatusCode)
assert.True(t, proto.Equal(errStatus, s.Proto()))
} else {
assert.Equal(t, http.StatusInternalServerError, resp.StatusCode)
assert.True(t, proto.Equal(errStatus, &spb.Status{Code: int32(codes.Unknown), Message: "my error"}))
}
}

got := sink.AllTraces()[0]
Expand Down Expand Up @@ -203,6 +223,7 @@ func TestProtoHttp(t *testing.T) {
tests := []struct {
name string
encoding string
err error
}{
{
name: "ProtoUncompressed",
Expand All @@ -212,6 +233,16 @@ func TestProtoHttp(t *testing.T) {
name: "ProtoGzipCompressed",
encoding: "gzip",
},
{
name: "NotGRPCError",
encoding: "",
err: errors.New("my error"),
},
{
name: "GRPCError",
encoding: "",
err: status.New(codes.Internal, "").Err(),
},
}
addr := testutil.GetAvailableLocalAddress(t)

Expand Down Expand Up @@ -248,6 +279,7 @@ func TestProtoHttp(t *testing.T) {
default:
buf = bytes.NewBuffer(traceBytes)
}
tSink.SetConsumeTraceError(test.err)
req, err := http.NewRequest("POST", url, buf)
require.NoError(t, err, "Error creating trace POST request: %v", err)
req.Header.Set("Content-Type", "application/x-protobuf")
Expand All @@ -261,12 +293,24 @@ func TestProtoHttp(t *testing.T) {
require.NoError(t, err, "Error reading response from trace grpc-gateway")
require.NoError(t, resp.Body.Close(), "Error closing response body")

require.Equal(t, 200, resp.StatusCode, "Unexpected return status")
require.Equal(t, "application/x-protobuf", resp.Header.Get("Content-Type"), "Unexpected response Content-Type")
if test.err == nil {
require.Equal(t, 200, resp.StatusCode, "Unexpected return status")
tmp := &collectortrace.ExportTraceServiceResponse{}
err = tmp.Unmarshal(respBytes)
require.NoError(t, err, "Unable to unmarshal response to ExportTraceServiceResponse proto")
} else {
errStatus := &spb.Status{}
assert.NoError(t, proto.Unmarshal(respBytes, errStatus))
if s, ok := status.FromError(test.err); ok {
assert.Equal(t, http.StatusInternalServerError, resp.StatusCode)
assert.True(t, proto.Equal(errStatus, s.Proto()))
} else {
assert.Equal(t, http.StatusInternalServerError, resp.StatusCode)
assert.True(t, proto.Equal(errStatus, &spb.Status{Code: int32(codes.Unknown), Message: "my error"}))
}
}

tmp := &collectortrace.ExportTraceServiceResponse{}
err = tmp.Unmarshal(respBytes)
require.NoError(t, err, "Unable to unmarshal response to ExportTraceServiceResponse proto")
require.Equal(t, "application/x-protobuf", resp.Header.Get("Content-Type"), "Unexpected response Content-Type")

gotOtlp := pdata.TracesToOtlp(tSink.AllTraces()[0])

Expand All @@ -279,7 +323,7 @@ func TestProtoHttp(t *testing.T) {

// assert.Equal doesn't work on protos, see:
// https://github.com/stretchr/testify/issues/758
if !proto.Equal(got, want) {
if !gogoproto.Equal(got, want) {
t.Errorf("Sending trace proto over http failed\nGot:\n%v\nWant:\n%v\n",
got.String(),
want.String())
Expand Down

0 comments on commit e00fdab

Please sign in to comment.