Skip to content

Commit

Permalink
Fix race conditions in ManagerV2 tests (#36185)
Browse files Browse the repository at this point in the history
This commit fixes a number of race conditions in the ManagerV2
tests. Most of them were due to the use of Testify's Eventually
function to read some values while some callbacks from the manager
would also modify those values. The simplest solution was to use the
atomic values on those cases.

One test (TestInputReload) had a race condition between the test and
the manager itself, so it was removed. There is an integration tests
that covers the same functionality.

Closes #36192
  • Loading branch information
belimawr committed Aug 22, 2023
1 parent 39b5834 commit 818c72d
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 231 deletions.
1 change: 1 addition & 0 deletions CHANGELOG-developer.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ The list below covers the major changes between 7.0.0-rc2 and main only.
- Skip HTTPJSON flakey test. {issue}34929[34929] {pull}35138[35138]
- Fix ingest pipeline for panw module to parse url scheme correctly {pull}35757[35757]
- Renamed an httpjson input metric to follow naming conventions. `httpjson_interval_pages_total` was renamed to `httpjson_interval_pages` because the `_total` suffix is reserved for counters. {issue}35933[35933] {pull}36169[36169]
- Fixed some race conditions in tests {pull}36185[36185]

==== Added

Expand Down
7 changes: 4 additions & 3 deletions x-pack/filebeat/tests/integration/managerV2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"fmt"
"os"
"path/filepath"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -265,7 +266,7 @@ func TestFailedOutputReportsUnhealthy(t *testing.T) {
"../../filebeat.test",
)

finalStateReached := false
finalStateReached := atomic.Bool{}
var units = []*proto.UnitExpected{
{
Id: "output-unit-borken",
Expand Down Expand Up @@ -319,7 +320,7 @@ func TestFailedOutputReportsUnhealthy(t *testing.T) {
// So we wait until the state matches the desired state
CheckinV2Impl: func(observed *proto.CheckinObserved) *proto.CheckinExpected {
if management.DoesStateMatch(observed, units, 0) {
finalStateReached = true
finalStateReached.Store(true)
}

return &proto.CheckinExpected{
Expand All @@ -337,7 +338,7 @@ func TestFailedOutputReportsUnhealthy(t *testing.T) {
)

require.Eventually(t, func() bool {
return finalStateReached
return finalStateReached.Load()
}, 30*time.Second, 100*time.Millisecond, "Output unit did not report unhealthy")

t.Cleanup(server.Stop)
Expand Down
208 changes: 0 additions & 208 deletions x-pack/libbeat/management/input_reload_test.go

This file was deleted.

40 changes: 20 additions & 20 deletions x-pack/libbeat/management/managerV2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,45 +37,45 @@ func TestManagerV2(t *testing.T) {
inputs := &reloadableList{}
r.MustRegisterInput(inputs)

configsSet := false
configsCleared := false
logLevelSet := false
fqdnEnabled := false
allStopped := false
configsSet := atomic.Bool{}
configsCleared := atomic.Bool{}
logLevelSet := atomic.Bool{}
fqdnEnabled := atomic.Bool{}
allStopped := atomic.Bool{}
onObserved := func(observed *proto.CheckinObserved, currentIdx int) {
if currentIdx == 1 {
oCfg := output.Config()
iCfgs := inputs.Configs()
if oCfg != nil && len(iCfgs) == 3 {
configsSet = true
t.Logf("output and inputs configuration set")
configsSet.Store(true)
t.Log("output and inputs configuration set")
}
} else if currentIdx == 2 {
oCfg := output.Config()
iCfgs := inputs.Configs()
if oCfg == nil || len(iCfgs) != 3 {
// should not happen (config no longer set)
configsSet = false
t.Logf("output and inputs configuration cleared (should not happen)")
configsSet.Store(false)
t.Log("output and inputs configuration cleared (should not happen)")
}
} else {
oCfg := output.Config()
iCfgs := inputs.Configs()
if oCfg == nil && len(iCfgs) == 0 {
configsCleared = true
configsCleared.Store(true)
}
if len(observed.Units) == 0 {
allStopped = true
t.Logf("output and inputs configuration cleared (stopping)")
allStopped.Store(true)
t.Log("output and inputs configuration cleared (stopping)")
}
}
if logp.GetLevel() == zapcore.DebugLevel {
logLevelSet = true
t.Logf("debug log level set")
logLevelSet.Store(true)
t.Log("debug log level set")
}

fqdnEnabled = features.FQDN()
t.Logf("FQDN feature flag set to %v", fqdnEnabled)
fqdnEnabled.Store(features.FQDN())
t.Logf("FQDN feature flag set to %v", fqdnEnabled.Load())
}

srv := integration.NewMockServer([][]*proto.UnitExpected{
Expand Down Expand Up @@ -221,7 +221,7 @@ func TestManagerV2(t *testing.T) {
defer m.Stop()

require.Eventually(t, func() bool {
return configsSet && configsCleared && logLevelSet && fqdnEnabled && allStopped
return configsSet.Load() && configsCleared.Load() && logLevelSet.Load() && fqdnEnabled.Load() && allStopped.Load()
}, 15*time.Second, 300*time.Millisecond)
}

Expand All @@ -245,7 +245,7 @@ func TestOutputError(t *testing.T) {
}
r.MustRegisterInput(inputs)

stateReached := false
stateReached := atomic.Bool{}
units := []*proto.UnitExpected{
{
Id: "output-unit",
Expand Down Expand Up @@ -303,7 +303,7 @@ func TestOutputError(t *testing.T) {
server := &mock.StubServerV2{
CheckinV2Impl: func(observed *proto.CheckinObserved) *proto.CheckinExpected {
if DoesStateMatch(observed, desiredState, 0) {
stateReached = true
stateReached.Store(true)
}
return &proto.CheckinExpected{
Units: units,
Expand Down Expand Up @@ -348,7 +348,7 @@ func TestOutputError(t *testing.T) {
defer m.Stop()

require.Eventually(t, func() bool {
return stateReached
return stateReached.Load()
}, 10*time.Second, 100*time.Millisecond, "desired state, output failed, was not reached")
}

Expand Down

0 comments on commit 818c72d

Please sign in to comment.