Skip to content

Commit

Permalink
[yaml] Preserve windowing for windowed input when using FileIO Java p…
Browse files Browse the repository at this point in the history
…roviders (#32586)
  • Loading branch information
Polber authored Sep 30, 2024
1 parent 543d2ce commit 9b85f71
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.beam.sdk.values.PCollectionRowTuple;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
import org.apache.commons.csv.CSVFormat;

Expand Down Expand Up @@ -134,10 +135,18 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) {
if (configuration.getDelimiter() != null) {
format = format.withDelimiter(configuration.getDelimiter().charAt(0));
}
WriteFilesResult<?> result =
input
.get(INPUT_ROWS_TAG)
.apply(CsvIO.writeRows(configuration.getPath(), format).withSuffix(""));

// Preserve input windowing
CsvIO.Write<Row> writeTransform =
CsvIO.writeRows(configuration.getPath(), format).withSuffix("");
if (!input
.get(INPUT_ROWS_TAG)
.getWindowingStrategy()
.equals(WindowingStrategy.globalDefault())) {
writeTransform = writeTransform.withWindowedWrites();
}

WriteFilesResult<?> result = input.get(INPUT_ROWS_TAG).apply(writeTransform);
Schema outputSchema = Schema.of(Field.of("filename", FieldType.STRING));
return PCollectionRowTuple.of(
WRITE_RESULTS,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.beam.sdk.values.PCollectionRowTuple;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;

/**
Expand Down Expand Up @@ -121,8 +122,16 @@ protected static class JsonWriteTransform extends SchemaTransform {

@Override
public PCollectionRowTuple expand(PCollectionRowTuple input) {
WriteFilesResult<?> result =
input.get(INPUT_ROWS_TAG).apply(JsonIO.writeRows(configuration.getPath()).withSuffix(""));
// Preserve input windowing
JsonIO.Write<Row> writeTransform = JsonIO.writeRows(configuration.getPath()).withSuffix("");
if (!input
.get(INPUT_ROWS_TAG)
.getWindowingStrategy()
.equals(WindowingStrategy.globalDefault())) {
writeTransform = writeTransform.withWindowedWrites();
}

WriteFilesResult<?> result = input.get(INPUT_ROWS_TAG).apply(writeTransform);
Schema outputSchema = Schema.of(Field.of("filename", FieldType.STRING));
return PCollectionRowTuple.of(
WRITE_RESULTS,
Expand Down

0 comments on commit 9b85f71

Please sign in to comment.