diff --git a/README.md b/README.md index c674aca..716c0cd 100644 --- a/README.md +++ b/README.md @@ -4,7 +4,8 @@ A repo for Spark related benchmark sets and utilities using the [RAPIDS Accelerator For Apache Spark](https://github.com/NVIDIA/spark-rapids). ## Benchmark sets: -- [Nvidia Decision Support(NDS)](./nds/) +- [Nvidia Decision Support ( NDS )](./nds/) +- [Nvidia Decision Support-H ( NDS-H )](./nds-h/) Please see README in each benchmark set for more details including building instructions and usage descriptions. \ No newline at end of file diff --git a/nds-h/README.md b/nds-h/README.md new file mode 100644 index 0000000..2b378e0 --- /dev/null +++ b/nds-h/README.md @@ -0,0 +1,254 @@ +# NDS-H v2.0 Automation + +## Disclaimer + +NDS-H is derived from the TPC-H Benchmarks and as such any results obtained using NDS-H are not +comparable to published TPC-H Benchmark results, as the results obtained from using NDS-H do not +comply with the TPC-H Benchmarks. + +## License + +NDS-H is licensed under Apache License, Version 2.0. + +Additionally, certain files in NDS-H are licensed subject to the accompanying [TPC EULA](../TPC%20EULA.txt) (also +available at [tpc.org](http://www.tpc.org/tpc_documents_current_versions/current_specifications5.asp). Files subject to the TPC +EULA are identified as such within the files. + +You may not use NDS-H except in compliance with the Apache License, Version 2.0 and the TPC EULA. + +## Prerequisites + +1. Python >= 3.6 +2. Necessary libraries + + ```bash + sudo locale-gen en_US.UTF-8 + sudo apt install openjdk-8-jdk-headless gcc make flex bison byacc maven + sudo apt install dos2unix + ``` +3. Install and set up SPARK. + - Download latest distro from [here](https://spark.apache.org/downloads.html) + - Preferably >= 3.4 + - Find and note SPARK_HOME ( /DOWNLOAD/LOCATION/spark-<3.4.1>-bin-hadoop3 ) + - (For local) Follow the steps mentioned [here](https://docs.nvidia.com/spark-rapids/user-guide/latest/getting-started/on-premise.html#local-mode) + for local setup + - (For local) Update *_gpu_* --files with the getGpuResources.sh location as mentioned in the link above + - (For local) Update spark master in shared/base.template with local[*] + - (For local) Remove the conf - "spark.task.resource.gpu.amount=0.05" from all template files + +4. Update Configuration + - Update [shared/base.template](../shared/base.template) line 26 with the Spark home location. + +5. For GPU run + - Download the latest RAPIDS jar from [here](https://oss.sonatype.org/content/repositories/staging/com/nvidia/rapids-4-spark_2.12/) + + - Update [shared/base.template](../shared/base.template) line 36 with rapids plugin jar location + +6. TPC-H Tools + + - Download TPC-H Tools from the [official TPC website] (https://www.tpc.org/tpc_documents_current_versions/current_specifications5.asp). The tool will be downloaded as a zip package with a random guid string prefix. + + - After unzipping it, a folder called `TPC-H V3.0.1` will be seen. + + - Set environment variable `TPCH_HOME` pointing to this directory. e.g. + + ```bash + export TPCH_HOME='/PATH/TO/YOUR/TPC-H V3.0.1' + ``` + +## Use spark-submit-template with template + +To help user run NDS-H, we provide a template to define the main Spark configs for spark-submit command. +User can use different templates to run NDS with different configurations for different environment. + +We create [spark-submit-template](../shared/spark-submit-template), which accepts a template file and +submit the Spark job with the configs defined in the template. + +Example command to submit via `spark-submit-template` utility: + +```bash +cd shared +spark-submit-template convert_submit_cpu.template \ +../nds-h/nds_h_transcode.py report.txt +``` + +We provide 2 types of template files used in different steps of NDS-H: + +1. *convert_submit_*.template for converting the data by using nds_h_transcode.py +2. *power_run_*.template for power run by using nds_h_power.py + +We predefine different template files for different environment. +For example, we provide below template files to run nds_h_transcode.py for different environment: + +* `convert_submit_cpu.template` is for Spark CPU cluster +* `convert_submit_gpu.template` is for Spark GPU cluster + +You need to choose one as your template file and modify it to fit your environment. +We define a [base.template](../shared/base.template) to help you define some basic variables for your envionment. +And all the other templates will source `base.template` to get the basic variables. +When running multiple steps of NDS-H, you only need to modify `base.template` to fit for your cluster. + +## Data Generation + +### Build the jar for data generation + + ```bash + cd tpch-gen + make + ``` + +### Generate data + +To generate data for local - + +```bash +$ python nds_h_gen_data.py -h +usage: nds_h_gen_data.py [-h] [--overwrite_output] +positional arguments: + scale volume of data to generate in GB. + parallel build data in separate chunks + data_dir generate data in directory. + +optional arguments: + -h, --help show this help message and exit + --overwrite_output overwrite if there has already existing data in the path provided + +``` + +Example command: + +```bash +python nds_h_gen_data.py 100 100 /data/raw_sf100 --overwrite_output +``` + +### Convert DSV to Parquet or Other data sources + +To do the data conversion, the `nds_h_transcode.py` need to be submitted as a Spark job. User can leverage +the [spark-submit-template](../shared/spark-submit-template) utility to simplify the submission. +The utility requires a pre-defined [template file](../shared/convert_submit_gpu.template) where user needs to put necessary Spark configurations. Alternatively user can submit the `nds_h_transcode.py` directly to spark with arbitrary Spark parameters. + +DSV ( pipe ) is the default input format for data conversion, it can be overridden by `--input_format`. + +```bash +cd shared +./spark-submit-template convert_submit_cpu.template ../nds-h/nds_h_transcode.py + +``` + +## Query Generation + +The [templates.patch](./tpch-gen/patches/template_new.patch) that contains necessary modifications to make NDS-H queries runnable in Spark will be applied automatically in the build step. The final query templates will be in folder `$TPCH_HOME/dbgen/queries` after the build process. + +### Generate Specific Query or Query Streams + +```text +usage: nds_h_gen_query_stream.py [-h] (--template TEMPLATE | --streams STREAMS) + scale output_dir + +positional arguments: + scale assume a database of this scale factor. + output_dir generate query in directory. + template | stream generate query stream or from a template arugment + +optional arguments: + -h, --help show this help message and exit + --template TEMPLATE build queries from this template. Only used to generate one query from one tempalte. + This argument is mutually exclusive with --streams. It + is often used for test purpose. + --streams STREAMS generate how many query streams. This argument is mutually exclusive with --template. +``` + +Example command to generate one query using template 1.sql ( There are 22 default queries and templates): + +```bash +cd nds-h +python nds_h_gen_query_stream.py 3000 ./query_1 --template +``` + +Example command to generate 10 query streams each one of which contains all NDS-H queries but in +different order: + +```bash +cd nds-h +python nds_h_gen_query_stream.py 3000 ./query_streams --streams 10 +``` + +## Benchmark Runner + +### Build Dependencies + +There's a customized Spark listener used to track the Spark task status e.g. success or failed +or success with retry. The results will be recorded at the json summary files when all jobs are +finished. This is often used for test or query monitoring purpose. + +To build: + +```bash +cd utils/jvm_listener +mvn package +``` + +`benchmark-listener-1.0-SNAPSHOT.jar` will be generated in `jvm_listener/target` folder. + +### Power Run + +_After_ user generates query streams, Power Run can be executed using one of them by submitting `nds_h_power.py` to Spark. + +Arguments supported by `nds_h_power.py`: + +```text +usage: nds_h_power.py [-h] [--input_format {parquet,}] + [--output_format OUTPUT_FORMAT] + [--property_file PROPERTY_FILE] + + + + +positional arguments: + input_data_location input data location (e.g., "hdfs:///ds-generated-data"). + query_stream_file query stream file that contains NDS-H queries in specific order + time_log_file path to execution time log, only support local path. + +optional arguments: + -h, --help show this help message and exit + --input_format {parquet,orc,avro,csv,json,iceberg,delta} + type for input data source, e.g. parquet, orc, json, csv or iceberg, delta. Certain types are not fully supported by GPU reading, please refer to https://github.com/NVIDIA/spark-rapids/blob/branch-24.08/docs/compatibility.md for more details. + --output_prefix OUTPUT_PREFIX + text to prepend to every output file (e.g., "hdfs:///ds-parquet") + --output_format OUTPUT_FORMAT + type of query output + --property_file PROPERTY_FILE + property file for Spark configuration. +``` + +Example command to submit nds_h_power.py by spark-submit-template utility: + +```bash +cd shared +./spark-submit-template power_run_gpu.template \ +../nds-h/nds_h_power.py \ + \ +/query_0.sql \ +time.csv \ +--property_file ../utils/properties/aqe-on.properties +``` + +User can also use `spark-submit` to submit `ndsH_power.py` directly. + +To simplify the performance analysis process, the script will create a local CSV file to save query(including TempView creation) and corresponding execution time. Note: please use `client` mode(set in your `power_run_gpu.template` file) when running in Yarn distributed environment to make sure the time log is saved correctly in your local path. + +Note the template file must follow the `spark-submit-template` utility as the _first_ argument. +All Spark configuration words (such as `--conf` and corresponding `k=v` values) are quoted by +double quotes in the template file. Please follow the format in [power_run_gpu.template](../shared/power_run_gpu.template). + +User can define the `properties` file like [aqe-on.properties](../utils/properties/aqe-on.properties). The properties will be passed to the submitted Spark job along with the configurations defined in the template file. User can define some common properties in the template file and put some other properties that usually varies in the property file. + +The command above will use `collect()` action to trigger Spark job for each query. It is also supported to save query output to some place for further verification. User can also specify output format e.g. csv, parquet or orc: + +```bash +./spark-submit-template power_run_gpu.template \ +nds_h_power.py \ +parquet_sf3k \ +./nds_query_streams/query_0.sql \ +time.csv +``` \ No newline at end of file diff --git a/nds-h/nds_h_gen_data.py b/nds-h/nds_h_gen_data.py new file mode 100644 index 0000000..84401f3 --- /dev/null +++ b/nds-h/nds_h_gen_data.py @@ -0,0 +1,148 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +# +# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ----- +# +# Certain portions of the contents of this file are derived from TPC-H version 3.0.1 +# (retrieved from www.tpc.org/tpc_documents_current_versions/current_specifications5.asp). +# Such portions are subject to copyrights held by Transaction Processing Performance Council (“TPC”) +# and licensed under the TPC EULA (a copy of which accompanies this file as “TPC EULA” and is also +# available at http://www.tpc.org/tpc_documents_current_versions/current_specifications5.asp) (the “TPC EULA”). +# +# You may not use this file except in compliance with the TPC EULA. +# DISCLAIMER: Portions of this file is derived from the TPC-H Benchmark and as such any results +# obtained using this file are not comparable to published TPC-H Benchmark results, as the results +# obtained from using this file do not comply with the TPC-H Benchmark. +# + +import argparse +import os +import sys +import subprocess + +#For adding utils to path +parent_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), '..')) +utils_dir = os.path.join(parent_dir, 'utils') +sys.path.insert(0, utils_dir) + +from check import check_build_nds_h, check_version, get_abs_path, get_dir_size, parallel_value_type, valid_range + +check_version() + +# Source tables contained in the schema for TPC-H. For more information, check - +# https://www.tpc.org/TPC_Documents_Current_Versions/pdf/TPC-H_v3.0.1.pdf + +source_table_names = [ + 'customer', + 'lineitem', + 'nation', + 'orders', + 'part', + 'partsupp', + 'region', + 'supplier' +] + +def generate_data_local(args, range_start, range_end, tool_path): + """Generate data to local file system. TPC-DS tool will generate all table data under target + folder without creating sub-folders for each table. So we add extra code to create sub folder + for each table and move data there respectively. + + Args: + args (Namepace): Namespace from argparser + tool_path (str): path to the dsdgen tool + + Raises: + Exception: if data already exists and overwrite_output is not honored + Exception: dsdgen failed + """ + data_dir = get_abs_path(args.data_dir) + if not os.path.isdir(data_dir): + os.makedirs(data_dir) + else: + # Verify if there's already data in this path + if get_dir_size(data_dir) > 0 and not args.overwrite_output: + raise Exception( + "There's already been data exists in directory {}.".format(data_dir) + + " Use '--overwrite_output' to overwrite.") + + # working directory for dsdgen + work_dir = tool_path.parent + print(work_dir) + procs = [] + for i in range(range_start, range_end + 1): + dbgen = ["-s", args.scale, + "-C", args.parallel, + "-S", str(i), + "-v", "Y", + "-f","Y"] + procs.append(subprocess.Popen( + ["./dbgen"] + dbgen, cwd=str(work_dir))) + # wait for data generation to complete + for p in procs: + p.wait() + if p.returncode != 0: + print("dbgen failed with return code {}".format(p.returncode)) + raise Exception("dbgen failed") + # move multi-partition files into table folders + table_names = source_table_names + for table in table_names: + print('mkdir -p {}/{}'.format(data_dir, table)) + subprocess.run(['mkdir', '-p', data_dir + '/' + table]) + if (table != 'region' and table !='nation'): + for i in range(range_start, range_end + 1): + subprocess.run(['mv', f'{work_dir}/{table}.tbl.{i}', + f'{data_dir}/{table}/'], stderr=subprocess.DEVNULL) + else: + subprocess.run(['mv', f'{work_dir}/{table}.tbl', + f'{data_dir}/{table}/'], stderr=subprocess.DEVNULL) + # delete date file has no parallel number suffix in the file name, move separately + # show summary + subprocess.run(['du', '-h', '-d1', data_dir]) + +def generate_data(args): + tool_path = check_build_nds_h() + range_start = 1 + range_end = int(args.parallel) + if args.range: + range_start, range_end = valid_range(args.range, args.parallel) + generate_data_local(args, range_start, range_end, tool_path) + +if __name__ == "__main__": + parser = parser = argparse.ArgumentParser() + parser.add_argument("scale", + help="volume of data to generate in GB. Accepted SF - 1,10, 100, 300, 1000 \ + ,3000, 10000, 30000," + ) + parser.add_argument("parallel", + type=parallel_value_type, + help="build data in separate chunks" + ) + parser.add_argument("data_dir", + help="generate data in directory.") + parser.add_argument('--range', + help='Used for incremental data generation, meaning which part of child' + + 'chunks are generated in one run. Format: "start,end", both are inclusive. ' + + 'e.g. "1,100". Note: the child range must be within the "parallel", ' + + '"--parallel 100 --range 100,200" is illegal.') + parser.add_argument("--overwrite_output", + action="store_true", + help="overwrite if there has already existing data in the path provided.") + + args = parser.parse_args() + generate_data(args) diff --git a/nds-h/nds_h_gen_query_stream.py b/nds-h/nds_h_gen_query_stream.py new file mode 100644 index 0000000..2e58dab --- /dev/null +++ b/nds-h/nds_h_gen_query_stream.py @@ -0,0 +1,100 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +# +# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ----- +# +# Certain portions of the contents of this file are derived from TPC-H version 3.0.1 +# (retrieved from www.tpc.org/tpc_documents_current_versions/current_specifications5.asp). +# Such portions are subject to copyrights held by Transaction Processing Performance Council (“TPC”) +# and licensed under the TPC EULA (a copy of which accompanies this file as “TPC EULA” and is also +# available at http://www.tpc.org/tpc_documents_current_versions/current_specifications5.asp) (the “TPC EULA”). +# +# You may not use this file except in compliance with the TPC EULA. +# DISCLAIMER: Portions of this file is derived from the TPC-H Benchmark and as such any results +# obtained using this file are not comparable to published TPC-H Benchmark results, as the results +# obtained from using this file do not comply with the TPC-H Benchmark. +# +import argparse +import os +import subprocess +import sys + +parent_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), '..')) +utils_dir = os.path.join(parent_dir, 'utils') +sys.path.insert(0, utils_dir) + +from check import check_build_nds_h, check_version, get_abs_path + +check_version() + +def generate_query_streams(args, tool_path): + """call TPC-H qgen tool to generate a specific query or query stream(s) that contains all + TPC-DS queries. + + Args: + args (Namespace): Namespace from argparser + tool_path (str): path to the tool + """ + # move to the tools directory + work_dir = tool_path.parent + output_dir = get_abs_path(args.output_dir) + + if not os.path.isdir(args.output_dir): + os.makedirs(args.output_dir) + + os.environ["DSS_QUERY"] = str(work_dir / "queries") + + base_cmd = ['./qgen', + '-s', args.scale] + + if args.streams: + procs = [] + for i in range(1,int(args.streams)+1): + new_cmd = base_cmd + ['-p',str(i)] + output_file = os.path.join(output_dir, f"stream_{i}.sql") + with open(output_file,'w') as f: + procs.append(subprocess.Popen(new_cmd, cwd=str(work_dir), stdout=f)) + for p in procs: + p.wait() + if p.returncode != 0: + print("QGEN failed with return code {}".format(p.returncode)) + raise Exception("dbgen failed") + else: + output_file = os.path.join(output_dir, f"query_{args.template}.sql") + base_cmd = base_cmd + ['-d',args.template] + with open(output_file,"w") as f: + subprocess.run(base_cmd, check=True, cwd=str(work_dir),stdout=f) + +if __name__ == "__main__": + tool_path = check_build_nds_h() + parser = parser = argparse.ArgumentParser() + parser.add_argument("scale", + help="Assume a database of this scale factor.") + parser.add_argument("output_dir", + help="Generate query stream in directory.") + group = parser.add_mutually_exclusive_group(required=True) + group.add_argument("--template", + help="build queries from this template. Only used to generate one query " + + "from one tempalte. This argument is mutually exclusive with --streams. " + + "It is often used for test purpose.") + group.add_argument('--streams', + help='generate how many query streams. ' + + 'This argument is mutually exclusive with --template.') + args = parser.parse_args() + + generate_query_streams(args, tool_path) diff --git a/nds-h/nds_h_power.py b/nds-h/nds_h_power.py new file mode 100644 index 0000000..be2dc98 --- /dev/null +++ b/nds-h/nds_h_power.py @@ -0,0 +1,306 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +# +# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ----- +# +# Certain portions of the contents of this file are derived from TPC-H version 3.0.1 +# (retrieved from www.tpc.org/tpc_documents_current_versions/current_specifications5.asp). +# Such portions are subject to copyrights held by Transaction Processing Performance Council (“TPC”) +# and licensed under the TPC EULA (a copy of which accompanies this file as “TPC EULA” and is also +# available at http://www.tpc.org/tpc_documents_current_versions/current_specifications5.asp) (the “TPC EULA”). +# +# You may not use this file except in compliance with the TPC EULA. +# DISCLAIMER: Portions of this file is derived from the TPC-H Benchmark and as such any results +# obtained using this file are not comparable to published TPC-H Benchmark results, as the results +# obtained from using this file do not comply with the TPC-H Benchmark. +# + +import argparse +import csv +import time +from collections import OrderedDict +from pyspark.sql import SparkSession +import os +import sys +parent_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), '..')) + +# Construct the path to the utils directory +utils_dir = os.path.join(parent_dir, 'utils') +# Add the utils directory to sys.path +sys.path.insert(0, utils_dir) + +from python_benchmark_reporter.PysparkBenchReport import PysparkBenchReport +from pyspark.sql import DataFrame + +from check import check_version +from nds_h_schema import get_schemas +import re + +check_version() + + +def gen_sql_from_stream(query_stream_file_path): + """Read Spark compatible query stream and split them one by one + + Args: + query_stream_file_path (str): path of query stream generated by TPC-H tool + + Returns: + ordered dict: an ordered dict of {query_name: query content} query pairs + """ + extended_queries = OrderedDict() + with open(query_stream_file_path, 'r') as f: + stream = f.read() + pattern = re.compile(r'-- Template file: (\d+)\n\n(.*?)(?=(?:-- Template file: \d+)|\Z)', re.DOTALL) + + # Find all matches in the content + matches = pattern.findall(stream) + +# Populate the dictionary with template file numbers as keys and queries as values + for match in matches: + template_number = match[0] + if int(template_number) == 15: + new_queries = match[1].split(";") + extended_queries[f'query{template_number}_part1'] = new_queries[0].strip() + extended_queries[f'query{template_number}_part2'] = new_queries[1].strip() + extended_queries[f'query{template_number}_part3'] = new_queries[2].strip() + else: + sql_query = match[1].strip() + extended_queries[f'query{template_number}'] = sql_query + + return extended_queries + +def setup_tables(spark_session, input_prefix, input_format, execution_time_list): + """set up data tables in Spark before running the Power Run queries. + + Args: + spark_session (SparkSession): a SparkSession instance to run queries. + input_prefix (str): path of input data. + input_format (str): type of input data source, e.g. parquet, orc, csv, json. + use_decimal (bool): use decimal type for certain columns when loading data of text type. + execution_time_list ([(str, str, int)]): a list to record query and its execution time. + + Returns: + execution_time_list: a list recording que15ry execution time. + """ + spark_app_id = spark_session.sparkContext.applicationId + # Create TempView for tables + for table_name in get_schemas().keys(): + start = int(time.time() * 1000) + table_path = input_prefix + '/' + table_name + reader = spark_session.read.format(input_format) + if input_format in ['csv', 'json']: + reader = reader.schema(get_schemas()[table_name]) + reader.load(table_path).createOrReplaceTempView(table_name) + end = int(time.time() * 1000) + print("====== Creating TempView for table {} ======".format(table_name)) + print("Time taken: {} millis for table {}".format(end - start, table_name)) + execution_time_list.append( + (spark_app_id, "CreateTempView {}".format(table_name), end - start)) + return execution_time_list + +def ensure_valid_column_names(df: DataFrame): + def is_column_start(char): + return char.isalpha() or char == '_' + + def is_column_part(char): + return char.isalpha() or char.isdigit() or char == '_' + + def is_valid(column_name): + return len(column_name) > 0 and is_column_start(column_name[0]) and all( + [is_column_part(char) for char in column_name[1:]]) + + def make_valid(column_name): + # To simplify: replace all invalid char with '_' + valid_name = '' + if is_column_start(column_name[0]): + valid_name += column_name[0] + else: + valid_name += '_' + for char in column_name[1:]: + if not is_column_part(char): + valid_name += '_' + else: + valid_name += char + return valid_name + + def deduplicate(column_names): + # In some queries like q35, it's possible to get columns with the same name. Append a number + # suffix to resolve this problem. + dedup_col_names = [] + for i,v in enumerate(column_names): + count = column_names.count(v) + index = column_names[:i].count(v) + dedup_col_names.append(v+str(index) if count > 1 else v) + return dedup_col_names + + valid_col_names = [c if is_valid(c) else make_valid(c) for c in df.columns] + dedup_col_names = deduplicate(valid_col_names) + return df.toDF(*dedup_col_names) + +def run_one_query(spark_session, + query, + query_name, + output_path, + output_format): + df = spark_session.sql(query) + if not output_path: + df.collect() + else: + ensure_valid_column_names(df).write.format(output_format).mode('overwrite').save( + output_path + '/' + query_name) + +def run_query_stream(input_prefix, + property_file, + query_dict, + time_log_output_path, + input_format="parquet", + output_path=None, + output_format="parquet"): + """run SQL in Spark and record execution time log. The execution time log is saved as a CSV file + for easy accesibility. TempView Creation time is also recorded. + + Args: + input_prefix (str): path of input data or warehouse if input_format is "iceberg" or hive_external=True. + query_dict (OrderedDict): ordered dict {query_name: query_content} of all TPC-DS queries runnable in Spark + time_log_output_path (str): path of the log that contains query execution time, both local + and HDFS path are supported. + input_format (str, optional): type of input data source. + use_deciaml(bool, optional): use decimal type for certain columns when loading data of text type. + output_path (str, optional): path of query output, optinal. If not specified, collect() + action will be applied to each query. Defaults to None. + output_format (str, optional): query output format, choices are csv, orc, parquet. Defaults + to "parquet". + """ + queries_reports = [] + execution_time_list = [] + total_time_start = time.time() + # check if it's running specific query or Power Run + app_name = "NDS - Power Run" + # Execute Power Run or Specific query in Spark + # build Spark Session + session_builder = SparkSession.builder + if property_file: + spark_properties = load_properties(property_file) + for k,v in spark_properties.items(): + session_builder = session_builder.config(k,v) + spark_session = session_builder.appName( + app_name).getOrCreate() + spark_app_id = spark_session.sparkContext.applicationId + if input_format != 'iceberg' and input_format != 'delta': + execution_time_list = setup_tables(spark_session, input_prefix, input_format, + execution_time_list) + + power_start = int(time.time()) + for query_name, q_content in query_dict.items(): + # show query name in Spark web UI + spark_session.sparkContext.setJobGroup(query_name, query_name) + print("====== Run {} ======".format(query_name)) + q_report = PysparkBenchReport(spark_session, query_name) + summary = q_report.report_on(run_one_query,spark_session, + q_content, + query_name, + output_path, + output_format) + print(f"Time taken: {summary['queryTimes']} millis for {query_name}") + query_times = summary['queryTimes'] + execution_time_list.append((spark_app_id, query_name, query_times[0])) + queries_reports.append(q_report) + power_end = int(time.time()) + power_elapse = int((power_end - power_start)*1000) + spark_session.sparkContext.stop() + total_time_end = time.time() + total_elapse = int((total_time_end - total_time_start)*1000) + print("====== Power Test Time: {} milliseconds ======".format(power_elapse)) + print("====== Total Time: {} milliseconds ======".format(total_elapse)) + execution_time_list.append( + (spark_app_id, "Power Start Time", power_start)) + execution_time_list.append( + (spark_app_id, "Power End Time", power_end)) + execution_time_list.append( + (spark_app_id, "Power Test Time", power_elapse)) + execution_time_list.append( + (spark_app_id, "Total Time", total_elapse)) + + header = ["application_id", "query", "time/milliseconds"] + # print to driver stdout for quick view + print(header) + for row in execution_time_list: + print(row) + # write to local file at driver node + with open(time_log_output_path, 'w', encoding='UTF8') as f: + writer = csv.writer(f) + writer.writerow(header) + writer.writerows(execution_time_list) + # write to csv in cloud environment + # check queries_reports, if there's any task or query failed, exit a non-zero to represent the script failure + exit_code = 0 + for q in queries_reports: + if not q.is_success(): + if exit_code == 0: + print("====== Queries with failure ======") + print("{} status: {}".format(q.summary['query'], q.summary['queryStatus'])) + exit_code = 1 + if exit_code: + print("Above queries failed or completed with failed tasks. Please check the logs for the detailed reason.") + +def load_properties(filename): + myvars = {} + with open(filename) as myfile: + for line in myfile: + name, var = line.partition("=")[::2] + myvars[name.strip()] = var.strip() + return myvars + +if __name__ == "__main__": + parser = parser = argparse.ArgumentParser() + parser.add_argument('input_prefix', + help='text to prepend to every input file path (e.g., "hdfs:///ds-generated-data"). ' + + 'If --hive or if input_format is "iceberg", this argument will be regarded as the value of property ' + + '"spark.sql.catalog.spark_catalog.warehouse". Only default Spark catalog ' + + 'session name "spark_catalog" is supported now, customized catalog is not ' + + 'yet supported. Note if this points to a Delta Lake table, the path must be ' + + 'absolute. Issue: https://github.com/delta-io/delta/issues/555') + parser.add_argument('query_stream_file', + help='query stream file that contains NDS queries in specific order') + parser.add_argument('time_log', + help='path to execution time log, only support local path.', + default="") + parser.add_argument('--input_format', + help='type for input data source, e.g. parquet, orc, json, csv or iceberg, delta. ' + + 'Certain types are not fully supported by GPU reading, please refer to ' + + 'https://github.com/NVIDIA/spark-rapids/blob/branch-24.08/docs/compatibility.md ' + + 'for more details.', + choices=['parquet', 'orc', 'avro', 'csv', 'json', 'iceberg', 'delta'], + default='parquet') + parser.add_argument('--output_prefix', + help='text to prepend to every output file (e.g., "hdfs:///ds-parquet")') + parser.add_argument('--output_format', + help='type of query output', + default='parquet') + parser.add_argument('--property_file', + help='property file for Spark configuration.') + args = parser.parse_args() + query_dict = gen_sql_from_stream(args.query_stream_file) + run_query_stream(args.input_prefix, + args.property_file, + query_dict, + args.time_log, + args.input_format, + args.output_prefix, + args.output_format) diff --git a/nds-h/nds_h_schema.py b/nds-h/nds_h_schema.py new file mode 100644 index 0000000..16cdb28 --- /dev/null +++ b/nds-h/nds_h_schema.py @@ -0,0 +1,147 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +# +# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ----- +# +# Certain portions of the contents of this file are derived from TPC-H version 3.0.1 +# (retrieved from www.tpc.org/tpc_documents_current_versions/current_specifications5.asp). +# Such portions are subject to copyrights held by Transaction Processing Performance Council (“TPC”) +# and licensed under the TPC EULA (a copy of which accompanies this file as “TPC EULA” and is also +# available at http://www.tpc.org/tpc_documents_current_versions/current_specifications5.asp) (the “TPC EULA”). +# +# You may not use this file except in compliance with the TPC EULA. +# DISCLAIMER: Portions of this file is derived from the TPC-H Benchmark and as such any results +# obtained using this file are not comparable to published TPC-H Benchmark results, as the results +# obtained from using this file do not comply with the TPC-H Benchmark. +# + +from pyspark.sql.types import * + + +def get_schemas(): + """get the schemas of all tables. + + Returns: + dict: {table_name: schema} + """ + SCHEMAS = {} + + # The specification states that "Identifier means that the column shall be able to hold any + # key value generated for that column". Some tables have more rows than others so we can + # choose to use different types per table. + identifier_int = IntegerType() + identifier_long = LongType() + + SCHEMAS["part"] = StructType([ + StructField("p_partkey", LongType(), False), + StructField("p_name", StringType(), False), + StructField("p_mfgr", StringType(), False), + StructField("p_brand", StringType(), False), + StructField("p_type", StringType(), False), + StructField("p_size", IntegerType(), False), + StructField("p_container", StringType(), False), + StructField("p_retailprice", DecimalType(11, 2), False), + StructField("p_comment", StringType(), False), + StructField("ignore", StringType(), True) + ]) + + SCHEMAS['supplier'] = StructType([ + StructField("s_suppkey", LongType(), False), + StructField("s_name", StringType(), False), + StructField("s_address", StringType(), False), + StructField("s_nationkey", LongType(), False), + StructField("s_phone", StringType(), False), + StructField("s_acctbal", DecimalType(11, 2), False), + StructField("s_comment", StringType(), False), + StructField("ignore", StringType(), True) + ]) + + SCHEMAS['partsupp'] = StructType([ + StructField("ps_partkey", LongType(), False), + StructField("ps_suppkey", LongType(), False), + StructField("ps_availqty", IntegerType(), False), + StructField("ps_supplycost", DecimalType(11, 2), False), + StructField("ps_comment", StringType(), False), + StructField("ignore", StringType(), True) + ]) + + SCHEMAS['customer'] = StructType([ + StructField("c_custkey", LongType(), False), + StructField("c_name", StringType(), False), + StructField("c_address", StringType(), False), + StructField("c_nationkey", LongType(), False), + StructField("c_phone", StringType(), False), + StructField("c_acctbal", DecimalType(11, 2), False), + StructField("c_mktsegment", StringType(), False), + StructField("c_comment", StringType(), False), + StructField("ignore", StringType(), True) + ]) + + SCHEMAS['orders'] = StructType([ + StructField("o_orderkey", LongType(), False), + StructField("o_custkey", LongType(), False), + StructField("o_orderstatus", StringType(), False), + StructField("o_totalprice", DecimalType(11, 2), False), + StructField("o_orderdate", DateType(), False), + StructField("o_orderpriority", StringType(), False), + StructField("o_clerk", StringType(), False), + StructField("o_shippriority", IntegerType(), False), + StructField("o_comment", StringType(), False), + StructField("ignore", StringType(), True) + ]) + + SCHEMAS['lineitem'] = StructType([ + StructField("l_orderkey", LongType(), False), + StructField("l_partkey", LongType(), False), + StructField("l_suppkey", LongType(), False), + StructField("l_linenumber", IntegerType(), False), + StructField("l_quantity", DecimalType(11, 2), False), + StructField("l_extendedprice", DecimalType(11, 2), False), + StructField("l_discount", DecimalType(11, 2), False), + StructField("l_tax", DecimalType(11, 2), False), + StructField("l_returnflag", StringType(), False), + StructField("l_linestatus", StringType(), False), + StructField("l_shipdate", DateType(), False), + StructField("l_commitdate", DateType(), False), + StructField("l_receiptdate", DateType(), False), + StructField("l_shipinstruct", StringType(), False), + StructField("l_shipmode", StringType(), False), + StructField("l_comment", StringType(), False), + StructField("ignore", StringType(), True) + ]) + + SCHEMAS['nation'] = StructType([ + StructField("n_nationkey", LongType(), False), + StructField("n_name", StringType(), False), + StructField("n_regionkey", LongType(), False), + StructField("n_comment", StringType(), False), + StructField("ignore", StringType(), True) + ]) + + SCHEMAS['region'] = StructType([ + StructField("r_regionkey", LongType(), False), + StructField("r_name", StringType(), False), + StructField("r_comment", StringType(), False), + StructField("ignore", StringType(), True) + ]) + + return SCHEMAS + +if __name__ == "__main__": + # Test code + print(get_schemas()) \ No newline at end of file diff --git a/nds-h/nds_h_transcode.py b/nds-h/nds_h_transcode.py new file mode 100644 index 0000000..cfd7dc0 --- /dev/null +++ b/nds-h/nds_h_transcode.py @@ -0,0 +1,203 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +# +# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ----- +# +# Certain portions of the contents of this file are derived from TPC-H version 3.0.1 +# (retrieved from www.tpc.org/tpc_documents_current_versions/current_specifications5.asp). +# Such portions are subject to copyrights held by Transaction Processing Performance Council (“TPC”) +# and licensed under the TPC EULA (a copy of which accompanies this file as “TPC EULA” and is also +# available at http://www.tpc.org/tpc_documents_current_versions/current_specifications5.asp) (the “TPC EULA”). +# +# You may not use this file except in compliance with the TPC EULA. +# DISCLAIMER: Portions of this file is derived from the TPC-H Benchmark and as such any results +# obtained using this file are not comparable to published TPC-H Benchmark results, as the results +# obtained from using this file do not comply with the TPC-H Benchmark. +# + +import argparse +import timeit +import pyspark + +from datetime import datetime + +from pyspark.sql.types import * +from pyspark.sql.functions import col +from nds_h_schema import * + +# Note the specific partitioning is applied when save the parquet data files. +TABLE_PARTITIONING = { + 'part': 'p_partkey', + 'supplier': 's_suppkey', + 'partsupp': 'ps_partkey', + 'customer': 'c_custkey', + 'orders': 'o_orderkey', + 'nation': 'n_nationkey', + 'region':'r_regionkey' +} + + +def load(session, filename, schema, input_format, delimiter="|", header="false", prefix=""): + data_path = prefix + '/' + filename + if input_format == 'csv': + print("Schema is {}",schema) + df = session.read.option("delimiter", delimiter).option("header", header)\ + .option("encoding", "ISO-8859-1").csv(data_path, schema=schema) + print("Head is {}",df.head()) + return df + elif input_format in ['parquet', 'orc', 'avro', 'json']: + return session.read.format(input_format).load(data_path) + # TODO: all of the output formats should be also supported as input format possibilities + # remains 'iceberg', 'delta' + else: + raise ValueError("Unsupported input format: {}".format(input_format)) + +def store(session, + df, + filename, + output_format, + output_mode, + prefix=""): + """Create Iceberg tables by CTAS + + Args: + session (SparkSession): a working SparkSession instance + df (DataFrame): DataFrame to be serialized into Iceberg table + filename (str): name of the table(file) + output_format (str): parquet, orc or avro + output_mode (str): save modes as defined by "https://spark.apache.org/docs/latest/sql-data-sources-load-save-functions.html#save-modes. + iceberg_write_format (bool): write data into Iceberg tables with specified format + compression (str): compression codec for converted data when saving to disk + prefix (str): output data path when not using Iceberg. + """ + data_path = prefix + '/' + filename + df = df.repartition(200) + writer = df.write + writer = writer.format(output_format).mode(output_mode) + writer.saveAsTable(filename, path=data_path) + +def transcode(args): + """ + Default function that is triggered post argument parsing + + Parameters: + args ( argparse.Namespace ): returns the parsed arguments in the namespace + + Returns: + Nothing + + """ + session_builder = pyspark.sql.SparkSession.builder + session = session_builder.appName(f"NDS-H - transcode - {args.output_format}").getOrCreate() + session.sparkContext.setLogLevel(args.log_level) + results = {} + + schemas = get_schemas() + + trans_tables = schemas + + if args.tables: + for t in args.tables: + if t not in trans_tables.keys() : + raise Exception(f"invalid table name: {t}. Valid tables are: {schemas.keys()}") + trans_tables = {t: trans_tables[t] for t in args.tables if t in trans_tables} + + + start_time = datetime.now() + print(f"Load Test Start Time: {start_time}") + for fn, schema in trans_tables.items(): + results[fn] = timeit.timeit( + lambda: store(session, + load(session, + f"{fn}", + schema, + input_format=args.input_format, + prefix=args.input_prefix), + f"{fn}", + args.output_format, + args.output_mode, + args.output_prefix), + number=1) + + end_time = datetime.now() + delta = (end_time - start_time).total_seconds() + print(f"Load Test Finished at: {end_time}") + print(f"Load Test Time: {delta} seconds") + # format required at TPC-DS Spec 4.3.1 + end_time_formatted = end_time.strftime("%m%d%H%M%S%f")[:-5] + print(f"RNGSEED used :{end_time_formatted}") + + report_text = "" + report_text += f"Load Test Time: {delta} seconds\n" + report_text += f"Load Test Finished at: {end_time}\n" + report_text += f"RNGSEED used: {end_time_formatted}\n" + + for table, duration in results.items(): + report_text += "Time to convert '%s' was %.04fs\n" % (table, duration) + + report_text += "\n\n\nSpark configuration follows:\n\n" + + with open(args.report_file, "w") as report: + report.write(report_text) + print(report_text) + + for conf in session.sparkContext.getConf().getAll(): + report.write(str(conf) + "\n") + print(conf) + + +if __name__ == "__main__": + parser = parser = argparse.ArgumentParser() + parser.add_argument( + 'input_prefix', + help='input folder') + parser.add_argument( + 'output_prefix', + help='output folder') + parser.add_argument( + 'report_file', + help='location to store a performance report(local)') + parser.add_argument( + '--output_mode', + choices=['overwrite', 'append', 'ignore', 'error', 'errorifexists'], + help="save modes as defined by " + + "https://spark.apache.org/docs/latest/sql-data-sources-load-save-functions.html#save-modes." + + "default value is errorifexists, which is the Spark default behavior.", + default="errorifexists") + parser.add_argument( + '--input_format', + choices=['csv', 'parquet', 'orc', 'avro', 'json'], + default='csv', + help='input data format to be converted. default value is csv.' + ) + parser.add_argument( + '--output_format', + choices=['parquet', 'orc', 'avro', 'json', 'iceberg', 'delta'], + default='parquet', + help="output data format when converting CSV data sources." + ) + parser.add_argument( + '--tables', + type=lambda s: s.split(','), + help="specify table names by a comma separated string. e.g. 'catalog_page,catalog_sales'.") + parser.add_argument( + '--log_level', + help='set log level for Spark driver log. Valid log levels include: ALL, DEBUG, ERROR, FATAL, INFO, OFF, TRACE, WARN(default: INFO)', + default="INFO") + args = parser.parse_args() + transcode(args) diff --git a/nds-h/tpch-gen/Makefile b/nds-h/tpch-gen/Makefile new file mode 100644 index 0000000..797aa7b --- /dev/null +++ b/nds-h/tpch-gen/Makefile @@ -0,0 +1,65 @@ +# +# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +all: check-tpch-env prepare-target copy-dbgen modify-makefile modify-tpc-h build-dbgen + +check-tpch-env: +ifndef TPCH_HOME + $(error "TPCH_HOME not defined, please set TPCH_HOME environment variable to your TPCH Tool directory") +endif + +prepare-target: + rm -Rf target + mkdir -p target/ + +copy-dbgen: + #Copying all patches to the current query folder + cp patches/template.patch "$(TPCH_HOME)/dbgen/queries" + # This is required to ensure similar line ending semantics bw patch + # file and the sql files + cd "$(TPCH_HOME)/dbgen/queries"; dos2unix *.sql + cd "$(TPCH_HOME)/dbgen/queries"; dos2unix *.patch + # Unapply any patch if already done + -cd "$(TPCH_HOME)/dbgen/queries"; cat *.patch | patch -R -p1 -N + # apply patches to both source code and templates + cd "$(TPCH_HOME)/dbgen/queries" && cat *.patch | patch -p1 + cp -r "$(TPCH_HOME)/dbgen" target/ + +modify-makefile: + # Create makefile from the template suit + cp target/dbgen/makefile.suite target/dbgen/Makefile + sed -i '103s/$$/ gcc/' target/dbgen/Makefile + sed -i '109s/$$/ SPARK/' target/dbgen/Makefile + sed -i '110s/$$/ LINUX/' target/dbgen/Makefile + sed -i '111s/$$/ TPCH/' target/dbgen/Makefile + sed -i '172i fprintf(ofp, "\\n-- Template file: %s\\n", qtag);' target/dbgen/qgen.c + +modify-tpc-h: + # Enter information for the SPARK replacement variables + sed -i '115a\ + #ifdef SPARK\ + #define GEN_QUERY_PLAN ""\ + #define START_TRAN ""\ + #define END_TRAN ""\ + #define SET_OUTPUT ""\ + #define SET_ROWCOUNT "LIMIT %d"\ + #define SET_DBASE ""\ + #endif' target/dbgen/tpcd.h + +build-dbgen: + # Build it + cd target/dbgen ; make clean; make 2>/dev/null \ No newline at end of file diff --git a/nds-h/tpch-gen/patches/template.patch b/nds-h/tpch-gen/patches/template.patch new file mode 100644 index 0000000..deaed2e --- /dev/null +++ b/nds-h/tpch-gen/patches/template.patch @@ -0,0 +1,284 @@ +diff --git a/1.sql b/1.sql +index 407417e..12815a0 100644 +--- a/1.sql ++++ b/1.sql +@@ -18,11 +18,11 @@ select + from + lineitem + where +- l_shipdate <= date '1998-12-01' - interval ':1' day (3) ++ l_shipdate <= date '1998-12-01' - interval ':1' day + group by + l_returnflag, + l_linestatus + order by + l_returnflag, +- l_linestatus; +-:n -1 ++ l_linestatus ++; +diff --git a/10.sql b/10.sql +index 2c8810c..55d13eb 100644 +--- a/10.sql ++++ b/10.sql +@@ -34,5 +34,6 @@ group by + c_address, + c_comment + order by +- revenue desc; ++ revenue desc + :n 20 ++; +diff --git a/11.sql b/11.sql +index 885185c..c0c6372 100644 +--- a/11.sql ++++ b/11.sql +@@ -30,5 +30,5 @@ group by + and n_name = ':1' + ) + order by +- value desc; +-:n -1 ++ value desc ++; +diff --git a/12.sql b/12.sql +index 0eb4aec..7b41659 100644 +--- a/12.sql ++++ b/12.sql +@@ -31,5 +31,5 @@ where + group by + l_shipmode + order by +- l_shipmode; +-:n -1 ++ l_shipmode ++; +diff --git a/13.sql b/13.sql +index 90d0750..2d85977 100644 +--- a/13.sql ++++ b/13.sql +@@ -23,5 +23,5 @@ group by + c_count + order by + custdist desc, +- c_count desc; +-:n -1 ++ c_count desc ++; +diff --git a/14.sql b/14.sql +index b5e45e3..eb4815a 100644 +--- a/14.sql ++++ b/14.sql +@@ -16,5 +16,5 @@ from + where + l_partkey = p_partkey + and l_shipdate >= date ':1' +- and l_shipdate < date ':1' + interval '1' month; +-:n -1 ++ and l_shipdate < date ':1' + interval '1' month ++; +diff --git a/15.sql b/15.sql +index 8e7e974..a966331 100644 +--- a/15.sql ++++ b/15.sql +@@ -3,7 +3,7 @@ + -- Functional Query Definition + -- Approved February 1998 + :x +-create view revenue:s (supplier_no, total_revenue) as ++create temp view revenue:s (supplier_no, total_revenue) as + select + l_suppkey, + sum(l_extendedprice * (1 - l_discount)) +@@ -36,5 +36,5 @@ where + order by + s_suppkey; + +-drop view revenue:s; +-:n -1 ++drop view revenue:s ++; +diff --git a/16.sql b/16.sql +index 0dabfb5..bc347b0 100644 +--- a/16.sql ++++ b/16.sql +@@ -33,5 +33,5 @@ order by + supplier_cnt desc, + p_brand, + p_type, +- p_size; +-:n -1 ++ p_size ++; +diff --git a/17.sql b/17.sql +index 3968f54..c4f9373 100644 +--- a/17.sql ++++ b/17.sql +@@ -20,5 +20,5 @@ where + lineitem + where + l_partkey = p_partkey +- ); +-:n -1 ++ ) ++; +diff --git a/18.sql b/18.sql +index cce174f..6b38325 100644 +--- a/18.sql ++++ b/18.sql +@@ -35,5 +35,6 @@ group by + o_totalprice + order by + o_totalprice desc, +- o_orderdate; ++ o_orderdate + :n 100 ++; +diff --git a/19.sql b/19.sql +index 8b7b915..5c32dcf 100644 +--- a/19.sql ++++ b/19.sql +@@ -38,5 +38,5 @@ where + and p_size between 1 and 15 + and l_shipmode in ('AIR', 'AIR REG') + and l_shipinstruct = 'DELIVER IN PERSON' +- ); +-:n -1 ++ ) ++; +diff --git a/2.sql b/2.sql +index 2308318..572a927 100644 +--- a/2.sql ++++ b/2.sql +@@ -46,5 +46,6 @@ order by + s_acctbal desc, + n_name, + s_name, +- p_partkey; ++ p_partkey + :n 100 ++; +\ No newline at end of file +diff --git a/20.sql b/20.sql +index 1d358bf..1323de7 100644 +--- a/20.sql ++++ b/20.sql +@@ -40,5 +40,5 @@ where + and s_nationkey = n_nationkey + and n_name = ':3' + order by +- s_name; +-:n -1 ++ s_name ++; +diff --git a/21.sql b/21.sql +index 38187dc..671435d 100644 +--- a/21.sql ++++ b/21.sql +@@ -42,5 +42,6 @@ group by + s_name + order by + numwait desc, +- s_name; ++ s_name + :n 100 ++; +diff --git a/22.sql b/22.sql +index 9a2aeb7..88a377e 100644 +--- a/22.sql ++++ b/22.sql +@@ -40,5 +40,5 @@ from + group by + cntrycode + order by +- cntrycode; +-:n -1 ++ cntrycode ++; +diff --git a/3.sql b/3.sql +index f054d31..46f33c6 100644 +--- a/3.sql ++++ b/3.sql +@@ -25,5 +25,6 @@ group by + o_shippriority + order by + revenue desc, +- o_orderdate; ++ o_orderdate + :n 10 ++; +diff --git a/4.sql b/4.sql +index f068f36..ed3ebca 100644 +--- a/4.sql ++++ b/4.sql +@@ -24,5 +24,5 @@ where + group by + o_orderpriority + order by +- o_orderpriority; +-:n -1 ++ o_orderpriority ++; +diff --git a/5.sql b/5.sql +index 998913d..ec16737 100644 +--- a/5.sql ++++ b/5.sql +@@ -27,5 +27,5 @@ where + group by + n_name + order by +- revenue desc; +-:n -1 ++ revenue desc ++; +diff --git a/6.sql b/6.sql +index 59a6883..3bf726d 100644 +--- a/6.sql ++++ b/6.sql +@@ -12,5 +12,5 @@ where + l_shipdate >= date ':1' + and l_shipdate < date ':1' + interval '1' year + and l_discount between :2 - 0.01 and :2 + 0.01 +- and l_quantity < :3; +-:n -1 ++ and l_quantity < :3 ++; +diff --git a/7.sql b/7.sql +index 26eafad..81ba730 100644 +--- a/7.sql ++++ b/7.sql +@@ -42,5 +42,5 @@ group by + order by + supp_nation, + cust_nation, +- l_year; +-:n -1 ++ l_year ++; +diff --git a/8.sql b/8.sql +index 977d24e..9b67466 100644 +--- a/8.sql ++++ b/8.sql +@@ -40,5 +40,5 @@ from + group by + o_year + order by +- o_year; +-:n -1 ++ o_year ++; +diff --git a/9.sql b/9.sql +index b262db4..1e7aa9e 100644 +--- a/9.sql ++++ b/9.sql +@@ -35,5 +35,5 @@ group by + o_year + order by + nation, +- o_year desc; +-:n -1 ++ o_year desc ++; +-- +2.34.1 + diff --git a/nds/README.md b/nds/README.md index c24e3cf..94db74b 100644 --- a/nds/README.md +++ b/nds/README.md @@ -25,8 +25,13 @@ You may not use NDS except in compliance with the Apache License, Version 2.0 an sudo locale-gen en_US.UTF-8 sudo apt install openjdk-8-jdk-headless gcc make flex bison byacc maven ``` +3. Install and set up SPARK. + - Download latest distro from [here](https://spark.apache.org/downloads.html) + - Preferably >= 3.4 + - Find and note SPARK_HOME ( /DOWNLOAD/LOCATION/spark-<3.4.1>-bin-hadoop3 ) -3. TPC-DS Tools + +4. TPC-DS Tools User must download TPC-DS Tools from [official TPC website](https://www.tpc.org/tpc_documents_current_versions/current_specifications5.asp). The tool will be downloaded as a zip package with a random guid string prefix. After unzipping it, a folder called `DSGen-software-code-3.2.0rc1` will be seen. diff --git a/shared/base.template b/shared/base.template new file mode 100644 index 0000000..b758925 --- /dev/null +++ b/shared/base.template @@ -0,0 +1,37 @@ +# +# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# This is the base template file for the common information about test environment +# including the information about Spark, cluster configuration and the Jar files, +# which are required in the other templates. +# We'll source this base file in all the other templates so that we just need to update +# here once instead of updating in all the templates. +# If you have any different configuration in a specific template, you can override +# the variables in that template. + +export SPARK_HOME=${SPARK_HOME:-/usr/lib/spark} +export SPARK_MASTER=${SPARK_MASTER:-local[*]} +export DRIVER_MEMORY=${DRIVER_MEMORY:-10G} +export EXECUTOR_CORES=${EXECUTOR_CORES:-12} +export NUM_EXECUTORS=${NUM_EXECUTORS:-8} +export EXECUTOR_MEMORY=${EXECUTOR_MEMORY:-16G} + +# The NDS listener jar which is built in jvm_listener directory. +export NDS_LISTENER_JAR=${NDS_LISTENER_JAR:-../utils/jvm_listener/target/benchmark-listener-1.0-SNAPSHOT.jar} +# The spark-rapids jar which is required when running on GPU +export SPARK_RAPIDS_PLUGIN_JAR=${SPARK_RAPIDS_PLUGIN_JAR:-rapids-4-spark_2.12-24.04.0-cuda11.jar} +export PYTHONPATH=$SPARK_HOME/python:`echo $SPARK_HOME/python/lib/py4j-*.zip` diff --git a/shared/convert_submit_cpu.template b/shared/convert_submit_cpu.template new file mode 100644 index 0000000..3d06e9a --- /dev/null +++ b/shared/convert_submit_cpu.template @@ -0,0 +1,28 @@ +# +# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +source base.template +export SHUFFLE_PARTITIONS=${SHUFFLE_PARTITIONS:-200} + +export SPARK_CONF=("--master" "${SPARK_MASTER}" + "--deploy-mode" "client" + "--conf" "spark.driver.memory=${DRIVER_MEMORY}" + "--conf" "spark.executor.cores=${EXECUTOR_CORES}" + "--conf" "spark.executor.instances=${NUM_EXECUTORS}" + "--conf" "spark.executor.memory=${EXECUTOR_MEMORY}" + "--conf" "spark.sql.shuffle.partitions=${SHUFFLE_PARTITIONS}" + "--conf" "spark.sql.legacy.charVarcharAsString=true") diff --git a/shared/convert_submit_gpu.template b/shared/convert_submit_gpu.template new file mode 100644 index 0000000..bb56065 --- /dev/null +++ b/shared/convert_submit_gpu.template @@ -0,0 +1,42 @@ +# +# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +source base.template +export CONCURRENT_GPU_TASKS=${CONCURRENT_GPU_TASKS:-2} +export SHUFFLE_PARTITIONS=${SHUFFLE_PARTITIONS:-200} + +export SPARK_CONF=("--master" "${SPARK_MASTER}" + "--deploy-mode" "client" + "--conf" "spark.driver.memory=${DRIVER_MEMORY}" + "--conf" "spark.executor.cores=${EXECUTOR_CORES}" + "--conf" "spark.executor.instances=${NUM_EXECUTORS}" + "--conf" "spark.executor.memory=${EXECUTOR_MEMORY}" + "--conf" "spark.sql.shuffle.partitions=${SHUFFLE_PARTITIONS}" + "--conf" "spark.executor.resource.gpu.amount=1" + "--conf" "spark.executor.resource.gpu.discoveryScript=./getGpusResources.sh" + "--conf" "spark.plugins=com.nvidia.spark.SQLPlugin" + "--conf" "spark.rapids.memory.pinnedPool.size=8g" + "--conf" "spark.rapids.sql.concurrentGpuTasks=${CONCURRENT_GPU_TASKS}" + "--conf" "spark.rapids.sql.explain=NOT_ON_GPU" + "--conf" "spark.rapids.sql.incompatibleOps.enabled=true" + "--conf" "spark.rapids.sql.variableFloatAgg.enabled=true" + "--conf" "spark.sql.files.maxPartitionBytes=2g" + "--conf" "spark.sql.legacy.parquet.datetimeRebaseModeInWrite=CORRECTED" + "--conf" "spark.task.resource.gpu.amount=0.05" + "--conf" "spark.sql.legacy.charVarcharAsString=true" + "--files" "/opt/spark/getGpusResources.sh" + "--jars" "$SPARK_RAPIDS_PLUGIN_JAR") diff --git a/shared/power_run_cpu.template b/shared/power_run_cpu.template new file mode 100644 index 0000000..c513f9f --- /dev/null +++ b/shared/power_run_cpu.template @@ -0,0 +1,32 @@ +# +# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +source base.template +export SHUFFLE_PARTITIONS=${SHUFFLE_PARTITIONS:-200} + +export SPARK_CONF=("--master" "${SPARK_MASTER}" + "--deploy-mode" "client" + "--conf" "spark.driver.memory=${DRIVER_MEMORY}" + "--conf" "spark.executor.cores=${EXECUTOR_CORES}" + "--conf" "spark.executor.instances=${NUM_EXECUTORS}" + "--conf" "spark.executor.memory=${EXECUTOR_MEMORY}" + "--conf" "spark.sql.shuffle.partitions=${SHUFFLE_PARTITIONS}" + "--conf" "spark.scheduler.minRegisteredResourcesRatio=1.0" + "--conf" "spark.sql.adaptive.enabled=true" + "--conf" "spark.sql.broadcastTimeout=1200" + "--conf" "spark.dynamicAllocation.enabled=false" + "--jars" "$NDS_LISTENER_JAR") diff --git a/shared/power_run_gpu.template b/shared/power_run_gpu.template new file mode 100644 index 0000000..0fa405d --- /dev/null +++ b/shared/power_run_gpu.template @@ -0,0 +1,40 @@ +# +# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +source base.template +export CONCURRENT_GPU_TASKS=${CONCURRENT_GPU_TASKS:-2} +export SHUFFLE_PARTITIONS=${SHUFFLE_PARTITIONS:-200} + +export SPARK_CONF=("--master" "${SPARK_MASTER}" + "--deploy-mode" "client" + "--conf" "spark.driver.maxResultSize=2GB" + "--conf" "spark.driver.memory=${DRIVER_MEMORY}" + "--conf" "spark.executor.cores=${EXECUTOR_CORES}" + "--conf" "spark.executor.instances=${NUM_EXECUTORS}" + "--conf" "spark.executor.memory=${EXECUTOR_MEMORY}" + "--conf" "spark.sql.shuffle.partitions=${SHUFFLE_PARTITIONS}" + "--conf" "spark.sql.files.maxPartitionBytes=2gb" + "--conf" "spark.sql.adaptive.enabled=true" + "--conf" "spark.executor.resource.gpu.amount=1" + "--conf" "spark.executor.resource.gpu.discoveryScript=./getGpusResources.sh" + "--conf" "spark.task.resource.gpu.amount=0.0625" + "--conf" "spark.plugins=com.nvidia.spark.SQLPlugin" + "--conf" "spark.rapids.memory.host.spillStorageSize=32G" + "--conf" "spark.rapids.memory.pinnedPool.size=8g" + "--conf" "spark.rapids.sql.concurrentGpuTasks=${CONCURRENT_GPU_TASKS}" + "--files" "$SPARK_HOME/examples/src/main/scripts/getGpusResources.sh" + "--jars" "$SPARK_RAPIDS_PLUGIN_JAR,$NDS_LISTENER_JAR") diff --git a/shared/spark-submit-template b/shared/spark-submit-template new file mode 100755 index 0000000..21120ea --- /dev/null +++ b/shared/spark-submit-template @@ -0,0 +1,33 @@ +#!/bin/bash +# +# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +set -ex +# e.g. +# ./spark-submit-template power_run_gpu.template nds_power.py \ +# local_data_parquet/ +# ./nds_query_streams/query_0.sql \ +# time.csv + +# the first argument must be the template file +source "$1" +# build spark-submit command +MORE_ARGS=("${@:2}") +CMD=("$SPARK_HOME/bin/spark-submit") +CMD+=("${SPARK_CONF[@]}") +CMD+=("${MORE_ARGS[@]}") +# submit +"${CMD[@]}" diff --git a/utils/check.py b/utils/check.py new file mode 100644 index 0000000..db01e68 --- /dev/null +++ b/utils/check.py @@ -0,0 +1,173 @@ +#!/usr/bin/env python3 +# +# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ----- +# +# Certain portions of the contents of this file are derived from TPC-DS version 3.2.0 +# (retrieved from www.tpc.org/tpc_documents_current_versions/current_specifications5.asp). +# Such portions are subject to copyrights held by Transaction Processing Performance Council (“TPC”) +# and licensed under the TPC EULA (a copy of which accompanies this file as “TPC EULA” and is also +# available at http://www.tpc.org/tpc_documents_current_versions/current_specifications5.asp) (the “TPC EULA”). +# +# You may not use this file except in compliance with the TPC EULA. +# DISCLAIMER: Portions of this file is derived from the TPC-DS Benchmark and as such any results +# obtained using this file are not comparable to published TPC-DS Benchmark results, as the results +# obtained from using this file do not comply with the TPC-DS Benchmark. +# + +import argparse +import os +import sys +from pathlib import Path + + +def check_version(): + req_ver = (3, 6) + cur_ver = sys.version_info + if cur_ver < req_ver: + raise Exception('Minimum required Python version is 3.6, but current python version is {}.' + .format(str(cur_ver.major) + '.' + str(cur_ver.minor)) + + ' Please use proper Python version') + + +def check_build_nds_h(): + """check jar and tpcds executable + + Raises: + Exception: the build is not done or broken + + Returns: + PosixPath, PosixPath: path of jar and dsdgen executable + """ + # Check if necessary executable or jars are built. + # we assume user won't move this script. + src_dir = Path(__file__).parent.parent.absolute() + tool_path = list(Path(src_dir / 'nds-h/tpch-gen/target/dbgen').rglob("dbgen")) + print(tool_path) + if tool_path == []: + raise Exception('dbgen executable is ' + + 'not found in `target` folder.' + + 'Please refer to README document and build this project first.') + return tool_path[0] + +def check_build_nds(): + """check jar and tpcds executable + + Raises: + Exception: the build is not done or broken + + Returns: + PosixPath, PosixPath: path of jar and dsdgen executable + """ + # Check if necessary executable or jars are built. + # we assume user won't move this script. + src_dir = Path(__file__).parent.parent.absolute() + jar_path = list( + Path(src_dir / 'nds/tpcds-gen/target').rglob("tpcds-gen-*.jar")) + tool_path = list(Path(src_dir / 'nds/tpcds-gen/target/tools').rglob("dsdgen")) + if jar_path == [] or tool_path == []: + raise Exception('Target jar file is not found in `target` folder or dsdgen executable is ' + + 'not found in `target/tools` folder.' + + 'Please refer to README document and build this project first.') + return jar_path[0], tool_path[0] + + + +def get_abs_path(input_path): + """receive a user input path and return absolute path of it. + + Args: + input_path (str): user's input path + + Returns: + str: if the input is absolute, return it; if it's relative path, return the absolute path of + it. + """ + if Path(input_path).is_absolute(): + # it's absolute path + output_path = input_path + else: + # it's relative path where this script is executed + output_path = os.getcwd() + '/' + input_path + return output_path + + +def valid_range(range, parallel): + """check the range validation + + Args: + range (str): a range specified for a range data generation, e.g. "1,10" + parallel (str): string type number for parallelism in TPC-DS data generation, e.g. "20" + + Raises: + Exception: error message for invalid range input. + """ + if len(range.split(',')) != 2: + msg = 'Invalid range: please specify a range with a comma between start and end. e.g., "1,10".' + raise Exception(msg) + range_start = int(range.split(',')[0]) + range_end = int(range.split(',')[1]) + if range_start < 1 or range_start > range_end or range_end > int(parallel): + msg = 'Please provide correct child range: 1 <= range_start <= range_end <= parallel' + raise Exception(msg) + return range_start, range_end + + +def parallel_value_type(p): + """helper function to check parallel valuie + + Args: + p (str): parallel value + + Raises: + argparse.ArgumentTypeError: ArgumentTypeError exception + + Returns: + str: parallel in string + """ + if int(p) < 2: + raise argparse.ArgumentTypeError("PARALLEL must be >= 2") + return p + + +def get_dir_size(start_path): + total_size = 0 + for dirpath, dirnames, filenames in os.walk(start_path): + for f in filenames: + fp = os.path.join(dirpath, f) + # skip if it is symbolic link + if not os.path.islink(fp): + total_size += os.path.getsize(fp) + return total_size + +def check_json_summary_folder(json_summary_folder): + if json_summary_folder: + # prepare a folder to save json summaries of query results + if not os.path.exists(json_summary_folder): + os.makedirs(json_summary_folder) + else: + if os.listdir(json_summary_folder): + raise Exception(f"json_summary_folder {json_summary_folder} is not empty. " + + "There may be already some json files there. Please clean the folder " + + "or specify another one.") + +def check_query_subset_exists(query_dict, subset_list): + """check if the query subset exists in the query dictionary""" + for q in subset_list: + if q not in query_dict.keys(): + raise Exception(f"Query {q} is not in the query dictionary. Please check the query subset.") + return True diff --git a/utils/jvm_listener/pom.xml b/utils/jvm_listener/pom.xml new file mode 100644 index 0000000..41843fd --- /dev/null +++ b/utils/jvm_listener/pom.xml @@ -0,0 +1,74 @@ + + + + 4.0.0 + + com.nvidia + benchmark-listener + jar + 1.0-SNAPSHOT + + + 8 + 8 + + + + + org.apache.spark + spark-core_2.12 + 3.1.2 + + + + org.apache.spark + spark-sql_2.12 + 3.1.2 + provided + + + + + src/main/scala/ + + + org.apache.maven.plugins + maven-compiler-plugin + 3.8.1 + + 1.8 + 1.8 + + + + org.scala-tools + maven-scala-plugin + 2.15.2 + + + + compile + testCompile + + + + + + + + diff --git a/utils/jvm_listener/src/main/scala/com/nvidia/spark/rapids/listener/Listener.scala b/utils/jvm_listener/src/main/scala/com/nvidia/spark/rapids/listener/Listener.scala new file mode 100644 index 0000000..113f2db --- /dev/null +++ b/utils/jvm_listener/src/main/scala/com/nvidia/spark/rapids/listener/Listener.scala @@ -0,0 +1,24 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids.listener + +trait Listener { + /* Listener interface to be implemented at Python side + */ + def notify(x: Any): Any +} diff --git a/utils/jvm_listener/src/main/scala/com/nvidia/spark/rapids/listener/Manager.scala b/utils/jvm_listener/src/main/scala/com/nvidia/spark/rapids/listener/Manager.scala new file mode 100644 index 0000000..13a13e6 --- /dev/null +++ b/utils/jvm_listener/src/main/scala/com/nvidia/spark/rapids/listener/Manager.scala @@ -0,0 +1,66 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids.listener + +import org.apache.spark.SparkContext + +object Manager { + /* Manager class to manage all extra customized listeners. + */ + private var listeners: Map[String, Listener] = Map() + private val spark_listener = new TaskFailureListener() + private var isRegistered = false + + def register(listener: Listener): String = { + /* Note this register method has nothing to do with SparkContext.addSparkListener method. + * This method is only to provide an interface to developers to have a better control over + * all customized listeners. + */ + this.synchronized { + // We register to the spark listener when the first listener is registered. + registerSparkListener() + val uuid = java.util.UUID.randomUUID().toString + listeners = listeners + (uuid -> listener) + uuid + } + } + + def unregister(uuid: String) = { + this.synchronized { + listeners = listeners - uuid + } + } + + def notifyAll(message: String): Unit = { + for { (_, listener) <- listeners } listener.notify(message) + } + + def registerSparkListener() : Unit = { + if (!isRegistered) { + SparkContext.getOrCreate().addSparkListener(spark_listener) + isRegistered = true + } + } + + def unregisterSparkListener() : Unit = { + if (isRegistered) { + SparkContext.getOrCreate().removeSparkListener(spark_listener) + isRegistered = false + } + } +} diff --git a/utils/jvm_listener/src/main/scala/com/nvidia/spark/rapids/listener/TaskFailureListener.scala b/utils/jvm_listener/src/main/scala/com/nvidia/spark/rapids/listener/TaskFailureListener.scala new file mode 100644 index 0000000..791be72 --- /dev/null +++ b/utils/jvm_listener/src/main/scala/com/nvidia/spark/rapids/listener/TaskFailureListener.scala @@ -0,0 +1,37 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids.listener + +import org.apache.spark.{Success, TaskEndReason} +import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd} +import scala.collection.mutable.ListBuffer + + +/* A simple listener which captures SparkListenerTaskEnd, + * extracts "reason" of the task. If the reason is not "Success", + * send this reason to python side. + */ +class TaskFailureListener extends SparkListener { + override def onTaskEnd(taskEnd: SparkListenerTaskEnd) { + taskEnd.reason match { + case Success => + case reason => Manager.notifyAll(reason.toString) + } + super.onTaskEnd(taskEnd) + } +} diff --git a/utils/properties/aqe-on.properties b/utils/properties/aqe-on.properties new file mode 100644 index 0000000..42c8dd1 --- /dev/null +++ b/utils/properties/aqe-on.properties @@ -0,0 +1,17 @@ +# +# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +spark.sql.adaptive.enabled=true \ No newline at end of file diff --git a/utils/python_benchmark_reporter/PysparkBenchReport.py b/utils/python_benchmark_reporter/PysparkBenchReport.py new file mode 100644 index 0000000..c12a468 --- /dev/null +++ b/utils/python_benchmark_reporter/PysparkBenchReport.py @@ -0,0 +1,126 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +# +# SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ----- +# +# Certain portions of the contents of this file are derived from TPC-H version 3.2.0 +# (retrieved from www.tpc.org/tpc_documents_current_versions/current_specifications5.asp). +# Such portions are subject to copyrights held by Transaction Processing Performance Council (“TPC”) +# and licensed under the TPC EULA (a copy of which accompanies this file as “TPC EULA” and is also +# available at http://www.tpc.org/tpc_documents_current_versions/current_specifications5.asp) (the “TPC EULA”). +# +# You may not use this file except in compliance with the TPC EULA. +# DISCLAIMER: Portions of this file is derived from the TPC-H Benchmark and as such any results +# obtained using this file are not comparable to published TPC-H Benchmark results, as the results +# obtained from using this file do not comply with the TPC-H Benchmark. +# + +import json +import os +import time +import traceback +from typing import Callable +from pyspark.sql import SparkSession +from python_benchmark_reporter.PythonListener import PythonListener + +class PysparkBenchReport: + """Class to generate json summary report for a benchmark + """ + def __init__(self, spark_session: SparkSession, query_name) -> None: + self.spark_session = spark_session + self.summary = { + 'env': { + 'envVars': {}, + 'sparkConf': {}, + 'sparkVersion': None + }, + 'queryStatus': [], + 'exceptions': [], + 'startTime': None, + 'queryTimes': [], + 'query': query_name, + } + + def report_on(self, fn: Callable, *args): + """Record a function for its running environment, running status etc. and exclude sentive + information like tokens, secret and password Generate summary in dict format for it. + + Args: + fn (Callable): a function to be recorded + + Returns: + dict: summary of the fn + """ + spark_conf = dict(self.spark_session.sparkContext._conf.getAll()) + env_vars = dict(os.environ) + redacted = ["TOKEN", "SECRET", "PASSWORD"] + filtered_env_vars = dict((k, env_vars[k]) for k in env_vars.keys() if not (k in redacted)) + self.summary['env']['envVars'] = filtered_env_vars + self.summary['env']['sparkConf'] = spark_conf + self.summary['env']['sparkVersion'] = self.spark_session.version + listener = None + try: + listener = PythonListener() + listener.register() + except TypeError as e: + print("Not found com.nvidia.spark.rapids.listener.Manager", str(e)) + listener = None + if listener is not None: + print("TaskFailureListener is registered.") + try: + start_time = int(time.time() * 1000) + fn(*args) + end_time = int(time.time() * 1000) + if listener and len(listener.failures) != 0: + self.summary['queryStatus'].append("CompletedWithTaskFailures") + else: + self.summary['queryStatus'].append("Completed") + except Exception as e: + # print the exception to ease debugging + print('ERROR BEGIN') + print(e) + traceback.print_tb(e.__traceback__) + print('ERROR END') + end_time = int(time.time() * 1000) + self.summary['queryStatus'].append("Failed") + self.summary['exceptions'].append(str(e)) + finally: + self.summary['startTime'] = start_time + self.summary['queryTimes'].append(end_time - start_time) + if listener is not None: + listener.unregister() + return self.summary + + def write_summary(self, prefix=""): + """_summary_ + + Args: + query_name (str): name of the query + prefix (str, optional): prefix for the output json summary file. Defaults to "". + """ + # Power BI side is retrieving some information from the summary file name, so keep this file + # name format for pipeline compatibility + filename = prefix + '-' + self.summary['query'] + '-' +str(self.summary['startTime']) + '.json' + self.summary['filename'] = filename + with open(filename, "w") as f: + json.dump(self.summary, f, indent=2) + + def is_success(self): + """Check if the query succeeded, queryStatus == Completed + """ + return self.summary['queryStatus'][0] == 'Completed' diff --git a/utils/python_benchmark_reporter/PythonListener.py b/utils/python_benchmark_reporter/PythonListener.py new file mode 100644 index 0000000..16210fb --- /dev/null +++ b/utils/python_benchmark_reporter/PythonListener.py @@ -0,0 +1,61 @@ +#!/usr/bin/env python3 +# +# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +from pyspark import SparkContext +from pyspark.java_gateway import ensure_callback_server_started + +class PythonListener(object): + package = "com.nvidia.spark.rapids.listener" + + @staticmethod + def get_manager(): + jvm = SparkContext.getOrCreate()._jvm + manager = getattr(jvm, "{}.{}".format(PythonListener.package, "Manager")) + return manager + + def __init__(self): + self.uuid = None + self.failures = [] + + def notify(self, obj): + """This method is required by Scala Listener interface + we defined above. + """ + self.failures.append(obj) + + def register(self): + ensure_callback_server_started(gw = SparkContext.getOrCreate()._gateway) + manager = PythonListener.get_manager() + self.uuid = manager.register(self) + return self.uuid + + def unregister(self): + manager = PythonListener.get_manager() + manager.unregister(self.uuid) + self.uuid = None + + # should call after register + def register_spark_listener(self): + manager = PythonListener.get_manager() + manager.registerSparkListener() + + def unregister_spark_listener(self): + manager = PythonListener.get_manager() + manager.unregisterSparkListener() + + class Java: + implements = ["com.nvidia.spark.rapids.listener.Listener"] diff --git a/utils/python_benchmark_reporter/__init__.py b/utils/python_benchmark_reporter/__init__.py new file mode 100644 index 0000000..a195f9e --- /dev/null +++ b/utils/python_benchmark_reporter/__init__.py @@ -0,0 +1,30 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +# +# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ----- +# +# Certain portions of the contents of this file are derived from TPC-H version 3.2.0 +# (retrieved from www.tpc.org/tpc_documents_current_versions/current_specifications5.asp). +# Such portions are subject to copyrights held by Transaction Processing Performance Council (“TPC”) +# and licensed under the TPC EULA (a copy of which accompanies this file as “TPC EULA” and is also +# available at http://www.tpc.org/tpc_documents_current_versions/current_specifications5.asp) (the “TPC EULA”). +# +# You may not use this file except in compliance with the TPC EULA. +# DISCLAIMER: Portions of this file is derived from the TPC-H Benchmark and as such any results +# obtained using this file are not comparable to published TPC-H Benchmark results, as the results +# obtained from using this file do not comply with the TPC-H Benchmark. \ No newline at end of file