-
Notifications
You must be signed in to change notification settings - Fork 230
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support orc coalescing reading #2909
Conversation
build |
Signed-off-by: Bobby Wang <wbo4958@gmail.com>
it would be good to update description saying fixed the leak and how you did it as well |
Did you run any tests on partitioned files? Also did we compare to CPU? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
made one pass through.
@@ -30,7 +30,9 @@ def read_orc_sql(data_path): | |||
# test with original orc file reader, the multi-file parallel reader for cloud | |||
original_orc_file_reader_conf = {'spark.rapids.sql.format.orc.reader.type': 'PERFILE'} | |||
multithreaded_orc_file_reader_conf = {'spark.rapids.sql.format.orc.reader.type': 'MULTITHREADED'} | |||
reader_opt_confs = [original_orc_file_reader_conf, multithreaded_orc_file_reader_conf] | |||
coalescing_orc_file_reader_conf = {'spark.rapids.sql.format.orc.reader.type': 'COALESCING'} | |||
# reader_opt_confs = [original_orc_file_reader_conf, multithreaded_orc_file_reader_conf, coalescing_orc_file_reader_conf] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
need to add more test it looks like or were some failing for some reason?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuMultiFileReader.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOrcScan.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOrcScan.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOrcScan.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOrcScan.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOrcScan.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOrcScan.scala
Outdated
Show resolved
Hide resolved
sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuParquetScan.scala
Outdated
Show resolved
Hide resolved
* @return Long, the estimated output size | ||
*/ | ||
override def calculateEstimatedBlocksOutputSize( | ||
filesAndBlocks: LinkedHashMap[Path, ArrayBuffer[DataBlockBase]], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe I'm missing it did we really need to change this interface to take the LinkedHashMap? It looks like you ignore the Path part of filesAndBlocks below in the foreach?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Better to change to the LinkedHashMap.
When we estimate the initial file footer size, we can use the original file's footer size as the worst case for all stripes in that file. or else, we need to accumulate every footer size for every stripe, which is a bit over-estimating.
Yes, I just ignore the Path, and I use the stripes(0).ctx to estimate the footer size for the stripes in that Path
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't really understand this response, why is it better to changed to LinkedHashMap and pass parameters you aren't using? If the answer is because the filesAndBlock is already a LinkedHashMap so we don't have to do any transformations, I'm fine with that but you need to say that and I would like to see it documented, otherwise someone is going to come along later and wonder about it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done, added some comments.
Done |
Yes, I did two kind of tests, the first one is the performance test for non-partitioned files. The second one the the comparison test for CPU and GPU for Partitioned files. You can see them in the description. Is there any necessary to run the performance test on partitioned files? |
Yes we should run something on partitioned files to make sure they are handled properly |
build |
@tgravescs , I added the Performance tests for partitioned and non-partitioned files and did the comparison tests for CPU/COALESCING for both partitioned and non-partitioned files. Pls refer to the descriptionon |
perhaps I'm missing it, do you have the perf results just running on CPU of reading some of these? |
sql-plugin/src/main/scala/com/nvidia/spark/rapids/GpuOrcScan.scala
Outdated
Show resolved
Hide resolved
overall very close a few nits and questions. |
build |
Hi @tgravescs, I added the perf test in descriptiton for CPU on Standalone, we can have 2x speed up for CPU for COALESCING reading |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
overall its fine, I would like to look into the footer estimate more though as it seem like it really overestimated.
Signed-off-by: Bobby Wang wbo4958@gmail.com
This PR is to support ORC coalescing reading.
The ORC coalescing reading is pretty the same with Parquet.
Below is the performance test result.
Performance on Standalone
1 CPU 12 cores, and 1 TITAN V (12G memory)
Non-partitioned 5000 orc files, total 1.3G in LOCAL storage
Partitioned 4092 orc files, total 1.3G in LOCAL storage
Performance on Non-partitioned ORC files in Databricks
Non-partitioned 5000 orc files, total 1.3G in LOCAL storage
Non-partitioned 5000 orc files, total 1.3G in DBFS storage
Non-partitioned 2659 orc files, total 6.9M, basically 1 row for 1 orc file. in LOCAL storage
Non-partitioned 2659 orc files, total 6.9M, basically 1 row for 1 orc file. in DBFS storage
Performance on Partitioned ORC files in Databricks
Partitioned 5797 orc files, total 1.3G in DBFS storage
Partitioned 5797 orc files, total 1.3G in LOCAL storage
result comparison for CPU and COALESCING
The results collected and sorted locally on the driver side are the same for CPU and COALESCING reading for both (Partitioned 561 orc files, total 375M) and (Non-partitioned 300 ORC files, total 354M)
Re-design the OrcPartitionReaderContext to fix orc reader leak issue
The previous OrcPartitionReaderContext holds the Orc readers during the whole Spark task, which causes some side-effects, like keep occupying the S3 connection pool which results in others trying to acquire the connection timeout. See the issue #2850
I previously had a PR to fix this issue for PERFILE by closing orc readers after an ORC file has finished reading, See the merged PR #2881.
But the #2881 PR is not available for COALESCING anymore, COALESCING needs to filter out all the stripes from all orc files beforehand, which means it will create all OrcPartitionReaderContext for all orc files, and we can't close all OrcPartitionReaderContext in time, because we will coalesce them later. So when the number of created OrcPartitionReaderContext has exceeded the number of the max S3 pool, issue #2850 happens again.
The fix is to remove the orc readers from OrcPartitionReaderContext which only keeps some necessary information.
when reading from ORC is necessary, we can create the orc reader and read and destroy. And at the same time, we have a threadpool to control how many tasks are processing simultaneously, So, as long as the number of concurrent orc readings is less than the S3 pool size. it will work.