Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: grpc reconnection #1521

Merged
merged 6 commits into from
Feb 17, 2021
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- Rename project default branch from `master` to `main`.
- Reverse order in which `Resource` attributes are merged, per change in spec. (#1501)

## Fixed

- Fixed otlpgrpc reconnection issue.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
- Fixed otlpgrpc reconnection issue.
- Fixed otlpgrpc reconnection issue. (#1521)


## [0.16.0] - 2020-01-13

### Added
Expand Down
4 changes: 3 additions & 1 deletion exporters/otlp/otlpgrpc/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func newConnection(cfg config, handler func(cc *grpc.ClientConn)) *connection {

func (c *connection) startConnection(ctx context.Context) {
c.stopCh = make(chan struct{})
c.disconnectedCh = make(chan bool)
c.disconnectedCh = make(chan bool, 1)
c.backgroundConnectionDoneCh = make(chan struct{})

if err := c.connect(ctx); err == nil {
Expand Down Expand Up @@ -155,6 +155,8 @@ func (c *connection) indefiniteBackgroundConnection() {
if err := c.connect(context.Background()); err == nil {
c.setStateConnected()
} else {
// this code is unreachable in most cases
// c.connect does not establish connection
c.setStateDisconnected(err)
}

Expand Down
66 changes: 66 additions & 0 deletions exporters/otlp/otlpgrpc/otlp_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,72 @@ func TestNewExporter_invokeStartThenStopManyTimes(t *testing.T) {
}
}

func TestNewExporter_collectorConnectionDiesThenReconnectsWhenInRestMode(t *testing.T) {
mc := runMockCollector(t)

reconnectionPeriod := 2 * time.Second // 2 second + jitter rest time after reconnection
ctx := context.Background()
exp := newGRPCExporter(t, ctx, mc.endpoint,
otlpgrpc.WithReconnectionPeriod(reconnectionPeriod))
defer func() {
_ = exp.Shutdown(ctx)
}()

// We'll now stop the collector right away to simulate a connection
// dying in the midst of communication or even not existing before.
_ = mc.stop()

// first export, it will send disconnected message to the channel on export failure,
// trigger almost immediate reconnection
require.Error(
t,
exp.ExportSpans(ctx, []*exporttrace.SpanSnapshot{{Name: "in the midst"}}),
"transport: Error while dialing dial tcp %s: connect: connection refused",
mc.endpoint,
)

// give it time for first reconnection
<-time.After(time.Millisecond * 20)

// second export, it will detect connection issue, change state of exporter to disconnected and
// send message to disconnected channel but this time reconnection gouroutine will be in (rest mode, not listening to the disconnected channel)
require.Error(
t,
exp.ExportSpans(ctx, []*exporttrace.SpanSnapshot{{Name: "in the midst"}}),
"transport: Error while dialing dial tcp %s: connect: connection refused2",
mc.endpoint,
)

// as a result we have exporter in disconnected state waiting for disconnection message to reconnect

// resurrect collector
nmc := runMockCollectorAtEndpoint(t, mc.endpoint)

// make sure reconnection loop hits beginning and goes back to waiting mode
// after hitting beginning of the loop it should reconnect
<-time.After(time.Second * 4)

n := 10
for i := 0; i < n; i++ {
// when disconnected exp.ExportSpans doesnt send disconnected messages again
// it just quits and return last connection error
require.NoError(t, exp.ExportSpans(ctx, []*exporttrace.SpanSnapshot{{Name: "Resurrected"}}))
}

nmaSpans := nmc.getSpans()

// Expecting 10 SpanSnapshots that were sampled, given that
if g, w := len(nmaSpans), n; g != w {
t.Fatalf("Connected collector: spans: got %d want %d", g, w)
}

dSpans := mc.getSpans()
// Expecting 0 spans to have been received by the original but now dead collector
if g, w := len(dSpans), 0; g != w {
t.Fatalf("Disconnected collector: spans: got %d want %d", g, w)
}
}

func TestNewExporter_collectorConnectionDiesThenReconnects(t *testing.T) {
mc := runMockCollector(t)

Expand Down