Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding HDFS support for data generation #188

Merged
merged 11 commits into from
Jul 3, 2024
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions nds-h/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,9 @@ To generate data for local -

```bash
$ python nds_h_gen_data.py -h
usage: nds_h_gen_data.py [-h] <scale> <parallel> <data_dir> [--overwrite_output]
usage: nds_h_gen_data.py [-h] {local,hdfs} <scale> <parallel> <data_dir> [--overwrite_output]
positional arguments:
{local,hdfs} file system to save the generated data.
mattahrens marked this conversation as resolved.
Show resolved Hide resolved
scale volume of data to generate in GB.
parallel build data in <parallel_value> separate chunks
data_dir generate data in directory.
Expand All @@ -118,7 +119,7 @@ optional arguments:
Example command:

```bash
python nds_h_gen_data.py 100 100 /data/raw_sf100 --overwrite_output
python nds_h_gen_data.py hdfs 100 100 /data/raw_sf100 --overwrite_output
```

### Convert DSV to Parquet or Other data sources
Expand Down
115 changes: 101 additions & 14 deletions nds-h/nds_h_gen_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import os
import sys
import subprocess
import shutil

#For adding utils to path
parent_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))
Expand All @@ -58,6 +59,7 @@
'supplier'
]


def generate_data_local(args, range_start, range_end, tool_path):
"""Generate data to local file system. TPC-DS tool will generate all table data under target
folder without creating sub-folders for each table. So we add extra code to create sub folder
Expand Down Expand Up @@ -87,10 +89,10 @@ def generate_data_local(args, range_start, range_end, tool_path):
procs = []
for i in range(range_start, range_end + 1):
dbgen = ["-s", args.scale,
"-C", args.parallel,
"-S", str(i),
"-v", "Y",
"-f","Y"]
"-C", args.parallel,
"-S", str(i),
"-v", "Y",
"-f", "Y"]
procs.append(subprocess.Popen(
["./dbgen"] + dbgen, cwd=str(work_dir)))
# wait for data generation to complete
Expand All @@ -104,45 +106,130 @@ def generate_data_local(args, range_start, range_end, tool_path):
for table in table_names:
print('mkdir -p {}/{}'.format(data_dir, table))
subprocess.run(['mkdir', '-p', data_dir + '/' + table])
if (table != 'region' and table !='nation'):
if (table != 'region' and table != 'nation'):
for i in range(range_start, range_end + 1):
subprocess.run(['mv', f'{work_dir}/{table}.tbl.{i}',
f'{data_dir}/{table}/'], stderr=subprocess.DEVNULL)
else:
subprocess.run(['mv', f'{work_dir}/{table}.tbl',
f'{data_dir}/{table}/'], stderr=subprocess.DEVNULL)
f'{data_dir}/{table}/'], stderr=subprocess.DEVNULL)
# delete date file has no parallel number suffix in the file name, move separately
# show summary
subprocess.run(['du', '-h', '-d1', data_dir])


def clean_temp_data(temp_data_path):
cmd = ['hadoop', 'fs', '-rm', '-r', '-skipTrash', temp_data_path]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that beyond subpar user-perceived delays with shelling out to launch heavy JVMs, we hit limitations in the past where hadoop CLI is not available. If we document that this script can be launched via spark-submit than we can use PY4J NVIDIA/spark-rapids#10599

On the other hand why do we need to wrap a Java program in Python CLI to begin with?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The java program is a mapper triggered only when generating data for hdfs. In case of local data generation, the python wrapper does not trigger a mapreduce job.
For the missing hadoop cli, there is a primary check in the python program triggering install hadoop cli message just for verbosity.
Here the hadoop job is just creating a limiting set of directories ( 8 in total - 1 per nds-h table) and moving nds-h generated data to the required folders.
Currently this is not being triggered via spark-submit.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is more like a tech debt. When initializing this project, the order was to use Python and void the one that DB is using. I argued that time that we can also use Scala but failed.

More details I can recall, to avoid the way that we chain call "python-hdfs", the best option was to leverage the

_FILTER =  [Y|N]         -- output data to stdout

argument to pipe text output to stdout then pipe it directly into Spark Dataframe. (This is also what DB does) In this way we don't need any hadoop job to help generate distributed dataset.

Unluckily, latest TPC-DS v.3.20 disabled this argument, and the order was to use latest version and try our best not to modify it.

Thus it becomes what it is now.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As per allen's comment, I can pick this up as a separate issue later to figure out if there is any alternate solution to avoid chaining hadoop commands from python.

print(" ".join(cmd))
subprocess.run(cmd)


def merge_temp_tables(temp_data_path, parent_data_path):
"""Helper functions for incremental data generation. Move data in temporary child range path to
parent directory.

Args:
temp_data_path (str): temorary child range data path
parent_data_path (str): parent data path
"""
table_names = source_table_names
for table_name in table_names:
# manually create table sub-folders
# redundant step if it's not the first range part.
cmd = ['hadoop', 'fs', '-mkdir', parent_data_path + '/' + table_name]
print(" ".join(cmd))
subprocess.run(cmd)
# move temp content to upper folder
# note not all tables are generated in different child range step
# please ignore messages like "mv: `.../reason/*': No such file or directory"
temp_table_data_path = temp_data_path + '/' + table_name + '/*'
cmd = ['hadoop', 'fs', '-mv', temp_table_data_path,
parent_data_path + '/' + table_name + '/']
print(" ".join(cmd))
subprocess.run(cmd)
clean_temp_data(temp_data_path)


def generate_data_hdfs(args, jar_path):
"""generate data to hdfs using TPC-DS dsdgen tool. Support incremental generation: due to the
limit of hdfs, each range data will be generated under a temporary folder then move to target
folder.

Args:
args (Namespace): Namespace from argparser
jar_path (str): path to the target jar

Raises:
Exception: if Hadoop binary is not installed.
"""
# Check if hadoop is installed.
if shutil.which('hadoop') is None:
raise Exception('No Hadoop binary found in current environment, ' +
'please install Hadoop for data generation in cluster.')
# Submit hadoop MR job to generate data
cmd = ['hadoop', 'jar', str(jar_path)]
cmd += ['-p', args.parallel, '-s', args.scale]
# get dsdgen.jar path, assume user won't change file structure
tpcds_gen_path = jar_path.parent.parent.absolute()
if args.overwrite_output:
cmd += ['-o']
if args.range:
# use a temp folder to save the specific range data.
# will move the content to parent folder afterwards.
# it's a workaround for "Output directory ... already exists" in incremental generation
temp_data_path = args.data_dir + '/_temp_'
# before generation, we remove "_temp_" folders in case they contain garbage generated by
# previous user runs.
clean_temp_data(temp_data_path)
cmd.extend(["-r", args.range])
cmd.extend(["-d", temp_data_path])
try:
subprocess.run(cmd, check=True, cwd=str(tpcds_gen_path))
# only move delete table for data maintenance
merge_temp_tables(temp_data_path, args.data_dir)
finally:
clean_temp_data(temp_data_path)
else:
cmd.extend(["-d", args.data_dir])
subprocess.run(cmd, check=True, cwd=str(tpcds_gen_path))
# only move delete table for data maintenance


def generate_data(args):
tool_path = check_build_nds_h()
jar_path, tool_path = check_build_nds_h()
range_start = 1
range_end = int(args.parallel)
if args.range:
range_start, range_end = valid_range(args.range, args.parallel)
generate_data_local(args, range_start, range_end, tool_path)
if args.type == 'hdfs':
generate_data_hdfs(args, jar_path)
else:
generate_data_local(args, range_start, range_end, tool_path)


if __name__ == "__main__":
parser = parser = argparse.ArgumentParser()
parser.add_argument("type",
choices=["local", "hdfs"],
help="file system to save the generated data.")
parser.add_argument("scale",
help="volume of data to generate in GB. Accepted SF - 1,10, 100, 300, 1000 \
,3000, 10000, 30000,"
)
)
parser.add_argument("parallel",
type=parallel_value_type,
help="build data in <parallel_value> separate chunks"
)
)
parser.add_argument("data_dir",
help="generate data in directory.")
parser.add_argument('--range',
help='Used for incremental data generation, meaning which part of child' +
'chunks are generated in one run. Format: "start,end", both are inclusive. ' +
'e.g. "1,100". Note: the child range must be within the "parallel", ' +
'"--parallel 100 --range 100,200" is illegal.')
'chunks are generated in one run. Format: "start,end", both are inclusive. ' +
'e.g. "1,100". Note: the child range must be within the "parallel", ' +
'"--parallel 100 --range 100,200" is illegal.')
parser.add_argument("--overwrite_output",
action="store_true",
help="overwrite if there has already existing data in the path provided.")

args = parser.parse_args()
generate_data(args)
2 changes: 1 addition & 1 deletion nds-h/nds_h_gen_query_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ def generate_query_streams(args, tool_path):
subprocess.run(base_cmd, check=True, cwd=str(work_dir),stdout=f)

if __name__ == "__main__":
tool_path = check_build_nds_h()
jar_path, tool_path = check_build_nds_h()
parser = parser = argparse.ArgumentParser()
parser.add_argument("scale",
help="Assume a database of this scale factor.")
Expand Down
13 changes: 9 additions & 4 deletions nds-h/tpch-gen/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
# limitations under the License.
#

all: check-tpch-env prepare-target copy-dbgen modify-makefile modify-tpc-h build-dbgen
all: check-tpch-env prepare-target copy-dbgen modify-makefile modify-tpc-h build-dbgen make-jar build-package

check-tpch-env:
ifndef TPCH_HOME
Expand All @@ -33,8 +33,6 @@ copy-dbgen:
# file and the sql files
cd "$(TPCH_HOME)/dbgen/queries"; dos2unix *.sql
cd "$(TPCH_HOME)/dbgen/queries"; dos2unix *.patch
# Unapply any patch if already done
-cd "$(TPCH_HOME)/dbgen/queries"; cat *.patch | patch -R -p1 -N
# apply patches to both source code and templates
cd "$(TPCH_HOME)/dbgen/queries" && cat *.patch | patch -p1
cp -r "$(TPCH_HOME)/dbgen" target/
Expand Down Expand Up @@ -62,4 +60,11 @@ modify-tpc-h:

build-dbgen:
# Build it
cd target/dbgen ; make clean; make 2>/dev/null
cd target/dbgen ; make clean; make 2>/dev/null

make-jar:
cd target; (jar cvf dbgen.jar dbgen/ || gjar cvf dbgen.jar dbgen/ )

build-package:
mvn package

105 changes: 105 additions & 0 deletions nds-h/tpch-gen/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
SPDX-License-Identifier: Apache-2.0

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/maven-v4_0_0.xsd">

<modelVersion>4.0.0</modelVersion>

<groupId>org.nvidia.nds_h</groupId>
<artifactId>tpch-gen</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>

<name>tpch-gen</name>
<url>http://maven.apache.org</url>

<properties>
<tpcds-gen.jdk.version>1.8</tpcds-gen.jdk.version>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.2.1</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
<version>1.1</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>1.8.5</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13.1</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>${tpcds-gen.jdk.version}</source>
<target>${tpcds-gen.jdk.version}</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<configuration>
<archive>
<manifest>
<addClasspath>true</addClasspath>
<classpathPrefix>lib/</classpathPrefix>
<mainClass>org.nvidia.nds_h.GenTable</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<executions>
<execution>
<id>copy-dependencies</id>
<phase>package</phase>
<goals>
<goal>copy-dependencies</goal>
</goals>
<configuration>
<outputDirectory>${project.build.directory}/lib</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>

</project>
Loading