Skip to content

Commit

Permalink
Fix GpuFileFormatDataWriter failing to stat file after commit (#5107)
Browse files Browse the repository at this point in the history
Signed-off-by: Jason Lowe <jlowe@nvidia.com>
  • Loading branch information
jlowe authored Apr 4, 2022
1 parent 48ce369 commit d678a66
Show file tree
Hide file tree
Showing 6 changed files with 105 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ abstract class ColumnarOutputWriterFactory extends Serializable {
* must provide a zero-argument constructor. This is the columnar version of
* `org.apache.spark.sql.execution.datasources.OutputWriter`.
*/
abstract class ColumnarOutputWriter(path: String, context: TaskAttemptContext,
abstract class ColumnarOutputWriter(context: TaskAttemptContext,
dataSchema: StructType, rangeName: String) extends HostBufferConsumer with Arm {

val tableWriter: TableWriter
Expand Down Expand Up @@ -165,6 +165,11 @@ abstract class ColumnarOutputWriter(path: String, context: TaskAttemptContext,
writeBufferedData()
outputStream.close()
}

/**
* The file path to write. Invoked on the executor side.
*/
def path(): String
}

object ColumnarOutputWriter {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,13 +259,13 @@ class GpuParquetFileFormat extends ColumnarFileFormat with Logging {
}

class GpuParquetWriter(
path: String,
override val path: String,
dataSchema: StructType,
compressionType: CompressionType,
dateRebaseException: Boolean,
timestampRebaseException: Boolean,
context: TaskAttemptContext)
extends ColumnarOutputWriter(path, context, dataSchema, "Parquet") {
extends ColumnarOutputWriter(context, dataSchema, "Parquet") {

val outputTimestampType = conf.get(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019, NVIDIA CORPORATION.
* Copyright (c) 2019-2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -17,9 +17,12 @@
package org.apache.spark.sql.rapids

import java.io.FileNotFoundException
import java.nio.charset.StandardCharsets

import scala.collection.mutable

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.{FileSystem, Path}

import org.apache.spark.{SparkContext, TaskContext}
import org.apache.spark.internal.Logging
Expand Down Expand Up @@ -53,11 +56,11 @@ class BasicColumnarWriteTaskStatsTracker(
extends ColumnarWriteTaskStatsTracker with Logging {
private[this] var numPartitions: Int = 0
private[this] var numFiles: Int = 0
private[this] var submittedFiles: Int = 0
private[this] var numSubmittedFiles: Int = 0
private[this] var numBytes: Long = 0L
private[this] var numRows: Long = 0L

private[this] var curFile: Option[String] = None
private[this] val submittedFiles = mutable.HashSet[String]()

/**
* Get the size of the file expected to have been written by a worker.
Expand All @@ -67,37 +70,86 @@ class BasicColumnarWriteTaskStatsTracker(
private def getFileSize(filePath: String): Option[Long] = {
val path = new Path(filePath)
val fs = path.getFileSystem(hadoopConf)
getFileSize(fs, path)
}

/**
* Get the size of the file expected to have been written by a worker.
* This supports the XAttr in HADOOP-17414 when the "magic committer" adds
* a custom HTTP header to the a zero byte marker.
* If the output file as returned by getFileStatus > 0 then the length if
* returned. For zero-byte files, the (optional) Hadoop FS API getXAttr() is
* invoked. If a parseable, non-negative length can be retrieved, this
* is returned instead of the length.
* @return the file size or None if the file was not found.
*/
private def getFileSize(fs: FileSystem, path: Path): Option[Long] = {
// the normal file status probe.
try {
Some(fs.getFileStatus(path).getLen())
val len = fs.getFileStatus(path).getLen
if (len > 0) {
return Some(len)
}
} catch {
case e: FileNotFoundException =>
// may arise against eventually consistent object stores
// may arise against eventually consistent object stores.
logDebug(s"File $path is not yet visible", e)
None
return None
}

// Output File Size is 0. Look to see if it has an attribute
// declaring a future-file-length.
// Failure of API call, parsing, invalid value all return the
// 0 byte length.

var len = 0L
try {
val attr = fs.getXAttr(path, BasicColumnarWriteJobStatsTracker.FILE_LENGTH_XATTR)
if (attr != null && attr.nonEmpty) {
val str = new String(attr, StandardCharsets.UTF_8)
logDebug(s"File Length statistics for $path retrieved from XAttr: $str")
// a non-empty header was found. parse to a long via the java class
val l = java.lang.Long.parseLong(str)
if (l > 0) {
len = l
} else {
logDebug("Ignoring negative value in XAttr file length")
}
}
} catch {
case e: NumberFormatException =>
// warn but don't dump the whole stack
logInfo(s"Failed to parse" +
s" ${BasicColumnarWriteJobStatsTracker.FILE_LENGTH_XATTR}:$e;" +
s" bytes written may be under-reported");
case e: UnsupportedOperationException =>
// this is not unusual; ignore
logDebug(s"XAttr not supported on path $path", e);
case e: Exception =>
// Something else. Log at debug and continue.
logDebug(s"XAttr processing failure on $path", e);
}
Some(len)
}

override def newPartition(/*partitionValues: InternalRow*/): Unit = {
numPartitions += 1
}

override def newBucket(bucketId: Int): Unit = {
// currently unhandled
override def newFile(filePath: String): Unit = {
submittedFiles += filePath
numSubmittedFiles += 1
}

override def newFile(filePath: String): Unit = {
statCurrentFile()
curFile = Some(filePath)
submittedFiles += 1
override def closeFile(filePath: String): Unit = {
updateFileStats(filePath)
submittedFiles.remove(filePath)
}

private def statCurrentFile(): Unit = {
curFile.foreach { path =>
getFileSize(path).foreach { len =>
numBytes += len
numFiles += 1
}
curFile = None
private def updateFileStats(filePath: String): Unit = {
getFileSize(filePath).foreach { len =>
numBytes += len
numFiles += 1
}
}

Expand All @@ -106,16 +158,17 @@ class BasicColumnarWriteTaskStatsTracker(
}

override def getFinalStats(taskCommitTime: Long): WriteTaskStats = {
statCurrentFile()
submittedFiles.foreach(updateFileStats)
submittedFiles.clear()

// Reports bytesWritten and recordsWritten to the Spark output metrics.
Option(TaskContext.get()).map(_.taskMetrics().outputMetrics).foreach { outputMetrics =>
outputMetrics.setBytesWritten(numBytes)
outputMetrics.setRecordsWritten(numRows)
}

if (submittedFiles != numFiles) {
logWarning(s"Expected $submittedFiles files, but only saw $numFiles. " +
if (numSubmittedFiles != numFiles) {
logWarning(s"Expected $numSubmittedFiles files, but only saw $numFiles. " +
"This could be due to the output format not writing empty files, " +
"or files being not immediately visible in the filesystem.")
}
Expand Down Expand Up @@ -181,6 +234,8 @@ object BasicColumnarWriteJobStatsTracker {
private val NUM_PARTS_KEY = "numParts"
val TASK_COMMIT_TIME = "taskCommitTime"
val JOB_COMMIT_TIME = "jobCommitTime"
/** XAttr key of the data length header added in HADOOP-17414. */
val FILE_LENGTH_XATTR = "header.x-hadoop-s3a-magic-data-length"

def metrics: Map[String, SQLMetric] = {
val sparkContext = SparkContext.getActive.get
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019, NVIDIA CORPORATION.
* Copyright (c) 2019-2022, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -24,20 +24,7 @@ import org.apache.spark.sql.vectorized.ColumnarBatch
* processed by a single write task in [[GpuFileFormatDataWriter]] - i.e. there should be one
* instance per executor.
*
* This trait is coupled with the way [[GpuFileFormatWriter]] works, in the sense that its methods
* will be called according to how column batches are being written out to disk, namely in
* sorted order according to partitionValue(s), then bucketId.
*
* As such, a typical call scenario is:
*
* newPartition -> newBucket -> newFile -> newRow -.
* ^ |______^___________^ ^ ^____|
* | | |______________|
* | |____________________________|
* |____________________________________________|
*
* newPartition and newBucket events are only triggered if the relation to be written out is
* partitioned and/or bucketed, respectively.
* newPartition event is only triggered if the relation to be written out is partitioned.
*/
trait ColumnarWriteTaskStatsTracker {

Expand All @@ -50,19 +37,18 @@ trait ColumnarWriteTaskStatsTracker {
*/
def newPartition(/*partitionValues: InternalRow*/): Unit

/**
* Process the fact that a new bucket is about to written.
* Only triggered when the relation is bucketed by a (non-empty) sequence of columns.
* @param bucketId The bucket number.
*/
def newBucket(bucketId: Int): Unit

/**
* Process the fact that a new file is about to be written.
* @param filePath Path of the file into which future rows will be written.
*/
def newFile(filePath: String): Unit

/**
* Process the fact that a file is finished to be written and closed.
* @param filePath Path of the file.
*/
def closeFile(filePath: String): Unit

/**
* Process a new column batch to update the tracked statistics accordingly.
* The batch will be written to the most recently witnessed file (via `newFile`).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,16 +57,24 @@ abstract class GpuFileFormatDataWriter(
protected val statsTrackers: Seq[ColumnarWriteTaskStatsTracker] =
description.statsTrackers.map(_.newTaskInstance())

protected def releaseResources(): Unit = {
/** Release resources of `currentWriter`. */
protected def releaseCurrentWriter(): Unit = {
if (currentWriter != null) {
try {
currentWriter.close()
statsTrackers.foreach(_.closeFile(currentWriter.path()))
} finally {
currentWriter = null
}
}
}

/** Release all resources. */
protected def releaseResources(): Unit = {
// Call `releaseCurrentWriter()` by default, as this is the only resource to be released.
releaseCurrentWriter()
}

/** Writes a columnar batch of records */
def write(batch: ColumnarBatch): Unit

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,10 +156,10 @@ class GpuOrcFileFormat extends ColumnarFileFormat with Logging {
}
}

class GpuOrcWriter(path: String,
class GpuOrcWriter(override val path: String,
dataSchema: StructType,
context: TaskAttemptContext)
extends ColumnarOutputWriter(path, context, dataSchema, "ORC") {
extends ColumnarOutputWriter(context, dataSchema, "ORC") {

override val tableWriter: TableWriter = {
val builder = SchemaUtils
Expand Down

0 comments on commit d678a66

Please sign in to comment.