Skip to content

Commit

Permalink
Recompute Parquet block metadata when estimating footer from multiple…
Browse files Browse the repository at this point in the history
… file input (#3666)

Signed-off-by: Jason Lowe <jlowe@nvidia.com>
  • Loading branch information
jlowe authored Sep 27, 2021
1 parent f567574 commit 4ee8ed1
Showing 1 changed file with 43 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,11 @@ import org.apache.spark.sql.execution.datasources.parquet.ParquetReadSupport
import org.apache.spark.sql.execution.datasources.v2.FilePartitionReaderFactory
import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy
import org.apache.spark.sql.rapids.execution.TrampolineUtil
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types._
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.util.{SerializableConfiguration, Utils}
import org.apache.spark.util.SerializableConfiguration


/**
Expand Down Expand Up @@ -596,23 +595,20 @@ trait ParquetPartitionReaderBase extends Logging with Arm with ScanWithMetrics
}

/**
* Copies the data corresponding to the clipped blocks in the original file and compute the
* block metadata for the output. The output blocks will contain the same column chunk
* metadata but with the file offsets updated to reflect the new position of the column data
* as written to the output.
* Computes new block metadata to reflect where the blocks and columns will appear in the
* computed Parquet file.
*
* @param in the input stream for the original Parquet file
* @param out the output stream to receive the data
* @return updated block metadata corresponding to the output
* @param blocks block metadata from the original file(s) that will appear in the computed file
* @param realStartOffset starting file offset of the first block
* @param copyRangesToUpdate optional buffer to update with ranges of column data to copy
* @return updated block metadata
*/
protected def copyBlocksData(
in: FSDataInputStream,
out: HostMemoryOutputStream,
protected def computeBlockMetaData(
blocks: Seq[BlockMetaData],
realStartOffset: Long): Seq[BlockMetaData] = {
realStartOffset: Long,
copyRangesToUpdate: Option[ArrayBuffer[CopyRange]] = None): Seq[BlockMetaData] = {
var totalRows: Long = 0
val outputBlocks = new ArrayBuffer[BlockMetaData](blocks.length)
val copyRanges = new ArrayBuffer[CopyRange]
var currentCopyStart = 0L
var currentCopyEnd = 0L
var totalBytesToCopy = 0L
Expand Down Expand Up @@ -645,7 +641,9 @@ trait ParquetPartitionReaderBase extends Logging with Arm with ScanWithMetrics

if (currentCopyEnd != column.getStartingPos) {
if (currentCopyEnd != 0) {
copyRanges.append(CopyRange(currentCopyStart, currentCopyEnd - currentCopyStart))
copyRangesToUpdate.foreach {
_.append(CopyRange(currentCopyStart, currentCopyEnd - currentCopyStart))
}
}
currentCopyStart = column.getStartingPos
currentCopyEnd = currentCopyStart
Expand All @@ -657,8 +655,32 @@ trait ParquetPartitionReaderBase extends Logging with Arm with ScanWithMetrics
}

if (currentCopyEnd != currentCopyStart) {
copyRanges.append(CopyRange(currentCopyStart, currentCopyEnd - currentCopyStart))
copyRangesToUpdate.foreach {
_.append(CopyRange(currentCopyStart, currentCopyEnd - currentCopyStart))
}
}
outputBlocks
}

/**
* Copies the data corresponding to the clipped blocks in the original file and compute the
* block metadata for the output. The output blocks will contain the same column chunk
* metadata but with the file offsets updated to reflect the new position of the column data
* as written to the output.
*
* @param in the input stream for the original Parquet file
* @param out the output stream to receive the data
* @param blocks block metadata from the original file that will appear in the computed file
* @param realStartOffset starting file offset of the first block
* @return updated block metadata corresponding to the output
*/
protected def copyBlocksData(
in: FSDataInputStream,
out: HostMemoryOutputStream,
blocks: Seq[BlockMetaData],
realStartOffset: Long): Seq[BlockMetaData] = {
val copyRanges = new ArrayBuffer[CopyRange]
val outputBlocks = computeBlockMetaData(blocks, realStartOffset, Some(copyRanges))
val copyBuffer = new Array[Byte](copyBufferSize)
copyRanges.foreach(copyRange => copyDataRange(copyRange, in, out, copyBuffer))
outputBlocks
Expand Down Expand Up @@ -998,7 +1020,11 @@ class MultiFileParquetPartitionReader(
filesAndBlocks: LinkedHashMap[Path, ArrayBuffer[DataBlockBase]],
schema: SchemaBase): Long = {
val allBlocks = filesAndBlocks.values.flatten.toSeq
calculateParquetOutputSize(allBlocks, schema, true)
// Some Parquet versions sanity check the block metadata, and since the blocks could be from
// multiple files they will not pass the checks as they are.
val blockStartOffset = ParquetPartitionReader.PARQUET_MAGIC.length
val updatedBlocks = computeBlockMetaData(allBlocks, blockStartOffset)
calculateParquetOutputSize(updatedBlocks, schema, true)
}

override def getThreadPool(numThreads: Int): ThreadPoolExecutor = {
Expand Down

0 comments on commit 4ee8ed1

Please sign in to comment.