From 818c72d4341d3e1b87d3467b436b73a9fc045a28 Mon Sep 17 00:00:00 2001 From: Tiago Queiroz Date: Tue, 22 Aug 2023 12:15:26 +0200 Subject: [PATCH] Fix race conditions in ManagerV2 tests (#36185) This commit fixes a number of race conditions in the ManagerV2 tests. Most of them were due to the use of Testify's Eventually function to read some values while some callbacks from the manager would also modify those values. The simplest solution was to use the atomic values on those cases. One test (TestInputReload) had a race condition between the test and the manager itself, so it was removed. There is an integration tests that covers the same functionality. Closes https://github.com/elastic/beats/issues/36192 --- CHANGELOG-developer.next.asciidoc | 1 + .../tests/integration/managerV2_test.go | 7 +- .../libbeat/management/input_reload_test.go | 208 ------------------ x-pack/libbeat/management/managerV2_test.go | 40 ++-- 4 files changed, 25 insertions(+), 231 deletions(-) delete mode 100644 x-pack/libbeat/management/input_reload_test.go diff --git a/CHANGELOG-developer.next.asciidoc b/CHANGELOG-developer.next.asciidoc index 9afe4ff5857..baa6ca3ec95 100644 --- a/CHANGELOG-developer.next.asciidoc +++ b/CHANGELOG-developer.next.asciidoc @@ -84,6 +84,7 @@ The list below covers the major changes between 7.0.0-rc2 and main only. - Skip HTTPJSON flakey test. {issue}34929[34929] {pull}35138[35138] - Fix ingest pipeline for panw module to parse url scheme correctly {pull}35757[35757] - Renamed an httpjson input metric to follow naming conventions. `httpjson_interval_pages_total` was renamed to `httpjson_interval_pages` because the `_total` suffix is reserved for counters. {issue}35933[35933] {pull}36169[36169] +- Fixed some race conditions in tests {pull}36185[36185] ==== Added diff --git a/x-pack/filebeat/tests/integration/managerV2_test.go b/x-pack/filebeat/tests/integration/managerV2_test.go index 5e3111a0e09..3332d549fa2 100644 --- a/x-pack/filebeat/tests/integration/managerV2_test.go +++ b/x-pack/filebeat/tests/integration/managerV2_test.go @@ -10,6 +10,7 @@ import ( "fmt" "os" "path/filepath" + "sync/atomic" "testing" "time" @@ -265,7 +266,7 @@ func TestFailedOutputReportsUnhealthy(t *testing.T) { "../../filebeat.test", ) - finalStateReached := false + finalStateReached := atomic.Bool{} var units = []*proto.UnitExpected{ { Id: "output-unit-borken", @@ -319,7 +320,7 @@ func TestFailedOutputReportsUnhealthy(t *testing.T) { // So we wait until the state matches the desired state CheckinV2Impl: func(observed *proto.CheckinObserved) *proto.CheckinExpected { if management.DoesStateMatch(observed, units, 0) { - finalStateReached = true + finalStateReached.Store(true) } return &proto.CheckinExpected{ @@ -337,7 +338,7 @@ func TestFailedOutputReportsUnhealthy(t *testing.T) { ) require.Eventually(t, func() bool { - return finalStateReached + return finalStateReached.Load() }, 30*time.Second, 100*time.Millisecond, "Output unit did not report unhealthy") t.Cleanup(server.Stop) diff --git a/x-pack/libbeat/management/input_reload_test.go b/x-pack/libbeat/management/input_reload_test.go deleted file mode 100644 index 8d79f685da7..00000000000 --- a/x-pack/libbeat/management/input_reload_test.go +++ /dev/null @@ -1,208 +0,0 @@ -// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one -// or more contributor license agreements. Licensed under the Elastic License; -// you may not use this file except in compliance with the Elastic License. - -package management - -import ( - "fmt" - "sync" - "testing" - "time" - - "github.com/joeshaw/multierror" - "github.com/stretchr/testify/require" - "google.golang.org/grpc" - "google.golang.org/grpc/credentials/insecure" - - "github.com/elastic/beats/v7/libbeat/common" - "github.com/elastic/beats/v7/libbeat/common/reload" - "github.com/elastic/beats/v7/libbeat/tests/integration" - "github.com/elastic/elastic-agent-client/v7/pkg/client" - "github.com/elastic/elastic-agent-client/v7/pkg/proto" -) - -func TestInputReload(t *testing.T) { - // Uncomment the line below to see the debug logs for this test - // logp.DevelopmentSetup(logp.WithLevel(logp.DebugLevel), logp.WithSelectors("*", "centralmgmt.V2-manager")) - r := reload.NewRegistry() - - output := &reloadable{} - r.MustRegisterOutput(output) - - reloadCallCount := 0 - inputs := &reloadableListMock{ - ReloadImpl: func(configs []*reload.ConfigWithMeta) error { - reloadCallCount++ - if reloadCallCount == 1 { - e1 := multierror.Errors{fmt.Errorf("%w", &common.ErrInputNotFinished{ - State: "", - File: "/tmp/foo.log", - })} - return e1.Err() - } - - return nil - }, - } - r.MustRegisterInput(inputs) - - configIdx := -1 - onObserved := func(observed *proto.CheckinObserved, currentIdx int) { - configIdx = currentIdx - } - - srv := integration.NewMockServer([][]*proto.UnitExpected{ - { - { - Id: "output-unit", - Type: proto.UnitType_OUTPUT, - ConfigStateIdx: 1, - State: proto.State_HEALTHY, - LogLevel: proto.UnitLogLevel_DEBUG, - Config: &proto.UnitExpectedConfig{ - Id: "default", - Type: "elasticsearch", - Name: "elasticsearch", - }, - }, - { - Id: "input-unit-1", - Type: proto.UnitType_INPUT, - ConfigStateIdx: 1, - State: proto.State_STARTING, - LogLevel: proto.UnitLogLevel_DEBUG, - Config: &proto.UnitExpectedConfig{ - Id: "log-input", - Type: "log", - Name: "log", - Streams: []*proto.Stream{ - { - Id: "log-input-1", - Source: requireNewStruct(t, map[string]interface{}{ - "paths": []interface{}{"/tmp/foo.log"}, - }), - }, - }, - }, - }, - }, - { - { - Id: "output-unit", - Type: proto.UnitType_OUTPUT, - ConfigStateIdx: 1, - State: proto.State_HEALTHY, - LogLevel: proto.UnitLogLevel_DEBUG, - Config: &proto.UnitExpectedConfig{ - Id: "default", - Type: "elasticsearch", - Name: "elasticsearch", - }, - }, - { - Id: "input-unit-1", - Type: proto.UnitType_INPUT, - ConfigStateIdx: 1, - State: proto.State_HEALTHY, - LogLevel: proto.UnitLogLevel_DEBUG, - Config: &proto.UnitExpectedConfig{ - Id: "log-input-2", - Type: "log", - Name: "log", - Streams: []*proto.Stream{ - { - Id: "log-input-2", - Source: requireNewStruct(t, map[string]interface{}{ - "paths": []interface{}{"/tmp/foo.log"}, - }), - }, - }, - }, - }, - }, - }, - []uint64{1, 1}, - []*proto.Features{ - nil, - nil, - }, - onObserved, - 500*time.Millisecond, - ) - require.NoError(t, srv.Start()) - defer srv.Stop() - - client := client.NewV2(fmt.Sprintf(":%d", srv.Port), "", client.VersionInfo{ - Name: "program", - Version: "v1.0.0", - Meta: map[string]string{ - "key": "value", - }, - }, grpc.WithTransportCredentials(insecure.NewCredentials())) - - m, err := NewV2AgentManagerWithClient( - &Config{ - Enabled: true, - }, - r, - client, - WithChangeDebounce(300*time.Millisecond), - WithForceReloadDebounce(800*time.Millisecond), - ) - require.NoError(t, err) - - mm := m.(*BeatV2Manager) - - err = m.Start() - require.NoError(t, err) - defer m.Stop() - - forceReloadStates := []bool{false, true, false} - forceReloadStateIdx := 0 - forceReloadLastState := true // starts on true so the first iteration is already a change - - eventuallyCheck := func() bool { - forceReload := mm.forceReload - // That detects a state change, we only count/advance steps - // on state changes - if forceReload != forceReloadLastState { - forceReloadLastState = forceReload - if forceReload == forceReloadStates[forceReloadStateIdx] { - // Set to the next state - forceReloadStateIdx++ - } - - // If we went through all states, then succeed - if forceReloadStateIdx == len(forceReloadStates) { - // If we went through all states - if configIdx == 1 { - return true - } - } - } - - return false - } - - require.Eventually(t, eventuallyCheck, 20*time.Second, 300*time.Millisecond, - "the expected changes on forceReload did not happen") -} - -type reloadableListMock struct { - mx sync.Mutex - configs []*reload.ConfigWithMeta - ReloadImpl func(configs []*reload.ConfigWithMeta) error -} - -func (r *reloadableListMock) Reload(configs []*reload.ConfigWithMeta) error { - r.mx.Lock() - defer r.mx.Unlock() - return r.ReloadImpl(configs) -} - -func (r *reloadableListMock) Configs() []*reload.ConfigWithMeta { - r.mx.Lock() - defer r.mx.Unlock() - return r.configs -} diff --git a/x-pack/libbeat/management/managerV2_test.go b/x-pack/libbeat/management/managerV2_test.go index 3aec77bac91..9fe238605b4 100644 --- a/x-pack/libbeat/management/managerV2_test.go +++ b/x-pack/libbeat/management/managerV2_test.go @@ -37,45 +37,45 @@ func TestManagerV2(t *testing.T) { inputs := &reloadableList{} r.MustRegisterInput(inputs) - configsSet := false - configsCleared := false - logLevelSet := false - fqdnEnabled := false - allStopped := false + configsSet := atomic.Bool{} + configsCleared := atomic.Bool{} + logLevelSet := atomic.Bool{} + fqdnEnabled := atomic.Bool{} + allStopped := atomic.Bool{} onObserved := func(observed *proto.CheckinObserved, currentIdx int) { if currentIdx == 1 { oCfg := output.Config() iCfgs := inputs.Configs() if oCfg != nil && len(iCfgs) == 3 { - configsSet = true - t.Logf("output and inputs configuration set") + configsSet.Store(true) + t.Log("output and inputs configuration set") } } else if currentIdx == 2 { oCfg := output.Config() iCfgs := inputs.Configs() if oCfg == nil || len(iCfgs) != 3 { // should not happen (config no longer set) - configsSet = false - t.Logf("output and inputs configuration cleared (should not happen)") + configsSet.Store(false) + t.Log("output and inputs configuration cleared (should not happen)") } } else { oCfg := output.Config() iCfgs := inputs.Configs() if oCfg == nil && len(iCfgs) == 0 { - configsCleared = true + configsCleared.Store(true) } if len(observed.Units) == 0 { - allStopped = true - t.Logf("output and inputs configuration cleared (stopping)") + allStopped.Store(true) + t.Log("output and inputs configuration cleared (stopping)") } } if logp.GetLevel() == zapcore.DebugLevel { - logLevelSet = true - t.Logf("debug log level set") + logLevelSet.Store(true) + t.Log("debug log level set") } - fqdnEnabled = features.FQDN() - t.Logf("FQDN feature flag set to %v", fqdnEnabled) + fqdnEnabled.Store(features.FQDN()) + t.Logf("FQDN feature flag set to %v", fqdnEnabled.Load()) } srv := integration.NewMockServer([][]*proto.UnitExpected{ @@ -221,7 +221,7 @@ func TestManagerV2(t *testing.T) { defer m.Stop() require.Eventually(t, func() bool { - return configsSet && configsCleared && logLevelSet && fqdnEnabled && allStopped + return configsSet.Load() && configsCleared.Load() && logLevelSet.Load() && fqdnEnabled.Load() && allStopped.Load() }, 15*time.Second, 300*time.Millisecond) } @@ -245,7 +245,7 @@ func TestOutputError(t *testing.T) { } r.MustRegisterInput(inputs) - stateReached := false + stateReached := atomic.Bool{} units := []*proto.UnitExpected{ { Id: "output-unit", @@ -303,7 +303,7 @@ func TestOutputError(t *testing.T) { server := &mock.StubServerV2{ CheckinV2Impl: func(observed *proto.CheckinObserved) *proto.CheckinExpected { if DoesStateMatch(observed, desiredState, 0) { - stateReached = true + stateReached.Store(true) } return &proto.CheckinExpected{ Units: units, @@ -348,7 +348,7 @@ func TestOutputError(t *testing.T) { defer m.Stop() require.Eventually(t, func() bool { - return stateReached + return stateReached.Load() }, 10*time.Second, 100*time.Millisecond, "desired state, output failed, was not reached") }