From 046a81aa734dc4b61b66c3214ba6888a72d68bc9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Eduard=20Tudenh=C3=B6fner?= Date: Tue, 17 Jan 2023 17:49:24 +0100 Subject: [PATCH] Spark: Add the query ID to file names (#6569) --- .../apache/iceberg/io/OutputFileFactory.java | 23 ++++++++++++++---- .../iceberg/io/TestOutputFileFactory.java | 24 +++++++++++++++++++ .../spark/source/SparkPositionDeltaWrite.java | 9 +++++++ .../iceberg/spark/source/SparkWrite.java | 16 +++++++++++-- .../spark/source/SparkPositionDeltaWrite.java | 9 +++++++ .../iceberg/spark/source/SparkWrite.java | 16 +++++++++++-- 6 files changed, 89 insertions(+), 8 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java b/core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java index 812c3d05427d..473272635df0 100644 --- a/core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java +++ b/core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java @@ -30,7 +30,7 @@ import org.apache.iceberg.encryption.EncryptedOutputFile; import org.apache.iceberg.encryption.EncryptionManager; -/** Factory responsible for generating unique but recognizable data file names. */ +/** Factory responsible for generating unique but recognizable data/delete file names. */ public class OutputFileFactory { private final PartitionSpec defaultSpec; private final FileFormat format; @@ -46,6 +46,7 @@ public class OutputFileFactory { // with a recursive listing and grep. private final String operationId; private final AtomicInteger fileCount = new AtomicInteger(0); + private final String suffix; /** * Constructor with specific operationId. The [partitionId, taskId, operationId] triplet has to be @@ -60,6 +61,7 @@ public class OutputFileFactory { * @param partitionId First part of the file name * @param taskId Second part of the file name * @param operationId Third part of the file name + * @param suffix Suffix part of the file name */ private OutputFileFactory( PartitionSpec spec, @@ -69,7 +71,8 @@ private OutputFileFactory( EncryptionManager encryptionManager, int partitionId, long taskId, - String operationId) { + String operationId, + String suffix) { this.defaultSpec = spec; this.format = format; this.locations = locations; @@ -78,6 +81,7 @@ private OutputFileFactory( this.partitionId = partitionId; this.taskId = taskId; this.operationId = operationId; + this.suffix = suffix; } public static Builder builderFor(Table table, int partitionId, long taskId) { @@ -87,7 +91,12 @@ public static Builder builderFor(Table table, int partitionId, long taskId) { private String generateFilename() { return format.addExtension( String.format( - "%05d-%d-%s-%05d", partitionId, taskId, operationId, fileCount.incrementAndGet())); + "%05d-%d-%s-%05d%s", + partitionId, + taskId, + operationId, + fileCount.incrementAndGet(), + null != suffix ? "-" + suffix : "")); } /** Generates an {@link EncryptedOutputFile} for unpartitioned writes. */ @@ -115,6 +124,7 @@ public static class Builder { private PartitionSpec defaultSpec; private String operationId; private FileFormat format; + private String suffix; private Builder(Table table, int partitionId, long taskId) { this.table = table; @@ -143,12 +153,17 @@ public Builder format(FileFormat newFormat) { return this; } + public Builder suffix(String newSuffix) { + this.suffix = newSuffix; + return this; + } + public OutputFileFactory build() { LocationProvider locations = table.locationProvider(); FileIO io = table.io(); EncryptionManager encryption = table.encryption(); return new OutputFileFactory( - defaultSpec, format, locations, io, encryption, partitionId, taskId, operationId); + defaultSpec, format, locations, io, encryption, partitionId, taskId, operationId, suffix); } } } diff --git a/core/src/test/java/org/apache/iceberg/io/TestOutputFileFactory.java b/core/src/test/java/org/apache/iceberg/io/TestOutputFileFactory.java index 616a42791dd6..f7c81ae879c9 100644 --- a/core/src/test/java/org/apache/iceberg/io/TestOutputFileFactory.java +++ b/core/src/test/java/org/apache/iceberg/io/TestOutputFileFactory.java @@ -26,6 +26,7 @@ import org.apache.iceberg.data.Record; import org.apache.iceberg.encryption.EncryptedOutputFile; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.assertj.core.api.Assertions; import org.junit.Assert; import org.junit.Test; import org.junit.runner.RunWith; @@ -76,4 +77,27 @@ public void testOutputFileFactoryWithMultipleSpecs() { Assert.assertTrue( partitionedFileLocation.endsWith("data_bucket=7/00001-100-append-00002.parquet")); } + + @Test + public void testWithCustomSuffix() { + OutputFileFactory fileFactory = + OutputFileFactory.builderFor(table, PARTITION_ID, TASK_ID) + .operationId("append") + .suffix("suffix") + .build(); + + EncryptedOutputFile unpartitionedFile = + fileFactory.newOutputFile(PartitionSpec.unpartitioned(), null); + String unpartitionedFileLocation = unpartitionedFile.encryptingOutputFile().location(); + Assertions.assertThat(unpartitionedFileLocation) + .endsWith("data/00001-100-append-00001-suffix.parquet"); + + Record record = GenericRecord.create(table.schema()).copy(ImmutableMap.of("data", "aaa")); + PartitionKey partitionKey = new PartitionKey(table.spec(), table.schema()); + partitionKey.partition(record); + EncryptedOutputFile partitionedFile = fileFactory.newOutputFile(table.spec(), partitionKey); + String partitionedFileLocation = partitionedFile.encryptingOutputFile().location(); + Assertions.assertThat(partitionedFileLocation) + .endsWith("data_bucket=7/00001-100-append-00002-suffix.parquet"); + } } diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java index f819cd31fd5a..1f65184925b9 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java @@ -335,10 +335,13 @@ public DeltaWriter createWriter(int partitionId, long taskId) { OutputFileFactory dataFileFactory = OutputFileFactory.builderFor(table, partitionId, taskId) .format(context.dataFileFormat()) + .operationId(context.queryId()) .build(); OutputFileFactory deleteFileFactory = OutputFileFactory.builderFor(table, partitionId, taskId) .format(context.deleteFileFormat()) + .operationId(context.queryId()) + .suffix("deletes") .build(); SparkFileWriterFactory writerFactory = @@ -660,6 +663,7 @@ private static class Context implements Serializable { private final FileFormat deleteFileFormat; private final long targetDeleteFileSize; private final boolean fanoutWriterEnabled; + private final String queryId; Context(Schema dataSchema, SparkWriteConf writeConf, ExtendedLogicalWriteInfo info) { this.dataSchema = dataSchema; @@ -671,6 +675,7 @@ private static class Context implements Serializable { this.targetDeleteFileSize = writeConf.targetDeleteFileSize(); this.metadataSparkType = info.metadataSchema(); this.fanoutWriterEnabled = writeConf.fanoutWriterEnabled(); + this.queryId = info.queryId(); } Schema dataSchema() { @@ -708,5 +713,9 @@ long targetDeleteFileSize() { boolean fanoutWriterEnabled() { return fanoutWriterEnabled; } + + String queryId() { + return queryId; + } } } diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java index 52e43d3484a6..e84442a00afd 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java @@ -186,7 +186,13 @@ private WriterFactory createWriterFactory() { Broadcast tableBroadcast = sparkContext.broadcast(SerializableTableWithSize.copyOf(table)); return new WriterFactory( - tableBroadcast, format, targetFileSize, writeSchema, dsSchema, partitionedFanoutEnabled); + tableBroadcast, + queryId, + format, + targetFileSize, + writeSchema, + dsSchema, + partitionedFanoutEnabled); } private void commitOperation(SnapshotUpdate operation, String description) { @@ -620,9 +626,11 @@ private static class WriterFactory implements DataWriterFactory, StreamingDataWr private final Schema writeSchema; private final StructType dsSchema; private final boolean partitionedFanoutEnabled; + private final String queryId; protected WriterFactory( Broadcast
tableBroadcast, + String queryId, FileFormat format, long targetFileSize, Schema writeSchema, @@ -634,6 +642,7 @@ protected WriterFactory( this.writeSchema = writeSchema; this.dsSchema = dsSchema; this.partitionedFanoutEnabled = partitionedFanoutEnabled; + this.queryId = queryId; } @Override @@ -648,7 +657,10 @@ public DataWriter createWriter(int partitionId, long taskId, long e FileIO io = table.io(); OutputFileFactory fileFactory = - OutputFileFactory.builderFor(table, partitionId, taskId).format(format).build(); + OutputFileFactory.builderFor(table, partitionId, taskId) + .format(format) + .operationId(queryId) + .build(); SparkFileWriterFactory writerFactory = SparkFileWriterFactory.builderFor(table) .dataFileFormat(format) diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java index 32d603d5a794..6c278e131d74 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java @@ -335,10 +335,13 @@ public DeltaWriter createWriter(int partitionId, long taskId) { OutputFileFactory dataFileFactory = OutputFileFactory.builderFor(table, partitionId, taskId) .format(context.dataFileFormat()) + .operationId(context.queryId()) .build(); OutputFileFactory deleteFileFactory = OutputFileFactory.builderFor(table, partitionId, taskId) .format(context.deleteFileFormat()) + .operationId(context.queryId()) + .suffix("deletes") .build(); SparkFileWriterFactory writerFactory = @@ -660,6 +663,7 @@ private static class Context implements Serializable { private final FileFormat deleteFileFormat; private final long targetDeleteFileSize; private final boolean fanoutWriterEnabled; + private final String queryId; Context(Schema dataSchema, SparkWriteConf writeConf, ExtendedLogicalWriteInfo info) { this.dataSchema = dataSchema; @@ -671,6 +675,7 @@ private static class Context implements Serializable { this.targetDeleteFileSize = writeConf.targetDeleteFileSize(); this.metadataSparkType = info.metadataSchema(); this.fanoutWriterEnabled = writeConf.fanoutWriterEnabled(); + this.queryId = info.queryId(); } Schema dataSchema() { @@ -708,5 +713,9 @@ long targetDeleteFileSize() { boolean fanoutWriterEnabled() { return fanoutWriterEnabled; } + + String queryId() { + return queryId; + } } } diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java index f77d96da7f0d..9ae7280ea87e 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java @@ -186,7 +186,13 @@ private WriterFactory createWriterFactory() { Broadcast
tableBroadcast = sparkContext.broadcast(SerializableTableWithSize.copyOf(table)); return new WriterFactory( - tableBroadcast, format, targetFileSize, writeSchema, dsSchema, partitionedFanoutEnabled); + tableBroadcast, + queryId, + format, + targetFileSize, + writeSchema, + dsSchema, + partitionedFanoutEnabled); } private void commitOperation(SnapshotUpdate operation, String description) { @@ -633,9 +639,11 @@ private static class WriterFactory implements DataWriterFactory, StreamingDataWr private final Schema writeSchema; private final StructType dsSchema; private final boolean partitionedFanoutEnabled; + private final String queryId; protected WriterFactory( Broadcast
tableBroadcast, + String queryId, FileFormat format, long targetFileSize, Schema writeSchema, @@ -647,6 +655,7 @@ protected WriterFactory( this.writeSchema = writeSchema; this.dsSchema = dsSchema; this.partitionedFanoutEnabled = partitionedFanoutEnabled; + this.queryId = queryId; } @Override @@ -661,7 +670,10 @@ public DataWriter createWriter(int partitionId, long taskId, long e FileIO io = table.io(); OutputFileFactory fileFactory = - OutputFileFactory.builderFor(table, partitionId, taskId).format(format).build(); + OutputFileFactory.builderFor(table, partitionId, taskId) + .format(format) + .operationId(queryId) + .build(); SparkFileWriterFactory writerFactory = SparkFileWriterFactory.builderFor(table) .dataFileFormat(format)