Skip to content

Commit

Permalink
Port close_inactive* tests from test_input.py (elastic#24706)
Browse files Browse the repository at this point in the history
  • Loading branch information
kvch authored Apr 1, 2021
1 parent 51a3317 commit ade9a88
Showing 1 changed file with 166 additions and 0 deletions.
166 changes: 166 additions & 0 deletions filebeat/input/filestream/input_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -351,3 +351,169 @@ func TestFilestreamCloseTimeout(t *testing.T) {

env.requireOffsetInRegistry(testlogName, len(testlines))
}

// test_close_inactive from test_input.py
func TestFilestreamCloseAfterInterval(t *testing.T) {
env := newInputTestingEnvironment(t)

testlogName := "test.log"
inp := env.mustCreateInput(map[string]interface{}{
"paths": []string{env.abspath(testlogName)},
"prospector.scanner.check_interval": "24h",
"close.on_state_change.check_interval": "100ms",
"close.on_state_change.inactive": "2s",
})

testlines := []byte("first line\nsecond line\nthird line\n")
env.mustWriteLinesToFile(testlogName, testlines)

ctx, cancelInput := context.WithCancel(context.Background())
env.startInput(ctx, inp)

env.waitUntilEventCount(3)
env.requireOffsetInRegistry(testlogName, len(testlines))
env.waitUntilHarvesterIsDone()

cancelInput()
env.waitUntilInputStops()
}

// test_close_inactive_file_removal from test_input.py
func TestFilestreamCloseAfterIntervalRemoved(t *testing.T) {
env := newInputTestingEnvironment(t)

testlogName := "test.log"
inp := env.mustCreateInput(map[string]interface{}{
"paths": []string{env.abspath(testlogName)},
"prospector.scanner.check_interval": "24h",
"close.on_state_change.check_interval": "10ms",
"close.on_state_change.inactive": "100ms",
// reader is not stopped when file is removed to see if the reader can still detect
// if the file has been inactive even if it have been removed in the meantime
"close.on_state_change.removed": "false",
})

testlines := []byte("first line\nsecond line\nthird line\n")
env.mustWriteLinesToFile(testlogName, testlines)

ctx, cancelInput := context.WithCancel(context.Background())
env.startInput(ctx, inp)

env.waitUntilEventCount(3)
env.requireOffsetInRegistry(testlogName, len(testlines))

env.mustRemoveFile(testlogName)

env.waitUntilHarvesterIsDone()

cancelInput()
env.waitUntilInputStops()
}

func TestFilestreamCloseAfterIntervalRenamed(t *testing.T) {
env := newInputTestingEnvironment(t)

testlogName := "test.log"
inp := env.mustCreateInput(map[string]interface{}{
"paths": []string{env.abspath(testlogName)},
"prospector.scanner.check_interval": "24h",
"close.on_state_change.check_interval": "10ms",
"close.on_state_change.inactive": "100ms",
// reader is not stopped when file is removed to see if the reader can still detect
// if the file has been inactive even if it have been removed in the meantime
"close.on_state_change.removed": "false",
})

testlines := []byte("first line\nsecond line\nthird line\n")
env.mustWriteLinesToFile(testlogName, testlines)

ctx, cancelInput := context.WithCancel(context.Background())
env.startInput(ctx, inp)

env.waitUntilEventCount(3)
env.requireOffsetInRegistry(testlogName, len(testlines))

newFileName := "test_rotated.log"
env.mustRenameFile(testlogName, newFileName)

env.waitUntilHarvesterIsDone()

cancelInput()
env.waitUntilInputStops()
}

// test_close_inactive_file_rotation_and_removal from test_input.py
func TestFilestreamCloseAfterIntervalRotatedAndRemoved(t *testing.T) {
env := newInputTestingEnvironment(t)

testlogName := "test.log"
inp := env.mustCreateInput(map[string]interface{}{
"paths": []string{env.abspath(testlogName)},
"prospector.scanner.check_interval": "24h",
"close.on_state_change.check_interval": "10ms",
"close.on_state_change.inactive": "100ms",
// reader is not stopped when file is removed to see if the reader can still detect
// if the file has been inactive even if it have been removed in the meantime
"close.on_state_change.removed": "false",
})

testlines := []byte("first line\nsecond line\nthird line\n")
env.mustWriteLinesToFile(testlogName, testlines)

ctx, cancelInput := context.WithCancel(context.Background())
env.startInput(ctx, inp)

env.waitUntilEventCount(3)
env.requireOffsetInRegistry(testlogName, len(testlines))

newFileName := "test_rotated.log"
env.mustRenameFile(testlogName, newFileName)
env.mustRemoveFile(newFileName)

env.waitUntilHarvesterIsDone()

cancelInput()
env.waitUntilInputStops()
}

// test_close_inactive_file_rotation_and_removal2 from test_input.py
func TestFilestreamCloseAfterIntervalRotatedAndNewRemoved(t *testing.T) {
env := newInputTestingEnvironment(t)

testlogName := "test.log"
inp := env.mustCreateInput(map[string]interface{}{
"paths": []string{env.abspath(testlogName)},
"prospector.scanner.check_interval": "1ms",
"close.on_state_change.check_interval": "10ms",
"close.on_state_change.inactive": "100ms",
// reader is not stopped when file is removed to see if the reader can still detect
// if the file has been inactive even if it have been removed in the meantime
"close.on_state_change.removed": "false",
})

testlines := []byte("first line\nsecond line\nthird line\n")
env.mustWriteLinesToFile(testlogName, testlines)

ctx, cancelInput := context.WithCancel(context.Background())
env.startInput(ctx, inp)

env.waitUntilEventCount(3)
env.requireOffsetInRegistry(testlogName, len(testlines))

newFileName := "test_rotated.log"
env.mustRenameFile(testlogName, newFileName)

env.waitUntilHarvesterIsDone()

newTestlines := []byte("rotated first line\nrotated second line\nrotated third line\n")
env.mustWriteLinesToFile(testlogName, newTestlines)

env.waitUntilEventCount(6)

env.mustRemoveFile(newFileName)

env.waitUntilHarvesterIsDone()

cancelInput()
env.waitUntilInputStops()
}

0 comments on commit ade9a88

Please sign in to comment.