Skip to content

Commit

Permalink
[SPARK-6554] [SQL] Don't push down predicates which reference partiti…
Browse files Browse the repository at this point in the history
…on column(s)

There are two cases for the new Parquet data source:

1. Partition columns exist in the Parquet data files

   We don't need to push-down these predicates since partition pruning already handles them.

1. Partition columns don't exist in the Parquet data files

   We can't push-down these predicates since they are considered as invalid columns by Parquet.

<!-- Reviewable:start -->
[<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/5210)
<!-- Reviewable:end -->

Author: Cheng Lian <lian@databricks.com>

Closes #5210 from liancheng/spark-6554 and squashes the following commits:

4f7ec03 [Cheng Lian] Adds comments
e134ced [Cheng Lian] Don't push down predicates which reference partition column(s)
  • Loading branch information
liancheng authored and marmbrus committed Mar 26, 2015
1 parent 784fcd5 commit 71a0d40
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -435,11 +435,18 @@ private[sql] case class ParquetRelation2(
// Push down filters when possible. Notice that not all filters can be converted to Parquet
// filter predicate. Here we try to convert each individual predicate and only collect those
// convertible ones.
predicates
.flatMap(ParquetFilters.createFilter)
.reduceOption(FilterApi.and)
.filter(_ => sqlContext.conf.parquetFilterPushDown)
.foreach(ParquetInputFormat.setFilterPredicate(jobConf, _))
if (sqlContext.conf.parquetFilterPushDown) {
predicates
// Don't push down predicates which reference partition columns
.filter { pred =>
val partitionColNames = partitionColumns.map(_.name).toSet
val referencedColNames = pred.references.map(_.name).toSet
referencedColNames.intersect(partitionColNames).isEmpty
}
.flatMap(ParquetFilters.createFilter)
.reduceOption(FilterApi.and)
.foreach(ParquetInputFormat.setFilterPredicate(jobConf, _))
}

if (isPartitioned) {
logInfo {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,23 @@ class ParquetDataSourceOnFilterSuite extends ParquetFilterSuiteBase with BeforeA
override protected def afterAll(): Unit = {
sqlContext.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, originalConf.toString)
}

test("SPARK-6554: don't push down predicates which reference partition columns") {
import sqlContext.implicits._

withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED -> "true") {
withTempPath { dir =>
val path = s"${dir.getCanonicalPath}/part=1"
(1 to 3).map(i => (i, i.toString)).toDF("a", "b").saveAsParquetFile(path)

// If the "part = 1" filter gets pushed down, this query will throw an exception since
// "part" is not a valid column in the actual Parquet file
checkAnswer(
sqlContext.parquetFile(path).filter("part = 1"),
(1 to 3).map(i => Row(i, i.toString, 1)))
}
}
}
}

class ParquetDataSourceOffFilterSuite extends ParquetFilterSuiteBase with BeforeAndAfterAll {
Expand Down

0 comments on commit 71a0d40

Please sign in to comment.