From cd7700ed7facca25c6c5a9b8ad921d1708bdba88 Mon Sep 17 00:00:00 2001 From: Sayed Bilal Bari Date: Thu, 11 Jul 2024 10:46:33 -0500 Subject: [PATCH 1/5] Adding keep_sc support nds-h Signed-off-by: Sayed Bilal Bari --- nds-h/nds_h_power.py | 28 ++++++++++++++++++---------- 1 file changed, 18 insertions(+), 10 deletions(-) diff --git a/nds-h/nds_h_power.py b/nds-h/nds_h_power.py index be2dc98..39deefc 100644 --- a/nds-h/nds_h_power.py +++ b/nds-h/nds_h_power.py @@ -171,27 +171,28 @@ def run_query_stream(input_prefix, time_log_output_path, input_format="parquet", output_path=None, + keep_sc=False, output_format="parquet"): """run SQL in Spark and record execution time log. The execution time log is saved as a CSV file for easy accesibility. TempView Creation time is also recorded. Args: - input_prefix (str): path of input data or warehouse if input_format is "iceberg" or hive_external=True. - query_dict (OrderedDict): ordered dict {query_name: query_content} of all TPC-DS queries runnable in Spark - time_log_output_path (str): path of the log that contains query execution time, both local + :param input_prefix : path of input data or warehouse if input_format is "iceberg" or hive_external=True. + :param property_file: property file for Spark configuration. + :param query_dict : ordered dict {query_name: query_content} of all TPC-DS queries runnable in Spark + :param time_log_output_path : path of the log that contains query execution time, both local and HDFS path are supported. - input_format (str, optional): type of input data source. - use_deciaml(bool, optional): use decimal type for certain columns when loading data of text type. - output_path (str, optional): path of query output, optinal. If not specified, collect() + :param input_format : type of input data source. + :param output_path : path of query output, optinal. If not specified, collect() action will be applied to each query. Defaults to None. - output_format (str, optional): query output format, choices are csv, orc, parquet. Defaults - to "parquet". + :param output_format : query output format, choices are csv, orc, parquet. Defaults to "parquet". + :param keep_sc : Databricks specific to keep the spark context alive. Defaults to False. """ queries_reports = [] execution_time_list = [] total_time_start = time.time() # check if it's running specific query or Power Run - app_name = "NDS - Power Run" + app_name = "NDS-H - Power Run" # Execute Power Run or Specific query in Spark # build Spark Session session_builder = SparkSession.builder @@ -223,7 +224,8 @@ def run_query_stream(input_prefix, queries_reports.append(q_report) power_end = int(time.time()) power_elapse = int((power_end - power_start)*1000) - spark_session.sparkContext.stop() + if not keep_sc: + spark_session.sparkContext.stop() total_time_end = time.time() total_elapse = int((total_time_end - total_time_start)*1000) print("====== Power Test Time: {} milliseconds ======".format(power_elapse)) @@ -278,6 +280,11 @@ def load_properties(filename): 'absolute. Issue: https://github.com/delta-io/delta/issues/555') parser.add_argument('query_stream_file', help='query stream file that contains NDS queries in specific order') + parser.add_argument('--keep_sc', + action='store_true', + help='Keep SparkContext alive after running all queries. This is a ' + + 'limitation on Databricks runtime environment. User should always ' + 'attach this flag when running on Databricks.') parser.add_argument('time_log', help='path to execution time log, only support local path.', default="") @@ -303,4 +310,5 @@ def load_properties(filename): args.time_log, args.input_format, args.output_prefix, + args.keep_sc, args.output_format) From c97ec448d1ef8a62ac9e7b1fb5db415f618d8de8 Mon Sep 17 00:00:00 2001 From: Sayed Bilal Bari Date: Thu, 11 Jul 2024 11:16:23 -0500 Subject: [PATCH 2/5] Pulling in latest changes Signed-off-by: Sayed Bilal Bari --- nds-h/README.md | 10 +- nds-h/nds_h_gen_data.py | 115 +++++- nds-h/nds_h_gen_query_stream.py | 2 +- nds-h/tpch-gen/Makefile | 13 +- nds-h/tpch-gen/pom.xml | 105 +++++ .../main/java/org/nvidia/nds_h/GenTable.java | 372 ++++++++++++++++++ nds/README.md | 27 +- nds/base.template | 2 +- nds/convert_submit_cpu.template | 2 +- nds/convert_submit_cpu_delta.template | 6 +- nds/convert_submit_cpu_iceberg.template | 2 +- nds/maintenance_delta.template | 4 +- nds/maintenance_iceberg.template | 2 +- nds/nds_power.py | 52 +-- nds/power_run_cpu.template | 4 +- nds/power_run_cpu_delta.template | 4 +- nds/power_run_cpu_iceberg.template | 2 +- nds/power_run_gpu_delta.template | 4 +- nds/power_run_gpu_iceberg.template | 2 +- nds/spark-submit-template | 1 + nds/tpcds-gen/pom.xml | 2 +- shared/base.template | 2 +- utils/check.py | 8 +- 23 files changed, 664 insertions(+), 79 deletions(-) create mode 100644 nds-h/tpch-gen/pom.xml create mode 100755 nds-h/tpch-gen/src/main/java/org/nvidia/nds_h/GenTable.java diff --git a/nds-h/README.md b/nds-h/README.md index 2b378e0..3bbc0b8 100644 --- a/nds-h/README.md +++ b/nds-h/README.md @@ -103,8 +103,9 @@ To generate data for local - ```bash $ python nds_h_gen_data.py -h -usage: nds_h_gen_data.py [-h] [--overwrite_output] +usage: nds_h_gen_data.py [-h] {local,hdfs} [--overwrite_output] positional arguments: + {local,hdfs} file system to save the generated data. scale volume of data to generate in GB. parallel build data in separate chunks data_dir generate data in directory. @@ -118,7 +119,7 @@ optional arguments: Example command: ```bash -python nds_h_gen_data.py 100 100 /data/raw_sf100 --overwrite_output +python nds_h_gen_data.py hdfs 100 100 /data/raw_sf100 --overwrite_output ``` ### Convert DSV to Parquet or Other data sources @@ -219,6 +220,11 @@ optional arguments: type of query output --property_file PROPERTY_FILE property file for Spark configuration. + --json_summary_folder JSON_SUMMARY_FOLDER + Empty folder/path (will create if not exist) to save JSON summary file for each query. + --sub_queries SUB_QUERIES + comma separated list of queries to run. If not specified, all queries in the stream file will be run. + e.g. "query1,query2,query3". Note, use "_part1","_part2" and "part_3" suffix for the following query names: query15 ``` Example command to submit nds_h_power.py by spark-submit-template utility: diff --git a/nds-h/nds_h_gen_data.py b/nds-h/nds_h_gen_data.py index 84401f3..8293812 100644 --- a/nds-h/nds_h_gen_data.py +++ b/nds-h/nds_h_gen_data.py @@ -34,6 +34,7 @@ import os import sys import subprocess +import shutil #For adding utils to path parent_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), '..')) @@ -58,6 +59,7 @@ 'supplier' ] + def generate_data_local(args, range_start, range_end, tool_path): """Generate data to local file system. TPC-DS tool will generate all table data under target folder without creating sub-folders for each table. So we add extra code to create sub folder @@ -87,10 +89,10 @@ def generate_data_local(args, range_start, range_end, tool_path): procs = [] for i in range(range_start, range_end + 1): dbgen = ["-s", args.scale, - "-C", args.parallel, - "-S", str(i), - "-v", "Y", - "-f","Y"] + "-C", args.parallel, + "-S", str(i), + "-v", "Y", + "-f", "Y"] procs.append(subprocess.Popen( ["./dbgen"] + dbgen, cwd=str(work_dir))) # wait for data generation to complete @@ -104,45 +106,130 @@ def generate_data_local(args, range_start, range_end, tool_path): for table in table_names: print('mkdir -p {}/{}'.format(data_dir, table)) subprocess.run(['mkdir', '-p', data_dir + '/' + table]) - if (table != 'region' and table !='nation'): + if (table != 'region' and table != 'nation'): for i in range(range_start, range_end + 1): subprocess.run(['mv', f'{work_dir}/{table}.tbl.{i}', f'{data_dir}/{table}/'], stderr=subprocess.DEVNULL) else: subprocess.run(['mv', f'{work_dir}/{table}.tbl', - f'{data_dir}/{table}/'], stderr=subprocess.DEVNULL) + f'{data_dir}/{table}/'], stderr=subprocess.DEVNULL) # delete date file has no parallel number suffix in the file name, move separately # show summary subprocess.run(['du', '-h', '-d1', data_dir]) + +def clean_temp_data(temp_data_path): + cmd = ['hadoop', 'fs', '-rm', '-r', '-skipTrash', temp_data_path] + print(" ".join(cmd)) + subprocess.run(cmd) + + +def merge_temp_tables(temp_data_path, parent_data_path): + """Helper functions for incremental data generation. Move data in temporary child range path to + parent directory. + + Args: + temp_data_path (str): temorary child range data path + parent_data_path (str): parent data path + """ + table_names = source_table_names + for table_name in table_names: + # manually create table sub-folders + # redundant step if it's not the first range part. + cmd = ['hadoop', 'fs', '-mkdir', parent_data_path + '/' + table_name] + print(" ".join(cmd)) + subprocess.run(cmd) + # move temp content to upper folder + # note not all tables are generated in different child range step + # please ignore messages like "mv: `.../reason/*': No such file or directory" + temp_table_data_path = temp_data_path + '/' + table_name + '/*' + cmd = ['hadoop', 'fs', '-mv', temp_table_data_path, + parent_data_path + '/' + table_name + '/'] + print(" ".join(cmd)) + subprocess.run(cmd) + clean_temp_data(temp_data_path) + + +def generate_data_hdfs(args, jar_path): + """generate data to hdfs using TPC-DS dsdgen tool. Support incremental generation: due to the + limit of hdfs, each range data will be generated under a temporary folder then move to target + folder. + + Args: + args (Namespace): Namespace from argparser + jar_path (str): path to the target jar + + Raises: + Exception: if Hadoop binary is not installed. + """ + # Check if hadoop is installed. + if shutil.which('hadoop') is None: + raise Exception('No Hadoop binary found in current environment, ' + + 'please install Hadoop for data generation in cluster.') + # Submit hadoop MR job to generate data + cmd = ['hadoop', 'jar', str(jar_path)] + cmd += ['-p', args.parallel, '-s', args.scale] + # get dsdgen.jar path, assume user won't change file structure + tpcds_gen_path = jar_path.parent.parent.absolute() + if args.overwrite_output: + cmd += ['-o'] + if args.range: + # use a temp folder to save the specific range data. + # will move the content to parent folder afterwards. + # it's a workaround for "Output directory ... already exists" in incremental generation + temp_data_path = args.data_dir + '/_temp_' + # before generation, we remove "_temp_" folders in case they contain garbage generated by + # previous user runs. + clean_temp_data(temp_data_path) + cmd.extend(["-r", args.range]) + cmd.extend(["-d", temp_data_path]) + try: + subprocess.run(cmd, check=True, cwd=str(tpcds_gen_path)) + # only move delete table for data maintenance + merge_temp_tables(temp_data_path, args.data_dir) + finally: + clean_temp_data(temp_data_path) + else: + cmd.extend(["-d", args.data_dir]) + subprocess.run(cmd, check=True, cwd=str(tpcds_gen_path)) + # only move delete table for data maintenance + + def generate_data(args): - tool_path = check_build_nds_h() + jar_path, tool_path = check_build_nds_h() range_start = 1 range_end = int(args.parallel) if args.range: range_start, range_end = valid_range(args.range, args.parallel) - generate_data_local(args, range_start, range_end, tool_path) + if args.type == 'hdfs': + generate_data_hdfs(args, jar_path) + else: + generate_data_local(args, range_start, range_end, tool_path) + if __name__ == "__main__": parser = parser = argparse.ArgumentParser() + parser.add_argument("type", + choices=["local", "hdfs"], + help="file system to save the generated data.") parser.add_argument("scale", help="volume of data to generate in GB. Accepted SF - 1,10, 100, 300, 1000 \ ,3000, 10000, 30000," - ) + ) parser.add_argument("parallel", type=parallel_value_type, help="build data in separate chunks" - ) + ) parser.add_argument("data_dir", help="generate data in directory.") parser.add_argument('--range', help='Used for incremental data generation, meaning which part of child' + - 'chunks are generated in one run. Format: "start,end", both are inclusive. ' + - 'e.g. "1,100". Note: the child range must be within the "parallel", ' + - '"--parallel 100 --range 100,200" is illegal.') + 'chunks are generated in one run. Format: "start,end", both are inclusive. ' + + 'e.g. "1,100". Note: the child range must be within the "parallel", ' + + '"--parallel 100 --range 100,200" is illegal.') parser.add_argument("--overwrite_output", action="store_true", help="overwrite if there has already existing data in the path provided.") - + args = parser.parse_args() generate_data(args) diff --git a/nds-h/nds_h_gen_query_stream.py b/nds-h/nds_h_gen_query_stream.py index 2e58dab..fad9380 100644 --- a/nds-h/nds_h_gen_query_stream.py +++ b/nds-h/nds_h_gen_query_stream.py @@ -81,7 +81,7 @@ def generate_query_streams(args, tool_path): subprocess.run(base_cmd, check=True, cwd=str(work_dir),stdout=f) if __name__ == "__main__": - tool_path = check_build_nds_h() + jar_path, tool_path = check_build_nds_h() parser = parser = argparse.ArgumentParser() parser.add_argument("scale", help="Assume a database of this scale factor.") diff --git a/nds-h/tpch-gen/Makefile b/nds-h/tpch-gen/Makefile index 797aa7b..7b4520c 100644 --- a/nds-h/tpch-gen/Makefile +++ b/nds-h/tpch-gen/Makefile @@ -15,7 +15,7 @@ # limitations under the License. # -all: check-tpch-env prepare-target copy-dbgen modify-makefile modify-tpc-h build-dbgen +all: check-tpch-env prepare-target copy-dbgen modify-makefile modify-tpc-h build-dbgen make-jar build-package check-tpch-env: ifndef TPCH_HOME @@ -33,8 +33,6 @@ copy-dbgen: # file and the sql files cd "$(TPCH_HOME)/dbgen/queries"; dos2unix *.sql cd "$(TPCH_HOME)/dbgen/queries"; dos2unix *.patch - # Unapply any patch if already done - -cd "$(TPCH_HOME)/dbgen/queries"; cat *.patch | patch -R -p1 -N # apply patches to both source code and templates cd "$(TPCH_HOME)/dbgen/queries" && cat *.patch | patch -p1 cp -r "$(TPCH_HOME)/dbgen" target/ @@ -62,4 +60,11 @@ modify-tpc-h: build-dbgen: # Build it - cd target/dbgen ; make clean; make 2>/dev/null \ No newline at end of file + cd target/dbgen ; make clean; make 2>/dev/null + +make-jar: + cd target; (jar cvf dbgen.jar dbgen/ || gjar cvf dbgen.jar dbgen/ ) + +build-package: + mvn package + \ No newline at end of file diff --git a/nds-h/tpch-gen/pom.xml b/nds-h/tpch-gen/pom.xml new file mode 100644 index 0000000..d8a8db1 --- /dev/null +++ b/nds-h/tpch-gen/pom.xml @@ -0,0 +1,105 @@ + + + + + 4.0.0 + + org.nvidia.nds_h + tpch-gen + 1.0-SNAPSHOT + jar + + tpch-gen + http://maven.apache.org + + + 1.8 + + + + + org.apache.hadoop + hadoop-client + 3.2.1 + compile + + + commons-cli + commons-cli + 1.1 + compile + + + org.mockito + mockito-core + 1.8.5 + test + + + junit + junit + 4.13.1 + test + + + + + + + maven-compiler-plugin + + ${tpcds-gen.jdk.version} + ${tpcds-gen.jdk.version} + + + + org.apache.maven.plugins + maven-jar-plugin + + + + true + lib/ + org.nvidia.nds_h.GenTable + + + + + + org.apache.maven.plugins + maven-dependency-plugin + + + copy-dependencies + package + + copy-dependencies + + + ${project.build.directory}/lib + + + + + + + + diff --git a/nds-h/tpch-gen/src/main/java/org/nvidia/nds_h/GenTable.java b/nds-h/tpch-gen/src/main/java/org/nvidia/nds_h/GenTable.java new file mode 100755 index 0000000..529c271 --- /dev/null +++ b/nds-h/tpch-gen/src/main/java/org/nvidia/nds_h/GenTable.java @@ -0,0 +1,372 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.nvidia.nds_h; + +import org.apache.commons.cli.Options; +import org.apache.hadoop.conf.*; +import org.apache.hadoop.fs.*; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.io.*; +import org.apache.hadoop.util.*; + +import org.apache.hadoop.filecache.*; +import org.apache.hadoop.mapreduce.*; +import org.apache.hadoop.mapreduce.lib.input.*; +import org.apache.hadoop.mapreduce.lib.output.*; + +import org.apache.commons.cli.*; + +import java.io.*; +import java.net.*; +import java.math.*; +import java.security.*; +import java.util.Objects; + + +public class GenTable extends Configured implements Tool { + public static void main(String[] args) throws Exception { + Configuration conf = new Configuration(); + int res = ToolRunner.run(conf, new GenTable(), args); + System.exit(res); + } + + @Override + public int run(String[] args) throws Exception { + String[] remainingArgs = new GenericOptionsParser(getConf(), args).getRemainingArgs(); + + CommandLineParser parser = new BasicParser(); + getConf().setInt("io.sort.mb", 4); + Options options = getOptions(); + CommandLine line = parser.parse(options, remainingArgs); + + if(!line.hasOption("scale")) { + HelpFormatter f = new HelpFormatter(); + f.printHelp("GenTable", options); + return 1; + } + Path out = new Path(line.getOptionValue("dir")); + + int scale = Integer.parseInt(line.getOptionValue("scale")); + + String table = "all"; + if(line.hasOption("table")) { + table = line.getOptionValue("table"); + } + + int parallel = scale; + + if(line.hasOption("parallel")) { + parallel = Integer.parseInt(line.getOptionValue("parallel")); + } + + int rangeStart = 1; + int rangeEnd = parallel; + + if(line.hasOption("range")) { + String[] range = line.getOptionValue("range").split(","); + if (range.length == 1) { + System.err.println("Please provide range with comma for both range start and range end."); + return 1; + } + rangeStart = Integer.parseInt(range[0]); + rangeEnd = Integer.parseInt(range[1]); + if (rangeStart < 1 || rangeStart > rangeEnd || rangeEnd > parallel) { + System.err.println("Please provide correct child range: 1 <= rangeStart <= rangeEnd <= parallel"); + return 1; + } + } + + if(parallel == 1 || scale == 1) { + System.err.println("The MR task does not work for scale=1 or parallel=1"); + return 1; + } + + Path in = genInput(table, scale, parallel, rangeStart, rangeEnd); + + Path dbgen = copyJar(new File("target/dbgen.jar")); + + // Extracting the dbgen jar location and adding as a symlink as part of + // Mapred Cache hence enabling access by all mappers running + URI dsuri = dbgen.toUri(); + URI link = new URI(dsuri.getScheme(), + dsuri.getUserInfo(), dsuri.getHost(), + dsuri.getPort(),dsuri.getPath(), + dsuri.getQuery(),"dbgen"); + + Configuration conf = getConf(); + conf.setInt("mapred.task.timeout",0); + conf.setInt("mapreduce.task.timeout",0); + conf.setBoolean("mapreduce.map.output.compress", true); + conf.set("mapreduce.map.output.compress.codec", "org.apache.hadoop.io.compress.GzipCodec"); + + DistributedCache.addCacheArchive(link, conf); + DistributedCache.createSymlink(conf); + Job job = new Job(conf, "GenTable+"+table+"_"+scale); + job.setJarByClass(getClass()); + + // No reducers since no reduction task involved post data gen + // Updating mapper class + // Output will be a text file ( key(file_name) -> output ) + job.setNumReduceTasks(0); + job.setMapperClass(Dbgen.class); + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(Text.class); + + // Using NLineInputFormat mapper for parsing each line of input + // file as separate task + job.setInputFormatClass(NLineInputFormat.class); + NLineInputFormat.setNumLinesPerSplit(job, 1); + + FileInputFormat.addInputPath(job, in); + FileOutputFormat.setOutputPath(job, out); + + + FileSystem fs = FileSystem.get(getConf()); + // delete existing files if "overwrite" is set + if(line.hasOption("overwrite")) { + if (fs.exists(out)) { + fs.delete(out, true); + } + } + + // use multiple output to only write the named files + LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class); + MultipleOutputs.addNamedOutput(job, "text", + TextOutputFormat.class, LongWritable.class, Text.class); + + job.waitForCompletion(true); + + // cleanup + fs.delete(in, false); + fs.delete(dbgen, false); + + return 0; + } + + private static Options getOptions() { + Options options = new Options(); + /* + * These are the various options being passed to the class + * -s scale + * -d output directory + * -t specific table data + * -p number of parallel files to be generated + * -o overwrite output directory if exists + */ + options.addOption("s","scale", true, "scale"); + options.addOption("d","dir", true, "dir"); + options.addOption("t","table", true, "table"); + options.addOption("p", "parallel", true, "parallel"); + options.addOption("o", "overwrite", false, "overwrite existing data"); + options.addOption("r", "range", true, "child range in one data generation run"); + return options; + } + + /* + * This function just copies the jar from the local to hdfs temp + * location for access by the mappers + */ + public Path copyJar(File jar) throws Exception { + MessageDigest md = MessageDigest.getInstance("MD5"); + InputStream is = new FileInputStream(jar); + try { + is = new DigestInputStream(is, md); + // read stream to EOF as normal... + } + finally { + is.close(); + } + BigInteger md5 = new BigInteger(md.digest()); + String md5hex = md5.toString(16); + Path dst = new Path(String.format("/tmp/%s.jar",md5hex)); + Path src = new Path(jar.toURI()); + FileSystem fs = FileSystem.get(getConf()); + fs.copyFromLocalFile(false, /*overwrite*/true, src, dst); + return dst; + } + + /* + * This function generates the various commands to be run + * parallely as part of the mapper for the job. + * Each command runs the data generation for a specific part + * for a table + */ + public Path genInput(String table, int scale, int parallel, int rangeStart, int rangeEnd) throws Exception { + // Assigning epoch based name to the temporary files + // Will be cleaned later + long epoch = System.currentTimeMillis()/1000; + Path in = new Path("/tmp/"+table+"_"+scale+"-"+epoch); + FileSystem fs = FileSystem.get(getConf()); + FSDataOutputStream out = fs.create(in); + + // This is for passing the various params to the command + // for individual tables + String[ ] tables = {"c","O","L","P","S","s"}; + + for(int i = rangeStart; i <= rangeEnd; i++) { + String baseCmd = String.format("./dbgen -s %d -C %d -S %d ",scale,parallel,i); + // In case of no specific table, data is generated for all + // Separate commands for each table is generated for more parallelism + // running multiple mappers + if(table.equals("all")) { + for(String t: tables){ + String cmd = baseCmd + String.format("-T %s",t); + out.writeBytes(cmd+"\n"); + } + } + else{ + // TODO - update using map based approach for a cleaner implementation + if(table.equalsIgnoreCase("customer")){ + String cmd = baseCmd + "-T c"; + out.writeBytes(cmd + "\n"); + } + else if(table.equalsIgnoreCase("nation")){ + String cmd = baseCmd + "-T n"; + out.writeBytes(cmd + "\n"); + } + else if(table.equalsIgnoreCase("region")){ + String cmd = baseCmd + "-T r"; + out.writeBytes(cmd + "\n"); + } + else if(table.equalsIgnoreCase("lineItem")){ + String cmd = baseCmd + "-T L"; + out.writeBytes(cmd + "\n"); + } + else if(table.equalsIgnoreCase("orders")){ + String cmd = baseCmd + "-T O"; + out.writeBytes(cmd + "\n"); + } + else if(table.equalsIgnoreCase("parts")){ + String cmd = baseCmd + "-T P"; + out.writeBytes(cmd + "\n"); + } + else if(table.equalsIgnoreCase("partsupp")){ + String cmd = baseCmd + "-T S"; + out.writeBytes(cmd + "\n"); + } + else if(table.equalsIgnoreCase("supplier")){ + String cmd = baseCmd + "-T s"; + out.writeBytes(cmd + "\n"); + } + } + } + // nation and region tables are static tables hence adding + // a single command for both + if(table.equals("all")){ + String cmdL = String.format("./dbgen -s %d -T l",scale); + out.writeBytes(cmdL + "\n"); + } + // Writing the command file in temporary folder for being read by the mapper + out.close(); + return in; + } + + static String readToString(InputStream in) throws IOException { + InputStreamReader is = new InputStreamReader(in); + StringBuilder sb=new StringBuilder(); + BufferedReader br = new BufferedReader(is); + String read = br.readLine(); + + while(read != null) { + //System.out.println(read); + sb.append(read); + read =br.readLine(); + } + return sb.toString(); + } + + static final class Dbgen extends Mapper { + private MultipleOutputs mos; + protected void setup(Context context) throws IOException { + mos = new MultipleOutputs(context); + } + protected void cleanup(Context context) throws IOException, InterruptedException { + mos.close(); + } + protected void map(LongWritable offset, Text command, Mapper.Context context) + throws IOException, InterruptedException { + String parallel="1"; + String child="1"; + String table=""; + String suffix = ""; + String[] cmd = command.toString().split(" "); + + for(int i=0; i name.endsWith(suffixNew); + + for(File f: Objects.requireNonNull(cwd.listFiles(tables))) { + if(f != null) + { + System.out.println("Processing file: "+f.getName()); + } + final String baseOutputPath = f.getName().replace(suffix.substring(suffix.indexOf('.')), String.format("/data_%s_%s", child, parallel)); + BufferedReader br = new BufferedReader(new FileReader(f)); + String line; + while ((line = br.readLine()) != null) { + // process the line. + mos.write("text", line, null, baseOutputPath); + } + br.close(); + f.deleteOnExit(); + } + System.out.println("Processing complete"); + } + } +} diff --git a/nds/README.md b/nds/README.md index 94db74b..8650ebf 100644 --- a/nds/README.md +++ b/nds/README.md @@ -25,24 +25,24 @@ You may not use NDS except in compliance with the Apache License, Version 2.0 an sudo locale-gen en_US.UTF-8 sudo apt install openjdk-8-jdk-headless gcc make flex bison byacc maven ``` -3. Install and set up SPARK. - - Download latest distro from [here](https://spark.apache.org/downloads.html) - - Preferably >= 3.4 - - Find and note SPARK_HOME ( /DOWNLOAD/LOCATION/spark-<3.4.1>-bin-hadoop3 ) +3. Install and set up SPARK. + - Download latest distro from [here](https://spark.apache.org/downloads.html) + - Preferably >= 3.4 + - Find and note SPARK_HOME ( /DOWNLOAD/LOCATION/spark-<3.4.1>-bin-hadoop3 ) 4. TPC-DS Tools - User must download TPC-DS Tools from [official TPC website](https://www.tpc.org/tpc_documents_current_versions/current_specifications5.asp). The tool will be downloaded as a zip package with a random guid string prefix. - After unzipping it, a folder called `DSGen-software-code-3.2.0rc1` will be seen. + User must download TPC-DS Tools from [official TPC website](https://www.tpc.org/tpc_documents_current_versions/current_specifications5.asp). The tool will be downloaded as a zip package with a random guid string prefix. + After unzipping it, a folder called `DSGen-software-code-3.2.0rc1` will be seen. - User must set a system environment variable `TPCDS_HOME` pointing to this directory. e.g. + User must set a system environment variable `TPCDS_HOME` pointing to this directory. e.g. ```bash export TPCDS_HOME=/PATH/TO/YOUR/DSGen-software-code-3.2.0rc1 ``` - This variable will help find the TPC-DS Tool when building essential component for this repository. + This variable will help find the TPC-DS Tool when building essential component for this repository. ## Use spark-submit-template with template @@ -152,7 +152,8 @@ Parquet, Orc, Avro, JSON and Iceberg are supported for output data format at pre only Parquet and Orc are supported. Note: when exporting data from CSV to Iceberg, user needs to set necessary configs for Iceberg in submit template. -e.g. [convert_submit_cpu_iceberg.template](./convert_submit_cpu_iceberg.template) +e.g. [convert_submit_cpu_iceberg.template](./convert_submit_cpu_iceberg.template). +To run iceberg against different Spark versions, please modify the Iceberg package version accordingly in the template file. User can also specify `--tables` to convert specific table or tables. See argument details below. @@ -174,6 +175,9 @@ when you are about to shutdown the Metastore service. For [unmanaged tables](https://docs.databricks.com/lakehouse/data-objects.html#what-is-an-unmanaged-table), user doesn't need to create the Metastore service, appending `--delta_unmanaged` to arguments will be enough. +NOTE: To enabling Delta against different Spark versions, please modify the Delta package version accordingly in the template file. +For more version compatibility information, please visit [compatibility with apache spark](https://docs.delta.io/latest/releases.html#compatibility-with-apache-spark). + Arguments for `nds_transcode.py`: ```bash @@ -405,7 +409,8 @@ update operations cannot be done atomically on raw Parquet/Orc files, so we use [Iceberg](https://iceberg.apache.org/) as dataset metadata manager to overcome the issue. Enabling Iceberg requires additional configuration. Please refer to [Iceberg Spark](https://iceberg.apache.org/docs/latest/getting-started/) -for details. We also provide a Spark submit template with necessary Iceberg configs: [maintenance_iceberg.template](./maintenance_iceberg.template) +for details. We also provide a Spark submit template with necessary Iceberg configs: [maintenance_iceberg.template](./maintenance_iceberg.template). +To run iceberg against different Spark versions, please modify the Iceberg package version accordingly in the template file. The data maintenance queries are in [data_maintenance](./data_maintenance) folder. `DF_*.sql` are DELETE queries while `LF_*.sql` are INSERT queries. @@ -529,4 +534,4 @@ For instance: 4 concurrent streams in one Throughput run and the total available are 1024. Then in the template, `spark.cores.max` should be set to `1024/4=256` so that each stream will have compute resource evenly. -### NDS2.0 is using source code from TPC-DS Tool V3.2.0 +### NDS2.0 is using source code from TPC-DS Tool V3.2.0 \ No newline at end of file diff --git a/nds/base.template b/nds/base.template index 4fc0fb9..a085d9a 100644 --- a/nds/base.template +++ b/nds/base.template @@ -34,4 +34,4 @@ export EXECUTOR_MEMORY=${EXECUTOR_MEMORY:-16G} export NDS_LISTENER_JAR=${NDS_LISTENER_JAR:-./jvm_listener/target/nds-benchmark-listener-1.0-SNAPSHOT.jar} # The spark-rapids jar which is required when running on GPU export SPARK_RAPIDS_PLUGIN_JAR=${SPARK_RAPIDS_PLUGIN_JAR:-rapids-4-spark_2.12-22.06.0.jar} -export PYTHONPATH=$SPARK_HOME/python:`echo $SPARK_HOME/python/lib/py4j-*.zip` +export PYTHONPATH=$SPARK_HOME/python:`echo $SPARK_HOME/python/lib/py4j-*.zip` \ No newline at end of file diff --git a/nds/convert_submit_cpu.template b/nds/convert_submit_cpu.template index ac6add4..7a706b8 100644 --- a/nds/convert_submit_cpu.template +++ b/nds/convert_submit_cpu.template @@ -24,4 +24,4 @@ export SPARK_CONF=("--master" "${SPARK_MASTER}" "--conf" "spark.executor.cores=${EXECUTOR_CORES}" "--conf" "spark.executor.instances=${NUM_EXECUTORS}" "--conf" "spark.executor.memory=${EXECUTOR_MEMORY}" - "--conf" "spark.sql.shuffle.partitions=${SHUFFLE_PARTITIONS}") + "--conf" "spark.sql.shuffle.partitions=${SHUFFLE_PARTITIONS}") \ No newline at end of file diff --git a/nds/convert_submit_cpu_delta.template b/nds/convert_submit_cpu_delta.template index e7d9d0b..0237106 100644 --- a/nds/convert_submit_cpu_delta.template +++ b/nds/convert_submit_cpu_delta.template @@ -15,7 +15,7 @@ # limitations under the License. # -# 1. The io.delta:delta-core_2.12:1.0.1 only works on Spark 3.1.x +# 1. The io.delta:delta-core_2.12:1.1.0 only works on Spark 3.2.x # Please refer to https://docs.delta.io/latest/releases.html for other Spark versions. source base.template @@ -28,6 +28,6 @@ export SPARK_CONF=("--master" "${SPARK_MASTER}" "--conf" "spark.executor.instances=${NUM_EXECUTORS}" "--conf" "spark.executor.memory=${EXECUTOR_MEMORY}" "--conf" "spark.sql.shuffle.partitions=${SHUFFLE_PARTITIONS}" - "--packages" "io.delta:delta-core_2.12:1.0.1" + "--packages" "io.delta:delta-core_2.12:1.1.0" "--conf" "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" - "--conf" "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog") + "--conf" "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog") \ No newline at end of file diff --git a/nds/convert_submit_cpu_iceberg.template b/nds/convert_submit_cpu_iceberg.template index c961bfd..2440ac8 100644 --- a/nds/convert_submit_cpu_iceberg.template +++ b/nds/convert_submit_cpu_iceberg.template @@ -30,7 +30,7 @@ export SPARK_CONF=("--master" "${SPARK_MASTER}" "--conf" "spark.executor.instances=${NUM_EXECUTORS}" "--conf" "spark.executor.memory=${EXECUTOR_MEMORY}" "--conf" "spark.sql.shuffle.partitions=${SHUFFLE_PARTITIONS}" - "--packages" "org.apache.iceberg:iceberg-spark-runtime-3.1_2.12:0.13.2" + "--packages" "org.apache.iceberg:iceberg-spark-runtime-3.2_2.12:0.13.2" "--conf" "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions" "--conf" "spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog" "--conf" "spark.sql.catalog.spark_catalog.type=hadoop") diff --git a/nds/maintenance_delta.template b/nds/maintenance_delta.template index eb9dc76..35939ed 100644 --- a/nds/maintenance_delta.template +++ b/nds/maintenance_delta.template @@ -15,7 +15,7 @@ # limitations under the License. # -# 1. The io.delta:delta-core_2.12:1.0.0 only works on Spark 3.1.x +# 1. The io.delta:delta-core_2.12:1.1.0 only works on Spark 3.2.x # Please refer to https://docs.delta.io/latest/releases.html for other Spark versions. source base.template @@ -29,7 +29,7 @@ export SPARK_CONF=("--master" "${SPARK_MASTER}" "--conf" "spark.executor.instances=${NUM_EXECUTORS}" "--conf" "spark.executor.memory=${EXECUTOR_MEMORY}" "--conf" "spark.sql.shuffle.partitions=${SHUFFLE_PARTITIONS}" - "--packages" "io.delta:delta-core_2.12:1.0.1" + "--packages" "io.delta:delta-core_2.12:1.1.0" "--conf" "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" "--conf" "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog" "--jars" "$NDS_LISTENER_JAR") diff --git a/nds/maintenance_iceberg.template b/nds/maintenance_iceberg.template index d5b333e..a3f3fe6 100644 --- a/nds/maintenance_iceberg.template +++ b/nds/maintenance_iceberg.template @@ -30,7 +30,7 @@ export SPARK_CONF=("--master" "${SPARK_MASTER}" "--conf" "spark.executor.instances=${NUM_EXECUTORS}" "--conf" "spark.executor.memory=${EXECUTOR_MEMORY}" "--conf" "spark.sql.shuffle.partitions=${SHUFFLE_PARTITIONS}" - "--packages" "org.apache.iceberg:iceberg-spark-runtime-3.1_2.12:0.13.2" + "--packages" "org.apache.iceberg:iceberg-spark-runtime-3.2_2.12:0.13.2" "--conf" "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions" "--conf" "spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog" "--conf" "spark.sql.catalog.spark_catalog.type=hadoop" diff --git a/nds/nds_power.py b/nds/nds_power.py index 7c34fa1..676bbd6 100644 --- a/nds/nds_power.py +++ b/nds/nds_power.py @@ -132,7 +132,7 @@ def run_one_query(spark_session, df.collect() else: ensure_valid_column_names(df).write.format(output_format).mode('overwrite').save( - output_path + '/' + query_name) + output_path + '/' + query_name) def ensure_valid_column_names(df: DataFrame): def is_column_start(char): @@ -258,10 +258,10 @@ def run_query_stream(input_prefix, print("====== Run {} ======".format(query_name)) q_report = PysparkBenchReport(spark_session, query_name) summary = q_report.report_on(run_one_query,spark_session, - q_content, - query_name, - output_path, - output_format) + q_content, + query_name, + output_path, + output_format) print(f"Time taken: {summary['queryTimes']} millis for {query_name}") query_times = summary['queryTimes'] execution_time_list.append((spark_app_id, query_name, query_times[0])) @@ -333,11 +333,11 @@ def load_properties(filename): parser = parser = argparse.ArgumentParser() parser.add_argument('input_prefix', help='text to prepend to every input file path (e.g., "hdfs:///ds-generated-data"). ' + - 'If --hive or if input_format is "iceberg", this argument will be regarded as the value of property ' + - '"spark.sql.catalog.spark_catalog.warehouse". Only default Spark catalog ' + - 'session name "spark_catalog" is supported now, customized catalog is not ' + - 'yet supported. Note if this points to a Delta Lake table, the path must be ' + - 'absolute. Issue: https://github.com/delta-io/delta/issues/555') + 'If --hive or if input_format is "iceberg", this argument will be regarded as the value of property ' + + '"spark.sql.catalog.spark_catalog.warehouse". Only default Spark catalog ' + + 'session name "spark_catalog" is supported now, customized catalog is not ' + + 'yet supported. Note if this points to a Delta Lake table, the path must be ' + + 'absolute. Issue: https://github.com/delta-io/delta/issues/555') parser.add_argument('query_stream_file', help='query stream file that contains NDS queries in specific order') parser.add_argument('time_log', @@ -345,9 +345,9 @@ def load_properties(filename): default="") parser.add_argument('--input_format', help='type for input data source, e.g. parquet, orc, json, csv or iceberg, delta. ' + - 'Certain types are not fully supported by GPU reading, please refer to ' + - 'https://github.com/NVIDIA/spark-rapids/blob/branch-22.08/docs/compatibility.md ' + - 'for more details.', + 'Certain types are not fully supported by GPU reading, please refer to ' + + 'https://github.com/NVIDIA/spark-rapids/blob/branch-22.08/docs/compatibility.md ' + + 'for more details.', choices=['parquet', 'orc', 'avro', 'csv', 'json', 'iceberg', 'delta'], default='parquet') parser.add_argument('--output_prefix', @@ -360,34 +360,34 @@ def load_properties(filename): parser.add_argument('--floats', action='store_true', help='When loading Text files like json and csv, schemas are required to ' + - 'determine if certain parts of the data are read as decimal type or not. '+ - 'If specified, float data will be used.') + 'determine if certain parts of the data are read as decimal type or not. '+ + 'If specified, float data will be used.') parser.add_argument('--json_summary_folder', help='Empty folder/path (will create if not exist) to save JSON summary file for each query.') parser.add_argument('--delta_unmanaged', action='store_true', help='Use unmanaged tables for DeltaLake. This is useful for testing DeltaLake without ' + - ' leveraging a Metastore service.') + ' leveraging a Metastore service.') parser.add_argument('--keep_sc', action='store_true', help='Keep SparkContext alive after running all queries. This is a ' + - 'limitation on Databricks runtime environment. User should always attach ' + - 'this flag when running on Databricks.') + 'limitation on Databricks runtime environment. User should always attach ' + + 'this flag when running on Databricks.') parser.add_argument('--hive', action='store_true', help='use table meta information in Hive metastore directly without ' + - 'registering temp views.') + 'registering temp views.') parser.add_argument('--extra_time_log', help='extra path to save time log when running in cloud environment where '+ - 'driver node/pod cannot be accessed easily. User needs to add essential extra ' + - 'jars and configurations to access different cloud storage systems. ' + - 'e.g. s3, gs etc.') + 'driver node/pod cannot be accessed easily. User needs to add essential extra ' + + 'jars and configurations to access different cloud storage systems. ' + + 'e.g. s3, gs etc.') parser.add_argument('--sub_queries', type=lambda s: [x.strip() for x in s.split(',')], help='comma separated list of queries to run. If not specified, all queries ' + - 'in the stream file will be run. e.g. "query1,query2,query3". Note, use ' + - '"_part1" and "_part2" suffix for the following query names: ' + - 'query14, query23, query24, query39. e.g. query14_part1, query39_part2') + 'in the stream file will be run. e.g. "query1,query2,query3". Note, use ' + + '"_part1" and "_part2" suffix for the following query names: ' + + 'query14, query23, query24, query39. e.g. query14_part1, query39_part2') parser.add_argument('--allow_failure', action='store_true', help='Do not exit with non zero when any query failed or any task failed') @@ -407,4 +407,4 @@ def load_properties(filename): args.delta_unmanaged, args.keep_sc, args.hive, - args.allow_failure) + args.allow_failure) \ No newline at end of file diff --git a/nds/power_run_cpu.template b/nds/power_run_cpu.template index 76ea0ae..a02139d 100644 --- a/nds/power_run_cpu.template +++ b/nds/power_run_cpu.template @@ -16,7 +16,7 @@ # source base.template -export SHUFFLE_PARTITIONS=${SHUFFLE_PARTITIONS:-200} +export SHUFFLE_PARTITIONS=${SHUFFLE_PARTITIONS:-400} export SPARK_CONF=("--master" "${SPARK_MASTER}" "--deploy-mode" "client" @@ -29,4 +29,6 @@ export SPARK_CONF=("--master" "${SPARK_MASTER}" "--conf" "spark.sql.adaptive.enabled=true" "--conf" "spark.sql.broadcastTimeout=1200" "--conf" "spark.dynamicAllocation.enabled=false" + "--conf" "spark.eventLog.enabled=true" + "--conf" "spark.eventLog.dir=/home/sbari/project-repos/data/nds_benchmark_100_event_logs" "--jars" "$NDS_LISTENER_JAR") diff --git a/nds/power_run_cpu_delta.template b/nds/power_run_cpu_delta.template index 8ff90f3..89a4115 100644 --- a/nds/power_run_cpu_delta.template +++ b/nds/power_run_cpu_delta.template @@ -15,7 +15,7 @@ # limitations under the License. # -# 1. The io.delta:delta-core_2.12:1.0.1 only works on Spark 3.1.x +# 1. The io.delta:delta-core_2.12:1.1.0 only works on Spark 3.2.x # Please refer to https://docs.delta.io/latest/releases.html for other Spark versions. source base.template @@ -28,7 +28,7 @@ export SPARK_CONF=("--master" "${SPARK_MASTER}" "--conf" "spark.executor.instances=${NUM_EXECUTORS}" "--conf" "spark.executor.memory=${EXECUTOR_MEMORY}" "--conf" "spark.sql.shuffle.partitions=${SHUFFLE_PARTITIONS}" - "--packages" "io.delta:delta-core_2.12:1.0.1" + "--packages" "io.delta:delta-core_2.12:1.1.0" "--conf" "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" "--conf" "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog" "--jars" "$NDS_LISTENER_JAR") diff --git a/nds/power_run_cpu_iceberg.template b/nds/power_run_cpu_iceberg.template index 16cdf64..006e6f0 100644 --- a/nds/power_run_cpu_iceberg.template +++ b/nds/power_run_cpu_iceberg.template @@ -30,7 +30,7 @@ export SPARK_CONF=("--master" "${SPARK_MASTER}" "--conf" "spark.executor.instances=${NUM_EXECUTORS}" "--conf" "spark.executor.memory=${EXECUTOR_MEMORY}" "--conf" "spark.sql.shuffle.partitions=${SHUFFLE_PARTITIONS}" - "--packages" "org.apache.iceberg:iceberg-spark-runtime-3.1_2.12:0.13.2" + "--packages" "org.apache.iceberg:iceberg-spark-runtime-3.2_2.12:0.13.2" "--conf" "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions" "--conf" "spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog" "--conf" "spark.sql.catalog.spark_catalog.type=hadoop" diff --git a/nds/power_run_gpu_delta.template b/nds/power_run_gpu_delta.template index 1626cc7..4385200 100644 --- a/nds/power_run_gpu_delta.template +++ b/nds/power_run_gpu_delta.template @@ -15,7 +15,7 @@ # limitations under the License. # -# 1. The io.delta:delta-core_2.12:1.0.1 only works on Spark 3.1.x +# 1. The io.delta:delta-core_2.12:1.1.0 only works on Spark 3.2.x # Please refer to https://docs.delta.io/latest/releases.html for other Spark versions. source base.template @@ -39,7 +39,7 @@ export SPARK_CONF=("--master" "${SPARK_MASTER}" "--conf" "spark.rapids.memory.host.spillStorageSize=32G" "--conf" "spark.rapids.memory.pinnedPool.size=8g" "--conf" "spark.rapids.sql.concurrentGpuTasks=${CONCURRENT_GPU_TASKS}" - "--packages" "io.delta:delta-core_2.12:1.0.1" + "--packages" "io.delta:delta-core_2.12:1.1.0" "--conf" "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" "--conf" "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog" "--files" "$SPARK_HOME/examples/src/main/scripts/getGpusResources.sh" diff --git a/nds/power_run_gpu_iceberg.template b/nds/power_run_gpu_iceberg.template index 164bdfa..f6e1c04 100644 --- a/nds/power_run_gpu_iceberg.template +++ b/nds/power_run_gpu_iceberg.template @@ -41,7 +41,7 @@ export SPARK_CONF=("--master" "${SPARK_MASTER}" "--conf" "spark.rapids.memory.host.spillStorageSize=32G" "--conf" "spark.rapids.memory.pinnedPool.size=8g" "--conf" "spark.rapids.sql.concurrentGpuTasks=${CONCURRENT_GPU_TASKS}" - "--packages" "org.apache.iceberg:iceberg-spark-runtime-3.1_2.12:0.13.2" + "--packages" "org.apache.iceberg:iceberg-spark-runtime-3.2_2.12:0.13.2" "--conf" "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions" "--conf" "spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog" "--conf" "spark.sql.catalog.spark_catalog.type=hadoop" diff --git a/nds/spark-submit-template b/nds/spark-submit-template index fe0652b..8e4c2bc 100755 --- a/nds/spark-submit-template +++ b/nds/spark-submit-template @@ -30,4 +30,5 @@ CMD=("$SPARK_HOME/bin/spark-submit") CMD+=("${SPARK_CONF[@]}") CMD+=("${MORE_ARGS[@]}") # submit +echo "Executing command: ${CMD[@]}" "${CMD[@]}" diff --git a/nds/tpcds-gen/pom.xml b/nds/tpcds-gen/pom.xml index 2c595be..ae75fde 100644 --- a/nds/tpcds-gen/pom.xml +++ b/nds/tpcds-gen/pom.xml @@ -77,7 +77,7 @@ true - lib/ + lib/ org.notmysock.tpcds.GenTable diff --git a/shared/base.template b/shared/base.template index b758925..bff02f4 100644 --- a/shared/base.template +++ b/shared/base.template @@ -34,4 +34,4 @@ export EXECUTOR_MEMORY=${EXECUTOR_MEMORY:-16G} export NDS_LISTENER_JAR=${NDS_LISTENER_JAR:-../utils/jvm_listener/target/benchmark-listener-1.0-SNAPSHOT.jar} # The spark-rapids jar which is required when running on GPU export SPARK_RAPIDS_PLUGIN_JAR=${SPARK_RAPIDS_PLUGIN_JAR:-rapids-4-spark_2.12-24.04.0-cuda11.jar} -export PYTHONPATH=$SPARK_HOME/python:`echo $SPARK_HOME/python/lib/py4j-*.zip` +export PYTHONPATH=$SPARK_HOME/python:`echo $SPARK_HOME/python/lib/py4j-*.zip` \ No newline at end of file diff --git a/utils/check.py b/utils/check.py index db01e68..9704dd7 100644 --- a/utils/check.py +++ b/utils/check.py @@ -57,12 +57,14 @@ def check_build_nds_h(): # we assume user won't move this script. src_dir = Path(__file__).parent.parent.absolute() tool_path = list(Path(src_dir / 'nds-h/tpch-gen/target/dbgen').rglob("dbgen")) + jar_path = list( + Path(src_dir / 'nds-h/tpch-gen/target').rglob("tpch-gen-*.jar")) print(tool_path) if tool_path == []: raise Exception('dbgen executable is ' + 'not found in `target` folder.' + 'Please refer to README document and build this project first.') - return tool_path[0] + return jar_path[0], tool_path[0] def check_build_nds(): """check jar and tpcds executable @@ -156,7 +158,7 @@ def get_dir_size(start_path): def check_json_summary_folder(json_summary_folder): if json_summary_folder: - # prepare a folder to save json summaries of query results + # prepare a folder to save json summaries of query results if not os.path.exists(json_summary_folder): os.makedirs(json_summary_folder) else: @@ -170,4 +172,4 @@ def check_query_subset_exists(query_dict, subset_list): for q in subset_list: if q not in query_dict.keys(): raise Exception(f"Query {q} is not in the query dictionary. Please check the query subset.") - return True + return True \ No newline at end of file From 6ac29312cf0620d19a940ab5cbf3f024677fad60 Mon Sep 17 00:00:00 2001 From: Sayed Bilal Bari Date: Thu, 11 Jul 2024 11:24:19 -0500 Subject: [PATCH 3/5] Correcting modified files Signed-off-by: Sayed Bilal Bari --- nds/README.md | 14 ++++---- nds/convert_submit_cpu.template | 2 +- nds/convert_submit_cpu_delta.template | 2 +- nds/nds_power.py | 52 +++++++++++++-------------- nds/power_run_cpu.template | 4 +-- nds/spark-submit-template | 1 - nds/tpcds-gen/pom.xml | 2 +- shared/base.template | 2 +- utils/check.py | 2 +- 9 files changed, 39 insertions(+), 42 deletions(-) diff --git a/nds/README.md b/nds/README.md index 8650ebf..d5f4fd6 100644 --- a/nds/README.md +++ b/nds/README.md @@ -26,23 +26,23 @@ You may not use NDS except in compliance with the Apache License, Version 2.0 an sudo apt install openjdk-8-jdk-headless gcc make flex bison byacc maven ``` 3. Install and set up SPARK. - - Download latest distro from [here](https://spark.apache.org/downloads.html) - - Preferably >= 3.4 - - Find and note SPARK_HOME ( /DOWNLOAD/LOCATION/spark-<3.4.1>-bin-hadoop3 ) + - Download latest distro from [here](https://spark.apache.org/downloads.html) + - Preferably >= 3.4 + - Find and note SPARK_HOME ( /DOWNLOAD/LOCATION/spark-<3.4.1>-bin-hadoop3 ) 4. TPC-DS Tools - User must download TPC-DS Tools from [official TPC website](https://www.tpc.org/tpc_documents_current_versions/current_specifications5.asp). The tool will be downloaded as a zip package with a random guid string prefix. - After unzipping it, a folder called `DSGen-software-code-3.2.0rc1` will be seen. + User must download TPC-DS Tools from [official TPC website](https://www.tpc.org/tpc_documents_current_versions/current_specifications5.asp). The tool will be downloaded as a zip package with a random guid string prefix. + After unzipping it, a folder called `DSGen-software-code-3.2.0rc1` will be seen. - User must set a system environment variable `TPCDS_HOME` pointing to this directory. e.g. + User must set a system environment variable `TPCDS_HOME` pointing to this directory. e.g. ```bash export TPCDS_HOME=/PATH/TO/YOUR/DSGen-software-code-3.2.0rc1 ``` - This variable will help find the TPC-DS Tool when building essential component for this repository. + This variable will help find the TPC-DS Tool when building essential component for this repository. ## Use spark-submit-template with template diff --git a/nds/convert_submit_cpu.template b/nds/convert_submit_cpu.template index 7a706b8..ac6add4 100644 --- a/nds/convert_submit_cpu.template +++ b/nds/convert_submit_cpu.template @@ -24,4 +24,4 @@ export SPARK_CONF=("--master" "${SPARK_MASTER}" "--conf" "spark.executor.cores=${EXECUTOR_CORES}" "--conf" "spark.executor.instances=${NUM_EXECUTORS}" "--conf" "spark.executor.memory=${EXECUTOR_MEMORY}" - "--conf" "spark.sql.shuffle.partitions=${SHUFFLE_PARTITIONS}") \ No newline at end of file + "--conf" "spark.sql.shuffle.partitions=${SHUFFLE_PARTITIONS}") diff --git a/nds/convert_submit_cpu_delta.template b/nds/convert_submit_cpu_delta.template index 0237106..23789af 100644 --- a/nds/convert_submit_cpu_delta.template +++ b/nds/convert_submit_cpu_delta.template @@ -30,4 +30,4 @@ export SPARK_CONF=("--master" "${SPARK_MASTER}" "--conf" "spark.sql.shuffle.partitions=${SHUFFLE_PARTITIONS}" "--packages" "io.delta:delta-core_2.12:1.1.0" "--conf" "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" - "--conf" "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog") \ No newline at end of file + "--conf" "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog") diff --git a/nds/nds_power.py b/nds/nds_power.py index 676bbd6..4deb96c 100644 --- a/nds/nds_power.py +++ b/nds/nds_power.py @@ -132,7 +132,7 @@ def run_one_query(spark_session, df.collect() else: ensure_valid_column_names(df).write.format(output_format).mode('overwrite').save( - output_path + '/' + query_name) + output_path + '/' + query_name) def ensure_valid_column_names(df: DataFrame): def is_column_start(char): @@ -258,10 +258,10 @@ def run_query_stream(input_prefix, print("====== Run {} ======".format(query_name)) q_report = PysparkBenchReport(spark_session, query_name) summary = q_report.report_on(run_one_query,spark_session, - q_content, - query_name, - output_path, - output_format) + q_content, + query_name, + output_path, + output_format) print(f"Time taken: {summary['queryTimes']} millis for {query_name}") query_times = summary['queryTimes'] execution_time_list.append((spark_app_id, query_name, query_times[0])) @@ -333,11 +333,11 @@ def load_properties(filename): parser = parser = argparse.ArgumentParser() parser.add_argument('input_prefix', help='text to prepend to every input file path (e.g., "hdfs:///ds-generated-data"). ' + - 'If --hive or if input_format is "iceberg", this argument will be regarded as the value of property ' + - '"spark.sql.catalog.spark_catalog.warehouse". Only default Spark catalog ' + - 'session name "spark_catalog" is supported now, customized catalog is not ' + - 'yet supported. Note if this points to a Delta Lake table, the path must be ' + - 'absolute. Issue: https://github.com/delta-io/delta/issues/555') + 'If --hive or if input_format is "iceberg", this argument will be regarded as the value of property ' + + '"spark.sql.catalog.spark_catalog.warehouse". Only default Spark catalog ' + + 'session name "spark_catalog" is supported now, customized catalog is not ' + + 'yet supported. Note if this points to a Delta Lake table, the path must be ' + + 'absolute. Issue: https://github.com/delta-io/delta/issues/555') parser.add_argument('query_stream_file', help='query stream file that contains NDS queries in specific order') parser.add_argument('time_log', @@ -345,9 +345,9 @@ def load_properties(filename): default="") parser.add_argument('--input_format', help='type for input data source, e.g. parquet, orc, json, csv or iceberg, delta. ' + - 'Certain types are not fully supported by GPU reading, please refer to ' + - 'https://github.com/NVIDIA/spark-rapids/blob/branch-22.08/docs/compatibility.md ' + - 'for more details.', + 'Certain types are not fully supported by GPU reading, please refer to ' + + 'https://github.com/NVIDIA/spark-rapids/blob/branch-22.08/docs/compatibility.md ' + + 'for more details.', choices=['parquet', 'orc', 'avro', 'csv', 'json', 'iceberg', 'delta'], default='parquet') parser.add_argument('--output_prefix', @@ -360,34 +360,34 @@ def load_properties(filename): parser.add_argument('--floats', action='store_true', help='When loading Text files like json and csv, schemas are required to ' + - 'determine if certain parts of the data are read as decimal type or not. '+ - 'If specified, float data will be used.') + 'determine if certain parts of the data are read as decimal type or not. '+ + 'If specified, float data will be used.') parser.add_argument('--json_summary_folder', help='Empty folder/path (will create if not exist) to save JSON summary file for each query.') parser.add_argument('--delta_unmanaged', action='store_true', help='Use unmanaged tables for DeltaLake. This is useful for testing DeltaLake without ' + - ' leveraging a Metastore service.') + ' leveraging a Metastore service.') parser.add_argument('--keep_sc', action='store_true', help='Keep SparkContext alive after running all queries. This is a ' + - 'limitation on Databricks runtime environment. User should always attach ' + - 'this flag when running on Databricks.') + 'limitation on Databricks runtime environment. User should always attach ' + + 'this flag when running on Databricks.') parser.add_argument('--hive', action='store_true', help='use table meta information in Hive metastore directly without ' + - 'registering temp views.') + 'registering temp views.') parser.add_argument('--extra_time_log', help='extra path to save time log when running in cloud environment where '+ - 'driver node/pod cannot be accessed easily. User needs to add essential extra ' + - 'jars and configurations to access different cloud storage systems. ' + - 'e.g. s3, gs etc.') + 'driver node/pod cannot be accessed easily. User needs to add essential extra ' + + 'jars and configurations to access different cloud storage systems. ' + + 'e.g. s3, gs etc.') parser.add_argument('--sub_queries', type=lambda s: [x.strip() for x in s.split(',')], help='comma separated list of queries to run. If not specified, all queries ' + - 'in the stream file will be run. e.g. "query1,query2,query3". Note, use ' + - '"_part1" and "_part2" suffix for the following query names: ' + - 'query14, query23, query24, query39. e.g. query14_part1, query39_part2') + 'in the stream file will be run. e.g. "query1,query2,query3". Note, use ' + + '"_part1" and "_part2" suffix for the following query names: ' + + 'query14, query23, query24, query39. e.g. query14_part1, query39_part2') parser.add_argument('--allow_failure', action='store_true', help='Do not exit with non zero when any query failed or any task failed') @@ -407,4 +407,4 @@ def load_properties(filename): args.delta_unmanaged, args.keep_sc, args.hive, - args.allow_failure) \ No newline at end of file + args.allow_failure) diff --git a/nds/power_run_cpu.template b/nds/power_run_cpu.template index a02139d..76ea0ae 100644 --- a/nds/power_run_cpu.template +++ b/nds/power_run_cpu.template @@ -16,7 +16,7 @@ # source base.template -export SHUFFLE_PARTITIONS=${SHUFFLE_PARTITIONS:-400} +export SHUFFLE_PARTITIONS=${SHUFFLE_PARTITIONS:-200} export SPARK_CONF=("--master" "${SPARK_MASTER}" "--deploy-mode" "client" @@ -29,6 +29,4 @@ export SPARK_CONF=("--master" "${SPARK_MASTER}" "--conf" "spark.sql.adaptive.enabled=true" "--conf" "spark.sql.broadcastTimeout=1200" "--conf" "spark.dynamicAllocation.enabled=false" - "--conf" "spark.eventLog.enabled=true" - "--conf" "spark.eventLog.dir=/home/sbari/project-repos/data/nds_benchmark_100_event_logs" "--jars" "$NDS_LISTENER_JAR") diff --git a/nds/spark-submit-template b/nds/spark-submit-template index 8e4c2bc..fe0652b 100755 --- a/nds/spark-submit-template +++ b/nds/spark-submit-template @@ -30,5 +30,4 @@ CMD=("$SPARK_HOME/bin/spark-submit") CMD+=("${SPARK_CONF[@]}") CMD+=("${MORE_ARGS[@]}") # submit -echo "Executing command: ${CMD[@]}" "${CMD[@]}" diff --git a/nds/tpcds-gen/pom.xml b/nds/tpcds-gen/pom.xml index ae75fde..fe266af 100644 --- a/nds/tpcds-gen/pom.xml +++ b/nds/tpcds-gen/pom.xml @@ -77,7 +77,7 @@ true - lib/ + lib/ org.notmysock.tpcds.GenTable diff --git a/shared/base.template b/shared/base.template index bff02f4..b758925 100644 --- a/shared/base.template +++ b/shared/base.template @@ -34,4 +34,4 @@ export EXECUTOR_MEMORY=${EXECUTOR_MEMORY:-16G} export NDS_LISTENER_JAR=${NDS_LISTENER_JAR:-../utils/jvm_listener/target/benchmark-listener-1.0-SNAPSHOT.jar} # The spark-rapids jar which is required when running on GPU export SPARK_RAPIDS_PLUGIN_JAR=${SPARK_RAPIDS_PLUGIN_JAR:-rapids-4-spark_2.12-24.04.0-cuda11.jar} -export PYTHONPATH=$SPARK_HOME/python:`echo $SPARK_HOME/python/lib/py4j-*.zip` \ No newline at end of file +export PYTHONPATH=$SPARK_HOME/python:`echo $SPARK_HOME/python/lib/py4j-*.zip` diff --git a/utils/check.py b/utils/check.py index 9704dd7..b8fa41d 100644 --- a/utils/check.py +++ b/utils/check.py @@ -172,4 +172,4 @@ def check_query_subset_exists(query_dict, subset_list): for q in subset_list: if q not in query_dict.keys(): raise Exception(f"Query {q} is not in the query dictionary. Please check the query subset.") - return True \ No newline at end of file + return True From 9d66afea820bd80d89ab9aa75bffc4ce2e02285a Mon Sep 17 00:00:00 2001 From: Sayed Bilal Bari Date: Thu, 11 Jul 2024 11:28:34 -0500 Subject: [PATCH 4/5] Correcting changes Signed-off-by: Sayed Bilal Bari --- nds/README.md | 4 ++-- nds/base.template | 2 +- nds/nds_power.py | 46 +++++++++++++++++++++---------------------- nds/tpcds-gen/pom.xml | 2 +- utils/check.py | 2 +- 5 files changed, 28 insertions(+), 28 deletions(-) diff --git a/nds/README.md b/nds/README.md index d5f4fd6..b9ffb8e 100644 --- a/nds/README.md +++ b/nds/README.md @@ -25,7 +25,7 @@ You may not use NDS except in compliance with the Apache License, Version 2.0 an sudo locale-gen en_US.UTF-8 sudo apt install openjdk-8-jdk-headless gcc make flex bison byacc maven ``` -3. Install and set up SPARK. +3. Install and set up SPARK. - Download latest distro from [here](https://spark.apache.org/downloads.html) - Preferably >= 3.4 - Find and note SPARK_HOME ( /DOWNLOAD/LOCATION/spark-<3.4.1>-bin-hadoop3 ) @@ -534,4 +534,4 @@ For instance: 4 concurrent streams in one Throughput run and the total available are 1024. Then in the template, `spark.cores.max` should be set to `1024/4=256` so that each stream will have compute resource evenly. -### NDS2.0 is using source code from TPC-DS Tool V3.2.0 \ No newline at end of file +### NDS2.0 is using source code from TPC-DS Tool V3.2.0 diff --git a/nds/base.template b/nds/base.template index a085d9a..4fc0fb9 100644 --- a/nds/base.template +++ b/nds/base.template @@ -34,4 +34,4 @@ export EXECUTOR_MEMORY=${EXECUTOR_MEMORY:-16G} export NDS_LISTENER_JAR=${NDS_LISTENER_JAR:-./jvm_listener/target/nds-benchmark-listener-1.0-SNAPSHOT.jar} # The spark-rapids jar which is required when running on GPU export SPARK_RAPIDS_PLUGIN_JAR=${SPARK_RAPIDS_PLUGIN_JAR:-rapids-4-spark_2.12-22.06.0.jar} -export PYTHONPATH=$SPARK_HOME/python:`echo $SPARK_HOME/python/lib/py4j-*.zip` \ No newline at end of file +export PYTHONPATH=$SPARK_HOME/python:`echo $SPARK_HOME/python/lib/py4j-*.zip` diff --git a/nds/nds_power.py b/nds/nds_power.py index 4deb96c..a972dbb 100644 --- a/nds/nds_power.py +++ b/nds/nds_power.py @@ -258,10 +258,10 @@ def run_query_stream(input_prefix, print("====== Run {} ======".format(query_name)) q_report = PysparkBenchReport(spark_session, query_name) summary = q_report.report_on(run_one_query,spark_session, - q_content, - query_name, - output_path, - output_format) + q_content, + query_name, + output_path, + output_format) print(f"Time taken: {summary['queryTimes']} millis for {query_name}") query_times = summary['queryTimes'] execution_time_list.append((spark_app_id, query_name, query_times[0])) @@ -333,11 +333,11 @@ def load_properties(filename): parser = parser = argparse.ArgumentParser() parser.add_argument('input_prefix', help='text to prepend to every input file path (e.g., "hdfs:///ds-generated-data"). ' + - 'If --hive or if input_format is "iceberg", this argument will be regarded as the value of property ' + - '"spark.sql.catalog.spark_catalog.warehouse". Only default Spark catalog ' + - 'session name "spark_catalog" is supported now, customized catalog is not ' + - 'yet supported. Note if this points to a Delta Lake table, the path must be ' + - 'absolute. Issue: https://github.com/delta-io/delta/issues/555') + 'If --hive or if input_format is "iceberg", this argument will be regarded as the value of property ' + + '"spark.sql.catalog.spark_catalog.warehouse". Only default Spark catalog ' + + 'session name "spark_catalog" is supported now, customized catalog is not ' + + 'yet supported. Note if this points to a Delta Lake table, the path must be ' + + 'absolute. Issue: https://github.com/delta-io/delta/issues/555') parser.add_argument('query_stream_file', help='query stream file that contains NDS queries in specific order') parser.add_argument('time_log', @@ -345,9 +345,9 @@ def load_properties(filename): default="") parser.add_argument('--input_format', help='type for input data source, e.g. parquet, orc, json, csv or iceberg, delta. ' + - 'Certain types are not fully supported by GPU reading, please refer to ' + - 'https://github.com/NVIDIA/spark-rapids/blob/branch-22.08/docs/compatibility.md ' + - 'for more details.', + 'Certain types are not fully supported by GPU reading, please refer to ' + + 'https://github.com/NVIDIA/spark-rapids/blob/branch-22.08/docs/compatibility.md ' + + 'for more details.', choices=['parquet', 'orc', 'avro', 'csv', 'json', 'iceberg', 'delta'], default='parquet') parser.add_argument('--output_prefix', @@ -360,34 +360,34 @@ def load_properties(filename): parser.add_argument('--floats', action='store_true', help='When loading Text files like json and csv, schemas are required to ' + - 'determine if certain parts of the data are read as decimal type or not. '+ - 'If specified, float data will be used.') + 'determine if certain parts of the data are read as decimal type or not. '+ + 'If specified, float data will be used.') parser.add_argument('--json_summary_folder', help='Empty folder/path (will create if not exist) to save JSON summary file for each query.') parser.add_argument('--delta_unmanaged', action='store_true', help='Use unmanaged tables for DeltaLake. This is useful for testing DeltaLake without ' + - ' leveraging a Metastore service.') + ' leveraging a Metastore service.') parser.add_argument('--keep_sc', action='store_true', help='Keep SparkContext alive after running all queries. This is a ' + - 'limitation on Databricks runtime environment. User should always attach ' + - 'this flag when running on Databricks.') + 'limitation on Databricks runtime environment. User should always attach ' + + 'this flag when running on Databricks.') parser.add_argument('--hive', action='store_true', help='use table meta information in Hive metastore directly without ' + 'registering temp views.') parser.add_argument('--extra_time_log', help='extra path to save time log when running in cloud environment where '+ - 'driver node/pod cannot be accessed easily. User needs to add essential extra ' + - 'jars and configurations to access different cloud storage systems. ' + - 'e.g. s3, gs etc.') + 'driver node/pod cannot be accessed easily. User needs to add essential extra ' + + 'jars and configurations to access different cloud storage systems. ' + + 'e.g. s3, gs etc.') parser.add_argument('--sub_queries', type=lambda s: [x.strip() for x in s.split(',')], help='comma separated list of queries to run. If not specified, all queries ' + - 'in the stream file will be run. e.g. "query1,query2,query3". Note, use ' + - '"_part1" and "_part2" suffix for the following query names: ' + - 'query14, query23, query24, query39. e.g. query14_part1, query39_part2') + 'in the stream file will be run. e.g. "query1,query2,query3". Note, use ' + + '"_part1" and "_part2" suffix for the following query names: ' + + 'query14, query23, query24, query39. e.g. query14_part1, query39_part2') parser.add_argument('--allow_failure', action='store_true', help='Do not exit with non zero when any query failed or any task failed') diff --git a/nds/tpcds-gen/pom.xml b/nds/tpcds-gen/pom.xml index fe266af..b36ab52 100644 --- a/nds/tpcds-gen/pom.xml +++ b/nds/tpcds-gen/pom.xml @@ -77,7 +77,7 @@ true - lib/ + lib/ org.notmysock.tpcds.GenTable diff --git a/utils/check.py b/utils/check.py index b8fa41d..e429108 100644 --- a/utils/check.py +++ b/utils/check.py @@ -158,7 +158,7 @@ def get_dir_size(start_path): def check_json_summary_folder(json_summary_folder): if json_summary_folder: - # prepare a folder to save json summaries of query results + # prepare a folder to save json summaries of query results if not os.path.exists(json_summary_folder): os.makedirs(json_summary_folder) else: From 1b877cf3e014cf13aba6182d9ac12716c935e071 Mon Sep 17 00:00:00 2001 From: Sayed Bilal Bari Date: Thu, 11 Jul 2024 11:30:35 -0500 Subject: [PATCH 5/5] Correting changes Signed-off-by: Sayed Bilal Bari --- nds/nds_power.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/nds/nds_power.py b/nds/nds_power.py index a972dbb..7c34fa1 100644 --- a/nds/nds_power.py +++ b/nds/nds_power.py @@ -367,7 +367,7 @@ def load_properties(filename): parser.add_argument('--delta_unmanaged', action='store_true', help='Use unmanaged tables for DeltaLake. This is useful for testing DeltaLake without ' + - ' leveraging a Metastore service.') + ' leveraging a Metastore service.') parser.add_argument('--keep_sc', action='store_true', help='Keep SparkContext alive after running all queries. This is a ' + @@ -376,7 +376,7 @@ def load_properties(filename): parser.add_argument('--hive', action='store_true', help='use table meta information in Hive metastore directly without ' + - 'registering temp views.') + 'registering temp views.') parser.add_argument('--extra_time_log', help='extra path to save time log when running in cloud environment where '+ 'driver node/pod cannot be accessed easily. User needs to add essential extra ' +