Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Call globStatus directly via PY4J in hdfs_glob to avoid calling hadoop command #10599

Merged
merged 5 commits into from
Mar 19, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 30 additions & 34 deletions integration_tests/src/main/python/parquet_testing_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@
# Tests based on the Parquet dataset available at
# https://github.com/apache/parquet-testing

import os
from asserts import assert_gpu_and_cpu_are_equal_collect, assert_gpu_and_cpu_error
from conftest import get_std_input_path, is_parquet_testing_tests_forced, is_precommit_run, is_not_utc
from data_gen import copy_and_update, non_utc_allow
from marks import allow_non_gpu
from pathlib import Path
import subprocess
import pytest
from spark_session import is_before_spark_330, is_spark_350_or_later
import warnings
Expand Down Expand Up @@ -73,42 +73,38 @@
_error_files["lz4_raw_compressed.parquet"] = "Exception"
_error_files["lz4_raw_compressed_larger.parquet"] = "Exception"

def hdfs_glob(path, pattern):
def hdfs_glob(path_str, pattern):
"""
Finds hdfs files by checking the input path with glob pattern

:param path: hdfs path to check
:type path: pathlib.Path
:param path_str: hdfs path to check
:type path_str: str
:return: generator of matched files
"""
path_str = path.as_posix()
full_pattern = path_str + '/' + pattern
cmd = ['hadoop', 'fs', '-ls', '-C', full_pattern]

process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)
jlowe marked this conversation as resolved.
Show resolved Hide resolved
stdout, stderr = process.communicate()
if process.returncode != 0:
raise AssertionError(f'Failed to list files from {path_str}. Error: {stderr}')

paths = stdout.strip().split('\n')

for p in paths:
yield Path(p)

def glob(path, pattern):
from spark_init_internal import get_spark_i_know_what_i_am_doing
full_pattern = os.path.join(path_str, pattern)
sc = get_spark_i_know_what_i_am_doing().sparkContext
config = sc._jsc.hadoopConfiguration()
fs_path = sc._jvm.org.apache.hadoop.fs.Path(full_pattern)
fs = sc._jvm.org.apache.hadoop.fs.FileSystem.get(fs_path.toUri(), config)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit. no need to fix. a handy idiom especially if you need to access the fs object only once with the added benefit avoiding accessing sc._jvm

Suggested change
fs = sc._jvm.org.apache.hadoop.fs.FileSystem.get(fs_path.toUri(), config)
fs = fs_path.getFileSystem(config)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is helpful. Thanks!

statuses = fs.globStatus(fs_path)
for status in statuses:
yield status.getPath().toString()

def glob(path_str, pattern):
"""
Finds files by checking the input path with glob pattern.
Support local file system and hdfs

:param path: input path to check
:type path: pathlib.Path
:param path_str: input path to check
:type path_str: str
:return: generator of matched files
"""
path_str = path.as_posix()
if not path_str.startswith('hdfs:'):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not for this PR, but this should be agnostic of a particular non-local FileSystem path.

return path.glob(pattern)
path_list = Path(path_str).glob(pattern)
return [path.as_posix() for path in path_list]

return hdfs_glob(path, pattern)
return hdfs_glob(path_str, pattern)

def locate_parquet_testing_files():
"""
Expand All @@ -121,15 +117,15 @@ def locate_parquet_testing_files():
glob_patterns = ("parquet-testing/data/*.parquet", "parquet-testing/bad_data/*.parquet")
places = []
std_path = get_std_input_path()
if std_path: places.append(Path(std_path))
places.append(Path(__file__).parent.joinpath("../../../../thirdparty").resolve())
if std_path: places.append(std_path)
places.append(Path(__file__).parent.joinpath("../../../../thirdparty").resolve().as_posix())
for p in places:
files = []
for pattern in glob_patterns:
files += glob(p, pattern)
if files:
return files
locations = ", ".join([ p.joinpath(g).as_posix() for p in places for g in glob_patterns])
locations = ", ".join([os.path.join(p, g) for p in places for g in glob_patterns])
# TODO: Also fail for nightly tests when nightly scripts have been updated to initialize
# the git submodules when pulling spark-rapids changes.
# https://github.com/NVIDIA/spark-rapids/issues/8677
Expand All @@ -141,22 +137,22 @@ def locate_parquet_testing_files():
def gen_testing_params_for_errors():
result = []
for f in locate_parquet_testing_files():
error_obj = _error_files.get(f.name, None)
error_obj = _error_files.get(os.path.basename(f), None)
if error_obj is not None:
result.append((f.as_posix(), error_obj))
result.append((f, error_obj))
return result

def gen_testing_params_for_valid_files():
files = []
for f in locate_parquet_testing_files():
if f.name in _error_files:
basename = os.path.basename(f)
if basename in _error_files:
continue
path = f.as_posix()
xfail_reason = _xfail_files.get(f.name, None)
xfail_reason = _xfail_files.get(basename, None)
if xfail_reason:
files.append(pytest.param(path, marks=pytest.mark.xfail(reason=xfail_reason)))
files.append(pytest.param(f, marks=pytest.mark.xfail(reason=xfail_reason)))
else:
files.append(path)
files.append(f)
return files

@pytest.mark.parametrize("path", gen_testing_params_for_valid_files())
Expand Down
Loading