Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tpc-H feature branch #187

Merged
merged 11 commits into from
Jun 5, 2024
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ A repo for Spark related benchmark sets and utilities using the
[RAPIDS Accelerator For Apache Spark](https://github.com/NVIDIA/spark-rapids).

## Benchmark sets:
- [Nvidia Decision Support(NDS)](./nds/)
- [Nvidia Decision Support ( NDS )](./nds/)
- [Nvidia Decision Support-H ( NDS-H )](./nds-h/)

Please see README in each benchmark set for more details including building instructions and usage
descriptions.
238 changes: 238 additions & 0 deletions nds-h/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,238 @@
# NDS-H v2.0 Automation

## Prerequisites

1. Python >= 3.6
2. Necessary libraries

```bash
sudo locale-gen en_US.UTF-8
sudo apt install openjdk-8-jdk-headless gcc make flex bison byacc maven
sudo apt install dos2unix
```
3. Install and set up SPARK.
- Download latest distro from [here](https://spark.apache.org/downloads.html)
- Preferably >= 3.4
- Find and note SPARK_HOME ( /DOWNLOAD/LOCATION/spark-<3.4.1>-bin-hadoop3 )
- (For local) Follow the steps mentioned [here](https://docs.nvidia.com/spark-rapids/user-guide/latest/getting-started/on-premise.html#local-mode)
for local setup
- (For local) Update *_gpu_* --files with the getGpuResources.sh location as mentioned in the link above
- (For local) Update spark master in shared/base.template with local[*]
- (For local) Remove the conf - "spark.task.resource.gpu.amount=0.05" from all template files

4. Update Configuration
- Update [shared/base.template](../shared/base.template) line 26 with the Spark home location.

5. For GPU run
- Download the latest RAPIDS jar from [here](https://oss.sonatype.org/content/repositories/staging/com/nvidia/rapids-4-spark_2.12/)

- Update [shared/base.template](../shared/base.template) line 36 with rapids plugin jar location

6. TPC-H Tools

- Download TPC-H Tools from the [official TPC website] (https://www.tpc.org/tpc_documents_current_versions/current_specifications5.asp). The tool will be downloaded as a zip package with a random guid string prefix.

- After unzipping it, a folder called `TPC-H V3.0.1` will be seen.

- Set environment variable `TPCH_HOME` pointing to this directory. e.g.

```bash
export TPCH_HOME='/PATH/TO/YOUR/TPC-H V3.0.1'
```

## Use spark-submit-template with template

To help user run NDS-H, we provide a template to define the main Spark configs for spark-submit command.
User can use different templates to run NDS with different configurations for different environment.

We create [spark-submit-template](../shared/spark-submit-template), which accepts a template file and
submit the Spark job with the configs defined in the template.

Example command to submit via `spark-submit-template` utility:

```bash
cd shared
spark-submit-template convert_submit_cpu.template \
../nds-h/nds_h_transcode.py <raw_data_file_location> <parquet_location> report.txt
```

We provide 2 types of template files used in different steps of NDS-H:

1. *convert_submit_*.template for converting the data by using nds_h_transcode.py
2. *power_run_*.template for power run by using nds_h_power.py

We predefine different template files for different environment.
For example, we provide below template files to run nds_h_transcode.py for different environment:

* `convert_submit_cpu.template` is for Spark CPU cluster
* `convert_submit_gpu.template` is for Spark GPU cluster

You need to choose one as your template file and modify it to fit your environment.
We define a [base.template](../shared/base.template) to help you define some basic variables for your envionment.
And all the other templates will source `base.template` to get the basic variables.
When running multiple steps of NDS-H, you only need to modify `base.template` to fit for your cluster.

## Data Generation

### Build the jar for data generation

```bash
cd tpch-gen
make
```

### Generate data

To generate data for local -

```bash
$ python nds_h_gen_data.py -h
usage: nds_h_gen_data.py [-h] <scale> <parallel> <data_dir> [--overwrite_output]
positional arguments:
scale volume of data to generate in GB.
parallel build data in <parallel_value> separate chunks
data_dir generate data in directory.

optional arguments:
-h, --help show this help message and exit
--overwrite_output overwrite if there has already existing data in the path provided

```

Example command:

```bash
python nds_h_gen_data.py 100 100 /data/raw_sf100 --overwrite_output
```

### Convert DSV to Parquet or Other data sources

To do the data conversion, the `nds_h_transcode.py` need to be submitted as a Spark job. User can leverage
the [spark-submit-template](../shared/spark-submit-template) utility to simplify the submission.
The utility requires a pre-defined [template file](../shared/convert_submit_gpu.template) where user needs to put necessary Spark configurations. Alternatively user can submit the `nds_h_transcode.py` directly to spark with arbitrary Spark parameters.

DSV ( pipe ) is the default input format for data conversion, it can be overridden by `--input_format`.

```bash
cd shared
./spark-submit-template convert_submit_cpu.template ../nds-h/nds_h_transcode.py <input_data_location>
<output_data_location> <report_file_location>
```

## Query Generation

The [templates.patch](./tpch-gen/patches/template_new.patch) that contains necessary modifications to make NDS-H queries runnable in Spark will be applied automatically in the build step. The final query templates will be in folder `$TPCH_HOME/dbgen/queries` after the build process.

### Generate Specific Query or Query Streams

```text
usage: nds_h_gen_query_stream.py [-h] (--template TEMPLATE | --streams STREAMS)
scale output_dir

positional arguments:
scale assume a database of this scale factor.
output_dir generate query in directory.
template | stream generate query stream or from a template arugment

optional arguments:
-h, --help show this help message and exit
--template TEMPLATE build queries from this template. Only used to generate one query from one tempalte.
This argument is mutually exclusive with --streams. It
is often used for test purpose.
--streams STREAMS generate how many query streams. This argument is mutually exclusive with --template.
```

Example command to generate one query using template 1.sql ( There are 22 default queries and templates):

```bash
cd nds-h
python nds_h_gen_query_stream.py 3000 ./query_1 --template <query_number>
```

Example command to generate 10 query streams each one of which contains all NDS-H queries but in
different order:

```bash
cd nds-h
python nds_h_gen_query_stream.py 3000 ./query_streams --streams 10
```

## Benchmark Runner

### Build Dependencies

There's a customized Spark listener used to track the Spark task status e.g. success or failed
or success with retry. The results will be recorded at the json summary files when all jobs are
finished. This is often used for test or query monitoring purpose.

To build:

```bash
cd utils/jvm_listener
mvn package
```

`benchmark-listener-1.0-SNAPSHOT.jar` will be generated in `jvm_listener/target` folder.

### Power Run

_After_ user generates query streams, Power Run can be executed using one of them by submitting `nds_h_power.py` to Spark.

Arguments supported by `nds_h_power.py`:

```text
usage: nds_h_power.py [-h] [--input_format {parquet,}]
[--output_format OUTPUT_FORMAT]
[--property_file PROPERTY_FILE]
<input_data_location>
<query_stream_file>
<time_log_file>

positional arguments:
input_data_location input data location (e.g., "hdfs:///ds-generated-data").
query_stream_file query stream file that contains NDS-H queries in specific order
time_log_file path to execution time log, only support local path.

optional arguments:
-h, --help show this help message and exit
--input_format {parquet,orc,avro,csv,json,iceberg,delta}
type for input data source, e.g. parquet, orc, json, csv or iceberg, delta. Certain types are not fully supported by GPU reading, please refer to https://github.com/NVIDIA/spark-rapids/blob/branch-24.08/docs/compatibility.md for more details.
--output_prefix OUTPUT_PREFIX
text to prepend to every output file (e.g., "hdfs:///ds-parquet")
--output_format OUTPUT_FORMAT
type of query output
--property_file PROPERTY_FILE
property file for Spark configuration.
```

Example command to submit nds_h_power.py by spark-submit-template utility:

```bash
cd shared
./spark-submit-template power_run_gpu.template \
../nds-h/nds_h_power.py \
<parquet_folder_location> \
<query_stream_folder>/query_0.sql \
time.csv \
--property_file ../utils/properties/aqe-on.properties
```

User can also use `spark-submit` to submit `ndsH_power.py` directly.

To simplify the performance analysis process, the script will create a local CSV file to save query(including TempView creation) and corresponding execution time. Note: please use `client` mode(set in your `power_run_gpu.template` file) when running in Yarn distributed environment to make sure the time log is saved correctly in your local path.

Note the template file must follow the `spark-submit-template` utility as the _first_ argument.
All Spark configuration words (such as `--conf` and corresponding `k=v` values) are quoted by
double quotes in the template file. Please follow the format in [power_run_gpu.template](../shared/power_run_gpu.template).

User can define the `properties` file like [aqe-on.properties](../utils/properties/aqe-on.properties). The properties will be passed to the submitted Spark job along with the configurations defined in the template file. User can define some common properties in the template file and put some other properties that usually varies in the property file.

The command above will use `collect()` action to trigger Spark job for each query. It is also supported to save query output to some place for further verification. User can also specify output format e.g. csv, parquet or orc:

```bash
./spark-submit-template power_run_gpu.template \
nds_h_power.py \
parquet_sf3k \
./nds_query_streams/query_0.sql \
time.csv
```
148 changes: 148 additions & 0 deletions nds-h/nds_h_gen_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# -----
#
# Certain portions of the contents of this file are derived from TPC-H version 3.0.1
# (retrieved from www.tpc.org/tpc_documents_current_versions/current_specifications5.asp).
# Such portions are subject to copyrights held by Transaction Processing Performance Council (“TPC”)
# and licensed under the TPC EULA (a copy of which accompanies this file as “TPC EULA” and is also
# available at http://www.tpc.org/tpc_documents_current_versions/current_specifications5.asp) (the “TPC EULA”).
#
# You may not use this file except in compliance with the TPC EULA.
# DISCLAIMER: Portions of this file is derived from the TPC-H Benchmark and as such any results
# obtained using this file are not comparable to published TPC-H Benchmark results, as the results
# obtained from using this file do not comply with the TPC-H Benchmark.
#

import argparse
import os
import sys
import subprocess

#For adding utils to path
parent_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))
utils_dir = os.path.join(parent_dir, 'utils')
sys.path.insert(0, utils_dir)

from check import check_build_nds_h, check_version, get_abs_path, get_dir_size, parallel_value_type, valid_range

check_version()

# Source tables contained in the schema for TPC-H. For more information, check -
# https://www.tpc.org/TPC_Documents_Current_Versions/pdf/TPC-H_v3.0.1.pdf

source_table_names = [
'customer',
'lineitem',
'nation',
'orders',
'part',
'partsupp',
'region',
'supplier'
]

def generate_data_local(args, range_start, range_end, tool_path):
"""Generate data to local file system. TPC-DS tool will generate all table data under target
folder without creating sub-folders for each table. So we add extra code to create sub folder
for each table and move data there respectively.

Args:
args (Namepace): Namespace from argparser
tool_path (str): path to the dsdgen tool

Raises:
Exception: if data already exists and overwrite_output is not honored
Exception: dsdgen failed
"""
data_dir = get_abs_path(args.data_dir)
if not os.path.isdir(data_dir):
os.makedirs(data_dir)
else:
# Verify if there's already data in this path
if get_dir_size(data_dir) > 0 and not args.overwrite_output:
raise Exception(
"There's already been data exists in directory {}.".format(data_dir) +
" Use '--overwrite_output' to overwrite.")

# working directory for dsdgen
work_dir = tool_path.parent
print(work_dir)
procs = []
for i in range(range_start, range_end + 1):
dbgen = ["-s", args.scale,
"-C", args.parallel,
"-S", str(i),
"-v", "Y",
"-f","Y"]
procs.append(subprocess.Popen(
["./dbgen"] + dbgen, cwd=str(work_dir)))
# wait for data generation to complete
for p in procs:
p.wait()
if p.returncode != 0:
print("dbgen failed with return code {}".format(p.returncode))
raise Exception("dbgen failed")
# move multi-partition files into table folders
table_names = source_table_names
for table in table_names:
print('mkdir -p {}/{}'.format(data_dir, table))
subprocess.run(['mkdir', '-p', data_dir + '/' + table])
if (table != 'region' and table !='nation'):
for i in range(range_start, range_end + 1):
subprocess.run(['mv', f'{work_dir}/{table}.tbl.{i}',
f'{data_dir}/{table}/'], stderr=subprocess.DEVNULL)
else:
subprocess.run(['mv', f'{work_dir}/{table}.tbl',
f'{data_dir}/{table}/'], stderr=subprocess.DEVNULL)
# delete date file has no parallel number suffix in the file name, move separately
# show summary
subprocess.run(['du', '-h', '-d1', data_dir])

def generate_data(args):
tool_path = check_build_nds_h()
range_start = 1
range_end = int(args.parallel)
if args.range:
range_start, range_end = valid_range(args.range, args.parallel)
generate_data_local(args, range_start, range_end, tool_path)

if __name__ == "__main__":
parser = parser = argparse.ArgumentParser()
parser.add_argument("scale",
help="volume of data to generate in GB. Accepted SF - 1,10, 100, 300, 1000 \
,3000, 10000, 30000,"
)
parser.add_argument("parallel",
type=parallel_value_type,
help="build data in <parallel_value> separate chunks"
)
parser.add_argument("data_dir",
help="generate data in directory.")
parser.add_argument('--range',
help='Used for incremental data generation, meaning which part of child' +
'chunks are generated in one run. Format: "start,end", both are inclusive. ' +
'e.g. "1,100". Note: the child range must be within the "parallel", ' +
'"--parallel 100 --range 100,200" is illegal.')
parser.add_argument("--overwrite_output",
action="store_true",
help="overwrite if there has already existing data in the path provided.")

args = parser.parse_args()
generate_data(args)
Loading
Loading