Skip to content

Commit

Permalink
[SPARK-44548][PYTHON] Add support for pandas-on-Spark DataFrame asser…
Browse files Browse the repository at this point in the history
…tDataFrameEqual

### What changes were proposed in this pull request?
This PR adds support for pandas-on-Spark DataFrame for the testing util, `assertDataFrameEqual`

### Why are the changes needed?
The change allows users to call the same PySpark API for both Spark and pandas DataFrames.

### Does this PR introduce _any_ user-facing change?
Yes, the PR affects the user-facing util `assertDataFrameEqual`

### How was this patch tested?
Added tests to `python/pyspark/sql/tests/test_utils.py` and `python/pyspark/sql/tests/connect/test_utils.py` and existing pandas util tests.

Closes #42158 from asl3/pandas-or-pyspark-df.

Authored-by: Amanda Liu <amanda.liu@databricks.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
  • Loading branch information
asl3 authored and HyukjinKwon committed Jul 28, 2023
1 parent 3cf88cb commit 7c1ad5b
Show file tree
Hide file tree
Showing 8 changed files with 689 additions and 178 deletions.
1 change: 1 addition & 0 deletions dev/sparktestsupport/modules.py
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,7 @@ def __hash__(self):
python_test_goals=[
# doctests
"pyspark.testing.utils",
"pyspark.testing.pandasutils",
],
)

Expand Down
1 change: 1 addition & 0 deletions python/docs/source/reference/pyspark.testing.rst
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,5 @@ Testing
:toctree: api/

assertDataFrameEqual
assertPandasOnSparkEqual
assertSchemaEqual
42 changes: 42 additions & 0 deletions python/pyspark/errors/error_classes.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,42 @@
"Remote client cannot create a SparkContext. Create SparkSession instead."
]
},
"DIFFERENT_PANDAS_DATAFRAME" : {
"message" : [
"DataFrames are not almost equal:",
"Left: <left>",
"<left_dtype>",
"Right: <right>",
"<right_dtype>"
]
},
"DIFFERENT_PANDAS_INDEX" : {
"message" : [
"Indices are not almost equal:",
"Left: <left>",
"<left_dtype>",
"Right: <right>",
"<right_dtype>"
]
},
"DIFFERENT_PANDAS_MULTIINDEX" : {
"message" : [
"MultiIndices are not almost equal:",
"Left: <left>",
"<left_dtype>",
"Right: <right>",
"<right_dtype>"
]
},
"DIFFERENT_PANDAS_SERIES" : {
"message" : [
"Series are not almost equal:",
"Left: <left>",
"<left_dtype>",
"Right: <right>",
"<right_dtype>"
]
},
"DIFFERENT_ROWS" : {
"message" : [
"<error_msg>"
Expand Down Expand Up @@ -233,6 +269,12 @@
"NumPy array input should be of <dimensions> dimensions."
]
},
"INVALID_PANDAS_ON_SPARK_COMPARISON" : {
"message" : [
"Expected two pandas-on-Spark DataFrames",
"but got actual: <actual_type> and expected: <expected_type>"
]
},
"INVALID_PANDAS_UDF" : {
"message" : [
"Invalid function: <detail>"
Expand Down
171 changes: 170 additions & 1 deletion python/pyspark/pandas/tests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#

import pandas as pd
from typing import Union

from pyspark.pandas.indexes.base import Index
from pyspark.pandas.utils import (
Expand All @@ -25,8 +26,14 @@
validate_index_loc,
validate_mode,
)
from pyspark.testing.pandasutils import PandasOnSparkTestCase
from pyspark.testing.pandasutils import (
PandasOnSparkTestCase,
assertPandasOnSparkEqual,
_assert_pandas_equal,
_assert_pandas_almost_equal,
)
from pyspark.testing.sqlutils import SQLTestUtils
from pyspark.errors import PySparkAssertionError

some_global_variable = 0

Expand Down Expand Up @@ -105,6 +112,168 @@ def test_validate_index_loc(self):
with self.assertRaisesRegex(IndexError, err_msg):
validate_index_loc(psidx, -4)

def test_assert_df_assertPandasOnSparkEqual(self):
import pyspark.pandas as ps

psdf1 = ps.DataFrame({"a": [1, 2, 3], "b": [4, 5, 6], "c": [7, 8, 9]})
psdf2 = ps.DataFrame({"a": [1, 2, 3], "b": [4, 5, 6], "c": [7, 8, 9]})

assertPandasOnSparkEqual(psdf1, psdf2, checkRowOrder=False)
assertPandasOnSparkEqual(psdf1, psdf2, checkRowOrder=True)

def test_assertPandasOnSparkEqual_ignoreOrder_default(self):
import pyspark.pandas as ps

psdf1 = ps.DataFrame({"a": [1, 2, 3], "b": [4, 5, 6], "c": [7, 8, 9]})
psdf2 = ps.DataFrame({"a": [2, 1, 3], "b": [5, 4, 6], "c": [8, 7, 9]})

assertPandasOnSparkEqual(psdf1, psdf2)

def test_assert_series_assertPandasOnSparkEqual(self):
import pyspark.pandas as ps

s1 = ps.Series([212.32, 100.0001])
s2 = ps.Series([212.32, 100.0001])

assertPandasOnSparkEqual(s1, s2, checkExact=False)

def test_assert_index_assertPandasOnSparkEqual(self):
import pyspark.pandas as ps

s1 = ps.Index([212.300001, 100.000])
s2 = ps.Index([212.3, 100.0001])

assertPandasOnSparkEqual(s1, s2, almost=True)

def test_assert_error_assertPandasOnSparkEqual(self):
import pyspark.pandas as ps

list1 = [10, 20, 30]
list2 = [10, 20, 30]

with self.assertRaises(PySparkAssertionError) as pe:
assertPandasOnSparkEqual(list1, list2)

self.check_error(
exception=pe.exception,
error_class="INVALID_TYPE_DF_EQUALITY_ARG",
message_parameters={
"expected_type": f"{ps.DataFrame.__name__}, "
f"{ps.Series.__name__}, "
f"{ps.Index.__name__}",
"arg_name": "actual",
"actual_type": type(list1),
},
)

def test_assert_None_assertPandasOnSparkEqual(self):
psdf1 = None
psdf2 = None

assertPandasOnSparkEqual(psdf1, psdf2)

def test_assert_empty_assertPandasOnSparkEqual(self):
import pyspark.pandas as ps

psdf1 = ps.DataFrame()
psdf2 = ps.DataFrame()

assertPandasOnSparkEqual(psdf1, psdf2)

def test_dataframe_error_assert_pandas_equal(self):
pdf1 = pd.DataFrame({"a": [1, 2, 3], "b": [4, 5, 6]}, index=[0, 1, 3])
pdf2 = pd.DataFrame({"a": [1, 3, 3], "b": [4, 5, 6]}, index=[0, 1, 3])

with self.assertRaises(PySparkAssertionError) as pe:
_assert_pandas_equal(pdf1, pdf2, True)

self.check_error(
exception=pe.exception,
error_class="DIFFERENT_PANDAS_DATAFRAME",
message_parameters={
"left": pdf1.to_string(),
"left_dtype": str(pdf1.dtypes),
"right": pdf2.to_string(),
"right_dtype": str(pdf2.dtypes),
},
)

def test_series_error_assert_pandas_equal(self):
series1 = pd.Series([1, 2, 3])
series2 = pd.Series([4, 5, 6])

with self.assertRaises(PySparkAssertionError) as pe:
_assert_pandas_equal(series1, series2, True)

self.check_error(
exception=pe.exception,
error_class="DIFFERENT_PANDAS_SERIES",
message_parameters={
"left": series1,
"left_dtype": series1.dtype,
"right": series2,
"right_dtype": series2.dtype,
},
)

def test_index_error_assert_pandas_equal(self):
index1 = pd.Index([1, 2, 3])
index2 = pd.Index([4, 5, 6])

with self.assertRaises(PySparkAssertionError) as pe:
_assert_pandas_equal(index1, index2, True)

self.check_error(
exception=pe.exception,
error_class="DIFFERENT_PANDAS_INDEX",
message_parameters={
"left": index1,
"left_dtype": index1.dtype,
"right": index2,
"right_dtype": index2.dtype,
},
)

def test_multiindex_error_assert_pandas_almost_equal(self):
pdf1 = pd.DataFrame({"a": [1, 2], "b": [4, 10]}, index=[0, 1])
pdf2 = pd.DataFrame({"a": [1, 5, 3], "b": [1, 5, 6]}, index=[0, 1, 3])
multiindex1 = pd.MultiIndex.from_frame(pdf1)
multiindex2 = pd.MultiIndex.from_frame(pdf2)

with self.assertRaises(PySparkAssertionError) as pe:
_assert_pandas_almost_equal(multiindex1, multiindex2)

self.check_error(
exception=pe.exception,
error_class="DIFFERENT_PANDAS_MULTIINDEX",
message_parameters={
"left": multiindex1,
"left_dtype": multiindex1.dtype,
"right": multiindex2,
"right_dtype": multiindex2.dtype,
},
)

def test_dataframe_error_assert_pandas_on_spark_almost_equal(self):
import pyspark.pandas as ps

psdf1 = ps.DataFrame({"a": [1, 2, 3], "b": [4, 5, 6], "c": [7, 8, 9]})
psdf2 = ps.DataFrame({"a": [1, 2], "b": [4, 5], "c": [7, 8]})

with self.assertRaises(PySparkAssertionError) as pe:
assertPandasOnSparkEqual(psdf1, psdf2, almost=True)

self.check_error(
exception=pe.exception,
error_class="DIFFERENT_PANDAS_DATAFRAME",
message_parameters={
"left": psdf1.to_string(),
"left_dtype": str(psdf1.dtypes),
"right": psdf2.to_string(),
"right_dtype": str(psdf2.dtypes),
},
)


class TestClassForLazyProp:
def __init__(self):
Expand Down
60 changes: 43 additions & 17 deletions python/pyspark/sql/tests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -623,22 +623,47 @@ def test_assert_equal_nulldf(self):
assertDataFrameEqual(df1, df2, checkRowOrder=False)
assertDataFrameEqual(df1, df2, checkRowOrder=True)

def test_assert_error_pandas_df(self):
import pandas as pd
def test_assert_equal_exact_pandas_df(self):
import pyspark.pandas as ps

df1 = pd.DataFrame(data=[10, 20, 30], columns=["Numbers"])
df2 = pd.DataFrame(data=[10, 20, 30], columns=["Numbers"])
df1 = ps.DataFrame(data=[10, 20, 30], columns=["Numbers"])
df2 = ps.DataFrame(data=[10, 20, 30], columns=["Numbers"])

assertDataFrameEqual(df1, df2, checkRowOrder=False)
assertDataFrameEqual(df1, df2, checkRowOrder=True)

def test_assert_equal_exact_pandas_df(self):
import pyspark.pandas as ps

df1 = ps.DataFrame(data=[10, 20, 30], columns=["Numbers"])
df2 = ps.DataFrame(data=[30, 20, 10], columns=["Numbers"])

assertDataFrameEqual(df1, df2)

def test_assert_equal_approx_pandas_df(self):
import pyspark.pandas as ps

df1 = ps.DataFrame(data=[10.0001, 20.32, 30.1], columns=["Numbers"])
df2 = ps.DataFrame(data=[10.0, 20.32, 30.1], columns=["Numbers"])

assertDataFrameEqual(df1, df2, checkRowOrder=False)
assertDataFrameEqual(df1, df2, checkRowOrder=True)

def test_assert_error_pandas_pyspark_df(self):
import pyspark.pandas as ps

df1 = ps.DataFrame(data=[10, 20, 30], columns=["Numbers"])
df2 = self.spark.createDataFrame([(10,), (11,), (13,)], ["Numbers"])

with self.assertRaises(PySparkAssertionError) as pe:
assertDataFrameEqual(df1, df2)
assertDataFrameEqual(df1, df2, checkRowOrder=False)

self.check_error(
exception=pe.exception,
error_class="INVALID_TYPE_DF_EQUALITY_ARG",
error_class="INVALID_PANDAS_ON_SPARK_COMPARISON",
message_parameters={
"expected_type": DataFrame,
"arg_name": "df",
"actual_type": pd.DataFrame,
"actual_type": type(df1),
"expected_type": type(df2),
},
)

Expand All @@ -647,15 +672,16 @@ def test_assert_error_pandas_df(self):

self.check_error(
exception=pe.exception,
error_class="INVALID_TYPE_DF_EQUALITY_ARG",
error_class="INVALID_PANDAS_ON_SPARK_COMPARISON",
message_parameters={
"expected_type": DataFrame,
"arg_name": "df",
"actual_type": pd.DataFrame,
"actual_type": type(df1),
"expected_type": type(df2),
},
)

def test_assert_error_non_pyspark_df(self):
import pyspark.pandas as ps

dict1 = {"a": 1, "b": 2}
dict2 = {"a": 1, "b": 2}

Expand All @@ -666,8 +692,8 @@ def test_assert_error_non_pyspark_df(self):
exception=pe.exception,
error_class="INVALID_TYPE_DF_EQUALITY_ARG",
message_parameters={
"expected_type": DataFrame,
"arg_name": "df",
"expected_type": f"{DataFrame.__name__}, {ps.DataFrame.__name__}",
"arg_name": "actual",
"actual_type": type(dict1),
},
)
Expand All @@ -679,8 +705,8 @@ def test_assert_error_non_pyspark_df(self):
exception=pe.exception,
error_class="INVALID_TYPE_DF_EQUALITY_ARG",
message_parameters={
"expected_type": DataFrame,
"arg_name": "df",
"expected_type": f"{DataFrame.__name__}, {ps.DataFrame.__name__}",
"arg_name": "actual",
"actual_type": type(dict1),
},
)
Expand Down
4 changes: 3 additions & 1 deletion python/pyspark/testing/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,6 @@
#
from pyspark.testing.utils import assertDataFrameEqual, assertSchemaEqual

__all__ = ["assertDataFrameEqual", "assertSchemaEqual"]
from pyspark.testing.pandasutils import assertPandasOnSparkEqual

__all__ = ["assertDataFrameEqual", "assertSchemaEqual", "assertPandasOnSparkEqual"]
Loading

0 comments on commit 7c1ad5b

Please sign in to comment.