diff --git a/.github/workflows/lint.yml b/.github/workflows/lint.yml index 26e4c5b..29a9c46 100644 --- a/.github/workflows/lint.yml +++ b/.github/workflows/lint.yml @@ -19,10 +19,10 @@ jobs: VERSION=$(grep -m 1 -oP 'ruff==\K(.*)' requirements.txt) echo "version=$VERSION" >> $GITHUB_OUTPUT - # - uses: chartboost/ruff-action@v1 - # with: - # version: ${{ steps.version.outputs.version }} - # args: check --no-fix + - uses: chartboost/ruff-action@v1 + with: + version: ${{ steps.version.outputs.version }} + args: check --no-fix - uses: chartboost/ruff-action@v1 with: diff --git a/.gitignore b/.gitignore index 4283bbc..9cf55fd 100644 --- a/.gitignore +++ b/.gitignore @@ -16,3 +16,4 @@ perf.* target/ **/flamegraph.svg Cargo.lock +.ruff_cache/ diff --git a/common_utils.py b/common_utils.py index 0a5204b..241de68 100644 --- a/common_utils.py +++ b/common_utils.py @@ -1,6 +1,7 @@ import os import re import sys +from pathlib import Path from subprocess import run from linetimer import CodeTimer @@ -17,16 +18,17 @@ print("log timings:", LOG_TIMINGS) print("file type:", FILE_TYPE) -CWD = os.path.dirname(os.path.realpath(__file__)) -DATASET_BASE_DIR = os.path.join(CWD, f"tables_scale_{SCALE_FACTOR}") -ANSWERS_BASE_DIR = os.path.join(CWD, "tpch-dbgen/answers") -ANSWERS_PARQUET_BASE_DIR = os.path.join(CWD, "data/answers") -TIMINGS_FILE = os.path.join(CWD, os.environ.get("TIMINGS_FILE", "timings.csv")) -DEFAULT_PLOTS_DIR = os.path.join(CWD, "plots") + +CWD = Path(__file__).parent +DATASET_BASE_DIR = CWD / f"tables_scale_{SCALE_FACTOR}" +ANSWERS_BASE_DIR = CWD / "tpch-dbgen/answers" +ANSWERS_PARQUET_BASE_DIR = CWD / "data/answers" +TIMINGS_FILE = CWD / os.environ.get("TIMINGS_FILE", "timings.csv") +DEFAULT_PLOTS_DIR = CWD / "plots" def append_row(solution: str, q: str, secs: float, version: str, success=True): - with open(TIMINGS_FILE, "a") as f: + with TIMINGS_FILE.open("a") as f: if f.tell() == 0: f.write("solution,version,query_no,duration[s],include_io,success\n") f.write(f"{solution},{version},{q},{secs},{INCLUDE_IO},{success}\n") diff --git a/dask_queries/q1.py b/dask_queries/q1.py index 48ea0eb..b2ecc11 100644 --- a/dask_queries/q1.py +++ b/dask_queries/q1.py @@ -1,4 +1,4 @@ -from datetime import date, datetime +from datetime import datetime from dask_queries import utils diff --git a/dask_queries/q4.py b/dask_queries/q4.py index bc373d4..c7c2aa1 100644 --- a/dask_queries/q4.py +++ b/dask_queries/q4.py @@ -27,7 +27,10 @@ def query(): flineitem = line_item_ds[lsel] forders = orders_ds[osel] forders = forders[["o_orderkey", "o_orderpriority"]] - # jn = forders[forders["o_orderkey"].compute().isin(flineitem["l_orderkey"])] # doesn't support isin + + # doesn't support isin + # jn = forders[forders["o_orderkey"].compute().isin(flineitem["l_orderkey"])] + jn = forders.merge( flineitem, left_on="o_orderkey", right_on="l_orderkey" ).drop_duplicates(subset=["o_orderkey"])[["o_orderpriority", "o_orderkey"]] diff --git a/dask_queries/q7.py b/dask_queries/q7.py index 5bfd130..c31ba56 100644 --- a/dask_queries/q7.py +++ b/dask_queries/q7.py @@ -1,5 +1,4 @@ import datetime -from datetime import datetime import dask.dataframe as dd diff --git a/dask_queries/utils.py b/dask_queries/utils.py index ac00150..5059fef 100644 --- a/dask_queries/utils.py +++ b/dask_queries/utils.py @@ -1,7 +1,8 @@ import os import timeit -from os.path import join -from typing import Callable, Union +from collections.abc import Callable +from pathlib import Path +from typing import Union import dask.dataframe as dd import pandas as pd @@ -23,14 +24,15 @@ def read_ds(path: str) -> Union: if INCLUDE_IO: return dd.read_parquet(path) if FILE_TYPE == "feather": - raise ValueError("file type feather not supported for dask queries") + msg = "file type feather not supported for dask queries" + raise ValueError(msg) return dd.from_pandas(pd.read_parquet(path), npartitions=os.cpu_count()) -def get_query_answer(query: int, base_dir: str = ANSWERS_BASE_DIR) -> dd.DataFrame: +def get_query_answer(query: int, base_dir: Path = ANSWERS_BASE_DIR) -> dd.DataFrame: answer_df = pd.read_csv( - join(base_dir, f"q{query}.out"), + base_dir / f"q{query}.out", sep="|", parse_dates=True, infer_datetime_format=True, @@ -55,42 +57,42 @@ def test_results(q_num: int, result_df: pd.DataFrame): @on_second_call def get_line_item_ds(base_dir: str = DATASET_BASE_DIR) -> dd.DataFrame: - return read_ds(join(base_dir, "lineitem.parquet")) + return read_ds(Path(base_dir) / "lineitem.parquet") @on_second_call def get_orders_ds(base_dir: str = DATASET_BASE_DIR) -> dd.DataFrame: - return read_ds(join(base_dir, "orders.parquet")) + return read_ds(Path(base_dir) / "orders.parquet") @on_second_call def get_customer_ds(base_dir: str = DATASET_BASE_DIR) -> dd.DataFrame: - return read_ds(join(base_dir, "customer.parquet")) + return read_ds(Path(base_dir) / "customer.parquet") @on_second_call def get_region_ds(base_dir: str = DATASET_BASE_DIR) -> dd.DataFrame: - return read_ds(join(base_dir, "region.parquet")) + return read_ds(Path(base_dir) / "region.parquet") @on_second_call def get_nation_ds(base_dir: str = DATASET_BASE_DIR) -> dd.DataFrame: - return read_ds(join(base_dir, "nation.parquet")) + return read_ds(Path(base_dir) / "nation.parquet") @on_second_call def get_supplier_ds(base_dir: str = DATASET_BASE_DIR) -> dd.DataFrame: - return read_ds(join(base_dir, "supplier.parquet")) + return read_ds(Path(base_dir) / "supplier.parquet") @on_second_call def get_part_ds(base_dir: str = DATASET_BASE_DIR) -> dd.DataFrame: - return read_ds(join(base_dir, "part.parquet")) + return read_ds(Path(base_dir) / "part.parquet") @on_second_call def get_part_supp_ds(base_dir: str = DATASET_BASE_DIR) -> dd.DataFrame: - return read_ds(join(base_dir, "partsupp.parquet")) + return read_ds(Path(base_dir) / "partsupp.parquet") def run_query(q_num: str, query: Callable): diff --git a/duckdb_queries/q15.py b/duckdb_queries/q15.py index c3cbd25..36ad097 100644 --- a/duckdb_queries/q15.py +++ b/duckdb_queries/q15.py @@ -1,5 +1,4 @@ import duckdb -from duckdb import DuckDBPyConnection from duckdb_queries import utils diff --git a/duckdb_queries/q2.py b/duckdb_queries/q2.py index 5377412..bc81b1c 100644 --- a/duckdb_queries/q2.py +++ b/duckdb_queries/q2.py @@ -27,7 +27,7 @@ def q(): {supplier_ds}, {part_supp_ds}, {nation_ds}, - {region_ds} + {region_ds} where p_partkey = ps_partkey and s_suppkey = ps_suppkey diff --git a/duckdb_queries/utils.py b/duckdb_queries/utils.py index d6c161e..bdaae73 100644 --- a/duckdb_queries/utils.py +++ b/duckdb_queries/utils.py @@ -1,7 +1,6 @@ import timeit from importlib.metadata import version -from os.path import join -from typing import Any +from pathlib import Path import duckdb import polars as pl @@ -20,7 +19,7 @@ ) -def _scan_ds(path: str): +def _scan_ds(path: Path): path = f"{path}.{FILE_TYPE}" if FILE_TYPE == "parquet": if INCLUDE_IO: @@ -44,7 +43,7 @@ def _scan_ds(path: str): def get_query_answer( query: int, base_dir: str = ANSWERS_PARQUET_BASE_DIR ) -> pl.LazyFrame: - return pl.scan_parquet(join(base_dir, f"q{query}.parquet")) + return pl.scan_parquet(Path(base_dir) / f"q{query}.parquet") def test_results(q_num: int, result_df: pl.DataFrame): @@ -54,35 +53,35 @@ def test_results(q_num: int, result_df: pl.DataFrame): def get_line_item_ds(base_dir: str = DATASET_BASE_DIR) -> str: - return _scan_ds(join(base_dir, "lineitem")) + return _scan_ds(Path(base_dir) / "lineitem") def get_orders_ds(base_dir: str = DATASET_BASE_DIR) -> str: - return _scan_ds(join(base_dir, "orders")) + return _scan_ds(Path(base_dir) / "orders") def get_customer_ds(base_dir: str = DATASET_BASE_DIR) -> str: - return _scan_ds(join(base_dir, "customer")) + return _scan_ds(Path(base_dir) / "customer") def get_region_ds(base_dir: str = DATASET_BASE_DIR) -> str: - return _scan_ds(join(base_dir, "region")) + return _scan_ds(Path(base_dir) / "region") def get_nation_ds(base_dir: str = DATASET_BASE_DIR) -> str: - return _scan_ds(join(base_dir, "nation")) + return _scan_ds(Path(base_dir) / "nation") def get_supplier_ds(base_dir: str = DATASET_BASE_DIR) -> str: - return _scan_ds(join(base_dir, "supplier")) + return _scan_ds(Path(base_dir) / "supplier") def get_part_ds(base_dir: str = DATASET_BASE_DIR) -> str: - return _scan_ds(join(base_dir, "part")) + return _scan_ds(Path(base_dir) / "part") def get_part_supp_ds(base_dir: str = DATASET_BASE_DIR) -> str: - return _scan_ds(join(base_dir, "partsupp")) + return _scan_ds(Path(base_dir) / "partsupp") def run_query(q_num: int, context: DuckDBPyRelation): diff --git a/modin_queries/q7.py b/modin_queries/q7.py index 0c75ff6..7726a2a 100644 --- a/modin_queries/q7.py +++ b/modin_queries/q7.py @@ -1,5 +1,4 @@ import datetime -from datetime import datetime import modin.pandas as pd diff --git a/modin_queries/utils.py b/modin_queries/utils.py index de03890..5ba3a79 100644 --- a/modin_queries/utils.py +++ b/modin_queries/utils.py @@ -1,10 +1,9 @@ import timeit -from os.path import join -from typing import Callable +from collections.abc import Callable +from pathlib import Path import modin import modin.pandas as pd -import pandas from linetimer import CodeTimer, linetimer from pandas.core.frame import DataFrame as PandasDF @@ -22,11 +21,11 @@ def __read_parquet_ds(path: str) -> PandasDF: return pd.read_parquet(path, dtype_backend="pyarrow", engine="pyarrow") -def get_query_answer(query: int, base_dir: str = ANSWERS_BASE_DIR) -> PandasDF: +def get_query_answer(query: int, base_dir: Path = ANSWERS_BASE_DIR) -> PandasDF: import pandas as pd answer_df = pd.read_csv( - join(base_dir, f"q{query}.out"), + base_dir / f"q{query}.out", sep="|", parse_dates=True, infer_datetime_format=True, @@ -53,42 +52,42 @@ def test_results(q_num: int, result_df: PandasDF): @on_second_call def get_line_item_ds(base_dir: str = DATASET_BASE_DIR) -> PandasDF: - return __read_parquet_ds(join(base_dir, "lineitem.parquet")) + return __read_parquet_ds(Path(base_dir) / "lineitem.parquet") @on_second_call def get_orders_ds(base_dir: str = DATASET_BASE_DIR) -> PandasDF: - return __read_parquet_ds(join(base_dir, "orders.parquet")) + return __read_parquet_ds(Path(base_dir) / "orders.parquet") @on_second_call def get_customer_ds(base_dir: str = DATASET_BASE_DIR) -> PandasDF: - return __read_parquet_ds(join(base_dir, "customer.parquet")) + return __read_parquet_ds(Path(base_dir) / "customer.parquet") @on_second_call def get_region_ds(base_dir: str = DATASET_BASE_DIR) -> PandasDF: - return __read_parquet_ds(join(base_dir, "region.parquet")) + return __read_parquet_ds(Path(base_dir) / "region.parquet") @on_second_call def get_nation_ds(base_dir: str = DATASET_BASE_DIR) -> PandasDF: - return __read_parquet_ds(join(base_dir, "nation.parquet")) + return __read_parquet_ds(Path(base_dir) / "nation.parquet") @on_second_call def get_supplier_ds(base_dir: str = DATASET_BASE_DIR) -> PandasDF: - return __read_parquet_ds(join(base_dir, "supplier.parquet")) + return __read_parquet_ds(Path(base_dir) / "supplier.parquet") @on_second_call def get_part_ds(base_dir: str = DATASET_BASE_DIR) -> PandasDF: - return __read_parquet_ds(join(base_dir, "part.parquet")) + return __read_parquet_ds(Path(base_dir) / "part.parquet") @on_second_call def get_part_supp_ds(base_dir: str = DATASET_BASE_DIR) -> PandasDF: - return __read_parquet_ds(join(base_dir, "partsupp.parquet")) + return __read_parquet_ds(Path(base_dir) / "partsupp.parquet") def run_query(q_num: int, query: Callable): diff --git a/pandas_queries/q8.py b/pandas_queries/q8.py index 1378f42..bc0e8da 100644 --- a/pandas_queries/q8.py +++ b/pandas_queries/q8.py @@ -1,5 +1,3 @@ -from datetime import datetime - import pandas as pd from pandas_queries import utils diff --git a/pandas_queries/utils.py b/pandas_queries/utils.py index 1c0d199..dc575a9 100644 --- a/pandas_queries/utils.py +++ b/pandas_queries/utils.py @@ -1,6 +1,6 @@ import timeit -from os.path import join -from typing import Callable +from collections.abc import Callable +from pathlib import Path import pandas as pd from linetimer import CodeTimer, linetimer @@ -17,19 +17,20 @@ ) -def _read_ds(path: str) -> PandasDF: +def _read_ds(path: Path) -> PandasDF: path = f"{path}.{FILE_TYPE}" if FILE_TYPE == "parquet": return pd.read_parquet(path, dtype_backend="pyarrow", engine="pyarrow") elif FILE_TYPE == "feather": return pd.read_feather(path) else: - raise ValueError(f"file type: {FILE_TYPE} not expected") + msg = f"file type: {FILE_TYPE} not expected" + raise ValueError(msg) def get_query_answer(query: int, base_dir: str = ANSWERS_BASE_DIR) -> PandasDF: answer_df = pd.read_csv( - join(base_dir, f"q{query}.out"), + Path(base_dir) / f"q{query}.out", sep="|", parse_dates=True, infer_datetime_format=True, @@ -54,42 +55,42 @@ def test_results(q_num: int, result_df: PandasDF): @on_second_call def get_line_item_ds(base_dir: str = DATASET_BASE_DIR) -> PandasDF: - return _read_ds(join(base_dir, "lineitem")) + return _read_ds(Path(base_dir) / "lineitem") @on_second_call def get_orders_ds(base_dir: str = DATASET_BASE_DIR) -> PandasDF: - return _read_ds(join(base_dir, "orders")) + return _read_ds(Path(base_dir) / "orders") @on_second_call def get_customer_ds(base_dir: str = DATASET_BASE_DIR) -> PandasDF: - return _read_ds(join(base_dir, "customer")) + return _read_ds(Path(base_dir) / "customer") @on_second_call def get_region_ds(base_dir: str = DATASET_BASE_DIR) -> PandasDF: - return _read_ds(join(base_dir, "region")) + return _read_ds(Path(base_dir) / "region") @on_second_call def get_nation_ds(base_dir: str = DATASET_BASE_DIR) -> PandasDF: - return _read_ds(join(base_dir, "nation")) + return _read_ds(Path(base_dir) / "nation") @on_second_call def get_supplier_ds(base_dir: str = DATASET_BASE_DIR) -> PandasDF: - return _read_ds(join(base_dir, "supplier")) + return _read_ds(Path(base_dir) / "supplier") @on_second_call def get_part_ds(base_dir: str = DATASET_BASE_DIR) -> PandasDF: - return _read_ds(join(base_dir, "part")) + return _read_ds(Path(base_dir) / "part") @on_second_call def get_part_supp_ds(base_dir: str = DATASET_BASE_DIR) -> PandasDF: - return _read_ds(join(base_dir, "partsupp")) + return _read_ds(Path(base_dir) / "partsupp") def run_query(q_num: int, query: Callable): diff --git a/prepare_large_files.py b/prepare_large_files.py index db9f36a..c9b071b 100644 --- a/prepare_large_files.py +++ b/prepare_large_files.py @@ -92,13 +92,12 @@ "lineitem", ]: print("process table:", name) - df = pl.scan_csv( + lf = pl.scan_csv( f"tables_scale_{scale_fac}/{name}.tbl", has_header=False, separator="|", try_parse_dates=True, - with_column_names=lambda _: eval(f"h_{name}"), + new_columns=eval(f"h_{name}"), ) - - df = df.with_columns([pl.col(pl.Date).cast(pl.Datetime)]) - df.sink_parquet(f"tables_scale_{scale_fac}/{name}.parquet") + lf = lf.with_columns(pl.col(pl.Date).cast(pl.Datetime)) + lf.sink_parquet(f"tables_scale_{scale_fac}/{name}.parquet") diff --git a/pyproject.toml b/pyproject.toml index e2ab17a..9cd3212 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -11,14 +11,13 @@ select = [ "B", # flake8-bugbear "C4", # flake8-comprehensions "D", # flake8-docstrings - "D213", # Augment NumPy docstring convention: Multi-line docstring summary should start at the second line + "D212", # Augment NumPy docstring convention: Multi-line docstring summary should start at the second line "D417", # Augment NumPy docstring convention: Missing argument descriptions "I", # isort "SIM", # flake8-simplify "TCH", # flake8-type-checking "TID", # flake8-tidy-imports "UP", # pyupgrade - "PT", # flake8-pytest-style "RUF", # Ruff-specific rules "PTH", # flake8-use-pathlib "FA", # flake8-future-annotations @@ -26,23 +25,18 @@ select = [ "TD", # flake8-todos "TRY", # tryceratops "EM", # flake8-errmsg - "FBT001", # flake8-boolean-trap ] ignore = [ # Line length regulated by formatter "E501", # pydocstyle: http://www.pydocstyle.org/en/stable/error_codes.html - "D401", # Relax NumPy docstring convention: First line should be in imperative mood - # flake8-pytest-style: - "PT011", # pytest.raises({exception}) is too broad, set the match parameter or use a more specific exception + "D1", # Missing docstring # flake8-simplify "SIM102", # Use a single `if` statement instead of nested `if` statements "SIM108", # Use ternary operator # ruff "RUF005", # unpack-instead-of-concatenating-to-collection-literal - # pycodestyle - "D1", # Missing docstring # flake8-todos "TD002", # Missing author in TODO "TD003", # Missing issue link on the line following this TODO diff --git a/scripts/plot_dots.py b/scripts/plot_dots.py index 4614709..59e74f8 100755 --- a/scripts/plot_dots.py +++ b/scripts/plot_dots.py @@ -45,7 +45,7 @@ def parse_queries(s: str) -> list[str]: int_set.update(range(start, end + 1)) else: int_set.add(int(part)) - return [f"q{x}" for x in sorted(list(int_set))] + return [f"q{x}" for x in sorted(int_set)] def read_csv(filename: str) -> pl.DataFrame: @@ -305,7 +305,11 @@ def main() -> None: styles = get_styles(exclude_solutions) queries = parse_queries(args.queries) timings = prepare_timings( - read_csv(args.csv), styles, exclude_solutions, queries, args.include_io + read_csv(args.csv), + styles, + exclude_solutions, + queries, + args.include_io, ) caption = formulate_caption( timings, styles, queries, args.no_notes, args.max_duration, args.width diff --git a/scripts/plot_results.py b/scripts/plot_results.py index 8d0bc28..175d802 100644 --- a/scripts/plot_results.py +++ b/scripts/plot_results.py @@ -1,13 +1,12 @@ -"""This script uses Plotly to visualize benchmark results. +"""Script for visualizing benchmark results using Plotly. -To use this script run +To use this script, run: ```shell .venv/bin/python ./scripts/plot_results.py ``` """ - -import os +from pathlib import Path import plotly.express as px import polars as pl @@ -44,7 +43,7 @@ def add_annotations(fig, limit: int, df: pl.DataFrame): # every bar in the plot has a different offset for the text start_offset = 10 - offsets = [start_offset + 12 * i for i in range(0, bar_order.height)] + offsets = [start_offset + 12 * i for i in range(bar_order.height)] # we look for the solutions that surpassed the limit # and create a text label for them @@ -90,18 +89,19 @@ def add_annotations(fig, limit: int, df: pl.DataFrame): y=LIMIT, xshift=x_shift, yshift=30, - font=dict(color="white"), + font={"color": "white"}, showarrow=False, text=anno_text, ) def write_plot_image(fig): - if not os.path.exists(DEFAULT_PLOTS_DIR): - os.mkdir(DEFAULT_PLOTS_DIR) + path = Path(DEFAULT_PLOTS_DIR) + if not path.exists(): + path.mkdir() - file_name = f"plot_with_io.html" if INCLUDE_IO else "plot_without_io.html" - fig.write_html(os.path.join(DEFAULT_PLOTS_DIR, file_name)) + file_name = "plot_with_io.html" if INCLUDE_IO else "plot_without_io.html" + fig.write_html(path / file_name) def plot( @@ -111,20 +111,25 @@ def plot( group: str = "solution", limit: int = 120, ): - """Generate a Plotly Figure of a grouped bar chart diplaying - benchmark results from a DataFrame. - - Args: - df (pl.DataFrame): DataFrame containing `x`, `y`, and `group`. - x (str, optional): Column for X Axis. Defaults to "query_no". - y (str, optional): Column for Y Axis. Defaults to "duration[s]". - group (str, optional): Column for group. Defaults to "solution". - limit: height limit in seconds - - Returns: - px.Figure: Plotly Figure (histogram) + """Generate a Plotly Figure of a grouped bar chart displaying benchmark results. + + Parameters + ---------- + df + DataFrame containing `x`, `y`, and `group`. + x + Column for X Axis. Defaults to "query_no". + y + Column for Y Axis. Defaults to "duration[s]". + group + Column for group. Defaults to "solution". + limit + height limit in seconds + + Returns + ------- + px.Figure: Plotly Figure (histogram) """ - # build plotly figure object fig = px.histogram( x=df[x], @@ -142,8 +147,14 @@ def plot( paper_bgcolor="rgba(41,52,65,1)", yaxis_range=[0, limit], plot_bgcolor="rgba(41,52,65,1)", - margin=dict(t=100), - legend=dict(orientation="h", xanchor="left", yanchor="top", x=0.37, y=-0.1), + margin={"t": 100}, + legend={ + "orientation": "h", + "xanchor": "left", + "yanchor": "top", + "x": 0.37, + "y": -0.1, + }, ) add_annotations(fig, limit, df) @@ -172,31 +183,20 @@ def plot( pl.scan_csv(TIMINGS_FILE) .filter(e) # filter the max query to plot - .filter((pl.col("query_no").str.extract("q(\d+)", 1).cast(int) <= max_query)) + .filter(pl.col("query_no").str.extract(r"q(\d+)", 1).cast(int) <= max_query) # create a version no .with_columns( - [ - pl.when(pl.col("success")).then(pl.col("duration[s]")).otherwise(0), - pl.format("{}-{}", "solution", "version").alias("solution-version"), - ] + pl.when(pl.col("success")).then(pl.col("duration[s]")).otherwise(0), + pl.format("{}-{}", "solution", "version").alias("solution-version"), ) # ensure we get the latest version - .sort(["solution", "version"]) - .groupby(["solution", "query_no"], maintain_order=True) + .sort("solution", "version") + .group_by("solution", "query_no", maintain_order=True) .last() .collect() ) order = pl.DataFrame( - { - "solution": [ - "polars", - "duckdb", - "pandas", - "dask", - "spark", - "modin", - ] - } + {"solution": ["polars", "duckdb", "pandas", "dask", "spark", "modin"]} ) df = order.join(df, on="solution", how="left") diff --git a/spark_queries/executor.py b/spark_queries/executor.py index 80d5c89..17fdafd 100644 --- a/spark_queries/executor.py +++ b/spark_queries/executor.py @@ -1,7 +1,7 @@ from linetimer import CodeTimer # TODO: works for now, but need dynamic imports for this. -from spark_queries import ( +from spark_queries import ( # noqa: F401 q1, q2, q3, @@ -29,10 +29,10 @@ if __name__ == "__main__": num_queries = 22 - with CodeTimer(name=f"Overall execution of ALL spark queries", unit="s"): + with CodeTimer(name="Overall execution of ALL spark queries", unit="s"): sub_modules = [f"q{sm}" for sm in range(1, num_queries + 1)] for sm in sub_modules: try: eval(f"{sm}.q()") - except: + except Exception: print(f"Exception occurred while executing spark_queries.{sm}") diff --git a/spark_queries/q1.py b/spark_queries/q1.py index 4813287..50fadc4 100644 --- a/spark_queries/q1.py +++ b/spark_queries/q1.py @@ -4,7 +4,7 @@ def q(): - query_str = f""" + query_str = """ select l_returnflag, l_linestatus, diff --git a/spark_queries/q10.py b/spark_queries/q10.py index e6e8370..f94c9d6 100644 --- a/spark_queries/q10.py +++ b/spark_queries/q10.py @@ -4,7 +4,7 @@ def q(): - query_str = f""" + query_str = """ select c_custkey, c_name, diff --git a/spark_queries/q11.py b/spark_queries/q11.py index 516668d..b434525 100644 --- a/spark_queries/q11.py +++ b/spark_queries/q11.py @@ -4,7 +4,7 @@ def q(): - query_str = f""" + query_str = """ select ps_partkey, round(sum(ps_supplycost * ps_availqty), 2) as value diff --git a/spark_queries/q12.py b/spark_queries/q12.py index 9a74d32..cddaa66 100644 --- a/spark_queries/q12.py +++ b/spark_queries/q12.py @@ -4,7 +4,7 @@ def q(): - query_str = f""" + query_str = """ select l_shipmode, sum(case diff --git a/spark_queries/q13.py b/spark_queries/q13.py index 29825ad..26f1d57 100644 --- a/spark_queries/q13.py +++ b/spark_queries/q13.py @@ -4,7 +4,7 @@ def q(): - query_str = f""" + query_str = """ select c_count, count(*) as custdist from ( diff --git a/spark_queries/q14.py b/spark_queries/q14.py index 675134b..5ee7c69 100644 --- a/spark_queries/q14.py +++ b/spark_queries/q14.py @@ -4,7 +4,7 @@ def q(): - query_str = f""" + query_str = """ select round(100.00 * sum(case when p_type like 'PROMO%' diff --git a/spark_queries/q15.py b/spark_queries/q15.py index 4b5e931..8e27f05 100644 --- a/spark_queries/q15.py +++ b/spark_queries/q15.py @@ -6,7 +6,7 @@ def q(): spark = utils.get_or_create_spark() - ddl = f""" + ddl = """ create temp view revenue (supplier_no, total_revenue) as select l_suppkey, @@ -20,7 +20,7 @@ def q(): l_suppkey """ - query_str = f""" + query_str = """ select s_suppkey, s_name, diff --git a/spark_queries/q16.py b/spark_queries/q16.py index e524337..e1d9ddd 100644 --- a/spark_queries/q16.py +++ b/spark_queries/q16.py @@ -4,7 +4,7 @@ def q(): - query_str = f""" + query_str = """ select p_brand, p_type, diff --git a/spark_queries/q17.py b/spark_queries/q17.py index 1cdbb64..1a6bab0 100644 --- a/spark_queries/q17.py +++ b/spark_queries/q17.py @@ -4,7 +4,7 @@ def q(): - query_str = f""" + query_str = """ select round(sum(l_extendedprice) / 7.0, 2) as avg_yearly from diff --git a/spark_queries/q18.py b/spark_queries/q18.py index 5ba15f7..72981aa 100644 --- a/spark_queries/q18.py +++ b/spark_queries/q18.py @@ -4,7 +4,7 @@ def q(): - query_str = f""" + query_str = """ select c_name, c_custkey, diff --git a/spark_queries/q19.py b/spark_queries/q19.py index 2272402..79ae877 100644 --- a/spark_queries/q19.py +++ b/spark_queries/q19.py @@ -4,7 +4,7 @@ def q(): - query_str = f""" + query_str = """ select round(sum(l_extendedprice* (1 - l_discount)), 2) as revenue from diff --git a/spark_queries/q2.py b/spark_queries/q2.py index 2acc698..fa2d2fa 100644 --- a/spark_queries/q2.py +++ b/spark_queries/q2.py @@ -4,7 +4,7 @@ def q(): - query_str = f""" + query_str = """ select s_acctbal, s_name, diff --git a/spark_queries/q20.py b/spark_queries/q20.py index 8d3dd2f..7405595 100644 --- a/spark_queries/q20.py +++ b/spark_queries/q20.py @@ -4,7 +4,7 @@ def q(): - query_str = f""" + query_str = """ select s_name, s_address diff --git a/spark_queries/q21.py b/spark_queries/q21.py index 1973610..b93d036 100644 --- a/spark_queries/q21.py +++ b/spark_queries/q21.py @@ -4,7 +4,7 @@ def q(): - query_str = f""" + query_str = """ select s_name, count(*) as numwait diff --git a/spark_queries/q22.py b/spark_queries/q22.py index 4f422d2..f1db397 100644 --- a/spark_queries/q22.py +++ b/spark_queries/q22.py @@ -4,7 +4,7 @@ def q(): - query_str = f""" + query_str = """ select cntrycode, count(*) as numcust, diff --git a/spark_queries/q3.py b/spark_queries/q3.py index e203b0a..b09a986 100644 --- a/spark_queries/q3.py +++ b/spark_queries/q3.py @@ -4,7 +4,7 @@ def q(): - query_str = f""" + query_str = """ select l_orderkey, sum(l_extendedprice * (1 - l_discount)) as revenue, diff --git a/spark_queries/q4.py b/spark_queries/q4.py index 6f9ad90..bdf104b 100644 --- a/spark_queries/q4.py +++ b/spark_queries/q4.py @@ -4,7 +4,7 @@ def q(): - query_str = f""" + query_str = """ select o_orderpriority, count(*) as order_count diff --git a/spark_queries/q5.py b/spark_queries/q5.py index 2c81450..57e7341 100644 --- a/spark_queries/q5.py +++ b/spark_queries/q5.py @@ -4,7 +4,7 @@ def q(): - query_str = f""" + query_str = """ select n_name, sum(l_extendedprice * (1 - l_discount)) as revenue diff --git a/spark_queries/q6.py b/spark_queries/q6.py index 7c318d0..171740c 100644 --- a/spark_queries/q6.py +++ b/spark_queries/q6.py @@ -4,7 +4,7 @@ def q(): - query_str = f""" + query_str = """ select sum(l_extendedprice * l_discount) as revenue from diff --git a/spark_queries/q7.py b/spark_queries/q7.py index 73bf99e..2d99264 100644 --- a/spark_queries/q7.py +++ b/spark_queries/q7.py @@ -4,7 +4,7 @@ def q(): - query_str = f""" + query_str = """ select supp_nation, cust_nation, diff --git a/spark_queries/q8.py b/spark_queries/q8.py index 760c0ac..ac4a117 100644 --- a/spark_queries/q8.py +++ b/spark_queries/q8.py @@ -4,7 +4,7 @@ def q(): - query_str = f""" + query_str = """ select o_year, round( diff --git a/spark_queries/q9.py b/spark_queries/q9.py index 6d798cc..8a5afb6 100644 --- a/spark_queries/q9.py +++ b/spark_queries/q9.py @@ -4,7 +4,7 @@ def q(): - query_str = f""" + query_str = """ select nation, o_year, diff --git a/spark_queries/utils.py b/spark_queries/utils.py index 0fc263b..57fecd5 100644 --- a/spark_queries/utils.py +++ b/spark_queries/utils.py @@ -1,5 +1,5 @@ import timeit -from os.path import join +from pathlib import Path from linetimer import CodeTimer, linetimer from pandas.core.frame import DataFrame as PandasDF @@ -34,11 +34,11 @@ def __read_parquet_ds(path: str, table_name: str) -> SparkDF: return df -def get_query_answer(query: int, base_dir: str = ANSWERS_BASE_DIR) -> PandasDF: +def get_query_answer(query: int, base_dir: Path = ANSWERS_BASE_DIR) -> PandasDF: import pandas as pd answer_df = pd.read_csv( - join(base_dir, f"q{query}.out"), + base_dir / f"q{query}.out", sep="|", parse_dates=True, ) @@ -68,42 +68,42 @@ def test_results(q_num: int, result_df: PandasDF): @on_second_call def get_line_item_ds(base_dir: str = DATASET_BASE_DIR) -> SparkDF: - return __read_parquet_ds(join(base_dir, "lineitem.parquet"), "lineitem") + return __read_parquet_ds(Path(base_dir) / "lineitem.parquet", "lineitem") @on_second_call def get_orders_ds(base_dir: str = DATASET_BASE_DIR) -> SparkDF: - return __read_parquet_ds(join(base_dir, "orders.parquet"), "orders") + return __read_parquet_ds(Path(base_dir) / "orders.parquet", "orders") @on_second_call def get_customer_ds(base_dir: str = DATASET_BASE_DIR) -> SparkDF: - return __read_parquet_ds(join(base_dir, "customer.parquet"), "customer") + return __read_parquet_ds(Path(base_dir) / "customer.parquet", "customer") @on_second_call def get_region_ds(base_dir: str = DATASET_BASE_DIR) -> SparkDF: - return __read_parquet_ds(join(base_dir, "region.parquet"), "region") + return __read_parquet_ds(Path(base_dir) / "region.parquet", "region") @on_second_call def get_nation_ds(base_dir: str = DATASET_BASE_DIR) -> SparkDF: - return __read_parquet_ds(join(base_dir, "nation.parquet"), "nation") + return __read_parquet_ds(Path(base_dir) / "nation.parquet", "nation") @on_second_call def get_supplier_ds(base_dir: str = DATASET_BASE_DIR) -> SparkDF: - return __read_parquet_ds(join(base_dir, "supplier.parquet"), "supplier") + return __read_parquet_ds(Path(base_dir) / "supplier.parquet", "supplier") @on_second_call def get_part_ds(base_dir: str = DATASET_BASE_DIR) -> SparkDF: - return __read_parquet_ds(join(base_dir, "part.parquet"), "part") + return __read_parquet_ds(Path(base_dir) / "part.parquet", "part") @on_second_call def get_part_supp_ds(base_dir: str = DATASET_BASE_DIR) -> SparkDF: - return __read_parquet_ds(join(base_dir, "partsupp.parquet"), "partsupp") + return __read_parquet_ds(Path(base_dir) / "partsupp.parquet", "partsupp") def drop_temp_view():