Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Option 'es.read.field.include' with Spark SQL fails to pushdown field or _source filtering #2244

Open
1 of 2 tasks
cpeterp opened this issue Jul 10, 2024 · 0 comments
Open
1 of 2 tasks

Comments

@cpeterp
Copy link

cpeterp commented Jul 10, 2024

What kind an issue is this?

  • Bug report. If you’ve found a bug, please provide a code snippet or test to reproduce it below.
    The easier it is to track down the bug, the faster it is solved.
  • Feature Request. Start by telling us what problem you’re trying to solve.
    Often a solution already exists! Don’t send pull requests to implement new features without
    first getting our support. Sometimes we leave features out on purpose to keep the project small.

Issue description

According to the documentation, the preferred method for subsetting fields in a query through Spark SQL is by using the 'es.read.field.include' option (see Reading DataFrames - Controlling the DataFrame schema). According to the docs, filtering options should be pushed down to the ElasticQuery. However, when using this option alone, the actual queries sent to Elastic DO NOT include a source filtering option. Instead, all fields are queried and the specified fields are only subsetted after the data is returned.

Using built-in DataFrame methods like DataFrame.select(<field>) do not modify the underlying query sent to elastic either.

Adding a "_source" parameter to the query option using DataFrameReader.option('es.query', <query>) does not get passed to the underlying query either, although mappings in the "query" key are.

Finally, using the 'es.read.source.filter' option does modify the query sent to Elastic (by adding a "_source" parameter), but using it results in an error when the dataframe is operated on:

User specified source filters were found [name,timestamp], but the connector is executing in a state where it has provided its own source filtering [name,timestamp,location.address]. Please clear the user specified source fields under the [es.read.source.filter] property to continue. Bailing out...

which is addressed in the docs here.

Steps to reproduce

Code:

import json
from pyspark.sql.session import SparkSession
spark = SparkSession()

# Method 1
sdf_1 = spark.read.format('org.elasticsearch.spark.sql') \
  .options(boilerplate_options) \
  .option('es.read.field.include', "field.a,field.b") \
  .load()
sdf_1 .count()

# Method 2
sdf_2 = spark.read.format('org.elasticsearch.spark.sql') \
  .options(boilerplate_options) \
  .option('es.read.field.include', "field.a,field.b") \
  .load()
sdf_2 = sdf_2.select("field.a", "field.b")
sdf_2.count()

# Method 3
query = {
  "_source": ["field.a", "field.b"],
  "query":{"match_all":{}}
}
sdf_3 = spark.read.format('org.elasticsearch.spark.sql') \
  .options(boilerplate_options) \
  .option('es.read.field.include', "field.a,field.b") \
  .option('es.query', json.dumps(query)) \
  .load()
sdf_3.count()

Strack trace:
No errors were rasied. However, after setting logging on org.elasticsearch.hadoop.rest to TRACE, I saw the following in the logs for each method:

...
24/07/08 20:27:01 TRACE CommonsHttpTransport: Tx [POST]@[NODE_URL:PORT][sample_indx/_search]?[sort=_doc&scroll=5m&size=1000&preference=_shards_ABC&track_total_hits=true] w/ payload [{"slice":{"id":111,"max":700},"query":{"match_all":{}}}]
...
24/07/08 20:28:01 TRACE CommonsHttpTransport: Rx @[EXECUTER_IP] [200-OK] [{"_scroll_id":"ABCDEFGH","took":8248,"timed_out":false,"_shards":{"total":1,"successful":1,"skipped":0,"failed":0},"hits":{"total":{"value":305,"relation":"eq"},"max_score":null,"hits":[{"_index":"sample_indx","_id":"0001","_score":null,"_source":{"field.a":val, "field.b":val, "field.c":val ....}
...
24/07/08 20:29:01 TRACE CommonsHttpTransport: Tx [DELETE]@[NODE_URL:PORT[_search/scroll]?[null] w/ payload [{"scroll_id":["ABCDEFGH"]}]

Given these logs, and the time required to return data, it seems like no field/_source filter is pushed down.

Version Info

OS: DataBricks Runtime 10.4 LTS ML (runs on Ubuntu)
JVM : 1.8.0_382
Hadoop/Spark: 3.2.1
ES-Hadoop : 8.10.0
ES: 8.12:

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant