Skip to content

Latest commit

 

History

History
165 lines (129 loc) · 4.97 KB

vectorcube-run_udf.md

File metadata and controls

165 lines (129 loc) · 4.97 KB

Using run_udf on a vector cube data cube

The openEO process run_udf is typically used on raster data cubes in the "callback" of a process like reduce_dimension, apply_dimension, ... where the UDF operates on a slice of raster data, provided in some kind of multidimensional array format (like numpy, Xarray, pandas, ...).

The VITO/Terrascope openEO back-end also adds experimental support to use run_udf directly on a vector cube, e.g. to filter, transform, enrich, postprocess the vector data. In the original implementation (which is still the default), the back-end calls the UDF with the whole vector data set as input. This was fine as proof of concept, but did not scale well for large vector cubes as there was no way to leverage parallelization.

Parallelized run_udf on vector cube data

Under Open-EO/openeo-geopyspark-driver#251, the experimental run_udf support on vector cubes was further expanded to allow parallelized execution of the UDF logic.

The user-provided UDF is expected to work at the level of single geometries and must follow the UDF signatures described below.

In the examples below, we will assume to apply the run_udf process on the result of the aggregate_spatial process. For example, something like this:

import openeo
import openeo.processes
...
cube = connection.load_collection(...)
aggregates = cube.aggregate_spatial(geometries, reducer="mean")

result = openeo.processes.run_udf(data=aggregates, udf=udf_code, runtime="Python")

The resulting dataframe structure (to be downloaded in JSON format) is currently structured according to the "split" mode of pandas.DataFrame.to_dict For example, when using synchronous execution/download:

>>> data = result.execute()
>>> data
{
    "columns": ["feature_index", "mean(band_0)", "mean(band_1)", "mean(band_2)"],
    "data": [
        [0, 0.4, 0.8, 1.4],
        [1, 0.3, 0.6, 1.9],
        ...

Simple "Pandas DataFrame" mode with udf_apply_feature_dataframe

This mode can be enabled by defining your UDF entry point function as udf_apply_feature_dataframe, which will be given a pandas DataFrame, containing the data of a single geometry/feature in your vector cube:

import pandas as pd

def udf_apply_feature_dataframe(df: pd.DataFrame):
    # df contains data for a single geometry/feature
    # with time dimension as index, and band dimension as columns
    ...

Depending on your use case, you can return different values:

  • return a scalar (reduce all data of a feature to a single value), for example (extremely simplified example):

    def udf_apply_feature_dataframe(df: pd.DataFrame) -> float:
      return 123.456

    The resulting output data structure will list the returned scalar value for each geometry/feature, e.g.:

    {
      "columns": ["feature_index", "0"],
      "data": [
        [0, 123.456],
        [1, 123.456],
        ...
      ],
      ...
    }
  • return a pandas Series to:

    • reduce the time dimension:

      def udf_apply_feature_dataframe(df: pd.DataFrame) -> pd.Series:
          # Sum along index (time dimension)
          return df.sum(axis=0)
    • reduce the band dimension (make sure to convert the time index labels to strings):

      def udf_apply_feature_dataframe(df: pd.DataFrame) -> pd.Series:
          # Sum along columns (band dimension)
          series = df.sum(axis=0)
          # Make sure index labels are strings
          series.index = series.index.strftime("%Y-%m-%d")
          return series

    The resulting output data structure will list the calculated values per geometry as follows:

    {
      "columns": ["feature_index", "mean(band_0)", "mean(band_1)", "mean(band_2)"],
      "data": [
        [0, 0.4, 0.8, 1.4],
        [1, 1.3, 0.3, 2.3],
        ...
      ],
      ...
    }
  • return a full pandas DataFrame, for example (very simplified example)

    def udf_apply_feature_dataframe(df: pd.DataFrame) -> pd.DataFrame:
        return df + 1000

    The resulting output data structure will encode the preserved time dimension and band dimension as follows:

    {
      "columns": ["feature_index", "date", "mean(band_0)", "mean(band_1)", "mean(band_2)"],
      "data": [
        [0, "2021-01-05T00:00:00.000Z", 1000.4, 1000.8, 1001.4],
        [0, "2021-02-12T00:00:00.000Z", 1000.8, 1002.8, 1000.9],
        [1, "2021-01-05T00:00:00.000Z", 1001.8, 1000.3, 1002.7],
        [1, "2021-01-12T00:00:00.000Z", 1000.3, 1000.9, 1001.6],
        ...
      ],
      ...
    }

Classic UdfData mode with udf_apply_udf_data

This mode is more cumbersome to work with, because there is more unpacking and packing boilerplate code necessary.

See the unit tests for this mode for more information:

def test_udf_apply_udf_data_scalar(self, api100):
# TODO: influence of tight spatial_extent that excludes some geometries? e.g.:
cube = self._load_cube()
geometries = get_test_data_file("geometries/FeatureCollection03.json")
aggregates = cube.aggregate_spatial(geometries, "min")
udf = textwrap.dedent(
"""
from openeo.udf import UdfData
def udf_apply_udf_data(udf_data: UdfData) -> float:
data = udf_data.get_structured_data_list()[0].data
# Data's structure: {datetime: [[float for each band] for each polygon]}
assert isinstance(data, dict)
# Convert to single scalar value
((_, lon, lat),) = data["2021-01-05T00:00:00Z"]
return 1000 * lon + lat
"""
)
processed = openeo.processes.run_udf(aggregates, udf=udf, runtime="Python")
result = api100.check_result(processed).json
result = drop_empty_from_aggregate_polygon_result(result)
assert result == DictSubSet(
columns=["feature_index", "0"],
data=IgnoreOrder(
[
[0, 1001.0],
[1, 4002.0],
[2, 2004.0],
[3, 5000.0],
]
),
)
def test_udf_apply_udf_data_reduce_bands(self, api100):
cube = self._load_cube(temporal_extent=["2021-01-01", "2021-01-20"])
geometries = get_test_data_file("geometries/FeatureCollection03.json")
aggregates = cube.aggregate_spatial(geometries, "min")
udf = textwrap.dedent(
"""
from openeo.udf import UdfData, StructuredData
def udf_apply_udf_data(udf_data: UdfData) -> UdfData:
data = udf_data.get_structured_data_list()[0].data
# Data's structure: {datetime: [[float for each band] for each polygon]}
# Convert to {datetime: [float for each polygon]}
data = {date: [sum(bands) for bands in geometry_data] for date, geometry_data in data.items()}
return UdfData(structured_data_list=[StructuredData(data)])
"""
)
processed = openeo.processes.run_udf(aggregates, udf=udf, runtime="Python")
result = api100.check_result(processed).json
result = drop_empty_from_aggregate_polygon_result(result)
assert result == DictSubSet(
columns=["feature_index", "level_1", "0"],
data=IgnoreOrder(
[
[0, "2021-01-05T00:00:00Z", 5 + 1 + 1],
[0, "2021-01-15T00:00:00Z", 15 + 1 + 1],
[1, "2021-01-05T00:00:00Z", 5 + 4 + 2],
[1, "2021-01-15T00:00:00Z", 15 + 4 + 2],
[2, "2021-01-05T00:00:00Z", 5 + 2 + 4],
[2, "2021-01-15T00:00:00Z", 15 + 2 + 4],
[3, "2021-01-05T00:00:00Z", 5 + 5 + 0],
[3, "2021-01-15T00:00:00Z", 15 + 5 + 0],
]
),
)
def test_udf_apply_udf_data_reduce_time(self, api100):
cube = self._load_cube()
geometries = get_test_data_file("geometries/FeatureCollection03.json")
aggregates = cube.aggregate_spatial(geometries, "min")
udf = textwrap.dedent(
"""
from openeo.udf import UdfData, StructuredData
import functools
def udf_apply_udf_data(udf_data: UdfData) -> UdfData:
data = udf_data.get_structured_data_list()[0].data
# Data's structure: {datetime: [[float for each band] for each polygon]}
# Convert to [[float for each band] for each polygon]}
data = functools.reduce(
lambda d1, d2: [[b1 + b2 for (b1, b2) in zip(d1[0], d2[0])]],
data.values()
)
return UdfData(structured_data_list=[StructuredData(data)])
"""
)
processed = openeo.processes.run_udf(aggregates, udf=udf, runtime="Python")
result = api100.check_result(processed).json
result = drop_empty_from_aggregate_polygon_result(result)
assert result == DictSubSet(
columns=["feature_index", "level_1", "0", "1", "2"],
data=IgnoreOrder(
[
[0, 0, 5 + 15 + 25, 3 * 1, 3 * 1],
[1, 0, 5 + 15 + 25, 3 * 4, 3 * 2],
[2, 0, 5 + 15 + 25, 3 * 2, 3 * 4],
[3, 0, 5 + 15 + 25, 3 * 5, 3 * 0],
]
),
)
def test_udf_apply_udf_data_return_series(self, api100):
cube = self._load_cube()
geometries = get_test_data_file("geometries/FeatureCollection03.json")
aggregates = cube.aggregate_spatial(geometries, "min")
udf = textwrap.dedent(
"""
import pandas
from openeo.udf import UdfData, StructuredData
def udf_apply_udf_data(udf_data: UdfData) -> UdfData:
data = udf_data.get_structured_data_list()[0].data
# Data's structure: {datetime: [[float for each band] for each polygon]}
# Convert to series {"start": sum(bands), "end": sum(bands)}
series = pandas.Series({
"start": sum(min(data.items())[1][0]),
"end": sum(max(data.items())[1][0]),
})
return series
"""
)
processed = openeo.processes.run_udf(aggregates, udf=udf, runtime="Python")
result = api100.check_result(processed).json
result = drop_empty_from_aggregate_polygon_result(result)
assert result == DictSubSet(
columns=["feature_index", "start", "end"],
data=IgnoreOrder(
[
[0, 5 + 1 + 1, 25 + 1 + 1],
[1, 5 + 4 + 2, 25 + 4 + 2],
[2, 5 + 2 + 4, 25 + 2 + 4],
[3, 5 + 5 + 0, 25 + 5 + 0],
]
),
)
def test_udf_apply_udf_data_return_dataframe(self, api100):
cube = self._load_cube()
geometries = get_test_data_file("geometries/FeatureCollection03.json")
aggregates = cube.aggregate_spatial(geometries, "min")
udf = textwrap.dedent(
"""
import pandas
from openeo.udf import UdfData, StructuredData
def udf_apply_udf_data(udf_data: UdfData) -> UdfData:
data = udf_data.get_structured_data_list()[0].data
# Data's structure: {datetime: [[float for each band] for each polygon]}
# Convert to series {"start": [floats], "end": [floats]}
start_values = min(data.items())[1][0]
end_values = max(data.items())[1][0]
df = pandas.DataFrame(
{
"start": [min(start_values), max(start_values)],
"end": [min(end_values), max(end_values)],
},
index = pandas.Index(["min", "max"], name="band_range")
)
return df
"""
)
processed = openeo.processes.run_udf(aggregates, udf=udf, runtime="Python")
result = api100.check_result(processed).json
result = drop_empty_from_aggregate_polygon_result(result)
assert result == DictSubSet(
columns=["feature_index", "band_range", "start", "end"],
data=IgnoreOrder(
[
[0, "max", 5, 25],
[0, "min", 1, 1],
[1, "max", 5, 25],
[1, "min", 2, 2],
[2, "max", 5, 25],
[2, "min", 2, 2],
[3, "max", 5, 25],
[3, "min", 0, 0],
]
),
)