Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tpc-H feature branch #187

Merged
merged 11 commits into from
Jun 5, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ A repo for Spark related benchmark sets and utilities using the

## Benchmark sets:
- [Nvidia Decision Support(NDS)](./nds/)
- [Nvidia Decision Support-H(NDS-H)](./nds-h/)
bilalbari marked this conversation as resolved.
Show resolved Hide resolved

Please see README in each benchmark set for more details including building instructions and usage
descriptions.
217 changes: 217 additions & 0 deletions nds-h/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,217 @@
# NDS-H v2.0 Automation

## Prerequisites

1. Python >= 3.6
2. Necessary libraries

```bash
sudo locale-gen en_US.UTF-8
sudo apt install openjdk-8-jdk-headless gcc make flex bison byacc maven spark
bilalbari marked this conversation as resolved.
Show resolved Hide resolved
sudo apt install dos2unix
```
3. Update Configuration

- Update shared/base.template line 26 with the Spark home location.

4. For GPU run
- Download the latest RAPIDS jar from [here](https://oss.sonatype.org/content/repositories/staging/com/nvidia/rapids-4-spark_2.12/)

- Update shared/base.template line 36 with rapids plugin jar location

5. TPC-H Tools

- Download TPC-H Tools from the [official TPC website](https://www.tpc.org/tpc_documents_current_versions/current_specifications5.asp). The tool will be downloaded as a zip package with a random guid string prefix.

- After unzipping it, a folder called `TPC-H V3.0.1` will be seen.

- Set environment variable `TPCH_HOME` pointing to this directory. e.g.

```bash
export TPCH_HOME='/PATH/TO/YOUR/TPC-H V3.0.1'
```

## Use spark-submit-template with template

To help user run NDS-H, we provide a template to define the main Spark configs for spark-submit command.
User can use different templates to run NDS with different configurations for different environment.

We create [spark-submit-template](../shared/spark-submit-template), which accepts a template file and
submit the Spark job with the configs defined in the template.

Example command to submit via `spark-submit-template` utility:

```bash
shared/spark-submit-template convert_submit_cpu.template \
../nds-h/ndsH_transcode.py <raw_data_file_location> <parquet_location> report.txt
```

We provide 2 types of template files used in different steps of NDS:

1. *convert_submit_*.template for converting the data by using ndsH_transcode.py
2. *power_run_*.template for power run by using nds_power.py
bilalbari marked this conversation as resolved.
Show resolved Hide resolved

We predefine different template files for different environment.
For example, we provide below template files to run ndsH_transcode.py for different environment:

* `convert_submit_cpu.template` is for Spark CPU cluster
* `convert_submit_gpu.template` is for Spark GPU cluster

You need to choose one as your template file and modify it to fit your environment.
We define a [base.template](../shared/base.template) to help you define some basic variables for your envionment.
And all the other templates will source `base.template` to get the basic variables.
When running multiple steps of NDS-H, you only need to modify `base.template` to fit for your cluster.

## Data Generation

### Build the jar for data generation

```bash
cd tpch-gen
make
```

### Generate data

To generate data for local -

```bash
$ python ndsH_gen_data.py -h
usage: ndsH_gen_data.py [-h] [--overwrite_output] scale parallel data_dir
positional arguments:
scale volume of data to generate in GB.
parallel build data in <parallel_value> separate chunks
data_dir generate data in directory.

optional arguments:
-h, --help show this help message and exit
--overwrite_output overwrite if there has already existing data in the path provided

```

Example command:

```bash
python ndsH_gen_data.py 100 100 /data/raw_sf100 --overwrite_output
bilalbari marked this conversation as resolved.
Show resolved Hide resolved
```

### Convert DSV to Parquet or Other data sources

To do the data conversion, the `ndsH_transcode.py` need to be submitted as a Spark job. User can leverage
the [spark-submit-template](../shared/spark-submit-template) utility to simplify the submission.
The utility requires a pre-defined [template file](../shared/convert_submit_gpu.template) where user needs to put necessary Spark configurations. Alternatively user can submit the `ndsH_transcode.py` directly to spark with arbitrary Spark parameters.

DSV ( pipe ) is the default input format for data conversion, it can be overridden by `--input_format`.

```bash
shared/spark-submit-template convert_submit_cpu.template ../nds-h/ndsH_transcode.py <input_data_location>
bilalbari marked this conversation as resolved.
Show resolved Hide resolved
<output_data_location> <report_file_location>
```

## Query Generation

The [templates.patch](./tpch-gen/patches/template_new.patch) that contains necessary modifications to make NDS-H queries runnable in Spark will be applied automatically in the build step. The final query templates will be in folder `$TPCH_HOME/dbgen/queries` after the build process.

### Generate Specific Query or Query Streams

```text
usage: ndsH_gen_query_stream.py [-h] (--streams STREAMS)
scale output_dir

positional arguments:
scale assume a database of this scale factor.
output_dir generate query in directory.

optional arguments:
-h, --help show this help message and exit
This argument is mutually exclusive with --streams. It
is often used for test purpose.
--streams STREAMS generate how many query streams. This argument is mutually exclusive with --template.
```

Example command to generate one query using query1.tpl:

```bash
python ndsH_gen_query_stream.py 3000 ./query_1
bilalbari marked this conversation as resolved.
Show resolved Hide resolved
```

Example command to generate 10 query streams each one of which contains all NDS queries but in
different order:

```bash
python ndsH_gen_query_stream.py 3000 ./query_streams --streams 10
```

## Benchmark Runner

### Build Dependencies

There's a customized Spark listener used to track the Spark task status e.g. success or failed
or success with retry. The results will be recorded at the json summary files when all jobs are
finished. This is often used for test or query monitoring purpose.

To build:

```bash
cd shared/jvm_listener
bilalbari marked this conversation as resolved.
Show resolved Hide resolved
mvn package
```

`nds-benchmark-listener-1.0-SNAPSHOT.jar` will be generated in `jvm_listener/target` folder.

### Power Run

_After_ user generates query streams, Power Run can be executed using one of them by submitting `ndsH_power.py` to Spark.

Arguments supported by `ndsH_power.py`:

```text
usage: nds_power.py [-h] [--input_format {parquet,orc,avro,csv,json,iceberg,delta}] [--output_prefix OUTPUT_PREFIX] [--output_format OUTPUT_FORMAT] [--property_file PROPERTY_FILE] time_log

positional arguments:
input_prefix text to prepend to every input file path (e.g., "hdfs:///ds-generated-data").
query_stream_file query stream file that contains NDS queries in specific order
time_log path to execution time log, only support local path.

optional arguments:
-h, --help show this help message and exit
--input_format {parquet,orc,avro,csv,json,iceberg,delta}
type for input data source, e.g. parquet, orc, json, csv or iceberg, delta. Certain types are not fully supported by GPU reading, please refer to https://github.com/NVIDIA/spark-rapids/blob/branch-22.08/docs/compatibility.md for more details.
wjxiz1992 marked this conversation as resolved.
Show resolved Hide resolved
--output_prefix OUTPUT_PREFIX
text to prepend to every output file (e.g., "hdfs:///ds-parquet")
--output_format OUTPUT_FORMAT
type of query output
--property_file PROPERTY_FILE
property file for Spark configuration.
```

Example command to submit nds_power.py by spark-submit-template utility:

```bash
shared/spark-submit-template power_run_gpu.template \
bilalbari marked this conversation as resolved.
Show resolved Hide resolved
nds_power.py \
bilalbari marked this conversation as resolved.
Show resolved Hide resolved
<parquet_folder_location> \
<query_stream>/query_0.sql \
time.csv \
--property_file properties/aqe-on.properties
```

User can also use `spark-submit` to submit `ndsH_power.py` directly.

To simplify the performance analysis process, the script will create a local CSV file to save query(including TempView creation) and corresponding execution time. Note: please use `client` mode(set in your `power_run_gpu.template` file) when running in Yarn distributed environment to make sure the time log is saved correctly in your local path.

Note the template file must follow the `spark-submit-template` utility as the _first_ argument.
All Spark configuration words (such as `--conf` and corresponding `k=v` values) are quoted by
double quotes in the template file. Please follow the format in [power_run_gpu.template](../shared/power_run_gpu.template).

User can define the `properties` file like [aqe-on.properties](../utils/properties/aqe-on.properties). The properties will be passed to the submitted Spark job along with the configurations defined in the template file. User can define some common properties in the template file and put some other properties that usually varies in the property file.

The command above will use `collect()` action to trigger Spark job for each query. It is also supported to save query output to some place for further verification. User can also specify output format e.g. csv, parquet or orc:

```bash
./spark-submit-template power_run_gpu.template \
nds_power.py \
bilalbari marked this conversation as resolved.
Show resolved Hide resolved
parquet_sf3k \
./nds_query_streams/query_0.sql \
time.csv
```
151 changes: 151 additions & 0 deletions nds-h/check.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
#!/usr/bin/env python3
#
# SPDX-FileCopyrightText: Copyright (c) 2022 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# -----
#
# Certain portions of the contents of this file are derived from TPC-DS version 3.2.0
# (retrieved from www.tpc.org/tpc_documents_current_versions/current_specifications5.asp).
# Such portions are subject to copyrights held by Transaction Processing Performance Council (“TPC”)
# and licensed under the TPC EULA (a copy of which accompanies this file as “TPC EULA” and is also
# available at http://www.tpc.org/tpc_documents_current_versions/current_specifications5.asp) (the “TPC EULA”).
#
# You may not use this file except in compliance with the TPC EULA.
# DISCLAIMER: Portions of this file is derived from the TPC-DS Benchmark and as such any results
# obtained using this file are not comparable to published TPC-DS Benchmark results, as the results
# obtained from using this file do not comply with the TPC-DS Benchmark.
#

import argparse
import os
import sys
from pathlib import Path


def check_version():
req_ver = (3, 6)
cur_ver = sys.version_info
if cur_ver < req_ver:
raise Exception('Minimum required Python version is 3.6, but current python version is {}.'
.format(str(cur_ver.major) + '.' + str(cur_ver.minor)) +
' Please use proper Python version')


def check_build():
"""check jar and tpcds executable

Raises:
Exception: the build is not done or broken

Returns:
PosixPath, PosixPath: path of jar and dsdgen executable
"""
# Check if necessary executable or jars are built.
# we assume user won't move this script.
src_dir = Path(__file__).parent.absolute()
tool_path = list(Path(src_dir / 'tpch-gen/target/dbgen').rglob("dbgen"))
print(tool_path)
if tool_path == []:
raise Exception('dbgen executable is ' +
'not found in `target` folder.' +
'Please refer to README document and build this project first.')
return tool_path[0]


def get_abs_path(input_path):
"""receive a user input path and return absolute path of it.

Args:
input_path (str): user's input path

Returns:
str: if the input is absolute, return it; if it's relative path, return the absolute path of
it.
"""
if Path(input_path).is_absolute():
# it's absolute path
output_path = input_path
else:
# it's relative path where this script is executed
output_path = os.getcwd() + '/' + input_path
return output_path


def valid_range(range, parallel):
"""check the range validation

Args:
range (str): a range specified for a range data generation, e.g. "1,10"
parallel (str): string type number for parallelism in TPC-DS data generation, e.g. "20"

Raises:
Exception: error message for invalid range input.
"""
if len(range.split(',')) != 2:
msg = 'Invalid range: please specify a range with a comma between start and end. e.g., "1,10".'
raise Exception(msg)
range_start = int(range.split(',')[0])
range_end = int(range.split(',')[1])
if range_start < 1 or range_start > range_end or range_end > int(parallel):
msg = 'Please provide correct child range: 1 <= range_start <= range_end <= parallel'
raise Exception(msg)
return range_start, range_end


def parallel_value_type(p):
"""helper function to check parallel valuie

Args:
p (str): parallel value

Raises:
argparse.ArgumentTypeError: ArgumentTypeError exception

Returns:
str: parallel in string
"""
if int(p) < 2:
raise argparse.ArgumentTypeError("PARALLEL must be >= 2")
return p


def get_dir_size(start_path):
total_size = 0
for dirpath, dirnames, filenames in os.walk(start_path):
for f in filenames:
fp = os.path.join(dirpath, f)
# skip if it is symbolic link
if not os.path.islink(fp):
total_size += os.path.getsize(fp)
return total_size

def check_json_summary_folder(json_summary_folder):
if json_summary_folder:
# prepare a folder to save json summaries of query results
if not os.path.exists(json_summary_folder):
os.makedirs(json_summary_folder)
else:
if os.listdir(json_summary_folder):
raise Exception(f"json_summary_folder {json_summary_folder} is not empty. " +
"There may be already some json files there. Please clean the folder " +
"or specify another one.")

def check_query_subset_exists(query_dict, subset_list):
"""check if the query subset exists in the query dictionary"""
for q in subset_list:
if q not in query_dict.keys():
raise Exception(f"Query {q} is not in the query dictionary. Please check the query subset.")
return True
Loading
Loading