Skip to content

Commit

Permalink
Adding keep_sc support nds-h (#193)
Browse files Browse the repository at this point in the history
* Adding keep_sc support nds-h

Signed-off-by: Sayed Bilal Bari <[email protected]>

* Pulling in latest changes

Signed-off-by: Sayed Bilal Bari <[email protected]>

* Correcting modified files

Signed-off-by: Sayed Bilal Bari <[email protected]>

* Correcting changes

Signed-off-by: Sayed Bilal Bari <[email protected]>

* Correting changes

Signed-off-by: Sayed Bilal Bari <[email protected]>

---------

Signed-off-by: Sayed Bilal Bari <[email protected]>
Co-authored-by: Sayed Bilal Bari <[email protected]>
  • Loading branch information
bilalbari and bilalbari authored Jul 11, 2024
1 parent c41b702 commit eb96306
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 23 deletions.
42 changes: 20 additions & 22 deletions nds-h/nds_h_power.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,33 +186,25 @@ def run_query_stream(input_prefix,
sub_queries,
input_format,
output_path=None,
keep_sc=False,
output_format="parquet",
json_summary_folder=None):
"""run SQL in Spark and record execution time log. The execution time log is saved as a CSV file
for easy accessibility. TempView Creation time is also recorded.
Args:
input_prefix (str): path of input data.
property_file (str): path of property file for Spark configuration.
query_dict (OrderedDict): ordered dict {query_name: query_content} of all NDS-H queries runnable in Spark
time_log_output_path (str): path of the log that contains query execution time, both local
:param input_prefix : path of input data or warehouse if input_format is "iceberg" or hive_external=True.
:param property_file: property file for Spark configuration.
:param query_dict : ordered dict {query_name: query_content} of all TPC-DS queries runnable in Spark
:param time_log_output_path : path of the log that contains query execution time, both local
and HDFS path are supported.
sub_queries (list): a list of query names to run, if not specified, all queries in the stream
input_format (str, optional): type of input data source.
output_path (str, optional): path of query output, optional. If not specified, collect()
:param input_format : type of input data source.
:param output_path : path of query output, optinal. If not specified, collect()
action will be applied to each query. Defaults to None.
output_format (str, optional): query output format, choices are csv, orc, parquet. Defaults
json_summary_folder (str, optional): path to save JSON summary files for each query.
:param output_format : query output format, choices are csv, orc, parquet. Defaults to "parquet".
:param keep_sc : Databricks specific to keep the spark context alive. Defaults to False.
:param json_summary_folder : path to save JSON summary files for each query.
to "parquet".
:param time_log_output_path:
:param sub_queries:
:param json_summary_folder:
:param output_format:
:param output_path:
:param input_format:
:param input_prefix:
:param query_dict:
:param property_file:
"""
queries_reports = []
execution_time_list = []
Expand Down Expand Up @@ -261,8 +253,9 @@ def run_query_stream(input_prefix,
summary_prefix = os.path.join(json_summary_folder, '')
q_report.write_summary(prefix=summary_prefix)
power_end = int(time.time())
power_elapse = int((power_end - power_start) * 1000)
spark_session.sparkContext.stop()
power_elapse = int((power_end - power_start)*1000)
if not keep_sc:
spark_session.sparkContext.stop()
total_time_end = time.time()
total_elapse = int((total_time_end - total_time_start) * 1000)
print("====== Power Test Time: {} milliseconds ======".format(power_elapse))
Expand Down Expand Up @@ -321,6 +314,11 @@ def load_properties(filename):
'absolute. Issue: https://github.com/delta-io/delta/issues/555')
parser.add_argument('query_stream_file',
help='query stream file that contains NDS queries in specific order')
parser.add_argument('--keep_sc',
action='store_true',
help='Keep SparkContext alive after running all queries. This is a ' +
'limitation on Databricks runtime environment. User should always '
'attach this flag when running on Databricks.')
parser.add_argument('time_log',
help='path to execution time log, only support local path.',
default="")
Expand Down Expand Up @@ -355,6 +353,6 @@ def load_properties(filename):
args.sub_queries,
args.input_format,
args.output_prefix,
args.keep_sc,
args.output_format,
args.json_summary_folder
)
args.json_summary_folder)
2 changes: 1 addition & 1 deletion nds/tpcds-gen/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@
<archive>
<manifest>
<addClasspath>true</addClasspath>
<classpathPrefix>lib/</classpathPrefix>
<classpathPrefix>lib/</classpathPrefix>
<mainClass>org.notmysock.tpcds.GenTable</mainClass>
</manifest>
</archive>
Expand Down

0 comments on commit eb96306

Please sign in to comment.