Skip to content

Commit

Permalink
Update hadoop cmd to list all files directly based on the pattern
Browse files Browse the repository at this point in the history
Signed-off-by: Yinqing Hao <haoyinqing@gmail.com>
  • Loading branch information
yinqingh committed Feb 2, 2024
1 parent 4ccccba commit 9dc04aa
Showing 1 changed file with 6 additions and 18 deletions.
24 changes: 6 additions & 18 deletions integration_tests/src/main/python/parquet_testing_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,6 @@
from data_gen import copy_and_update, non_utc_allow
from marks import allow_non_gpu
from pathlib import Path
import re
import fnmatch
import subprocess
import pytest
from spark_session import is_before_spark_330, is_spark_350_or_later
Expand Down Expand Up @@ -84,23 +82,18 @@ def hdfs_glob(path, pattern):
:return: generator of matched files
"""
path_str = path.as_posix()
cmd = ['hadoop', 'fs', '-ls', '-R', path_str]
full_pattern = path_str + '/' + pattern
cmd = ['hadoop', 'fs', '-ls', '-C', full_pattern]

process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)
stdout, stderr = process.communicate()
if process.returncode != 0:
warnings.warn('Failed to list files from hdfs: {}. Error: {}'.format(path_str, stderr))
return iter(())
raise AssertionError('Failed to list files from hdfs: {}. Error: {}'.format(path_str, stderr))

lines = stdout.strip().split('\n')
paths = [line.split()[-1] for line in lines]
compiled_pattern = re.compile(fnmatch.translate(pattern), re.IGNORECASE)
paths = stdout.strip().split('\n')

for p in paths:
p = Path(p)
relative_path = p.relative_to(path_str).as_posix()
if compiled_pattern.match(relative_path):
yield p
yield Path(p)

def glob(path, pattern):
"""
Expand All @@ -115,12 +108,7 @@ def glob(path, pattern):
if not path_str.startswith('hdfs:'):
return path.glob(pattern)

try:
return hdfs_glob(path, pattern)
except Exception as e:
warnings.warn('Failed to run glob path from hdfs: {}. Error: {}'.format(path_str, str(e)))

return iter(())
return hdfs_glob(path, pattern)

def locate_parquet_testing_files():
"""
Expand Down

0 comments on commit 9dc04aa

Please sign in to comment.