Skip to content

Commit

Permalink
Merge pull request #455 from answerbook/dominic/LOG-19797
Browse files Browse the repository at this point in the history
fix(s3): multipart file consolidation newlines
  • Loading branch information
dominic-mcallister-logdna committed May 1, 2024
2 parents 3de5ce9 + 14f17f6 commit 406dc17
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 9 deletions.
29 changes: 23 additions & 6 deletions src/sinks/aws_s3/file_consolidation_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,10 +273,13 @@ impl<'a> FileConsolidationProcessor<'a> {

upload_file_parts.reverse(); //reverse the list so we can pop off in the correct order
while let Some(file) = upload_file_parts.pop() {
let consolidated_file_has_data =
!files_to_delete.is_empty() || !buf_files.is_empty();

let (trim_open_bracket, trim_close_bracket, prepend_char) =
determine_download_properties(
self.output_format.clone(),
&buf_files,
consolidated_file_has_data,
&upload_file_parts,
);

Expand Down Expand Up @@ -693,8 +696,9 @@ async fn download_all_files_as_bytes(

(*files).reverse(); // reverse the list so we can pop off it and maintain order
while let Some(file) = (*files).pop() {
let consolidated_file_has_data = !files_to_delete.is_empty();
let (trim_open_bracket, trim_close_bracket, prepend_char) =
determine_download_properties(output_format.clone(), files_to_delete, files);
determine_download_properties(output_format.clone(), consolidated_file_has_data, files);

let b: Bytes = match download_file_as_bytes(
client,
Expand Down Expand Up @@ -793,19 +797,32 @@ async fn download_file_as_bytes(
Ok(buf.freeze())
}

/*
Determines how we should handle the file when downloading by
automatically aoppending commas or newlines between data as well
as trimming brackets to handle json files
@output_format: the type of file being written
@consolidated_file_has_data: indicates if we've already written data
@upload_file_parts: the remaining parts for the file
@@returns: tuple containing [
trim_open_bracket (bool),
trim_close_bracket (bool),
prepend_char (Option<char>)
]
*/
fn determine_download_properties(
output_format: String,
files_to_delete: &Vec<String>,
consolidated_file_has_data: bool,
upload_file_parts: &Vec<ConsolidationFile>,
) -> (bool, bool, Option<char>) {
let is_standard_json_file: bool = output_format == "json";

let trim_open_bracket = is_standard_json_file && !files_to_delete.is_empty();
let trim_open_bracket = is_standard_json_file && consolidated_file_has_data;
let trim_close_bracket = is_standard_json_file && !upload_file_parts.is_empty();

let prepend_char: Option<char> = if is_standard_json_file && !files_to_delete.is_empty() {
let prepend_char: Option<char> = if is_standard_json_file && consolidated_file_has_data {
Some(',')
} else if !files_to_delete.is_empty() {
} else if consolidated_file_has_data {
Some('\n')
} else {
None
Expand Down
6 changes: 3 additions & 3 deletions src/sinks/aws_s3/integration_tests_mezmo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -905,7 +905,7 @@ async fn s3_file_consolidation_large_files() {
// 1 file of 60000 lines
// newlines between each added file
let response_lines = get_lines(obj).await;
assert_eq!(response_lines.len(), 675_025);
assert_eq!(response_lines.len(), 675_026);
}

#[tokio::test]
Expand Down Expand Up @@ -965,11 +965,11 @@ async fn s3_file_consolidation_lots_of_10mb_files() {
let obj = get_object(&bucket, k).await;
assert_eq!(obj.content_encoding, Some("identity".to_string()));
assert_eq!(obj.content_type, Some("text/x-log".to_string()));
assert_eq!(obj.content_length, 151_500_000);
assert_eq!(obj.content_length, 151_500_014);

// 15 files of 100_000 lines that are all bashed together
let response_lines = get_lines(obj).await;
assert_eq!(response_lines.len(), 1_500_000);
assert_eq!(response_lines.len(), 1_500_014);
} else {
panic!("did not find the merged file as expected");
}
Expand Down

0 comments on commit 406dc17

Please sign in to comment.