Skip to content

Commit

Permalink
Adding HDFS support for data generation (#188)
Browse files Browse the repository at this point in the history
* Changes for adding hdfs submitter class

Signed-off-by: Sayed Bilal Bari <[email protected]>

* Working changes to GenTable for hdfs run

Signed-off-by: Sayed Bilal Bari <[email protected]>

* Changes for mapReduce

Signed-off-by: Sayed Bilal Bari <[email protected]>

* Changes to makefile

Signed-off-by: Sayed Bilal Bari <[email protected]>

* Adding comments to the Java file

Signed-off-by: Sayed Bilal Bari <[email protected]>

* Removed redundant files

Signed-off-by: Sayed Bilal Bari <[email protected]>

* Correcting typo in query stream

Signed-off-by: Sayed Bilal Bari <[email protected]>

* Review changes

Signed-off-by: Sayed Bilal Bari <[email protected]>

* Fixing typo in example command

Signed-off-by: Sayed Bilal Bari <[email protected]>

* Changes for supporting json_summary+sub_queries -s

Signed-off-by: Sayed Bilal Bari <[email protected]>

* Correcting typo and README

Signed-off-by: Sayed Bilal Bari <[email protected]>

---------

Signed-off-by: Sayed Bilal Bari <[email protected]>
Co-authored-by: Sayed Bilal Bari <[email protected]>
  • Loading branch information
bilalbari and bilalbari authored Jul 3, 2024
1 parent 32fa7c6 commit c41b702
Show file tree
Hide file tree
Showing 8 changed files with 688 additions and 57 deletions.
10 changes: 8 additions & 2 deletions nds-h/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,9 @@ To generate data for local -

```bash
$ python nds_h_gen_data.py -h
usage: nds_h_gen_data.py [-h] <scale> <parallel> <data_dir> [--overwrite_output]
usage: nds_h_gen_data.py [-h] {local,hdfs} <scale> <parallel> <data_dir> [--overwrite_output]
positional arguments:
{local,hdfs} file system to save the generated data.
scale volume of data to generate in GB.
parallel build data in <parallel_value> separate chunks
data_dir generate data in directory.
Expand All @@ -118,7 +119,7 @@ optional arguments:
Example command:
```bash
python nds_h_gen_data.py 100 100 /data/raw_sf100 --overwrite_output
python nds_h_gen_data.py hdfs 100 100 /data/raw_sf100 --overwrite_output
```
### Convert DSV to Parquet or Other data sources
Expand Down Expand Up @@ -219,6 +220,11 @@ optional arguments:
type of query output
--property_file PROPERTY_FILE
property file for Spark configuration.
--json_summary_folder JSON_SUMMARY_FOLDER
Empty folder/path (will create if not exist) to save JSON summary file for each query.
--sub_queries SUB_QUERIES
comma separated list of queries to run. If not specified, all queries in the stream file will be run.
e.g. "query1,query2,query3". Note, use "_part1","_part2" and "part_3" suffix for the following query names: query15
```
Example command to submit nds_h_power.py by spark-submit-template utility:
Expand Down
115 changes: 101 additions & 14 deletions nds-h/nds_h_gen_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import os
import sys
import subprocess
import shutil

#For adding utils to path
parent_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))
Expand All @@ -58,6 +59,7 @@
'supplier'
]


def generate_data_local(args, range_start, range_end, tool_path):
"""Generate data to local file system. TPC-DS tool will generate all table data under target
folder without creating sub-folders for each table. So we add extra code to create sub folder
Expand Down Expand Up @@ -87,10 +89,10 @@ def generate_data_local(args, range_start, range_end, tool_path):
procs = []
for i in range(range_start, range_end + 1):
dbgen = ["-s", args.scale,
"-C", args.parallel,
"-S", str(i),
"-v", "Y",
"-f","Y"]
"-C", args.parallel,
"-S", str(i),
"-v", "Y",
"-f", "Y"]
procs.append(subprocess.Popen(
["./dbgen"] + dbgen, cwd=str(work_dir)))
# wait for data generation to complete
Expand All @@ -104,45 +106,130 @@ def generate_data_local(args, range_start, range_end, tool_path):
for table in table_names:
print('mkdir -p {}/{}'.format(data_dir, table))
subprocess.run(['mkdir', '-p', data_dir + '/' + table])
if (table != 'region' and table !='nation'):
if (table != 'region' and table != 'nation'):
for i in range(range_start, range_end + 1):
subprocess.run(['mv', f'{work_dir}/{table}.tbl.{i}',
f'{data_dir}/{table}/'], stderr=subprocess.DEVNULL)
else:
subprocess.run(['mv', f'{work_dir}/{table}.tbl',
f'{data_dir}/{table}/'], stderr=subprocess.DEVNULL)
f'{data_dir}/{table}/'], stderr=subprocess.DEVNULL)
# delete date file has no parallel number suffix in the file name, move separately
# show summary
subprocess.run(['du', '-h', '-d1', data_dir])


def clean_temp_data(temp_data_path):
cmd = ['hadoop', 'fs', '-rm', '-r', '-skipTrash', temp_data_path]
print(" ".join(cmd))
subprocess.run(cmd)


def merge_temp_tables(temp_data_path, parent_data_path):
"""Helper functions for incremental data generation. Move data in temporary child range path to
parent directory.
Args:
temp_data_path (str): temorary child range data path
parent_data_path (str): parent data path
"""
table_names = source_table_names
for table_name in table_names:
# manually create table sub-folders
# redundant step if it's not the first range part.
cmd = ['hadoop', 'fs', '-mkdir', parent_data_path + '/' + table_name]
print(" ".join(cmd))
subprocess.run(cmd)
# move temp content to upper folder
# note not all tables are generated in different child range step
# please ignore messages like "mv: `.../reason/*': No such file or directory"
temp_table_data_path = temp_data_path + '/' + table_name + '/*'
cmd = ['hadoop', 'fs', '-mv', temp_table_data_path,
parent_data_path + '/' + table_name + '/']
print(" ".join(cmd))
subprocess.run(cmd)
clean_temp_data(temp_data_path)


def generate_data_hdfs(args, jar_path):
"""generate data to hdfs using TPC-DS dsdgen tool. Support incremental generation: due to the
limit of hdfs, each range data will be generated under a temporary folder then move to target
folder.
Args:
args (Namespace): Namespace from argparser
jar_path (str): path to the target jar
Raises:
Exception: if Hadoop binary is not installed.
"""
# Check if hadoop is installed.
if shutil.which('hadoop') is None:
raise Exception('No Hadoop binary found in current environment, ' +
'please install Hadoop for data generation in cluster.')
# Submit hadoop MR job to generate data
cmd = ['hadoop', 'jar', str(jar_path)]
cmd += ['-p', args.parallel, '-s', args.scale]
# get dsdgen.jar path, assume user won't change file structure
tpcds_gen_path = jar_path.parent.parent.absolute()
if args.overwrite_output:
cmd += ['-o']
if args.range:
# use a temp folder to save the specific range data.
# will move the content to parent folder afterwards.
# it's a workaround for "Output directory ... already exists" in incremental generation
temp_data_path = args.data_dir + '/_temp_'
# before generation, we remove "_temp_" folders in case they contain garbage generated by
# previous user runs.
clean_temp_data(temp_data_path)
cmd.extend(["-r", args.range])
cmd.extend(["-d", temp_data_path])
try:
subprocess.run(cmd, check=True, cwd=str(tpcds_gen_path))
# only move delete table for data maintenance
merge_temp_tables(temp_data_path, args.data_dir)
finally:
clean_temp_data(temp_data_path)
else:
cmd.extend(["-d", args.data_dir])
subprocess.run(cmd, check=True, cwd=str(tpcds_gen_path))
# only move delete table for data maintenance


def generate_data(args):
tool_path = check_build_nds_h()
jar_path, tool_path = check_build_nds_h()
range_start = 1
range_end = int(args.parallel)
if args.range:
range_start, range_end = valid_range(args.range, args.parallel)
generate_data_local(args, range_start, range_end, tool_path)
if args.type == 'hdfs':
generate_data_hdfs(args, jar_path)
else:
generate_data_local(args, range_start, range_end, tool_path)


if __name__ == "__main__":
parser = parser = argparse.ArgumentParser()
parser.add_argument("type",
choices=["local", "hdfs"],
help="file system to save the generated data.")
parser.add_argument("scale",
help="volume of data to generate in GB. Accepted SF - 1,10, 100, 300, 1000 \
,3000, 10000, 30000,"
)
)
parser.add_argument("parallel",
type=parallel_value_type,
help="build data in <parallel_value> separate chunks"
)
)
parser.add_argument("data_dir",
help="generate data in directory.")
parser.add_argument('--range',
help='Used for incremental data generation, meaning which part of child' +
'chunks are generated in one run. Format: "start,end", both are inclusive. ' +
'e.g. "1,100". Note: the child range must be within the "parallel", ' +
'"--parallel 100 --range 100,200" is illegal.')
'chunks are generated in one run. Format: "start,end", both are inclusive. ' +
'e.g. "1,100". Note: the child range must be within the "parallel", ' +
'"--parallel 100 --range 100,200" is illegal.')
parser.add_argument("--overwrite_output",
action="store_true",
help="overwrite if there has already existing data in the path provided.")

args = parser.parse_args()
generate_data(args)
2 changes: 1 addition & 1 deletion nds-h/nds_h_gen_query_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ def generate_query_streams(args, tool_path):
subprocess.run(base_cmd, check=True, cwd=str(work_dir),stdout=f)

if __name__ == "__main__":
tool_path = check_build_nds_h()
jar_path, tool_path = check_build_nds_h()
parser = parser = argparse.ArgumentParser()
parser.add_argument("scale",
help="Assume a database of this scale factor.")
Expand Down
Loading

0 comments on commit c41b702

Please sign in to comment.