Skip to content

Commit

Permalink
Spark: Add the query ID to file names (#6569)
Browse files Browse the repository at this point in the history
  • Loading branch information
nastra committed Jan 17, 2023
1 parent ab1a19b commit 046a81a
Show file tree
Hide file tree
Showing 6 changed files with 89 additions and 8 deletions.
23 changes: 19 additions & 4 deletions core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import org.apache.iceberg.encryption.EncryptedOutputFile;
import org.apache.iceberg.encryption.EncryptionManager;

/** Factory responsible for generating unique but recognizable data file names. */
/** Factory responsible for generating unique but recognizable data/delete file names. */
public class OutputFileFactory {
private final PartitionSpec defaultSpec;
private final FileFormat format;
Expand All @@ -46,6 +46,7 @@ public class OutputFileFactory {
// with a recursive listing and grep.
private final String operationId;
private final AtomicInteger fileCount = new AtomicInteger(0);
private final String suffix;

/**
* Constructor with specific operationId. The [partitionId, taskId, operationId] triplet has to be
Expand All @@ -60,6 +61,7 @@ public class OutputFileFactory {
* @param partitionId First part of the file name
* @param taskId Second part of the file name
* @param operationId Third part of the file name
* @param suffix Suffix part of the file name
*/
private OutputFileFactory(
PartitionSpec spec,
Expand All @@ -69,7 +71,8 @@ private OutputFileFactory(
EncryptionManager encryptionManager,
int partitionId,
long taskId,
String operationId) {
String operationId,
String suffix) {
this.defaultSpec = spec;
this.format = format;
this.locations = locations;
Expand All @@ -78,6 +81,7 @@ private OutputFileFactory(
this.partitionId = partitionId;
this.taskId = taskId;
this.operationId = operationId;
this.suffix = suffix;
}

public static Builder builderFor(Table table, int partitionId, long taskId) {
Expand All @@ -87,7 +91,12 @@ public static Builder builderFor(Table table, int partitionId, long taskId) {
private String generateFilename() {
return format.addExtension(
String.format(
"%05d-%d-%s-%05d", partitionId, taskId, operationId, fileCount.incrementAndGet()));
"%05d-%d-%s-%05d%s",
partitionId,
taskId,
operationId,
fileCount.incrementAndGet(),
null != suffix ? "-" + suffix : ""));
}

/** Generates an {@link EncryptedOutputFile} for unpartitioned writes. */
Expand Down Expand Up @@ -115,6 +124,7 @@ public static class Builder {
private PartitionSpec defaultSpec;
private String operationId;
private FileFormat format;
private String suffix;

private Builder(Table table, int partitionId, long taskId) {
this.table = table;
Expand Down Expand Up @@ -143,12 +153,17 @@ public Builder format(FileFormat newFormat) {
return this;
}

public Builder suffix(String newSuffix) {
this.suffix = newSuffix;
return this;
}

public OutputFileFactory build() {
LocationProvider locations = table.locationProvider();
FileIO io = table.io();
EncryptionManager encryption = table.encryption();
return new OutputFileFactory(
defaultSpec, format, locations, io, encryption, partitionId, taskId, operationId);
defaultSpec, format, locations, io, encryption, partitionId, taskId, operationId, suffix);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.iceberg.data.Record;
import org.apache.iceberg.encryption.EncryptedOutputFile;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.assertj.core.api.Assertions;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand Down Expand Up @@ -76,4 +77,27 @@ public void testOutputFileFactoryWithMultipleSpecs() {
Assert.assertTrue(
partitionedFileLocation.endsWith("data_bucket=7/00001-100-append-00002.parquet"));
}

@Test
public void testWithCustomSuffix() {
OutputFileFactory fileFactory =
OutputFileFactory.builderFor(table, PARTITION_ID, TASK_ID)
.operationId("append")
.suffix("suffix")
.build();

EncryptedOutputFile unpartitionedFile =
fileFactory.newOutputFile(PartitionSpec.unpartitioned(), null);
String unpartitionedFileLocation = unpartitionedFile.encryptingOutputFile().location();
Assertions.assertThat(unpartitionedFileLocation)
.endsWith("data/00001-100-append-00001-suffix.parquet");

Record record = GenericRecord.create(table.schema()).copy(ImmutableMap.of("data", "aaa"));
PartitionKey partitionKey = new PartitionKey(table.spec(), table.schema());
partitionKey.partition(record);
EncryptedOutputFile partitionedFile = fileFactory.newOutputFile(table.spec(), partitionKey);
String partitionedFileLocation = partitionedFile.encryptingOutputFile().location();
Assertions.assertThat(partitionedFileLocation)
.endsWith("data_bucket=7/00001-100-append-00002-suffix.parquet");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -335,10 +335,13 @@ public DeltaWriter<InternalRow> createWriter(int partitionId, long taskId) {
OutputFileFactory dataFileFactory =
OutputFileFactory.builderFor(table, partitionId, taskId)
.format(context.dataFileFormat())
.operationId(context.queryId())
.build();
OutputFileFactory deleteFileFactory =
OutputFileFactory.builderFor(table, partitionId, taskId)
.format(context.deleteFileFormat())
.operationId(context.queryId())
.suffix("deletes")
.build();

SparkFileWriterFactory writerFactory =
Expand Down Expand Up @@ -660,6 +663,7 @@ private static class Context implements Serializable {
private final FileFormat deleteFileFormat;
private final long targetDeleteFileSize;
private final boolean fanoutWriterEnabled;
private final String queryId;

Context(Schema dataSchema, SparkWriteConf writeConf, ExtendedLogicalWriteInfo info) {
this.dataSchema = dataSchema;
Expand All @@ -671,6 +675,7 @@ private static class Context implements Serializable {
this.targetDeleteFileSize = writeConf.targetDeleteFileSize();
this.metadataSparkType = info.metadataSchema();
this.fanoutWriterEnabled = writeConf.fanoutWriterEnabled();
this.queryId = info.queryId();
}

Schema dataSchema() {
Expand Down Expand Up @@ -708,5 +713,9 @@ long targetDeleteFileSize() {
boolean fanoutWriterEnabled() {
return fanoutWriterEnabled;
}

String queryId() {
return queryId;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,13 @@ private WriterFactory createWriterFactory() {
Broadcast<Table> tableBroadcast =
sparkContext.broadcast(SerializableTableWithSize.copyOf(table));
return new WriterFactory(
tableBroadcast, format, targetFileSize, writeSchema, dsSchema, partitionedFanoutEnabled);
tableBroadcast,
queryId,
format,
targetFileSize,
writeSchema,
dsSchema,
partitionedFanoutEnabled);
}

private void commitOperation(SnapshotUpdate<?> operation, String description) {
Expand Down Expand Up @@ -620,9 +626,11 @@ private static class WriterFactory implements DataWriterFactory, StreamingDataWr
private final Schema writeSchema;
private final StructType dsSchema;
private final boolean partitionedFanoutEnabled;
private final String queryId;

protected WriterFactory(
Broadcast<Table> tableBroadcast,
String queryId,
FileFormat format,
long targetFileSize,
Schema writeSchema,
Expand All @@ -634,6 +642,7 @@ protected WriterFactory(
this.writeSchema = writeSchema;
this.dsSchema = dsSchema;
this.partitionedFanoutEnabled = partitionedFanoutEnabled;
this.queryId = queryId;
}

@Override
Expand All @@ -648,7 +657,10 @@ public DataWriter<InternalRow> createWriter(int partitionId, long taskId, long e
FileIO io = table.io();

OutputFileFactory fileFactory =
OutputFileFactory.builderFor(table, partitionId, taskId).format(format).build();
OutputFileFactory.builderFor(table, partitionId, taskId)
.format(format)
.operationId(queryId)
.build();
SparkFileWriterFactory writerFactory =
SparkFileWriterFactory.builderFor(table)
.dataFileFormat(format)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -335,10 +335,13 @@ public DeltaWriter<InternalRow> createWriter(int partitionId, long taskId) {
OutputFileFactory dataFileFactory =
OutputFileFactory.builderFor(table, partitionId, taskId)
.format(context.dataFileFormat())
.operationId(context.queryId())
.build();
OutputFileFactory deleteFileFactory =
OutputFileFactory.builderFor(table, partitionId, taskId)
.format(context.deleteFileFormat())
.operationId(context.queryId())
.suffix("deletes")
.build();

SparkFileWriterFactory writerFactory =
Expand Down Expand Up @@ -660,6 +663,7 @@ private static class Context implements Serializable {
private final FileFormat deleteFileFormat;
private final long targetDeleteFileSize;
private final boolean fanoutWriterEnabled;
private final String queryId;

Context(Schema dataSchema, SparkWriteConf writeConf, ExtendedLogicalWriteInfo info) {
this.dataSchema = dataSchema;
Expand All @@ -671,6 +675,7 @@ private static class Context implements Serializable {
this.targetDeleteFileSize = writeConf.targetDeleteFileSize();
this.metadataSparkType = info.metadataSchema();
this.fanoutWriterEnabled = writeConf.fanoutWriterEnabled();
this.queryId = info.queryId();
}

Schema dataSchema() {
Expand Down Expand Up @@ -708,5 +713,9 @@ long targetDeleteFileSize() {
boolean fanoutWriterEnabled() {
return fanoutWriterEnabled;
}

String queryId() {
return queryId;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,13 @@ private WriterFactory createWriterFactory() {
Broadcast<Table> tableBroadcast =
sparkContext.broadcast(SerializableTableWithSize.copyOf(table));
return new WriterFactory(
tableBroadcast, format, targetFileSize, writeSchema, dsSchema, partitionedFanoutEnabled);
tableBroadcast,
queryId,
format,
targetFileSize,
writeSchema,
dsSchema,
partitionedFanoutEnabled);
}

private void commitOperation(SnapshotUpdate<?> operation, String description) {
Expand Down Expand Up @@ -633,9 +639,11 @@ private static class WriterFactory implements DataWriterFactory, StreamingDataWr
private final Schema writeSchema;
private final StructType dsSchema;
private final boolean partitionedFanoutEnabled;
private final String queryId;

protected WriterFactory(
Broadcast<Table> tableBroadcast,
String queryId,
FileFormat format,
long targetFileSize,
Schema writeSchema,
Expand All @@ -647,6 +655,7 @@ protected WriterFactory(
this.writeSchema = writeSchema;
this.dsSchema = dsSchema;
this.partitionedFanoutEnabled = partitionedFanoutEnabled;
this.queryId = queryId;
}

@Override
Expand All @@ -661,7 +670,10 @@ public DataWriter<InternalRow> createWriter(int partitionId, long taskId, long e
FileIO io = table.io();

OutputFileFactory fileFactory =
OutputFileFactory.builderFor(table, partitionId, taskId).format(format).build();
OutputFileFactory.builderFor(table, partitionId, taskId)
.format(format)
.operationId(queryId)
.build();
SparkFileWriterFactory writerFactory =
SparkFileWriterFactory.builderFor(table)
.dataFileFormat(format)
Expand Down

0 comments on commit 046a81a

Please sign in to comment.