Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[exporter/loadbalancing] Fix panic on a sub-exporter shutdown #31456

Merged
merged 1 commit into from
Mar 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions .chloggen/fix-load-balancing-exp.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: exporter/loadbalancing

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Fix panic when a sub-exporter is shut down while still handling requests.
dmitryax marked this conversation as resolved.
Show resolved Hide resolved

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [31410]

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
51 changes: 19 additions & 32 deletions exporter/loadbalancingexporter/loadbalancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,32 +24,24 @@ var (
errMultipleResolversProvided = errors.New("only one resolver should be specified")
)

var _ loadBalancer = (*loadBalancerImp)(nil)

type componentFactory func(ctx context.Context, endpoint string) (component.Component, error)

type loadBalancer interface {
component.Component
Endpoint(identifier []byte) string
Exporter(endpoint string) (component.Component, error)
}

type loadBalancerImp struct {
type loadBalancer struct {
logger *zap.Logger
host component.Host

res resolver
ring *hashRing

componentFactory componentFactory
exporters map[string]component.Component
exporters map[string]*wrappedExporter

stopped bool
updateLock sync.RWMutex
}

// Create new load balancer
func newLoadBalancer(params exporter.CreateSettings, cfg component.Config, factory componentFactory) (*loadBalancerImp, error) {
func newLoadBalancer(params exporter.CreateSettings, cfg component.Config, factory componentFactory) (*loadBalancer, error) {
oCfg := cfg.(*Config)

if oCfg.Resolver.DNS != nil && oCfg.Resolver.Static != nil {
Expand Down Expand Up @@ -90,21 +82,21 @@ func newLoadBalancer(params exporter.CreateSettings, cfg component.Config, facto
return nil, errNoResolver
}

return &loadBalancerImp{
return &loadBalancer{
logger: params.Logger,
res: res,
componentFactory: factory,
exporters: map[string]component.Component{},
exporters: map[string]*wrappedExporter{},
}, nil
}

func (lb *loadBalancerImp) Start(ctx context.Context, host component.Host) error {
func (lb *loadBalancer) Start(ctx context.Context, host component.Host) error {
lb.res.onChange(lb.onBackendChanges)
lb.host = host
return lb.res.start(ctx)
}

func (lb *loadBalancerImp) onBackendChanges(resolved []string) {
func (lb *loadBalancer) onBackendChanges(resolved []string) {
newRing := newHashRing(resolved)

if !newRing.equal(lb.ring) {
Expand All @@ -122,7 +114,7 @@ func (lb *loadBalancerImp) onBackendChanges(resolved []string) {
}
}

func (lb *loadBalancerImp) addMissingExporters(ctx context.Context, endpoints []string) {
func (lb *loadBalancer) addMissingExporters(ctx context.Context, endpoints []string) {
for _, endpoint := range endpoints {
endpoint = endpointWithPort(endpoint)

Expand All @@ -132,12 +124,12 @@ func (lb *loadBalancerImp) addMissingExporters(ctx context.Context, endpoints []
lb.logger.Error("failed to create new exporter for endpoint", zap.String("endpoint", endpoint), zap.Error(err))
continue
}

if err = exp.Start(ctx, lb.host); err != nil {
we := newWrappedExporter(exp)
if err = we.Start(ctx, lb.host); err != nil {
lb.logger.Error("failed to start new exporter for endpoint", zap.String("endpoint", endpoint), zap.Error(err))
continue
}
lb.exporters[endpoint] = exp
lb.exporters[endpoint] = we
}
}
}
Expand All @@ -149,7 +141,7 @@ func endpointWithPort(endpoint string) string {
return endpoint
}

func (lb *loadBalancerImp) removeExtraExporters(ctx context.Context, endpoints []string) {
func (lb *loadBalancer) removeExtraExporters(ctx context.Context, endpoints []string) {
endpointsWithPort := make([]string, len(endpoints))
for i, e := range endpoints {
endpointsWithPort[i] = endpointWithPort(e)
Expand All @@ -172,29 +164,24 @@ func endpointFound(endpoint string, endpoints []string) bool {
return false
}

func (lb *loadBalancerImp) Shutdown(context.Context) error {
func (lb *loadBalancer) Shutdown(context.Context) error {
lb.stopped = true
return nil
}

func (lb *loadBalancerImp) Endpoint(identifier []byte) string {
lb.updateLock.RLock()
defer lb.updateLock.RUnlock()

return lb.ring.endpointFor(identifier)
}

func (lb *loadBalancerImp) Exporter(endpoint string) (component.Component, error) {
// exporterAndEndpoint returns the exporter and the endpoint for the given identifier.
func (lb *loadBalancer) exporterAndEndpoint(identifier []byte) (*wrappedExporter, string, error) {
// NOTE: make rolling updates of next tier of collectors work. currently, this may cause
// data loss because the latest batches sent to outdated backend will never find their way out.
// for details: https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/1690
lb.updateLock.RLock()
defer lb.updateLock.RUnlock()
endpoint := lb.ring.endpointFor(identifier)
exp, found := lb.exporters[endpointWithPort(endpoint)]
lb.updateLock.RUnlock()
if !found {
// something is really wrong... how come we couldn't find the exporter??
return nil, fmt.Errorf("couldn't find the exporter for the endpoint %q", endpoint)
return nil, "", fmt.Errorf("couldn't find the exporter for the endpoint %q", endpoint)
}

return exp, nil
return exp, endpoint, nil
}
10 changes: 5 additions & 5 deletions exporter/loadbalancingexporter/loadbalancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func TestWithDNSResolverNoEndpoints(t *testing.T) {
require.NoError(t, err)

// test
e := p.Endpoint([]byte{128, 128, 0, 0})
_, e, _ := p.exporterAndEndpoint([]byte{128, 128, 0, 0})

// verify
assert.Equal(t, "", e)
Expand Down Expand Up @@ -376,19 +376,19 @@ func TestFailedExporterInRing(t *testing.T) {

// test
// this trace ID will reach the endpoint-2 -- see the consistent hashing tests for more info
_, err = p.Exporter(p.Endpoint([]byte{128, 128, 0, 0}))
_, _, err = p.exporterAndEndpoint([]byte{128, 128, 0, 0})

// verify
assert.Error(t, err)

// test
// this service name will reach the endpoint-2 -- see the consistent hashing tests for more info
_, err = p.Exporter(p.Endpoint([]byte("get-recommendations-1")))
_, _, err = p.exporterAndEndpoint([]byte("get-recommendations-1"))

// verify
assert.Error(t, err)
}

func newNopMockExporter() component.Component {
return mockComponent{}
func newNopMockExporter() *wrappedExporter {
return newWrappedExporter(mockComponent{})
}
12 changes: 4 additions & 8 deletions exporter/loadbalancingexporter/log_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package loadbalancingexporter // import "github.com/open-telemetry/opentelemetry

import (
"context"
"fmt"
"math/rand"
"sync"
"time"
Expand All @@ -26,7 +25,7 @@ import (
var _ exporter.Logs = (*logExporterImp)(nil)

type logExporterImp struct {
loadBalancer loadBalancer
loadBalancer *loadBalancer

started bool
shutdownWg sync.WaitGroup
Expand Down Expand Up @@ -87,16 +86,13 @@ func (e *logExporterImp) consumeLog(ctx context.Context, ld plog.Logs) error {
balancingKey = random()
}

endpoint := e.loadBalancer.Endpoint(balancingKey[:])
exp, err := e.loadBalancer.Exporter(endpoint)
le, endpoint, err := e.loadBalancer.exporterAndEndpoint(balancingKey[:])
if err != nil {
return err
}

le, ok := exp.(exporter.Logs)
if !ok {
return fmt.Errorf("unable to export logs, unexpected exporter type: expected exporter.Logs but got %T", exp)
}
le.consumeWG.Add(1)
defer le.consumeWG.Done()

start := time.Now()
err = le.ConsumeLogs(ctx, ld)
Expand Down
79 changes: 65 additions & 14 deletions exporter/loadbalancingexporter/log_exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,58 @@ func TestLogsWithoutTraceID(t *testing.T) {
assert.Len(t, sink.AllLogs(), 1)
}

// this test validates that exporter is can concurrently change the endpoints while consuming logs.
func TestConsumeLogs_ConcurrentResolverChange(t *testing.T) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test passes on main as well, it might not trigger the error condition this PR is aiming to fix.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure how it passed for you. Here is the draft PR with the tests only #31566

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the confirmation! Something certainly went wrong when backporting it to main on my side. LGTM!

consumeStarted := make(chan struct{})
consumeDone := make(chan struct{})

// imitate a slow exporter
te := &mockLogsExporter{Component: mockComponent{}}
te.consumelogsfn = func(ctx context.Context, td plog.Logs) error {
close(consumeStarted)
time.Sleep(50 * time.Millisecond)
return te.consumeErr
}
componentFactory := func(ctx context.Context, endpoint string) (component.Component, error) {
return te, nil
}
lb, err := newLoadBalancer(exportertest.NewNopCreateSettings(), simpleConfig(), componentFactory)
require.NotNil(t, lb)
require.NoError(t, err)

p, err := newLogsExporter(exportertest.NewNopCreateSettings(), simpleConfig())
require.NotNil(t, p)
require.NoError(t, err)

endpoints := []string{"endpoint-1"}
lb.res = &mockResolver{
triggerCallbacks: true,
onResolve: func(ctx context.Context) ([]string, error) {
return endpoints, nil
},
}
p.loadBalancer = lb

err = p.Start(context.Background(), componenttest.NewNopHost())
require.NoError(t, err)
defer func() {
require.NoError(t, p.Shutdown(context.Background()))
}()

go func() {
assert.NoError(t, p.ConsumeLogs(context.Background(), simpleLogs()))
close(consumeDone)
}()

// update endpoint while consuming logs
<-consumeStarted
endpoints = []string{"endpoint-2"}
endpoint, err := lb.res.resolve(context.Background())
require.NoError(t, err)
require.Equal(t, endpoints, endpoint)
<-consumeDone
}

func TestRollingUpdatesWhenConsumeLogs(t *testing.T) {
t.Skip("Flaky Test - See https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/13331")

Expand Down Expand Up @@ -360,19 +412,17 @@ func TestRollingUpdatesWhenConsumeLogs(t *testing.T) {

counter1 := &atomic.Int64{}
counter2 := &atomic.Int64{}
defaultExporters := map[string]component.Component{
"127.0.0.1:4317": newMockLogsExporter(func(ctx context.Context, ld plog.Logs) error {
defaultExporters := map[string]*wrappedExporter{
"127.0.0.1:4317": newWrappedExporter(newMockLogsExporter(func(ctx context.Context, ld plog.Logs) error {
counter1.Add(1)
// simulate an unreachable backend
time.Sleep(10 * time.Second)
return nil
},
),
"127.0.0.2:4317": newMockLogsExporter(func(ctx context.Context, ld plog.Logs) error {
})),
"127.0.0.2:4317": newWrappedExporter(newMockLogsExporter(func(ctx context.Context, ld plog.Logs) error {
counter2.Add(1)
return nil
},
),
})),
}

// test
Expand Down Expand Up @@ -458,15 +508,21 @@ func simpleLogWithoutID() plog.Logs {
type mockLogsExporter struct {
component.Component
consumelogsfn func(ctx context.Context, ld plog.Logs) error
consumeErr error
}

func (e *mockLogsExporter) Capabilities() consumer.Capabilities {
return consumer.Capabilities{MutatesData: false}
}

func (e *mockLogsExporter) Shutdown(context.Context) error {
e.consumeErr = errors.New("exporter is shut down")
return nil
}

func (e *mockLogsExporter) ConsumeLogs(ctx context.Context, ld plog.Logs) error {
if e.consumelogsfn == nil {
return nil
return e.consumeErr
}
return e.consumelogsfn(ctx, ld)
}
Expand All @@ -484,10 +540,5 @@ func newMockLogsExporter(consumelogsfn func(ctx context.Context, ld plog.Logs) e
}

func newNopMockLogsExporter() exporter.Logs {
return &mockLogsExporter{
Component: mockComponent{},
consumelogsfn: func(ctx context.Context, ld plog.Logs) error {
return nil
},
}
return &mockLogsExporter{Component: mockComponent{}}
}
Loading
Loading