Skip to content

Commit

Permalink
Port close_inactive* tests from test_input.py (elastic#24706) (elasti…
Browse files Browse the repository at this point in the history
…c#24884)

(cherry picked from commit ade9a88)
  • Loading branch information
kvch authored Apr 1, 2021
1 parent bed2ad7 commit f2d4c16
Showing 1 changed file with 198 additions and 0 deletions.
198 changes: 198 additions & 0 deletions filebeat/input/filestream/input_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,3 +280,201 @@ func TestFilestreamUTF16BOMs(t *testing.T) {
})
}
}

// test_close_timeout from test_harvester.py
func TestFilestreamCloseTimeout(t *testing.T) {
env := newInputTestingEnvironment(t)

testlogName := "test.log"
inp := env.mustCreateInput(map[string]interface{}{
"paths": []string{env.abspath(testlogName)},
"prospector.scanner.check_interval": "24h",
"close.on_state_change.check_interval": "100ms",
"close.reader.after_interval": "500ms",
})

testlines := []byte("first line\n")
env.mustWriteLinesToFile(testlogName, testlines)

ctx, cancelInput := context.WithCancel(context.Background())
env.startInput(ctx, inp)

env.waitUntilEventCount(1)
env.requireOffsetInRegistry(testlogName, len(testlines))
env.waitUntilHarvesterIsDone()

env.mustWriteLinesToFile(testlogName, []byte("first line\nsecond log line\n"))

env.waitUntilEventCount(1)

cancelInput()
env.waitUntilInputStops()

env.requireOffsetInRegistry(testlogName, len(testlines))
}

// test_close_inactive from test_input.py
func TestFilestreamCloseAfterInterval(t *testing.T) {
env := newInputTestingEnvironment(t)

testlogName := "test.log"
inp := env.mustCreateInput(map[string]interface{}{
"paths": []string{env.abspath(testlogName)},
"prospector.scanner.check_interval": "24h",
"close.on_state_change.check_interval": "100ms",
"close.on_state_change.inactive": "2s",
})

testlines := []byte("first line\nsecond line\nthird line\n")
env.mustWriteLinesToFile(testlogName, testlines)

ctx, cancelInput := context.WithCancel(context.Background())
env.startInput(ctx, inp)

env.waitUntilEventCount(3)
env.requireOffsetInRegistry(testlogName, len(testlines))
env.waitUntilHarvesterIsDone()

cancelInput()
env.waitUntilInputStops()
}

// test_close_inactive_file_removal from test_input.py
func TestFilestreamCloseAfterIntervalRemoved(t *testing.T) {
env := newInputTestingEnvironment(t)

testlogName := "test.log"
inp := env.mustCreateInput(map[string]interface{}{
"paths": []string{env.abspath(testlogName)},
"prospector.scanner.check_interval": "24h",
"close.on_state_change.check_interval": "10ms",
"close.on_state_change.inactive": "100ms",
// reader is not stopped when file is removed to see if the reader can still detect
// if the file has been inactive even if it have been removed in the meantime
"close.on_state_change.removed": "false",
})

testlines := []byte("first line\nsecond line\nthird line\n")
env.mustWriteLinesToFile(testlogName, testlines)

ctx, cancelInput := context.WithCancel(context.Background())
env.startInput(ctx, inp)

env.waitUntilEventCount(3)
env.requireOffsetInRegistry(testlogName, len(testlines))

env.mustRemoveFile(testlogName)

env.waitUntilHarvesterIsDone()

cancelInput()
env.waitUntilInputStops()
}

func TestFilestreamCloseAfterIntervalRenamed(t *testing.T) {
env := newInputTestingEnvironment(t)

testlogName := "test.log"
inp := env.mustCreateInput(map[string]interface{}{
"paths": []string{env.abspath(testlogName)},
"prospector.scanner.check_interval": "24h",
"close.on_state_change.check_interval": "10ms",
"close.on_state_change.inactive": "100ms",
// reader is not stopped when file is removed to see if the reader can still detect
// if the file has been inactive even if it have been removed in the meantime
"close.on_state_change.removed": "false",
})

testlines := []byte("first line\nsecond line\nthird line\n")
env.mustWriteLinesToFile(testlogName, testlines)

ctx, cancelInput := context.WithCancel(context.Background())
env.startInput(ctx, inp)

env.waitUntilEventCount(3)
env.requireOffsetInRegistry(testlogName, len(testlines))

newFileName := "test_rotated.log"
env.mustRenameFile(testlogName, newFileName)

env.waitUntilHarvesterIsDone()

cancelInput()
env.waitUntilInputStops()
}

// test_close_inactive_file_rotation_and_removal from test_input.py
func TestFilestreamCloseAfterIntervalRotatedAndRemoved(t *testing.T) {
env := newInputTestingEnvironment(t)

testlogName := "test.log"
inp := env.mustCreateInput(map[string]interface{}{
"paths": []string{env.abspath(testlogName)},
"prospector.scanner.check_interval": "24h",
"close.on_state_change.check_interval": "10ms",
"close.on_state_change.inactive": "100ms",
// reader is not stopped when file is removed to see if the reader can still detect
// if the file has been inactive even if it have been removed in the meantime
"close.on_state_change.removed": "false",
})

testlines := []byte("first line\nsecond line\nthird line\n")
env.mustWriteLinesToFile(testlogName, testlines)

ctx, cancelInput := context.WithCancel(context.Background())
env.startInput(ctx, inp)

env.waitUntilEventCount(3)
env.requireOffsetInRegistry(testlogName, len(testlines))

newFileName := "test_rotated.log"
env.mustRenameFile(testlogName, newFileName)
env.mustRemoveFile(newFileName)

env.waitUntilHarvesterIsDone()

cancelInput()
env.waitUntilInputStops()
}

// test_close_inactive_file_rotation_and_removal2 from test_input.py
func TestFilestreamCloseAfterIntervalRotatedAndNewRemoved(t *testing.T) {
env := newInputTestingEnvironment(t)

testlogName := "test.log"
inp := env.mustCreateInput(map[string]interface{}{
"paths": []string{env.abspath(testlogName)},
"prospector.scanner.check_interval": "1ms",
"close.on_state_change.check_interval": "10ms",
"close.on_state_change.inactive": "100ms",
// reader is not stopped when file is removed to see if the reader can still detect
// if the file has been inactive even if it have been removed in the meantime
"close.on_state_change.removed": "false",
})

testlines := []byte("first line\nsecond line\nthird line\n")
env.mustWriteLinesToFile(testlogName, testlines)

ctx, cancelInput := context.WithCancel(context.Background())
env.startInput(ctx, inp)

env.waitUntilEventCount(3)
env.requireOffsetInRegistry(testlogName, len(testlines))

newFileName := "test_rotated.log"
env.mustRenameFile(testlogName, newFileName)

env.waitUntilHarvesterIsDone()

newTestlines := []byte("rotated first line\nrotated second line\nrotated third line\n")
env.mustWriteLinesToFile(testlogName, newTestlines)

env.waitUntilEventCount(6)

env.mustRemoveFile(newFileName)

env.waitUntilHarvesterIsDone()

cancelInput()
env.waitUntilInputStops()
}

0 comments on commit f2d4c16

Please sign in to comment.