Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Recompute Parquet block metadata when estimating footer from multiple file input #3666

Merged
merged 1 commit into from
Sep 27, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,11 @@ import org.apache.spark.sql.execution.datasources.parquet.ParquetReadSupport
import org.apache.spark.sql.execution.datasources.v2.FilePartitionReaderFactory
import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy
import org.apache.spark.sql.rapids.execution.TrampolineUtil
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types._
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.util.{SerializableConfiguration, Utils}
import org.apache.spark.util.SerializableConfiguration


/**
Expand Down Expand Up @@ -596,23 +595,20 @@ trait ParquetPartitionReaderBase extends Logging with Arm with ScanWithMetrics
}

/**
* Copies the data corresponding to the clipped blocks in the original file and compute the
* block metadata for the output. The output blocks will contain the same column chunk
* metadata but with the file offsets updated to reflect the new position of the column data
* as written to the output.
* Computes new block metadata to reflect where the blocks and columns will appear in the
* computed Parquet file.
*
* @param in the input stream for the original Parquet file
* @param out the output stream to receive the data
* @return updated block metadata corresponding to the output
* @param blocks block metadata from the original file(s) that will appear in the computed file
* @param realStartOffset starting file offset of the first block
* @param copyRangesToUpdate optional buffer to update with ranges of column data to copy
* @return updated block metadata
*/
protected def copyBlocksData(
in: FSDataInputStream,
out: HostMemoryOutputStream,
protected def computeBlockMetaData(
blocks: Seq[BlockMetaData],
realStartOffset: Long): Seq[BlockMetaData] = {
realStartOffset: Long,
copyRangesToUpdate: Option[ArrayBuffer[CopyRange]] = None): Seq[BlockMetaData] = {
var totalRows: Long = 0
val outputBlocks = new ArrayBuffer[BlockMetaData](blocks.length)
val copyRanges = new ArrayBuffer[CopyRange]
var currentCopyStart = 0L
var currentCopyEnd = 0L
var totalBytesToCopy = 0L
Expand Down Expand Up @@ -645,7 +641,9 @@ trait ParquetPartitionReaderBase extends Logging with Arm with ScanWithMetrics

if (currentCopyEnd != column.getStartingPos) {
if (currentCopyEnd != 0) {
copyRanges.append(CopyRange(currentCopyStart, currentCopyEnd - currentCopyStart))
copyRangesToUpdate.foreach {
_.append(CopyRange(currentCopyStart, currentCopyEnd - currentCopyStart))
}
}
currentCopyStart = column.getStartingPos
currentCopyEnd = currentCopyStart
Expand All @@ -657,8 +655,32 @@ trait ParquetPartitionReaderBase extends Logging with Arm with ScanWithMetrics
}

if (currentCopyEnd != currentCopyStart) {
copyRanges.append(CopyRange(currentCopyStart, currentCopyEnd - currentCopyStart))
copyRangesToUpdate.foreach {
_.append(CopyRange(currentCopyStart, currentCopyEnd - currentCopyStart))
}
}
outputBlocks
}

/**
* Copies the data corresponding to the clipped blocks in the original file and compute the
* block metadata for the output. The output blocks will contain the same column chunk
* metadata but with the file offsets updated to reflect the new position of the column data
* as written to the output.
*
* @param in the input stream for the original Parquet file
* @param out the output stream to receive the data
* @param blocks block metadata from the original file that will appear in the computed file
* @param realStartOffset starting file offset of the first block
* @return updated block metadata corresponding to the output
*/
protected def copyBlocksData(
in: FSDataInputStream,
out: HostMemoryOutputStream,
blocks: Seq[BlockMetaData],
realStartOffset: Long): Seq[BlockMetaData] = {
val copyRanges = new ArrayBuffer[CopyRange]
val outputBlocks = computeBlockMetaData(blocks, realStartOffset, Some(copyRanges))
val copyBuffer = new Array[Byte](copyBufferSize)
copyRanges.foreach(copyRange => copyDataRange(copyRange, in, out, copyBuffer))
outputBlocks
Expand Down Expand Up @@ -998,7 +1020,11 @@ class MultiFileParquetPartitionReader(
filesAndBlocks: LinkedHashMap[Path, ArrayBuffer[DataBlockBase]],
schema: SchemaBase): Long = {
val allBlocks = filesAndBlocks.values.flatten.toSeq
calculateParquetOutputSize(allBlocks, schema, true)
// Some Parquet versions sanity check the block metadata, and since the blocks could be from
// multiple files they will not pass the checks as they are.
val blockStartOffset = ParquetPartitionReader.PARQUET_MAGIC.length
val updatedBlocks = computeBlockMetaData(allBlocks, blockStartOffset)
calculateParquetOutputSize(updatedBlocks, schema, true)
}

override def getThreadPool(numThreads: Int): ThreadPoolExecutor = {
Expand Down