From dc0eb59e82d608c7f41744bf752b4f454a5d5f72 Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Mon, 12 Sep 2022 11:11:05 -0700 Subject: [PATCH] Add integration and config testing to otlpmetricgrpc (#3126) * Add the GRPCCollector to otest * Use otest to test otlpmetricgrpc Client * Add WithHeaders and WithTimeout tests --- .../otlpmetric/internal/otest/collector.go | 89 +++++++++++++++++ .../otlpmetric/otlpmetricgrpc/client_test.go | 97 +++++++++++-------- .../otlp/otlpmetric/otlpmetricgrpc/go.mod | 1 + .../otlp/otlpmetric/otlpmetricgrpc/go.sum | 1 + 4 files changed, 148 insertions(+), 40 deletions(-) diff --git a/exporters/otlp/otlpmetric/internal/otest/collector.go b/exporters/otlp/otlpmetric/internal/otest/collector.go index a18bc1b4249..4bc8ba0b7db 100644 --- a/exporters/otlp/otlpmetric/internal/otest/collector.go +++ b/exporters/otlp/otlpmetric/internal/otest/collector.go @@ -18,8 +18,13 @@ package otest // import "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/internal/otest" import ( + "context" + "net" "sync" + "google.golang.org/grpc" + "google.golang.org/grpc/metadata" + collpb "go.opentelemetry.io/proto/otlp/collector/metrics/v1" mpb "go.opentelemetry.io/proto/otlp/metrics/v1" ) @@ -56,3 +61,87 @@ func (s *Storage) dump() []*mpb.ResourceMetrics { data, s.data = s.data, []*mpb.ResourceMetrics{} return data } + +// GRPCCollector is an OTLP gRPC server that collects all requests it receives. +type GRPCCollector struct { + collpb.UnimplementedMetricsServiceServer + + headersMu sync.Mutex + headers metadata.MD + storage *Storage + + errCh <-chan error + listener net.Listener + srv *grpc.Server +} + +// NewGRPCCollector returns a *GRPCCollector that is listening at the provided +// endpoint. +// +// If endpoint is an empty string, the returned collector will be listeing on +// the localhost interface at an OS chosen port. +// +// If errCh is not nil, the collector will respond to Export calls with errors +// sent on that channel. This means that if errCh is not nil Export calls will +// block until an error is received. +func NewGRPCCollector(endpoint string, errCh <-chan error) (*GRPCCollector, error) { + if endpoint == "" { + endpoint = "localhost:0" + } + + c := &GRPCCollector{ + storage: NewStorage(), + errCh: errCh, + } + + var err error + c.listener, err = net.Listen("tcp", endpoint) + if err != nil { + return nil, err + } + + c.srv = grpc.NewServer() + collpb.RegisterMetricsServiceServer(c.srv, c) + go func() { _ = c.srv.Serve(c.listener) }() + + return c, nil +} + +// Shutdown shuts down the gRPC server closing all open connections and +// listeners immediately. +func (c *GRPCCollector) Shutdown() { c.srv.Stop() } + +// Addr returns the net.Addr c is listening at. +func (c *GRPCCollector) Addr() net.Addr { + return c.listener.Addr() +} + +// Collect returns the Storage holding all collected requests. +func (c *GRPCCollector) Collect() *Storage { + return c.storage +} + +// Headers returns the headers received for all requests. +func (c *GRPCCollector) Headers() map[string][]string { + // Makes a copy. + c.headersMu.Lock() + defer c.headersMu.Unlock() + return metadata.Join(c.headers) +} + +// Export handles the export req. +func (c *GRPCCollector) Export(ctx context.Context, req *collpb.ExportMetricsServiceRequest) (*collpb.ExportMetricsServiceResponse, error) { + c.storage.Add(req) + + if h, ok := metadata.FromIncomingContext(ctx); ok { + c.headersMu.Lock() + c.headers = metadata.Join(c.headers, h) + c.headersMu.Unlock() + } + + var err error + if c.errCh != nil { + err = <-c.errCh + } + return &collpb.ExportMetricsServiceResponse{}, err +} diff --git a/exporters/otlp/otlpmetric/otlpmetricgrpc/client_test.go b/exporters/otlp/otlpmetric/otlpmetricgrpc/client_test.go index 567439578e1..e78e91f5396 100644 --- a/exporters/otlp/otlpmetric/otlpmetricgrpc/client_test.go +++ b/exporters/otlp/otlpmetric/otlpmetricgrpc/client_test.go @@ -25,10 +25,14 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "google.golang.org/genproto/googleapis/rpc/errdetails" - "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" "google.golang.org/protobuf/types/known/durationpb" + + "go.opentelemetry.io/otel/exporters/otlp/otlpmetric" + "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/internal/otest" + "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/metric/metricdata" ) func TestThrottleDuration(t *testing.T) { @@ -127,51 +131,64 @@ func TestRetryable(t *testing.T) { } } -func TestClientHonorsContextErrors(t *testing.T) { - ctx := context.Background() - var emptyConn *grpc.ClientConn - t.Run("Shutdown", testCtxErr(func(t *testing.T) func(context.Context) error { - c, err := newClient(ctx, WithGRPCConn(emptyConn)) +func TestClient(t *testing.T) { + factory := func() (otlpmetric.Client, otest.Collector) { + coll, err := otest.NewGRPCCollector("", nil) require.NoError(t, err) - return c.Shutdown - })) - t.Run("ForceFlush", testCtxErr(func(t *testing.T) func(context.Context) error { - c, err := newClient(ctx, WithGRPCConn(emptyConn)) + ctx := context.Background() + addr := coll.Addr().String() + client, err := newClient(ctx, WithEndpoint(addr), WithInsecure()) require.NoError(t, err) - return c.ForceFlush - })) + return client, coll + } - t.Run("UploadMetrics", testCtxErr(func(t *testing.T) func(context.Context) error { - c, err := newClient(ctx, WithGRPCConn(emptyConn)) - require.NoError(t, err) - return func(ctx context.Context) error { - return c.UploadMetrics(ctx, nil) - } - })) + t.Run("Integration", otest.RunClientTests(factory)) } -func testCtxErr(factory func(*testing.T) func(context.Context) error) func(t *testing.T) { - return func(t *testing.T) { - t.Helper() - ctx, cancel := context.WithCancel(context.Background()) - t.Cleanup(cancel) - - t.Run("DeadlineExceeded", func(t *testing.T) { - innerCtx, innerCancel := context.WithTimeout(ctx, time.Nanosecond) - t.Cleanup(innerCancel) - <-innerCtx.Done() - - f := factory(t) - assert.ErrorIs(t, f(innerCtx), context.DeadlineExceeded) - }) - - t.Run("Canceled", func(t *testing.T) { - innerCtx, innerCancel := context.WithCancel(ctx) - innerCancel() +func TestConfig(t *testing.T) { + factoryFunc := func(errCh <-chan error, o ...Option) (metric.Exporter, *otest.GRPCCollector) { + coll, err := otest.NewGRPCCollector("", errCh) + require.NoError(t, err) - f := factory(t) - assert.ErrorIs(t, f(innerCtx), context.Canceled) - }) + ctx := context.Background() + opts := append([]Option{ + WithEndpoint(coll.Addr().String()), + WithInsecure(), + }, o...) + exp, err := New(ctx, opts...) + require.NoError(t, err) + return exp, coll } + + t.Run("WithHeaders", func(t *testing.T) { + key := "my-custom-header" + headers := map[string]string{key: "custom-value"} + exp, coll := factoryFunc(nil, WithHeaders(headers)) + t.Cleanup(coll.Shutdown) + ctx := context.Background() + require.NoError(t, exp.Export(ctx, metricdata.ResourceMetrics{})) + // Ensure everything is flushed. + require.NoError(t, exp.Shutdown(ctx)) + + got := coll.Headers() + require.Contains(t, got, key) + assert.Equal(t, got[key], []string{headers[key]}) + }) + + t.Run("WithTimeout", func(t *testing.T) { + // Do not send on errCh so the Collector never responds to the client. + errCh := make(chan error) + t.Cleanup(func() { close(errCh) }) + exp, coll := factoryFunc( + errCh, + WithTimeout(time.Millisecond), + WithRetry(RetryConfig{Enabled: false}), + ) + t.Cleanup(coll.Shutdown) + ctx := context.Background() + t.Cleanup(func() { require.NoError(t, exp.Shutdown(ctx)) }) + err := exp.Export(ctx, metricdata.ResourceMetrics{}) + assert.ErrorContains(t, err, context.DeadlineExceeded.Error()) + }) } diff --git a/exporters/otlp/otlpmetric/otlpmetricgrpc/go.mod b/exporters/otlp/otlpmetric/otlpmetricgrpc/go.mod index 9c534e4a51c..71ea8704bb5 100644 --- a/exporters/otlp/otlpmetric/otlpmetricgrpc/go.mod +++ b/exporters/otlp/otlpmetric/otlpmetricgrpc/go.mod @@ -21,6 +21,7 @@ require ( github.com/go-logr/logr v1.2.3 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/golang/protobuf v1.5.2 // indirect + github.com/google/go-cmp v0.5.8 // indirect github.com/grpc-ecosystem/grpc-gateway/v2 v2.7.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect go.opentelemetry.io/otel/sdk v1.9.0 // indirect diff --git a/exporters/otlp/otlpmetric/otlpmetricgrpc/go.sum b/exporters/otlp/otlpmetric/otlpmetricgrpc/go.sum index c8a15d98b71..c066ceeecc1 100644 --- a/exporters/otlp/otlpmetric/otlpmetricgrpc/go.sum +++ b/exporters/otlp/otlpmetric/otlpmetricgrpc/go.sum @@ -113,6 +113,7 @@ github.com/google/go-cmp v0.5.1/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.8 h1:e6P7q2lk1O+qJJb4BtCQXlK8vWEO8V1ZeuEdJNOqZyg= +github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs= github.com/google/martian/v3 v3.0.0/go.mod h1:y5Zk1BBys9G+gd6Jrk0W3cC1+ELVxBWuIGO+w/tUAp0= github.com/google/pprof v0.0.0-20181206194817-3ea8567a2e57/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc=