diff --git a/filebeat/input/filestream/input_integration_test.go b/filebeat/input/filestream/input_integration_test.go index 376970cacf9..67436446cde 100644 --- a/filebeat/input/filestream/input_integration_test.go +++ b/filebeat/input/filestream/input_integration_test.go @@ -351,3 +351,169 @@ func TestFilestreamCloseTimeout(t *testing.T) { env.requireOffsetInRegistry(testlogName, len(testlines)) } + +// test_close_inactive from test_input.py +func TestFilestreamCloseAfterInterval(t *testing.T) { + env := newInputTestingEnvironment(t) + + testlogName := "test.log" + inp := env.mustCreateInput(map[string]interface{}{ + "paths": []string{env.abspath(testlogName)}, + "prospector.scanner.check_interval": "24h", + "close.on_state_change.check_interval": "100ms", + "close.on_state_change.inactive": "2s", + }) + + testlines := []byte("first line\nsecond line\nthird line\n") + env.mustWriteLinesToFile(testlogName, testlines) + + ctx, cancelInput := context.WithCancel(context.Background()) + env.startInput(ctx, inp) + + env.waitUntilEventCount(3) + env.requireOffsetInRegistry(testlogName, len(testlines)) + env.waitUntilHarvesterIsDone() + + cancelInput() + env.waitUntilInputStops() +} + +// test_close_inactive_file_removal from test_input.py +func TestFilestreamCloseAfterIntervalRemoved(t *testing.T) { + env := newInputTestingEnvironment(t) + + testlogName := "test.log" + inp := env.mustCreateInput(map[string]interface{}{ + "paths": []string{env.abspath(testlogName)}, + "prospector.scanner.check_interval": "24h", + "close.on_state_change.check_interval": "10ms", + "close.on_state_change.inactive": "100ms", + // reader is not stopped when file is removed to see if the reader can still detect + // if the file has been inactive even if it have been removed in the meantime + "close.on_state_change.removed": "false", + }) + + testlines := []byte("first line\nsecond line\nthird line\n") + env.mustWriteLinesToFile(testlogName, testlines) + + ctx, cancelInput := context.WithCancel(context.Background()) + env.startInput(ctx, inp) + + env.waitUntilEventCount(3) + env.requireOffsetInRegistry(testlogName, len(testlines)) + + env.mustRemoveFile(testlogName) + + env.waitUntilHarvesterIsDone() + + cancelInput() + env.waitUntilInputStops() +} + +func TestFilestreamCloseAfterIntervalRenamed(t *testing.T) { + env := newInputTestingEnvironment(t) + + testlogName := "test.log" + inp := env.mustCreateInput(map[string]interface{}{ + "paths": []string{env.abspath(testlogName)}, + "prospector.scanner.check_interval": "24h", + "close.on_state_change.check_interval": "10ms", + "close.on_state_change.inactive": "100ms", + // reader is not stopped when file is removed to see if the reader can still detect + // if the file has been inactive even if it have been removed in the meantime + "close.on_state_change.removed": "false", + }) + + testlines := []byte("first line\nsecond line\nthird line\n") + env.mustWriteLinesToFile(testlogName, testlines) + + ctx, cancelInput := context.WithCancel(context.Background()) + env.startInput(ctx, inp) + + env.waitUntilEventCount(3) + env.requireOffsetInRegistry(testlogName, len(testlines)) + + newFileName := "test_rotated.log" + env.mustRenameFile(testlogName, newFileName) + + env.waitUntilHarvesterIsDone() + + cancelInput() + env.waitUntilInputStops() +} + +// test_close_inactive_file_rotation_and_removal from test_input.py +func TestFilestreamCloseAfterIntervalRotatedAndRemoved(t *testing.T) { + env := newInputTestingEnvironment(t) + + testlogName := "test.log" + inp := env.mustCreateInput(map[string]interface{}{ + "paths": []string{env.abspath(testlogName)}, + "prospector.scanner.check_interval": "24h", + "close.on_state_change.check_interval": "10ms", + "close.on_state_change.inactive": "100ms", + // reader is not stopped when file is removed to see if the reader can still detect + // if the file has been inactive even if it have been removed in the meantime + "close.on_state_change.removed": "false", + }) + + testlines := []byte("first line\nsecond line\nthird line\n") + env.mustWriteLinesToFile(testlogName, testlines) + + ctx, cancelInput := context.WithCancel(context.Background()) + env.startInput(ctx, inp) + + env.waitUntilEventCount(3) + env.requireOffsetInRegistry(testlogName, len(testlines)) + + newFileName := "test_rotated.log" + env.mustRenameFile(testlogName, newFileName) + env.mustRemoveFile(newFileName) + + env.waitUntilHarvesterIsDone() + + cancelInput() + env.waitUntilInputStops() +} + +// test_close_inactive_file_rotation_and_removal2 from test_input.py +func TestFilestreamCloseAfterIntervalRotatedAndNewRemoved(t *testing.T) { + env := newInputTestingEnvironment(t) + + testlogName := "test.log" + inp := env.mustCreateInput(map[string]interface{}{ + "paths": []string{env.abspath(testlogName)}, + "prospector.scanner.check_interval": "1ms", + "close.on_state_change.check_interval": "10ms", + "close.on_state_change.inactive": "100ms", + // reader is not stopped when file is removed to see if the reader can still detect + // if the file has been inactive even if it have been removed in the meantime + "close.on_state_change.removed": "false", + }) + + testlines := []byte("first line\nsecond line\nthird line\n") + env.mustWriteLinesToFile(testlogName, testlines) + + ctx, cancelInput := context.WithCancel(context.Background()) + env.startInput(ctx, inp) + + env.waitUntilEventCount(3) + env.requireOffsetInRegistry(testlogName, len(testlines)) + + newFileName := "test_rotated.log" + env.mustRenameFile(testlogName, newFileName) + + env.waitUntilHarvesterIsDone() + + newTestlines := []byte("rotated first line\nrotated second line\nrotated third line\n") + env.mustWriteLinesToFile(testlogName, newTestlines) + + env.waitUntilEventCount(6) + + env.mustRemoveFile(newFileName) + + env.waitUntilHarvesterIsDone() + + cancelInput() + env.waitUntilInputStops() +}