Skip to content

Commit

Permalink
Make the sync marker uniform for the Avro coalescing reader (#5428)
Browse files Browse the repository at this point in the history
* Make the sync marker uniform for coalescing reader

Signed-off-by: Firestarman <firestarmanllc@gmail.com>
  • Loading branch information
firestarman authored May 7, 2022
1 parent f7b4058 commit aa3dbab
Show file tree
Hide file tree
Showing 5 changed files with 150 additions and 72 deletions.
24 changes: 23 additions & 1 deletion integration_tests/src/main/python/avro_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# limitations under the License.
import os

from spark_session import with_cpu_session
from spark_session import with_cpu_session, with_gpu_session
import pytest

from asserts import assert_gpu_and_cpu_are_equal_collect
Expand Down Expand Up @@ -95,3 +95,25 @@ def test_avro_input_meta(spark_tmp_path, v1_enabled_list, reader_type):
'input_file_block_start()',
'input_file_block_length()'),
conf=all_confs)


# This is for https://github.com/NVIDIA/spark-rapids/issues/5312
@pytest.mark.parametrize('v1_enabled_list', ["avro", ""], ids=["v1", "v2"])
def test_coalescing_uniform_sync(spark_tmp_path, v1_enabled_list):
# Generate the data files
data_path = spark_tmp_path + '/AVRO_DATA'
with_cpu_session(
lambda spark: unary_op_df(spark, long_gen).repartition(coalescingPartitionNum)\
.write.format("avro").save(data_path))
# dump the coalesced files
dump_path = spark_tmp_path + '/AVRO_DUMP/'
all_confs = copy_and_update(_enable_all_types_conf, {
'spark.rapids.sql.format.avro.reader.type': 'COALESCING',
'spark.rapids.sql.avro.debug.dumpPrefix': dump_path,
'spark.sql.sources.useV1SourceList': v1_enabled_list})
with_gpu_session(
lambda spark: spark.read.format("avro").load(data_path).collect(),
conf=all_confs)
# read the coalesced files by CPU
with_cpu_session(
lambda spark: spark.read.format("avro").load(dump_path).collect())
Original file line number Diff line number Diff line change
Expand Up @@ -561,6 +561,17 @@ trait SingleDataBlockInfo {
def extraInfo: ExtraInfo // extra information
}

/**
* A context lives during the whole process of reading partitioned files
* to a batch buffer (aka HostMemoryBuffer) to build a memory file.
* Children can extend this to add more necessary fields.
* @param origChunkedBlocks mapping of file path to data blocks
* @param schema schema info
*/
class BatchContext(
val origChunkedBlocks: LinkedHashMap[Path, ArrayBuffer[DataBlockBase]],
val schema: SchemaBase) {}

/**
* The abstracted multi-file coalescing reading class, which tries to coalesce small
* ColumnarBatch into a bigger ColumnarBatch according to maxReadBatchSizeRows,
Expand Down Expand Up @@ -632,12 +643,10 @@ abstract class MultiFileCoalescingPartitionReaderBase(
*
* Please be note, the estimated size should be at least equal to size of HEAD + Blocks + FOOTER
*
* @param blocks a map with file as the key, and its stripes as the value
* @param schema shema info
* @param batchContext the batch building context
* @return Long, the estimated output size
*/
def calculateEstimatedBlocksOutputSize(blocks: LinkedHashMap[Path, ArrayBuffer[DataBlockBase]],
schema: SchemaBase): Long
def calculateEstimatedBlocksOutputSize(batchContext: BatchContext): Long

/**
* Calculate the final block output size which will be used to decide
Expand All @@ -652,11 +661,11 @@ abstract class MultiFileCoalescingPartitionReaderBase(
*
* @param footerOffset footer offset
* @param blocks blocks to be evaluated
* @param schema schema info
* @param batchContext the batch building context
* @return the output size
*/
def calculateFinalBlocksOutputSize(footerOffset: Long, blocks: Seq[DataBlockBase],
schema: SchemaBase): Long
batchContext: BatchContext): Long

/**
* Get ThreadPoolExecutor to run the Callable.
Expand All @@ -679,6 +688,7 @@ abstract class MultiFileCoalescingPartitionReaderBase(
* is in charge of closing it in sub-class
* @param blocks blocks meta info to specify which blocks to be read
* @param offset used as the offset adjustment
* @param batchContext the batch building context
* @return Callable[(Seq[DataBlockBase], Long)], which will be submitted to a
* ThreadPoolExecutor, and the Callable will return a tuple result and
* result._1 is block meta info with the offset adjusted
Expand All @@ -689,7 +699,8 @@ abstract class MultiFileCoalescingPartitionReaderBase(
file: Path,
outhmb: HostMemoryBuffer,
blocks: ArrayBuffer[DataBlockBase],
offset: Long): Callable[(Seq[DataBlockBase], Long)]
offset: Long,
batchContext: BatchContext): Callable[(Seq[DataBlockBase], Long)]

/**
* File format short name used for logging and other things to uniquely identity
Expand All @@ -715,11 +726,11 @@ abstract class MultiFileCoalescingPartitionReaderBase(
* Write a header for a specific file format. If there is no header for the file format,
* just ignore it and return 0
*
* @param paths the paths of files to be coalesced into a single batch
* @param buffer where the header will be written
* @param batchContext the batch building context
* @return how many bytes written
*/
def writeFileHeader(paths: Seq[Path], buffer: HostMemoryBuffer): Long
def writeFileHeader(buffer: HostMemoryBuffer, batchContext: BatchContext): Long

/**
* Writer a footer for a specific file format. If there is no footer for the file format,
Expand All @@ -733,11 +744,30 @@ abstract class MultiFileCoalescingPartitionReaderBase(
* @param bufferSize The total buffer size which equals to size of (header + blocks + footer)
* @param footerOffset Where begin to write the footer
* @param blocks The data block meta info
* @param clippedSchema The clipped schema info
* @param batchContext The batch building context
* @return the buffer and the buffer size
*/
def writeFileFooter(buffer: HostMemoryBuffer, bufferSize: Long, footerOffset: Long,
blocks: Seq[DataBlockBase], clippedSchema: SchemaBase): (HostMemoryBuffer, Long)
blocks: Seq[DataBlockBase], batchContext: BatchContext): (HostMemoryBuffer, Long)

/**
* Return a batch context which will be shared during the process of building a memory file,
* aka with the following APIs.
* - calculateEstimatedBlocksOutputSize
* - writeFileHeader
* - getBatchRunner
* - calculateFinalBlocksOutputSize
* - writeFileFooter
* It is useful when something is needed by some or all of the above APIs.
* Children can override this to return a customized batch context.
* @param chunkedBlocks mapping of file path to data blocks
* @param clippedSchema schema info
*/
protected def createBatchContext(
chunkedBlocks: LinkedHashMap[Path, ArrayBuffer[DataBlockBase]],
clippedSchema: SchemaBase): BatchContext = {
new BatchContext(chunkedBlocks, clippedSchema)
}

override def next(): Boolean = {
batch.foreach(_.close())
Expand Down Expand Up @@ -840,14 +870,15 @@ abstract class MultiFileCoalescingPartitionReaderBase(
}
val tasks = new java.util.ArrayList[Future[(Seq[DataBlockBase], Long)]]()

val batchContext = createBatchContext(filesAndBlocks, clippedSchema)
// First, estimate the output file size for the initial allocating.
// the estimated size should be >= size of HEAD + Blocks + FOOTER
val initTotalSize = calculateEstimatedBlocksOutputSize(filesAndBlocks, clippedSchema)
val initTotalSize = calculateEstimatedBlocksOutputSize(batchContext)

val (buffer, bufferSize, footerOffset, outBlocks) =
closeOnExcept(HostMemoryBuffer.allocate(initTotalSize)) { hmb =>
// Second, write header
var offset = writeFileHeader(filesAndBlocks.keys.toSeq, hmb)
var offset = writeFileHeader(hmb, batchContext)

val allOutputBlocks = scala.collection.mutable.ArrayBuffer[DataBlockBase]()
val tc = TaskContext.get
Expand All @@ -857,7 +888,7 @@ abstract class MultiFileCoalescingPartitionReaderBase(
val outLocal = hmb.slice(offset, fileBlockSize)
// Third, copy the blocks for each file in parallel using background threads
tasks.add(getThreadPool(numThreads).submit(
getBatchRunner(tc, file, outLocal, blocks, offset)))
getBatchRunner(tc, file, outLocal, blocks, offset, batchContext)))
offset += fileBlockSize
}

Expand All @@ -869,7 +900,7 @@ abstract class MultiFileCoalescingPartitionReaderBase(

// Fourth, calculate the final buffer size
val finalBufferSize = calculateFinalBlocksOutputSize(offset, allOutputBlocks,
clippedSchema)
batchContext)

(hmb, finalBufferSize, offset, allOutputBlocks)
}
Expand Down Expand Up @@ -908,7 +939,7 @@ abstract class MultiFileCoalescingPartitionReaderBase(
// reason to do that.
// If you have to do this, please think about to add other abstract methods first.
val (finalBuffer, finalBufferSize) = writeFileFooter(buf, totalBufferSize, footerOffset,
outBlocks, clippedSchema)
outBlocks, batchContext)

closeOnExcept(finalBuffer) { _ =>
// triple check we didn't go over memory
Expand Down
23 changes: 12 additions & 11 deletions sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOrcScan.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1849,14 +1849,11 @@ class MultiFileOrcPartitionReader(
*
* Please be note, the estimated size should be at least equal to size of HEAD + Blocks + FOOTER
*
* @param filesAndBlocks a map with file as the key, and its stripes as the value
* @param schema schema info
* @param batchContext the batch building context
* @return Long, the estimated output size
*/
override def calculateEstimatedBlocksOutputSize(
filesAndBlocks: LinkedHashMap[Path, ArrayBuffer[DataBlockBase]],
schema: SchemaBase): Long = {

override def calculateEstimatedBlocksOutputSize(batchContext: BatchContext): Long = {
val filesAndBlocks = batchContext.origChunkedBlocks
// start with header magic
var size: Long = OrcFile.MAGIC.length

Expand Down Expand Up @@ -1903,13 +1900,13 @@ class MultiFileOrcPartitionReader(
*
* @param footerOffset footer offset
* @param stripes stripes to be evaluated
* @param schema schema info
* @param batchContext the batch building context
* @return the output size
*/
override def calculateFinalBlocksOutputSize(
footerOffset: Long,
stripes: Seq[DataBlockBase],
schema: SchemaBase): Long = {
batchContext: BatchContext): Long = {

// In calculateEstimatedBlocksOutputSize, we have got the true size for
// HEADER + All STRIPES + the estimated the FileFooter size with the worst-case.
Expand Down Expand Up @@ -1941,6 +1938,7 @@ class MultiFileOrcPartitionReader(
* is in charge of closing it in sub-class
* @param blocks blocks meta info to specify which blocks to be read
* @param offset used as the offset adjustment
* @param batchContext the batch building context
* @return Callable[(Seq[DataBlockBase], Long)], which will be submitted to a
* ThreadPoolExecutor, and the Callable will return a tuple result and
* result._1 is block meta info with the offset adjusted
Expand All @@ -1951,7 +1949,8 @@ class MultiFileOrcPartitionReader(
file: Path,
outhmb: HostMemoryBuffer,
blocks: ArrayBuffer[DataBlockBase],
offset: Long): Callable[(Seq[DataBlockBase], Long)] = {
offset: Long,
batchContext: BatchContext): Callable[(Seq[DataBlockBase], Long)] = {
new OrcCopyStripesRunner(tc, file, outhmb, blocks, offset)
}

Expand Down Expand Up @@ -2007,9 +2006,10 @@ class MultiFileOrcPartitionReader(
* just ignore it and return 0
*
* @param buffer where the header will be written
* @param batchContext the batch building context
* @return how many bytes written
*/
override def writeFileHeader(paths: Seq[Path], buffer: HostMemoryBuffer): Long = {
override def writeFileHeader(buffer: HostMemoryBuffer, batchContext: BatchContext): Long = {
withResource(new HostMemoryOutputStream(buffer)) { out =>
withResource(new DataOutputStream(out)) { dataOut =>
dataOut.writeBytes(OrcFile.MAGIC)
Expand All @@ -2032,14 +2032,15 @@ class MultiFileOrcPartitionReader(
* @param footerOffset Where begin to write the footer
* @param stripes The data block meta info
* @param clippedSchema The clipped schema info
* @param batchContext The batch building context
* @return the buffer and the buffer size
*/
override def writeFileFooter(
buffer: HostMemoryBuffer,
bufferSize: Long,
footerOffset: Long,
stripes: Seq[DataBlockBase],
clippedSchema: SchemaBase): (HostMemoryBuffer, Long) = {
batchContext: BatchContext): (HostMemoryBuffer, Long) = {
val lenLeft = bufferSize - footerOffset
closeOnExcept(buffer) { _ =>
withResource(buffer.slice(footerOffset, lenLeft)) { finalizehmb =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import java.util.concurrent._

import scala.annotation.tailrec
import scala.collection.JavaConverters._
import scala.collection.mutable.{ArrayBuffer, LinkedHashMap}
import scala.collection.mutable.ArrayBuffer
import scala.language.implicitConversions
import scala.math.max

Expand Down Expand Up @@ -1140,15 +1140,13 @@ class MultiFileParquetPartitionReader(
false
}

override def calculateEstimatedBlocksOutputSize(
filesAndBlocks: LinkedHashMap[Path, ArrayBuffer[DataBlockBase]],
schema: SchemaBase): Long = {
val allBlocks = filesAndBlocks.values.flatten.toSeq
override def calculateEstimatedBlocksOutputSize(batchContext: BatchContext): Long = {
val allBlocks = batchContext.origChunkedBlocks.values.flatten.toSeq
// Some Parquet versions sanity check the block metadata, and since the blocks could be from
// multiple files they will not pass the checks as they are.
val blockStartOffset = ParquetPartitionReader.PARQUET_MAGIC.length
val updatedBlocks = computeBlockMetaData(allBlocks, blockStartOffset)
calculateParquetOutputSize(updatedBlocks, schema, true)
calculateParquetOutputSize(updatedBlocks, batchContext.schema, true)
}

override def getThreadPool(numThreads: Int): ThreadPoolExecutor = {
Expand All @@ -1160,7 +1158,8 @@ class MultiFileParquetPartitionReader(
file: Path,
outhmb: HostMemoryBuffer,
blocks: ArrayBuffer[DataBlockBase],
offset: Long): Callable[(Seq[DataBlockBase], Long)] = {
offset: Long,
batchContext: BatchContext): Callable[(Seq[DataBlockBase], Long)] = {
new ParquetCopyBlocksRunner(taskContext, file, outhmb, blocks, offset)
}

Expand Down Expand Up @@ -1196,30 +1195,30 @@ class MultiFileParquetPartitionReader(
evolveSchemaIfNeededAndClose(table, splits.mkString(","), clippedSchema)
}

override def writeFileHeader(paths: Seq[Path], buffer: HostMemoryBuffer): Long = {
override def writeFileHeader(buffer: HostMemoryBuffer, bContext: BatchContext): Long = {
withResource(new HostMemoryOutputStream(buffer)) { out =>
out.write(ParquetPartitionReader.PARQUET_MAGIC)
out.getPos
}
}

override def calculateFinalBlocksOutputSize(footerOffset: Long,
blocks: Seq[DataBlockBase], schema: SchemaBase): Long = {
blocks: Seq[DataBlockBase], bContext: BatchContext): Long = {

val actualFooterSize = calculateParquetFooterSize(blocks, schema)
val actualFooterSize = calculateParquetFooterSize(blocks, bContext.schema)
// 4 + 4 is for writing size and the ending PARQUET_MAGIC.
footerOffset + actualFooterSize + 4 + 4
}

override def writeFileFooter(buffer: HostMemoryBuffer, bufferSize: Long, footerOffset: Long,
blocks: Seq[DataBlockBase], clippedSchema: SchemaBase): (HostMemoryBuffer, Long) = {
blocks: Seq[DataBlockBase], bContext: BatchContext): (HostMemoryBuffer, Long) = {

val lenLeft = bufferSize - footerOffset

val finalSize = closeOnExcept(buffer) { _ =>
withResource(buffer.slice(footerOffset, lenLeft)) { finalizehmb =>
withResource(new HostMemoryOutputStream(finalizehmb)) { footerOut =>
writeFooter(footerOut, blocks, clippedSchema)
writeFooter(footerOut, blocks, bContext.schema)
BytesUtils.writeIntLittleEndian(footerOut, footerOut.getPos.toInt)
footerOut.write(ParquetPartitionReader.PARQUET_MAGIC)
footerOffset + footerOut.getPos
Expand Down
Loading

0 comments on commit aa3dbab

Please sign in to comment.