From c41b702d9b0e9ff4abcbb9d3dd0ae06f7a8b75bd Mon Sep 17 00:00:00 2001 From: Sayed Bilal Bari <34418972+bilalbari@users.noreply.github.com> Date: Wed, 3 Jul 2024 10:57:52 -0500 Subject: [PATCH] Adding HDFS support for data generation (#188) * Changes for adding hdfs submitter class Signed-off-by: Sayed Bilal Bari * Working changes to GenTable for hdfs run Signed-off-by: Sayed Bilal Bari * Changes for mapReduce Signed-off-by: Sayed Bilal Bari * Changes to makefile Signed-off-by: Sayed Bilal Bari * Adding comments to the Java file Signed-off-by: Sayed Bilal Bari * Removed redundant files Signed-off-by: Sayed Bilal Bari * Correcting typo in query stream Signed-off-by: Sayed Bilal Bari * Review changes Signed-off-by: Sayed Bilal Bari * Fixing typo in example command Signed-off-by: Sayed Bilal Bari * Changes for supporting json_summary+sub_queries -s Signed-off-by: Sayed Bilal Bari * Correcting typo and README Signed-off-by: Sayed Bilal Bari --------- Signed-off-by: Sayed Bilal Bari Co-authored-by: Sayed Bilal Bari --- nds-h/README.md | 10 +- nds-h/nds_h_gen_data.py | 115 +++++- nds-h/nds_h_gen_query_stream.py | 2 +- nds-h/nds_h_power.py | 124 ++++-- nds-h/tpch-gen/Makefile | 13 +- nds-h/tpch-gen/pom.xml | 105 +++++ .../main/java/org/nvidia/nds_h/GenTable.java | 372 ++++++++++++++++++ utils/check.py | 4 +- 8 files changed, 688 insertions(+), 57 deletions(-) create mode 100644 nds-h/tpch-gen/pom.xml create mode 100755 nds-h/tpch-gen/src/main/java/org/nvidia/nds_h/GenTable.java diff --git a/nds-h/README.md b/nds-h/README.md index 2b378e0..3bbc0b8 100644 --- a/nds-h/README.md +++ b/nds-h/README.md @@ -103,8 +103,9 @@ To generate data for local - ```bash $ python nds_h_gen_data.py -h -usage: nds_h_gen_data.py [-h] [--overwrite_output] +usage: nds_h_gen_data.py [-h] {local,hdfs} [--overwrite_output] positional arguments: + {local,hdfs} file system to save the generated data. scale volume of data to generate in GB. parallel build data in separate chunks data_dir generate data in directory. @@ -118,7 +119,7 @@ optional arguments: Example command: ```bash -python nds_h_gen_data.py 100 100 /data/raw_sf100 --overwrite_output +python nds_h_gen_data.py hdfs 100 100 /data/raw_sf100 --overwrite_output ``` ### Convert DSV to Parquet or Other data sources @@ -219,6 +220,11 @@ optional arguments: type of query output --property_file PROPERTY_FILE property file for Spark configuration. + --json_summary_folder JSON_SUMMARY_FOLDER + Empty folder/path (will create if not exist) to save JSON summary file for each query. + --sub_queries SUB_QUERIES + comma separated list of queries to run. If not specified, all queries in the stream file will be run. + e.g. "query1,query2,query3". Note, use "_part1","_part2" and "part_3" suffix for the following query names: query15 ``` Example command to submit nds_h_power.py by spark-submit-template utility: diff --git a/nds-h/nds_h_gen_data.py b/nds-h/nds_h_gen_data.py index 84401f3..8293812 100644 --- a/nds-h/nds_h_gen_data.py +++ b/nds-h/nds_h_gen_data.py @@ -34,6 +34,7 @@ import os import sys import subprocess +import shutil #For adding utils to path parent_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), '..')) @@ -58,6 +59,7 @@ 'supplier' ] + def generate_data_local(args, range_start, range_end, tool_path): """Generate data to local file system. TPC-DS tool will generate all table data under target folder without creating sub-folders for each table. So we add extra code to create sub folder @@ -87,10 +89,10 @@ def generate_data_local(args, range_start, range_end, tool_path): procs = [] for i in range(range_start, range_end + 1): dbgen = ["-s", args.scale, - "-C", args.parallel, - "-S", str(i), - "-v", "Y", - "-f","Y"] + "-C", args.parallel, + "-S", str(i), + "-v", "Y", + "-f", "Y"] procs.append(subprocess.Popen( ["./dbgen"] + dbgen, cwd=str(work_dir))) # wait for data generation to complete @@ -104,45 +106,130 @@ def generate_data_local(args, range_start, range_end, tool_path): for table in table_names: print('mkdir -p {}/{}'.format(data_dir, table)) subprocess.run(['mkdir', '-p', data_dir + '/' + table]) - if (table != 'region' and table !='nation'): + if (table != 'region' and table != 'nation'): for i in range(range_start, range_end + 1): subprocess.run(['mv', f'{work_dir}/{table}.tbl.{i}', f'{data_dir}/{table}/'], stderr=subprocess.DEVNULL) else: subprocess.run(['mv', f'{work_dir}/{table}.tbl', - f'{data_dir}/{table}/'], stderr=subprocess.DEVNULL) + f'{data_dir}/{table}/'], stderr=subprocess.DEVNULL) # delete date file has no parallel number suffix in the file name, move separately # show summary subprocess.run(['du', '-h', '-d1', data_dir]) + +def clean_temp_data(temp_data_path): + cmd = ['hadoop', 'fs', '-rm', '-r', '-skipTrash', temp_data_path] + print(" ".join(cmd)) + subprocess.run(cmd) + + +def merge_temp_tables(temp_data_path, parent_data_path): + """Helper functions for incremental data generation. Move data in temporary child range path to + parent directory. + + Args: + temp_data_path (str): temorary child range data path + parent_data_path (str): parent data path + """ + table_names = source_table_names + for table_name in table_names: + # manually create table sub-folders + # redundant step if it's not the first range part. + cmd = ['hadoop', 'fs', '-mkdir', parent_data_path + '/' + table_name] + print(" ".join(cmd)) + subprocess.run(cmd) + # move temp content to upper folder + # note not all tables are generated in different child range step + # please ignore messages like "mv: `.../reason/*': No such file or directory" + temp_table_data_path = temp_data_path + '/' + table_name + '/*' + cmd = ['hadoop', 'fs', '-mv', temp_table_data_path, + parent_data_path + '/' + table_name + '/'] + print(" ".join(cmd)) + subprocess.run(cmd) + clean_temp_data(temp_data_path) + + +def generate_data_hdfs(args, jar_path): + """generate data to hdfs using TPC-DS dsdgen tool. Support incremental generation: due to the + limit of hdfs, each range data will be generated under a temporary folder then move to target + folder. + + Args: + args (Namespace): Namespace from argparser + jar_path (str): path to the target jar + + Raises: + Exception: if Hadoop binary is not installed. + """ + # Check if hadoop is installed. + if shutil.which('hadoop') is None: + raise Exception('No Hadoop binary found in current environment, ' + + 'please install Hadoop for data generation in cluster.') + # Submit hadoop MR job to generate data + cmd = ['hadoop', 'jar', str(jar_path)] + cmd += ['-p', args.parallel, '-s', args.scale] + # get dsdgen.jar path, assume user won't change file structure + tpcds_gen_path = jar_path.parent.parent.absolute() + if args.overwrite_output: + cmd += ['-o'] + if args.range: + # use a temp folder to save the specific range data. + # will move the content to parent folder afterwards. + # it's a workaround for "Output directory ... already exists" in incremental generation + temp_data_path = args.data_dir + '/_temp_' + # before generation, we remove "_temp_" folders in case they contain garbage generated by + # previous user runs. + clean_temp_data(temp_data_path) + cmd.extend(["-r", args.range]) + cmd.extend(["-d", temp_data_path]) + try: + subprocess.run(cmd, check=True, cwd=str(tpcds_gen_path)) + # only move delete table for data maintenance + merge_temp_tables(temp_data_path, args.data_dir) + finally: + clean_temp_data(temp_data_path) + else: + cmd.extend(["-d", args.data_dir]) + subprocess.run(cmd, check=True, cwd=str(tpcds_gen_path)) + # only move delete table for data maintenance + + def generate_data(args): - tool_path = check_build_nds_h() + jar_path, tool_path = check_build_nds_h() range_start = 1 range_end = int(args.parallel) if args.range: range_start, range_end = valid_range(args.range, args.parallel) - generate_data_local(args, range_start, range_end, tool_path) + if args.type == 'hdfs': + generate_data_hdfs(args, jar_path) + else: + generate_data_local(args, range_start, range_end, tool_path) + if __name__ == "__main__": parser = parser = argparse.ArgumentParser() + parser.add_argument("type", + choices=["local", "hdfs"], + help="file system to save the generated data.") parser.add_argument("scale", help="volume of data to generate in GB. Accepted SF - 1,10, 100, 300, 1000 \ ,3000, 10000, 30000," - ) + ) parser.add_argument("parallel", type=parallel_value_type, help="build data in separate chunks" - ) + ) parser.add_argument("data_dir", help="generate data in directory.") parser.add_argument('--range', help='Used for incremental data generation, meaning which part of child' + - 'chunks are generated in one run. Format: "start,end", both are inclusive. ' + - 'e.g. "1,100". Note: the child range must be within the "parallel", ' + - '"--parallel 100 --range 100,200" is illegal.') + 'chunks are generated in one run. Format: "start,end", both are inclusive. ' + + 'e.g. "1,100". Note: the child range must be within the "parallel", ' + + '"--parallel 100 --range 100,200" is illegal.') parser.add_argument("--overwrite_output", action="store_true", help="overwrite if there has already existing data in the path provided.") - + args = parser.parse_args() generate_data(args) diff --git a/nds-h/nds_h_gen_query_stream.py b/nds-h/nds_h_gen_query_stream.py index 2e58dab..fad9380 100644 --- a/nds-h/nds_h_gen_query_stream.py +++ b/nds-h/nds_h_gen_query_stream.py @@ -81,7 +81,7 @@ def generate_query_streams(args, tool_path): subprocess.run(base_cmd, check=True, cwd=str(work_dir),stdout=f) if __name__ == "__main__": - tool_path = check_build_nds_h() + jar_path, tool_path = check_build_nds_h() parser = parser = argparse.ArgumentParser() parser.add_argument("scale", help="Assume a database of this scale factor.") diff --git a/nds-h/nds_h_power.py b/nds-h/nds_h_power.py index be2dc98..dc1496b 100644 --- a/nds-h/nds_h_power.py +++ b/nds-h/nds_h_power.py @@ -37,6 +37,8 @@ from pyspark.sql import SparkSession import os import sys +import re + parent_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), '..')) # Construct the path to the utils directory @@ -47,9 +49,8 @@ from python_benchmark_reporter.PysparkBenchReport import PysparkBenchReport from pyspark.sql import DataFrame -from check import check_version +from check import check_version, check_json_summary_folder, check_query_subset_exists from nds_h_schema import get_schemas -import re check_version() @@ -71,7 +72,7 @@ def gen_sql_from_stream(query_stream_file_path): # Find all matches in the content matches = pattern.findall(stream) -# Populate the dictionary with template file numbers as keys and queries as values + # Populate the dictionary with template file numbers as keys and queries as values for match in matches: template_number = match[0] if int(template_number) == 15: @@ -85,6 +86,7 @@ def gen_sql_from_stream(query_stream_file_path): return extended_queries + def setup_tables(spark_session, input_prefix, input_format, execution_time_list): """set up data tables in Spark before running the Power Run queries. @@ -92,7 +94,6 @@ def setup_tables(spark_session, input_prefix, input_format, execution_time_list) spark_session (SparkSession): a SparkSession instance to run queries. input_prefix (str): path of input data. input_format (str): type of input data source, e.g. parquet, orc, csv, json. - use_decimal (bool): use decimal type for certain columns when loading data of text type. execution_time_list ([(str, str, int)]): a list to record query and its execution time. Returns: @@ -103,9 +104,11 @@ def setup_tables(spark_session, input_prefix, input_format, execution_time_list) for table_name in get_schemas().keys(): start = int(time.time() * 1000) table_path = input_prefix + '/' + table_name - reader = spark_session.read.format(input_format) + reader = spark_session.read.format(input_format) if input_format in ['csv', 'json']: reader = reader.schema(get_schemas()[table_name]) + print("Loading table ", table_path) + print("table name ", table_name) reader.load(table_path).createOrReplaceTempView(table_name) end = int(time.time() * 1000) print("====== Creating TempView for table {} ======".format(table_name)) @@ -114,6 +117,7 @@ def setup_tables(spark_session, input_prefix, input_format, execution_time_list) (spark_app_id, "CreateTempView {}".format(table_name), end - start)) return execution_time_list + def ensure_valid_column_names(df: DataFrame): def is_column_start(char): return char.isalpha() or char == '_' @@ -143,16 +147,17 @@ def deduplicate(column_names): # In some queries like q35, it's possible to get columns with the same name. Append a number # suffix to resolve this problem. dedup_col_names = [] - for i,v in enumerate(column_names): + for i, v in enumerate(column_names): count = column_names.count(v) index = column_names[:i].count(v) - dedup_col_names.append(v+str(index) if count > 1 else v) + dedup_col_names.append(v + str(index) if count > 1 else v) return dedup_col_names valid_col_names = [c if is_valid(c) else make_valid(c) for c in df.columns] dedup_col_names = deduplicate(valid_col_names) return df.toDF(*dedup_col_names) + def run_one_query(spark_session, query, query_name, @@ -163,42 +168,64 @@ def run_one_query(spark_session, df.collect() else: ensure_valid_column_names(df).write.format(output_format).mode('overwrite').save( - output_path + '/' + query_name) + output_path + '/' + query_name) + + +def get_query_subset(query_dict, subset): + """Get a subset of queries from query_dict. + The subset is specified by a list of query names. + """ + check_query_subset_exists(query_dict, subset) + return dict((k, query_dict[k]) for k in subset) + def run_query_stream(input_prefix, property_file, query_dict, time_log_output_path, - input_format="parquet", + sub_queries, + input_format, output_path=None, - output_format="parquet"): + output_format="parquet", + json_summary_folder=None): """run SQL in Spark and record execution time log. The execution time log is saved as a CSV file - for easy accesibility. TempView Creation time is also recorded. + for easy accessibility. TempView Creation time is also recorded. Args: - input_prefix (str): path of input data or warehouse if input_format is "iceberg" or hive_external=True. - query_dict (OrderedDict): ordered dict {query_name: query_content} of all TPC-DS queries runnable in Spark + input_prefix (str): path of input data. + property_file (str): path of property file for Spark configuration. + query_dict (OrderedDict): ordered dict {query_name: query_content} of all NDS-H queries runnable in Spark time_log_output_path (str): path of the log that contains query execution time, both local and HDFS path are supported. + sub_queries (list): a list of query names to run, if not specified, all queries in the stream input_format (str, optional): type of input data source. - use_deciaml(bool, optional): use decimal type for certain columns when loading data of text type. - output_path (str, optional): path of query output, optinal. If not specified, collect() + output_path (str, optional): path of query output, optional. If not specified, collect() action will be applied to each query. Defaults to None. output_format (str, optional): query output format, choices are csv, orc, parquet. Defaults + json_summary_folder (str, optional): path to save JSON summary files for each query. to "parquet". + :param time_log_output_path: + :param sub_queries: + :param json_summary_folder: + :param output_format: + :param output_path: + :param input_format: + :param input_prefix: + :param query_dict: + :param property_file: """ queries_reports = [] execution_time_list = [] total_time_start = time.time() # check if it's running specific query or Power Run - app_name = "NDS - Power Run" + app_name = "NDS-H - Power Run" # Execute Power Run or Specific query in Spark # build Spark Session session_builder = SparkSession.builder if property_file: spark_properties = load_properties(property_file) - for k,v in spark_properties.items(): - session_builder = session_builder.config(k,v) + for k, v in spark_properties.items(): + session_builder = session_builder.config(k, v) spark_session = session_builder.appName( app_name).getOrCreate() spark_app_id = spark_session.sparkContext.applicationId @@ -206,26 +233,38 @@ def run_query_stream(input_prefix, execution_time_list = setup_tables(spark_session, input_prefix, input_format, execution_time_list) + check_json_summary_folder(json_summary_folder) + if sub_queries: + query_dict = get_query_subset(query_dict, sub_queries) + power_start = int(time.time()) for query_name, q_content in query_dict.items(): # show query name in Spark web UI spark_session.sparkContext.setJobGroup(query_name, query_name) print("====== Run {} ======".format(query_name)) q_report = PysparkBenchReport(spark_session, query_name) - summary = q_report.report_on(run_one_query,spark_session, - q_content, - query_name, - output_path, - output_format) + summary = q_report.report_on(run_one_query, spark_session, + q_content, + query_name, + output_path, + output_format) print(f"Time taken: {summary['queryTimes']} millis for {query_name}") query_times = summary['queryTimes'] execution_time_list.append((spark_app_id, query_name, query_times[0])) queries_reports.append(q_report) + if json_summary_folder: + if property_file: + summary_prefix = os.path.join( + json_summary_folder, os.path.basename(property_file) + ) + else: + summary_prefix = os.path.join(json_summary_folder, '') + q_report.write_summary(prefix=summary_prefix) power_end = int(time.time()) - power_elapse = int((power_end - power_start)*1000) + power_elapse = int((power_end - power_start) * 1000) spark_session.sparkContext.stop() total_time_end = time.time() - total_elapse = int((total_time_end - total_time_start)*1000) + total_elapse = int((total_time_end - total_time_start) * 1000) print("====== Power Test Time: {} milliseconds ======".format(power_elapse)) print("====== Total Time: {} milliseconds ======".format(total_elapse)) execution_time_list.append( @@ -259,6 +298,9 @@ def run_query_stream(input_prefix, if exit_code: print("Above queries failed or completed with failed tasks. Please check the logs for the detailed reason.") + sys.exit(exit_code) + + def load_properties(filename): myvars = {} with open(filename) as myfile: @@ -267,15 +309,16 @@ def load_properties(filename): myvars[name.strip()] = var.strip() return myvars + if __name__ == "__main__": - parser = parser = argparse.ArgumentParser() + parser = argparse.ArgumentParser() parser.add_argument('input_prefix', help='text to prepend to every input file path (e.g., "hdfs:///ds-generated-data"). ' + - 'If --hive or if input_format is "iceberg", this argument will be regarded as the value of property ' + - '"spark.sql.catalog.spark_catalog.warehouse". Only default Spark catalog ' + - 'session name "spark_catalog" is supported now, customized catalog is not ' + - 'yet supported. Note if this points to a Delta Lake table, the path must be ' + - 'absolute. Issue: https://github.com/delta-io/delta/issues/555') + 'If --hive or if input_format is "iceberg", this argument will be regarded as the value of property ' + + '"spark.sql.catalog.spark_catalog.warehouse". Only default Spark catalog ' + + 'session name "spark_catalog" is supported now, customized catalog is not ' + + 'yet supported. Note if this points to a Delta Lake table, the path must be ' + + 'absolute. Issue: https://github.com/delta-io/delta/issues/555') parser.add_argument('query_stream_file', help='query stream file that contains NDS queries in specific order') parser.add_argument('time_log', @@ -283,13 +326,21 @@ def load_properties(filename): default="") parser.add_argument('--input_format', help='type for input data source, e.g. parquet, orc, json, csv or iceberg, delta. ' + - 'Certain types are not fully supported by GPU reading, please refer to ' + - 'https://github.com/NVIDIA/spark-rapids/blob/branch-24.08/docs/compatibility.md ' + - 'for more details.', + 'Certain types are not fully supported by GPU reading, please refer to ' + + 'https://github.com/NVIDIA/spark-rapids/blob/branch-24.08/docs/compatibility.md ' + + 'for more details.', choices=['parquet', 'orc', 'avro', 'csv', 'json', 'iceberg', 'delta'], default='parquet') parser.add_argument('--output_prefix', help='text to prepend to every output file (e.g., "hdfs:///ds-parquet")') + parser.add_argument('--json_summary_folder', + help='Empty folder/path (will create if not exist) to save JSON summary file for each query.') + parser.add_argument('--sub_queries', + type=lambda s: [x.strip() for x in s.split(',')], + help='comma separated list of queries to run. If not specified, all queries ' + + 'in the stream file will be run. e.g. "query1,query2,query3". Note, use ' + + '"_part1" and "_part2" suffix for the following query names: ' + + 'query14, query23, query24, query39. e.g. query14_part1, query39_part2') parser.add_argument('--output_format', help='type of query output', default='parquet') @@ -301,6 +352,9 @@ def load_properties(filename): args.property_file, query_dict, args.time_log, + args.sub_queries, args.input_format, args.output_prefix, - args.output_format) + args.output_format, + args.json_summary_folder + ) diff --git a/nds-h/tpch-gen/Makefile b/nds-h/tpch-gen/Makefile index 797aa7b..7b4520c 100644 --- a/nds-h/tpch-gen/Makefile +++ b/nds-h/tpch-gen/Makefile @@ -15,7 +15,7 @@ # limitations under the License. # -all: check-tpch-env prepare-target copy-dbgen modify-makefile modify-tpc-h build-dbgen +all: check-tpch-env prepare-target copy-dbgen modify-makefile modify-tpc-h build-dbgen make-jar build-package check-tpch-env: ifndef TPCH_HOME @@ -33,8 +33,6 @@ copy-dbgen: # file and the sql files cd "$(TPCH_HOME)/dbgen/queries"; dos2unix *.sql cd "$(TPCH_HOME)/dbgen/queries"; dos2unix *.patch - # Unapply any patch if already done - -cd "$(TPCH_HOME)/dbgen/queries"; cat *.patch | patch -R -p1 -N # apply patches to both source code and templates cd "$(TPCH_HOME)/dbgen/queries" && cat *.patch | patch -p1 cp -r "$(TPCH_HOME)/dbgen" target/ @@ -62,4 +60,11 @@ modify-tpc-h: build-dbgen: # Build it - cd target/dbgen ; make clean; make 2>/dev/null \ No newline at end of file + cd target/dbgen ; make clean; make 2>/dev/null + +make-jar: + cd target; (jar cvf dbgen.jar dbgen/ || gjar cvf dbgen.jar dbgen/ ) + +build-package: + mvn package + \ No newline at end of file diff --git a/nds-h/tpch-gen/pom.xml b/nds-h/tpch-gen/pom.xml new file mode 100644 index 0000000..d8a8db1 --- /dev/null +++ b/nds-h/tpch-gen/pom.xml @@ -0,0 +1,105 @@ + + + + + 4.0.0 + + org.nvidia.nds_h + tpch-gen + 1.0-SNAPSHOT + jar + + tpch-gen + http://maven.apache.org + + + 1.8 + + + + + org.apache.hadoop + hadoop-client + 3.2.1 + compile + + + commons-cli + commons-cli + 1.1 + compile + + + org.mockito + mockito-core + 1.8.5 + test + + + junit + junit + 4.13.1 + test + + + + + + + maven-compiler-plugin + + ${tpcds-gen.jdk.version} + ${tpcds-gen.jdk.version} + + + + org.apache.maven.plugins + maven-jar-plugin + + + + true + lib/ + org.nvidia.nds_h.GenTable + + + + + + org.apache.maven.plugins + maven-dependency-plugin + + + copy-dependencies + package + + copy-dependencies + + + ${project.build.directory}/lib + + + + + + + + diff --git a/nds-h/tpch-gen/src/main/java/org/nvidia/nds_h/GenTable.java b/nds-h/tpch-gen/src/main/java/org/nvidia/nds_h/GenTable.java new file mode 100755 index 0000000..529c271 --- /dev/null +++ b/nds-h/tpch-gen/src/main/java/org/nvidia/nds_h/GenTable.java @@ -0,0 +1,372 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.nvidia.nds_h; + +import org.apache.commons.cli.Options; +import org.apache.hadoop.conf.*; +import org.apache.hadoop.fs.*; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.io.*; +import org.apache.hadoop.util.*; + +import org.apache.hadoop.filecache.*; +import org.apache.hadoop.mapreduce.*; +import org.apache.hadoop.mapreduce.lib.input.*; +import org.apache.hadoop.mapreduce.lib.output.*; + +import org.apache.commons.cli.*; + +import java.io.*; +import java.net.*; +import java.math.*; +import java.security.*; +import java.util.Objects; + + +public class GenTable extends Configured implements Tool { + public static void main(String[] args) throws Exception { + Configuration conf = new Configuration(); + int res = ToolRunner.run(conf, new GenTable(), args); + System.exit(res); + } + + @Override + public int run(String[] args) throws Exception { + String[] remainingArgs = new GenericOptionsParser(getConf(), args).getRemainingArgs(); + + CommandLineParser parser = new BasicParser(); + getConf().setInt("io.sort.mb", 4); + Options options = getOptions(); + CommandLine line = parser.parse(options, remainingArgs); + + if(!line.hasOption("scale")) { + HelpFormatter f = new HelpFormatter(); + f.printHelp("GenTable", options); + return 1; + } + Path out = new Path(line.getOptionValue("dir")); + + int scale = Integer.parseInt(line.getOptionValue("scale")); + + String table = "all"; + if(line.hasOption("table")) { + table = line.getOptionValue("table"); + } + + int parallel = scale; + + if(line.hasOption("parallel")) { + parallel = Integer.parseInt(line.getOptionValue("parallel")); + } + + int rangeStart = 1; + int rangeEnd = parallel; + + if(line.hasOption("range")) { + String[] range = line.getOptionValue("range").split(","); + if (range.length == 1) { + System.err.println("Please provide range with comma for both range start and range end."); + return 1; + } + rangeStart = Integer.parseInt(range[0]); + rangeEnd = Integer.parseInt(range[1]); + if (rangeStart < 1 || rangeStart > rangeEnd || rangeEnd > parallel) { + System.err.println("Please provide correct child range: 1 <= rangeStart <= rangeEnd <= parallel"); + return 1; + } + } + + if(parallel == 1 || scale == 1) { + System.err.println("The MR task does not work for scale=1 or parallel=1"); + return 1; + } + + Path in = genInput(table, scale, parallel, rangeStart, rangeEnd); + + Path dbgen = copyJar(new File("target/dbgen.jar")); + + // Extracting the dbgen jar location and adding as a symlink as part of + // Mapred Cache hence enabling access by all mappers running + URI dsuri = dbgen.toUri(); + URI link = new URI(dsuri.getScheme(), + dsuri.getUserInfo(), dsuri.getHost(), + dsuri.getPort(),dsuri.getPath(), + dsuri.getQuery(),"dbgen"); + + Configuration conf = getConf(); + conf.setInt("mapred.task.timeout",0); + conf.setInt("mapreduce.task.timeout",0); + conf.setBoolean("mapreduce.map.output.compress", true); + conf.set("mapreduce.map.output.compress.codec", "org.apache.hadoop.io.compress.GzipCodec"); + + DistributedCache.addCacheArchive(link, conf); + DistributedCache.createSymlink(conf); + Job job = new Job(conf, "GenTable+"+table+"_"+scale); + job.setJarByClass(getClass()); + + // No reducers since no reduction task involved post data gen + // Updating mapper class + // Output will be a text file ( key(file_name) -> output ) + job.setNumReduceTasks(0); + job.setMapperClass(Dbgen.class); + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(Text.class); + + // Using NLineInputFormat mapper for parsing each line of input + // file as separate task + job.setInputFormatClass(NLineInputFormat.class); + NLineInputFormat.setNumLinesPerSplit(job, 1); + + FileInputFormat.addInputPath(job, in); + FileOutputFormat.setOutputPath(job, out); + + + FileSystem fs = FileSystem.get(getConf()); + // delete existing files if "overwrite" is set + if(line.hasOption("overwrite")) { + if (fs.exists(out)) { + fs.delete(out, true); + } + } + + // use multiple output to only write the named files + LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class); + MultipleOutputs.addNamedOutput(job, "text", + TextOutputFormat.class, LongWritable.class, Text.class); + + job.waitForCompletion(true); + + // cleanup + fs.delete(in, false); + fs.delete(dbgen, false); + + return 0; + } + + private static Options getOptions() { + Options options = new Options(); + /* + * These are the various options being passed to the class + * -s scale + * -d output directory + * -t specific table data + * -p number of parallel files to be generated + * -o overwrite output directory if exists + */ + options.addOption("s","scale", true, "scale"); + options.addOption("d","dir", true, "dir"); + options.addOption("t","table", true, "table"); + options.addOption("p", "parallel", true, "parallel"); + options.addOption("o", "overwrite", false, "overwrite existing data"); + options.addOption("r", "range", true, "child range in one data generation run"); + return options; + } + + /* + * This function just copies the jar from the local to hdfs temp + * location for access by the mappers + */ + public Path copyJar(File jar) throws Exception { + MessageDigest md = MessageDigest.getInstance("MD5"); + InputStream is = new FileInputStream(jar); + try { + is = new DigestInputStream(is, md); + // read stream to EOF as normal... + } + finally { + is.close(); + } + BigInteger md5 = new BigInteger(md.digest()); + String md5hex = md5.toString(16); + Path dst = new Path(String.format("/tmp/%s.jar",md5hex)); + Path src = new Path(jar.toURI()); + FileSystem fs = FileSystem.get(getConf()); + fs.copyFromLocalFile(false, /*overwrite*/true, src, dst); + return dst; + } + + /* + * This function generates the various commands to be run + * parallely as part of the mapper for the job. + * Each command runs the data generation for a specific part + * for a table + */ + public Path genInput(String table, int scale, int parallel, int rangeStart, int rangeEnd) throws Exception { + // Assigning epoch based name to the temporary files + // Will be cleaned later + long epoch = System.currentTimeMillis()/1000; + Path in = new Path("/tmp/"+table+"_"+scale+"-"+epoch); + FileSystem fs = FileSystem.get(getConf()); + FSDataOutputStream out = fs.create(in); + + // This is for passing the various params to the command + // for individual tables + String[ ] tables = {"c","O","L","P","S","s"}; + + for(int i = rangeStart; i <= rangeEnd; i++) { + String baseCmd = String.format("./dbgen -s %d -C %d -S %d ",scale,parallel,i); + // In case of no specific table, data is generated for all + // Separate commands for each table is generated for more parallelism + // running multiple mappers + if(table.equals("all")) { + for(String t: tables){ + String cmd = baseCmd + String.format("-T %s",t); + out.writeBytes(cmd+"\n"); + } + } + else{ + // TODO - update using map based approach for a cleaner implementation + if(table.equalsIgnoreCase("customer")){ + String cmd = baseCmd + "-T c"; + out.writeBytes(cmd + "\n"); + } + else if(table.equalsIgnoreCase("nation")){ + String cmd = baseCmd + "-T n"; + out.writeBytes(cmd + "\n"); + } + else if(table.equalsIgnoreCase("region")){ + String cmd = baseCmd + "-T r"; + out.writeBytes(cmd + "\n"); + } + else if(table.equalsIgnoreCase("lineItem")){ + String cmd = baseCmd + "-T L"; + out.writeBytes(cmd + "\n"); + } + else if(table.equalsIgnoreCase("orders")){ + String cmd = baseCmd + "-T O"; + out.writeBytes(cmd + "\n"); + } + else if(table.equalsIgnoreCase("parts")){ + String cmd = baseCmd + "-T P"; + out.writeBytes(cmd + "\n"); + } + else if(table.equalsIgnoreCase("partsupp")){ + String cmd = baseCmd + "-T S"; + out.writeBytes(cmd + "\n"); + } + else if(table.equalsIgnoreCase("supplier")){ + String cmd = baseCmd + "-T s"; + out.writeBytes(cmd + "\n"); + } + } + } + // nation and region tables are static tables hence adding + // a single command for both + if(table.equals("all")){ + String cmdL = String.format("./dbgen -s %d -T l",scale); + out.writeBytes(cmdL + "\n"); + } + // Writing the command file in temporary folder for being read by the mapper + out.close(); + return in; + } + + static String readToString(InputStream in) throws IOException { + InputStreamReader is = new InputStreamReader(in); + StringBuilder sb=new StringBuilder(); + BufferedReader br = new BufferedReader(is); + String read = br.readLine(); + + while(read != null) { + //System.out.println(read); + sb.append(read); + read =br.readLine(); + } + return sb.toString(); + } + + static final class Dbgen extends Mapper { + private MultipleOutputs mos; + protected void setup(Context context) throws IOException { + mos = new MultipleOutputs(context); + } + protected void cleanup(Context context) throws IOException, InterruptedException { + mos.close(); + } + protected void map(LongWritable offset, Text command, Mapper.Context context) + throws IOException, InterruptedException { + String parallel="1"; + String child="1"; + String table=""; + String suffix = ""; + String[] cmd = command.toString().split(" "); + + for(int i=0; i name.endsWith(suffixNew); + + for(File f: Objects.requireNonNull(cwd.listFiles(tables))) { + if(f != null) + { + System.out.println("Processing file: "+f.getName()); + } + final String baseOutputPath = f.getName().replace(suffix.substring(suffix.indexOf('.')), String.format("/data_%s_%s", child, parallel)); + BufferedReader br = new BufferedReader(new FileReader(f)); + String line; + while ((line = br.readLine()) != null) { + // process the line. + mos.write("text", line, null, baseOutputPath); + } + br.close(); + f.deleteOnExit(); + } + System.out.println("Processing complete"); + } + } +} diff --git a/utils/check.py b/utils/check.py index db01e68..e429108 100644 --- a/utils/check.py +++ b/utils/check.py @@ -57,12 +57,14 @@ def check_build_nds_h(): # we assume user won't move this script. src_dir = Path(__file__).parent.parent.absolute() tool_path = list(Path(src_dir / 'nds-h/tpch-gen/target/dbgen').rglob("dbgen")) + jar_path = list( + Path(src_dir / 'nds-h/tpch-gen/target').rglob("tpch-gen-*.jar")) print(tool_path) if tool_path == []: raise Exception('dbgen executable is ' + 'not found in `target` folder.' + 'Please refer to README document and build this project first.') - return tool_path[0] + return jar_path[0], tool_path[0] def check_build_nds(): """check jar and tpcds executable