Skip to content

Commit

Permalink
Add integration and config testing to otlpmetricgrpc (#3126)
Browse files Browse the repository at this point in the history
* Add the GRPCCollector to otest

* Use otest to test otlpmetricgrpc Client

* Add WithHeaders and WithTimeout tests
  • Loading branch information
MrAlias authored Sep 12, 2022
1 parent a087b9f commit dc0eb59
Show file tree
Hide file tree
Showing 4 changed files with 148 additions and 40 deletions.
89 changes: 89 additions & 0 deletions exporters/otlp/otlpmetric/internal/otest/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,13 @@
package otest // import "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/internal/otest"

import (
"context"
"net"
"sync"

"google.golang.org/grpc"
"google.golang.org/grpc/metadata"

collpb "go.opentelemetry.io/proto/otlp/collector/metrics/v1"
mpb "go.opentelemetry.io/proto/otlp/metrics/v1"
)
Expand Down Expand Up @@ -56,3 +61,87 @@ func (s *Storage) dump() []*mpb.ResourceMetrics {
data, s.data = s.data, []*mpb.ResourceMetrics{}
return data
}

// GRPCCollector is an OTLP gRPC server that collects all requests it receives.
type GRPCCollector struct {
collpb.UnimplementedMetricsServiceServer

headersMu sync.Mutex
headers metadata.MD
storage *Storage

errCh <-chan error
listener net.Listener
srv *grpc.Server
}

// NewGRPCCollector returns a *GRPCCollector that is listening at the provided
// endpoint.
//
// If endpoint is an empty string, the returned collector will be listeing on
// the localhost interface at an OS chosen port.
//
// If errCh is not nil, the collector will respond to Export calls with errors
// sent on that channel. This means that if errCh is not nil Export calls will
// block until an error is received.
func NewGRPCCollector(endpoint string, errCh <-chan error) (*GRPCCollector, error) {
if endpoint == "" {
endpoint = "localhost:0"
}

c := &GRPCCollector{
storage: NewStorage(),
errCh: errCh,
}

var err error
c.listener, err = net.Listen("tcp", endpoint)
if err != nil {
return nil, err
}

c.srv = grpc.NewServer()
collpb.RegisterMetricsServiceServer(c.srv, c)
go func() { _ = c.srv.Serve(c.listener) }()

return c, nil
}

// Shutdown shuts down the gRPC server closing all open connections and
// listeners immediately.
func (c *GRPCCollector) Shutdown() { c.srv.Stop() }

// Addr returns the net.Addr c is listening at.
func (c *GRPCCollector) Addr() net.Addr {
return c.listener.Addr()
}

// Collect returns the Storage holding all collected requests.
func (c *GRPCCollector) Collect() *Storage {
return c.storage
}

// Headers returns the headers received for all requests.
func (c *GRPCCollector) Headers() map[string][]string {
// Makes a copy.
c.headersMu.Lock()
defer c.headersMu.Unlock()
return metadata.Join(c.headers)
}

// Export handles the export req.
func (c *GRPCCollector) Export(ctx context.Context, req *collpb.ExportMetricsServiceRequest) (*collpb.ExportMetricsServiceResponse, error) {
c.storage.Add(req)

if h, ok := metadata.FromIncomingContext(ctx); ok {
c.headersMu.Lock()
c.headers = metadata.Join(c.headers, h)
c.headersMu.Unlock()
}

var err error
if c.errCh != nil {
err = <-c.errCh
}
return &collpb.ExportMetricsServiceResponse{}, err
}
97 changes: 57 additions & 40 deletions exporters/otlp/otlpmetric/otlpmetricgrpc/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,14 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/genproto/googleapis/rpc/errdetails"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/durationpb"

"go.opentelemetry.io/otel/exporters/otlp/otlpmetric"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/internal/otest"
"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
)

func TestThrottleDuration(t *testing.T) {
Expand Down Expand Up @@ -127,51 +131,64 @@ func TestRetryable(t *testing.T) {
}
}

func TestClientHonorsContextErrors(t *testing.T) {
ctx := context.Background()
var emptyConn *grpc.ClientConn
t.Run("Shutdown", testCtxErr(func(t *testing.T) func(context.Context) error {
c, err := newClient(ctx, WithGRPCConn(emptyConn))
func TestClient(t *testing.T) {
factory := func() (otlpmetric.Client, otest.Collector) {
coll, err := otest.NewGRPCCollector("", nil)
require.NoError(t, err)
return c.Shutdown
}))

t.Run("ForceFlush", testCtxErr(func(t *testing.T) func(context.Context) error {
c, err := newClient(ctx, WithGRPCConn(emptyConn))
ctx := context.Background()
addr := coll.Addr().String()
client, err := newClient(ctx, WithEndpoint(addr), WithInsecure())
require.NoError(t, err)
return c.ForceFlush
}))
return client, coll
}

t.Run("UploadMetrics", testCtxErr(func(t *testing.T) func(context.Context) error {
c, err := newClient(ctx, WithGRPCConn(emptyConn))
require.NoError(t, err)
return func(ctx context.Context) error {
return c.UploadMetrics(ctx, nil)
}
}))
t.Run("Integration", otest.RunClientTests(factory))
}

func testCtxErr(factory func(*testing.T) func(context.Context) error) func(t *testing.T) {
return func(t *testing.T) {
t.Helper()
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)

t.Run("DeadlineExceeded", func(t *testing.T) {
innerCtx, innerCancel := context.WithTimeout(ctx, time.Nanosecond)
t.Cleanup(innerCancel)
<-innerCtx.Done()

f := factory(t)
assert.ErrorIs(t, f(innerCtx), context.DeadlineExceeded)
})

t.Run("Canceled", func(t *testing.T) {
innerCtx, innerCancel := context.WithCancel(ctx)
innerCancel()
func TestConfig(t *testing.T) {
factoryFunc := func(errCh <-chan error, o ...Option) (metric.Exporter, *otest.GRPCCollector) {
coll, err := otest.NewGRPCCollector("", errCh)
require.NoError(t, err)

f := factory(t)
assert.ErrorIs(t, f(innerCtx), context.Canceled)
})
ctx := context.Background()
opts := append([]Option{
WithEndpoint(coll.Addr().String()),
WithInsecure(),
}, o...)
exp, err := New(ctx, opts...)
require.NoError(t, err)
return exp, coll
}

t.Run("WithHeaders", func(t *testing.T) {
key := "my-custom-header"
headers := map[string]string{key: "custom-value"}
exp, coll := factoryFunc(nil, WithHeaders(headers))
t.Cleanup(coll.Shutdown)
ctx := context.Background()
require.NoError(t, exp.Export(ctx, metricdata.ResourceMetrics{}))
// Ensure everything is flushed.
require.NoError(t, exp.Shutdown(ctx))

got := coll.Headers()
require.Contains(t, got, key)
assert.Equal(t, got[key], []string{headers[key]})
})

t.Run("WithTimeout", func(t *testing.T) {
// Do not send on errCh so the Collector never responds to the client.
errCh := make(chan error)
t.Cleanup(func() { close(errCh) })
exp, coll := factoryFunc(
errCh,
WithTimeout(time.Millisecond),
WithRetry(RetryConfig{Enabled: false}),
)
t.Cleanup(coll.Shutdown)
ctx := context.Background()
t.Cleanup(func() { require.NoError(t, exp.Shutdown(ctx)) })
err := exp.Export(ctx, metricdata.ResourceMetrics{})
assert.ErrorContains(t, err, context.DeadlineExceeded.Error())
})
}
1 change: 1 addition & 0 deletions exporters/otlp/otlpmetric/otlpmetricgrpc/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ require (
github.com/go-logr/logr v1.2.3 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/google/go-cmp v0.5.8 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.7.0 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
go.opentelemetry.io/otel/sdk v1.9.0 // indirect
Expand Down
1 change: 1 addition & 0 deletions exporters/otlp/otlpmetric/otlpmetricgrpc/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ github.com/google/go-cmp v0.5.1/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.8 h1:e6P7q2lk1O+qJJb4BtCQXlK8vWEO8V1ZeuEdJNOqZyg=
github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs=
github.com/google/martian/v3 v3.0.0/go.mod h1:y5Zk1BBys9G+gd6Jrk0W3cC1+ELVxBWuIGO+w/tUAp0=
github.com/google/pprof v0.0.0-20181206194817-3ea8567a2e57/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc=
Expand Down

0 comments on commit dc0eb59

Please sign in to comment.