Skip to content

Commit

Permalink
fix: grpc reconnection (#1521)
Browse files Browse the repository at this point in the history
* fix: grpc reconnection fixed

* chore: changelog update

* fix: grpc reconnection issue - red test

* fix: grpc reconnection #1524

* fix: grpc reconnection issue cleanup
  • Loading branch information
kstasik authored Feb 17, 2021
1 parent 3bce9c9 commit 8da5299
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 1 deletion.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- Create new modules: otel/metric, otel/trace, otel/oteltest, otel/sdk/export/metric, otel/sdk/metric (#1528)
- Move metric-related public global APIs from otel to otel/metric/global. (#1528)

## Fixed

- Fixed otlpgrpc reconnection issue.

## [0.16.0] - 2020-01-13

### Added
Expand Down
4 changes: 3 additions & 1 deletion exporters/otlp/otlpgrpc/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func newConnection(cfg config, handler func(cc *grpc.ClientConn)) *connection {

func (c *connection) startConnection(ctx context.Context) {
c.stopCh = make(chan struct{})
c.disconnectedCh = make(chan bool)
c.disconnectedCh = make(chan bool, 1)
c.backgroundConnectionDoneCh = make(chan struct{})

if err := c.connect(ctx); err == nil {
Expand Down Expand Up @@ -155,6 +155,8 @@ func (c *connection) indefiniteBackgroundConnection() {
if err := c.connect(context.Background()); err == nil {
c.setStateConnected()
} else {
// this code is unreachable in most cases
// c.connect does not establish connection
c.setStateDisconnected(err)
}

Expand Down
66 changes: 66 additions & 0 deletions exporters/otlp/otlpgrpc/otlp_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,72 @@ func TestNewExporter_invokeStartThenStopManyTimes(t *testing.T) {
}
}

func TestNewExporter_collectorConnectionDiesThenReconnectsWhenInRestMode(t *testing.T) {
mc := runMockCollector(t)

reconnectionPeriod := 2 * time.Second // 2 second + jitter rest time after reconnection
ctx := context.Background()
exp := newGRPCExporter(t, ctx, mc.endpoint,
otlpgrpc.WithReconnectionPeriod(reconnectionPeriod))
defer func() {
_ = exp.Shutdown(ctx)
}()

// We'll now stop the collector right away to simulate a connection
// dying in the midst of communication or even not existing before.
_ = mc.stop()

// first export, it will send disconnected message to the channel on export failure,
// trigger almost immediate reconnection
require.Error(
t,
exp.ExportSpans(ctx, []*exporttrace.SpanSnapshot{{Name: "in the midst"}}),
"transport: Error while dialing dial tcp %s: connect: connection refused",
mc.endpoint,
)

// give it time for first reconnection
<-time.After(time.Millisecond * 20)

// second export, it will detect connection issue, change state of exporter to disconnected and
// send message to disconnected channel but this time reconnection gouroutine will be in (rest mode, not listening to the disconnected channel)
require.Error(
t,
exp.ExportSpans(ctx, []*exporttrace.SpanSnapshot{{Name: "in the midst"}}),
"transport: Error while dialing dial tcp %s: connect: connection refused2",
mc.endpoint,
)

// as a result we have exporter in disconnected state waiting for disconnection message to reconnect

// resurrect collector
nmc := runMockCollectorAtEndpoint(t, mc.endpoint)

// make sure reconnection loop hits beginning and goes back to waiting mode
// after hitting beginning of the loop it should reconnect
<-time.After(time.Second * 4)

n := 10
for i := 0; i < n; i++ {
// when disconnected exp.ExportSpans doesnt send disconnected messages again
// it just quits and return last connection error
require.NoError(t, exp.ExportSpans(ctx, []*exporttrace.SpanSnapshot{{Name: "Resurrected"}}))
}

nmaSpans := nmc.getSpans()

// Expecting 10 SpanSnapshots that were sampled, given that
if g, w := len(nmaSpans), n; g != w {
t.Fatalf("Connected collector: spans: got %d want %d", g, w)
}

dSpans := mc.getSpans()
// Expecting 0 spans to have been received by the original but now dead collector
if g, w := len(dSpans), 0; g != w {
t.Fatalf("Disconnected collector: spans: got %d want %d", g, w)
}
}

func TestNewExporter_collectorConnectionDiesThenReconnects(t *testing.T) {
mc := runMockCollector(t)

Expand Down

0 comments on commit 8da5299

Please sign in to comment.