Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding keep_sc support nds-h #193

Merged
merged 6 commits into from
Jul 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 20 additions & 22 deletions nds-h/nds_h_power.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,33 +186,25 @@ def run_query_stream(input_prefix,
sub_queries,
input_format,
output_path=None,
keep_sc=False,
output_format="parquet",
json_summary_folder=None):
"""run SQL in Spark and record execution time log. The execution time log is saved as a CSV file
for easy accessibility. TempView Creation time is also recorded.

Args:
input_prefix (str): path of input data.
property_file (str): path of property file for Spark configuration.
query_dict (OrderedDict): ordered dict {query_name: query_content} of all NDS-H queries runnable in Spark
time_log_output_path (str): path of the log that contains query execution time, both local
:param input_prefix : path of input data or warehouse if input_format is "iceberg" or hive_external=True.
:param property_file: property file for Spark configuration.
:param query_dict : ordered dict {query_name: query_content} of all TPC-DS queries runnable in Spark
:param time_log_output_path : path of the log that contains query execution time, both local
and HDFS path are supported.
sub_queries (list): a list of query names to run, if not specified, all queries in the stream
input_format (str, optional): type of input data source.
output_path (str, optional): path of query output, optional. If not specified, collect()
:param input_format : type of input data source.
:param output_path : path of query output, optinal. If not specified, collect()
action will be applied to each query. Defaults to None.
output_format (str, optional): query output format, choices are csv, orc, parquet. Defaults
json_summary_folder (str, optional): path to save JSON summary files for each query.
:param output_format : query output format, choices are csv, orc, parquet. Defaults to "parquet".
:param keep_sc : Databricks specific to keep the spark context alive. Defaults to False.
:param json_summary_folder : path to save JSON summary files for each query.
to "parquet".
:param time_log_output_path:
:param sub_queries:
:param json_summary_folder:
:param output_format:
:param output_path:
:param input_format:
:param input_prefix:
:param query_dict:
:param property_file:
"""
queries_reports = []
execution_time_list = []
Expand Down Expand Up @@ -261,8 +253,9 @@ def run_query_stream(input_prefix,
summary_prefix = os.path.join(json_summary_folder, '')
q_report.write_summary(prefix=summary_prefix)
power_end = int(time.time())
power_elapse = int((power_end - power_start) * 1000)
spark_session.sparkContext.stop()
power_elapse = int((power_end - power_start)*1000)
if not keep_sc:
spark_session.sparkContext.stop()
total_time_end = time.time()
total_elapse = int((total_time_end - total_time_start) * 1000)
print("====== Power Test Time: {} milliseconds ======".format(power_elapse))
Expand Down Expand Up @@ -321,6 +314,11 @@ def load_properties(filename):
'absolute. Issue: https://github.com/delta-io/delta/issues/555')
parser.add_argument('query_stream_file',
help='query stream file that contains NDS queries in specific order')
parser.add_argument('--keep_sc',
action='store_true',
help='Keep SparkContext alive after running all queries. This is a ' +
'limitation on Databricks runtime environment. User should always '
'attach this flag when running on Databricks.')
parser.add_argument('time_log',
help='path to execution time log, only support local path.',
default="")
Expand Down Expand Up @@ -355,6 +353,6 @@ def load_properties(filename):
args.sub_queries,
args.input_format,
args.output_prefix,
args.keep_sc,
args.output_format,
args.json_summary_folder
)
args.json_summary_folder)
2 changes: 1 addition & 1 deletion nds/tpcds-gen/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@
<archive>
<manifest>
<addClasspath>true</addClasspath>
<classpathPrefix>lib/</classpathPrefix>
<classpathPrefix>lib/</classpathPrefix>
<mainClass>org.notmysock.tpcds.GenTable</mainClass>
</manifest>
</archive>
Expand Down