diff --git a/nds-h/nds_h_power.py b/nds-h/nds_h_power.py index dc1496b..04a2f66 100644 --- a/nds-h/nds_h_power.py +++ b/nds-h/nds_h_power.py @@ -186,33 +186,25 @@ def run_query_stream(input_prefix, sub_queries, input_format, output_path=None, + keep_sc=False, output_format="parquet", json_summary_folder=None): """run SQL in Spark and record execution time log. The execution time log is saved as a CSV file for easy accessibility. TempView Creation time is also recorded. Args: - input_prefix (str): path of input data. - property_file (str): path of property file for Spark configuration. - query_dict (OrderedDict): ordered dict {query_name: query_content} of all NDS-H queries runnable in Spark - time_log_output_path (str): path of the log that contains query execution time, both local + :param input_prefix : path of input data or warehouse if input_format is "iceberg" or hive_external=True. + :param property_file: property file for Spark configuration. + :param query_dict : ordered dict {query_name: query_content} of all TPC-DS queries runnable in Spark + :param time_log_output_path : path of the log that contains query execution time, both local and HDFS path are supported. - sub_queries (list): a list of query names to run, if not specified, all queries in the stream - input_format (str, optional): type of input data source. - output_path (str, optional): path of query output, optional. If not specified, collect() + :param input_format : type of input data source. + :param output_path : path of query output, optinal. If not specified, collect() action will be applied to each query. Defaults to None. - output_format (str, optional): query output format, choices are csv, orc, parquet. Defaults - json_summary_folder (str, optional): path to save JSON summary files for each query. + :param output_format : query output format, choices are csv, orc, parquet. Defaults to "parquet". + :param keep_sc : Databricks specific to keep the spark context alive. Defaults to False. + :param json_summary_folder : path to save JSON summary files for each query. to "parquet". - :param time_log_output_path: - :param sub_queries: - :param json_summary_folder: - :param output_format: - :param output_path: - :param input_format: - :param input_prefix: - :param query_dict: - :param property_file: """ queries_reports = [] execution_time_list = [] @@ -261,8 +253,9 @@ def run_query_stream(input_prefix, summary_prefix = os.path.join(json_summary_folder, '') q_report.write_summary(prefix=summary_prefix) power_end = int(time.time()) - power_elapse = int((power_end - power_start) * 1000) - spark_session.sparkContext.stop() + power_elapse = int((power_end - power_start)*1000) + if not keep_sc: + spark_session.sparkContext.stop() total_time_end = time.time() total_elapse = int((total_time_end - total_time_start) * 1000) print("====== Power Test Time: {} milliseconds ======".format(power_elapse)) @@ -321,6 +314,11 @@ def load_properties(filename): 'absolute. Issue: https://github.com/delta-io/delta/issues/555') parser.add_argument('query_stream_file', help='query stream file that contains NDS queries in specific order') + parser.add_argument('--keep_sc', + action='store_true', + help='Keep SparkContext alive after running all queries. This is a ' + + 'limitation on Databricks runtime environment. User should always ' + 'attach this flag when running on Databricks.') parser.add_argument('time_log', help='path to execution time log, only support local path.', default="") @@ -355,6 +353,6 @@ def load_properties(filename): args.sub_queries, args.input_format, args.output_prefix, + args.keep_sc, args.output_format, - args.json_summary_folder - ) + args.json_summary_folder) diff --git a/nds/tpcds-gen/pom.xml b/nds/tpcds-gen/pom.xml index 2c595be..b36ab52 100644 --- a/nds/tpcds-gen/pom.xml +++ b/nds/tpcds-gen/pom.xml @@ -77,7 +77,7 @@ true - lib/ + lib/ org.notmysock.tpcds.GenTable