From 9dc04aabe2eb113fccbe09baa5d2911a982a2611 Mon Sep 17 00:00:00 2001 From: Yinqing Hao Date: Fri, 2 Feb 2024 09:54:27 +0800 Subject: [PATCH] Update hadoop cmd to list all files directly based on the pattern Signed-off-by: Yinqing Hao --- .../src/main/python/parquet_testing_test.py | 24 +++++-------------- 1 file changed, 6 insertions(+), 18 deletions(-) diff --git a/integration_tests/src/main/python/parquet_testing_test.py b/integration_tests/src/main/python/parquet_testing_test.py index d4d2d4623ae..267dd43441c 100644 --- a/integration_tests/src/main/python/parquet_testing_test.py +++ b/integration_tests/src/main/python/parquet_testing_test.py @@ -20,8 +20,6 @@ from data_gen import copy_and_update, non_utc_allow from marks import allow_non_gpu from pathlib import Path -import re -import fnmatch import subprocess import pytest from spark_session import is_before_spark_330, is_spark_350_or_later @@ -84,23 +82,18 @@ def hdfs_glob(path, pattern): :return: generator of matched files """ path_str = path.as_posix() - cmd = ['hadoop', 'fs', '-ls', '-R', path_str] + full_pattern = path_str + '/' + pattern + cmd = ['hadoop', 'fs', '-ls', '-C', full_pattern] process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True) stdout, stderr = process.communicate() if process.returncode != 0: - warnings.warn('Failed to list files from hdfs: {}. Error: {}'.format(path_str, stderr)) - return iter(()) + raise AssertionError('Failed to list files from hdfs: {}. Error: {}'.format(path_str, stderr)) - lines = stdout.strip().split('\n') - paths = [line.split()[-1] for line in lines] - compiled_pattern = re.compile(fnmatch.translate(pattern), re.IGNORECASE) + paths = stdout.strip().split('\n') for p in paths: - p = Path(p) - relative_path = p.relative_to(path_str).as_posix() - if compiled_pattern.match(relative_path): - yield p + yield Path(p) def glob(path, pattern): """ @@ -115,12 +108,7 @@ def glob(path, pattern): if not path_str.startswith('hdfs:'): return path.glob(pattern) - try: - return hdfs_glob(path, pattern) - except Exception as e: - warnings.warn('Failed to run glob path from hdfs: {}. Error: {}'.format(path_str, str(e))) - - return iter(()) + return hdfs_glob(path, pattern) def locate_parquet_testing_files(): """