Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

udf_apply_feature_dataframe UDF in executor? #458

Open
soxofaan opened this issue Jun 20, 2023 · 1 comment
Open

udf_apply_feature_dataframe UDF in executor? #458

soxofaan opened this issue Jun 20, 2023 · 1 comment
Assignees
Labels

Comments

@soxofaan
Copy link
Member

(I stumbled on this issue while working on #437 / Open-EO/openeo-python-driver#197)

#251 / #262 added parallelized UDF execution on vector cubes (udf_apply_feature_dataframe and udf_apply_udf_data entrypoints), as documented at https://github.com/Open-EO/openeo-geopyspark-driver/blob/1f0ad56cc749d9f3ade315a85f39f1200f74168c/docs/vectorcube-run_udf.md . The idea was to get parallelization and executor isolation automatically by using the pyspark.pandas with apply

However, it seems that a pyspark.pandas apply callback does not run in the executors, but just in the driver.

example snippet to illustrate:

import openeo
import openeo.processes
connection = openeo.connect("openeo.vito.be").authenticate_oidc()
cube = connection.load_collection(
    "TERRASCOPE_S2_TOC_V2",
    temporal_extent=["2023-03-01", "2023-03-20"],
    bands=["B02"],
)
geometries = {"type": "Polygon", "coordinates": [[[3.68, 51.04], [3.69, 51.04], [3.69, 51.05], [3.68, 51.05], [3.68, 51.04]]]}
aggregates = cube.aggregate_spatial(geometries=geometries, reducer="mean")
udf_code = """
import pandas as pd
import pyspark

def udf_apply_feature_dataframe(df: pd.DataFrame):
    # Executor detection based on pyspark.SparkContext._assert_on_driver
    in_executor = (pyspark.TaskContext.get() is not None)
    raise ValueError(f"{in_executor=}")
"""
processed = openeo.processes.run_udf(data=aggregates, udf=udf_code, runtime="Python")
connection.download(processed, outputfile="tmp.json")

This fails with: Internal: Server error: ValueError('in_executor=False') indicating the callback did not run in executor

@soxofaan
Copy link
Member Author

as comparison, here is UDF usage with openeo apply, where the UDF does run in executor:

s2_cube = connection.load_collection(
    "TERRASCOPE_S2_TOC_V2",
    spatial_extent={"west": 4.00, "south": 51.00, "east": 4.01, "north": 51.01},
    temporal_extent=["2022-03-01", "2022-03-31"],
    bands=["B02"]
)
udf = openeo.UDF("""
import pyspark
from openeo.udf import XarrayDataCube

def apply_datacube(cube: XarrayDataCube, context: dict) -> XarrayDataCube:
    # Executor detection based on pyspark.SparkContext._assert_on_driver
    in_executor = (pyspark.TaskContext.get() is not None)
    raise ValueError(f"{in_executor=}")
""")
rescaled = s2_cube.apply(process=udf)
rescaled.download("udf-in-executor-apply_datacube-tmp.nc")

which fails with [500] Internal: Server error: UDF Exception during Spark execution: ... ValueError: in_executor=True indicating the UDF ran in an executor

@soxofaan soxofaan added the bug label Jun 20, 2023
jdries added a commit that referenced this issue Aug 23, 2024
@jdries jdries self-assigned this Aug 23, 2024
jdries added a commit that referenced this issue Aug 23, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants