From 8d2ebd7e267684b20a4cf769cb39d173460ce86e Mon Sep 17 00:00:00 2001 From: Beto Dealmeida Date: Sat, 23 Jul 2022 22:03:33 -0700 Subject: [PATCH] feat: implement limit/offset in adapters (#271) * feat: implement limit/offset in adapters * Add tests --- .pylintrc | 3 + src/shillelagh/adapters/api/datasette.py | 18 +- src/shillelagh/adapters/api/github.py | 50 +++- .../adapters/api/gsheets/adapter.py | 19 +- src/shillelagh/adapters/api/s3select.py | 8 +- src/shillelagh/adapters/api/socrata.py | 9 +- src/shillelagh/adapters/api/system.py | 19 +- src/shillelagh/adapters/api/weatherapi.py | 7 +- src/shillelagh/adapters/file/csvfile.py | 9 +- src/shillelagh/lib.py | 24 +- tests/adapters/api/datasette_test.py | 85 ++++++ tests/adapters/api/github_test.py | 247 ++++++++++++++++++ tests/adapters/api/socrata_test.py | 10 +- tests/adapters/api/system_test.py | 41 +++ tests/adapters/base_test.py | 88 +++++++ tests/fakes/__init__.py | 11 +- tests/lib_test.py | 18 ++ 17 files changed, 628 insertions(+), 38 deletions(-) diff --git a/.pylintrc b/.pylintrc index d82c540b..8d3f695d 100644 --- a/.pylintrc +++ b/.pylintrc @@ -5,3 +5,6 @@ disable = [MASTER] ignore=templates,docs disable = + +[TYPECHECK] +ignored-modules=apsw diff --git a/src/shillelagh/adapters/api/datasette.py b/src/shillelagh/adapters/api/datasette.py index 91a5a74c..788f0229 100644 --- a/src/shillelagh/adapters/api/datasette.py +++ b/src/shillelagh/adapters/api/datasette.py @@ -94,6 +94,9 @@ class DatasetteAPI(Adapter): safe = True + supports_limit = True + supports_offset = True + @staticmethod def supports(uri: str, fast: bool = True, **kwargs: Any) -> Optional[bool]: parsed = urllib.parse.urlparse(uri) @@ -168,16 +171,24 @@ def get_data( self, bounds: Dict[str, Filter], order: List[Tuple[str, RequestedOrder]], + limit: Optional[int] = None, + offset: Optional[int] = None, **kwargs: Any, ) -> Iterator[Row]: - offset = 0 + offset = offset or 0 while True: + if limit is None: + # request 1 more, so we know if there are more pages to be fetched + end = DEFAULT_LIMIT + 1 + else: + end = min(limit, DEFAULT_LIMIT + 1) + sql = build_sql( self.columns, bounds, order, f'"{self.table}"', - limit=DEFAULT_LIMIT + 1, + limit=end, offset=offset, ) payload = self._run_query(sql) @@ -199,4 +210,7 @@ def get_data( if not payload["truncated"] and len(rows) <= DEFAULT_LIMIT: break + offset += i + 1 + if limit is not None: + limit -= i + 1 diff --git a/src/shillelagh/adapters/api/github.py b/src/shillelagh/adapters/api/github.py index 84d12a31..6574d6f0 100644 --- a/src/shillelagh/adapters/api/github.py +++ b/src/shillelagh/adapters/api/github.py @@ -38,7 +38,7 @@ class Column: field: Field # A default value for when the column is not specified. Eg, for ``pulls`` - # the API defaults to show only PRs with an open state, so we need to + # the API defaults to showing only PRs with an open state, so we need to # default the column to ``all`` to fetch all PRs when state is not # specified in the query. default: Optional[Filter] = None @@ -75,6 +75,9 @@ class GitHubAPI(Adapter): safe = True + supports_limit = True + supports_offset = True + @staticmethod def supports(uri: str, fast: bool = True, **kwargs: Any) -> Optional[bool]: parsed = urllib.parse.urlparse(uri) @@ -133,6 +136,8 @@ def get_data( self, bounds: Dict[str, Filter], order: List[Tuple[str, RequestedOrder]], + limit: Optional[int] = None, + offset: Optional[int] = None, **kwargs: Any, ) -> Iterator[Row]: # apply default values @@ -142,14 +147,22 @@ def get_data( if "number" in bounds: number = bounds.pop("number").value # type: ignore - return self._get_single_resource(number) + return self._get_single_resource(number, limit, offset) - return self._get_multiple_resources(bounds) + return self._get_multiple_resources(bounds, limit, offset) - def _get_single_resource(self, number: int) -> Iterator[Row]: + def _get_single_resource( + self, + number: int, + limit: Optional[int] = None, + offset: Optional[int] = None, + ) -> Iterator[Row]: """ Return a specific resource. """ + if offset or (limit is not None and limit < 1): + return + headers = {"Accept": "application/vnd.github.v3+json"} if self.access_token: headers["Authorization"] = f"Bearer {self.access_token}" @@ -171,7 +184,12 @@ def _get_single_resource(self, number: int) -> Iterator[Row]: _logger.debug(row) yield row - def _get_multiple_resources(self, bounds: Dict[str, Filter]) -> Iterator[Row]: + def _get_multiple_resources( + self, + bounds: Dict[str, Filter], + limit: Optional[int] = None, + offset: Optional[int] = None, + ) -> Iterator[Row]: """ Return multiple resources. """ @@ -185,8 +203,12 @@ def _get_multiple_resources(self, bounds: Dict[str, Filter]) -> Iterator[Row]: params = {name: filter_.value for name, filter_ in bounds.items()} # type: ignore params["per_page"] = PAGE_SIZE - page = 1 - while True: + offset = offset or 0 + page = (offset // PAGE_SIZE) + 1 + offset %= PAGE_SIZE + + rowid = 0 + while limit is None or rowid < limit: _logger.info("GET %s (page %d)", url, page) params["page"] = page response = self._session.get(url, headers=headers, params=params) @@ -198,13 +220,23 @@ def _get_multiple_resources(self, bounds: Dict[str, Filter]) -> Iterator[Row]: if not response.ok: raise ProgrammingError(payload["message"]) - for i, resource in enumerate(payload): + if offset is not None: + payload = payload[offset:] + offset = None + + for resource in payload: + if limit is not None and rowid == limit: + # this never happens because SQLite stops consuming from the generator + # as soon as the limit is hit + break + row = { column.name: JSONPath(column.json_path).parse(resource)[0] for column in TABLES[self.base][self.resource] } - row["rowid"] = i + (page - 1) * PAGE_SIZE + row["rowid"] = rowid _logger.debug(row) yield row + rowid += 1 page += 1 diff --git a/src/shillelagh/adapters/api/gsheets/adapter.py b/src/shillelagh/adapters/api/gsheets/adapter.py index 0a94d6f5..646fea4a 100644 --- a/src/shillelagh/adapters/api/gsheets/adapter.py +++ b/src/shillelagh/adapters/api/gsheets/adapter.py @@ -35,7 +35,7 @@ ) from shillelagh.fields import Field, Order from shillelagh.filters import Filter -from shillelagh.lib import SimpleCostModel, build_sql +from shillelagh.lib import SimpleCostModel, apply_limit_and_offset, build_sql from shillelagh.typing import RequestedOrder, Row _logger = logging.getLogger(__name__) @@ -79,6 +79,8 @@ class GSheetsAPI(Adapter): # pylint: disable=too-many-instance-attributes """ safe = True + supports_limit = True + supports_offset = True @staticmethod def supports(uri: str, fast: bool = True, **kwargs: Any) -> Optional[bool]: @@ -380,10 +382,12 @@ def _get_header_rows(self, values: List[List[Any]]) -> int: return i + 1 - def get_data( + def get_data( # pylint: disable=too-many-locals self, bounds: Dict[str, Filter], order: List[Tuple[str, RequestedOrder]], + limit: Optional[int] = None, + offset: Optional[int] = None, **kwargs: Any, ) -> Iterator[Row]: """ @@ -406,7 +410,7 @@ def get_data( }: values = self._get_values() headers = self._get_header_rows(values) - rows = ( + rows: Iterator[Row] = ( { reverse_map[letter]: cell for letter, cell in zip(gen_letters(), row) @@ -414,6 +418,7 @@ def get_data( } for row in values[headers:] ) + rows = apply_limit_and_offset(rows, limit, offset) # For ``BIDIRECTIONAL`` mode we continue using the Chart API to # retrieve data. This will happen before every DML query. @@ -425,7 +430,8 @@ def get_data( order, None, self._column_map, - None, + limit, + offset, ) except ImpossibleFilterError: return @@ -442,8 +448,9 @@ def get_data( ) for i, row in enumerate(rows): - self._row_ids[i] = row - row["rowid"] = i + rowid = (offset or 0) + i + self._row_ids[rowid] = row + row["rowid"] = rowid _logger.debug(row) yield row diff --git a/src/shillelagh/adapters/api/s3select.py b/src/shillelagh/adapters/api/s3select.py index 27a62bf1..ac6aae20 100644 --- a/src/shillelagh/adapters/api/s3select.py +++ b/src/shillelagh/adapters/api/s3select.py @@ -206,6 +206,9 @@ class S3SelectAPI(Adapter): safe = True + supports_limit = True + supports_offset = False + @staticmethod def supports(uri: str, fast: bool = True, **kwargs: Any) -> Optional[bool]: parsed = urllib.parse.urlparse(uri) @@ -303,18 +306,19 @@ def get_data( self, bounds: Dict[str, Filter], order: List[Tuple[str, RequestedOrder]], + limit: Optional[int] = None, **kwargs: Any, ) -> Iterator[Row]: try: - sql = build_sql(self.columns, bounds, order, table="s3object") + sql = build_sql(self.columns, bounds, order, table="s3object", limit=limit) except ImpossibleFilterError: return rows = self._run_query(sql) for i, row in enumerate(rows): row["rowid"] = i - yield row _logger.debug(row) + yield row def drop_table(self) -> None: self.s3_client.delete_object(Bucket=self.bucket, Key=self.key, **self.s3_kwargs) diff --git a/src/shillelagh/adapters/api/socrata.py b/src/shillelagh/adapters/api/socrata.py index 32016a11..8121a216 100644 --- a/src/shillelagh/adapters/api/socrata.py +++ b/src/shillelagh/adapters/api/socrata.py @@ -98,6 +98,9 @@ class SocrataAPI(Adapter): safe = True + supports_limit = True + supports_offset = True + @staticmethod def supports(uri: str, fast: bool = True, **kwargs: Any) -> Optional[bool]: """https://data.cdc.gov/resource/unsk-b7fc.json""" @@ -147,10 +150,12 @@ def get_data( self, bounds: Dict[str, Filter], order: List[Tuple[str, RequestedOrder]], + limit: Optional[int] = None, + offset: Optional[int] = None, **kwargs: Any, ) -> Iterator[Row]: try: - sql = build_sql(self.columns, bounds, order) + sql = build_sql(self.columns, bounds, order, limit=limit, offset=offset) except ImpossibleFilterError: return @@ -172,5 +177,5 @@ def get_data( for i, row in enumerate(payload): row["rowid"] = i - yield row _logger.debug(row) + yield row diff --git a/src/shillelagh/adapters/api/system.py b/src/shillelagh/adapters/api/system.py index 3b984364..dc902f80 100644 --- a/src/shillelagh/adapters/api/system.py +++ b/src/shillelagh/adapters/api/system.py @@ -5,6 +5,7 @@ See https://github.com/giampaolo/psutil for more information. """ import logging +import time import urllib.parse from datetime import datetime, timezone from typing import Any, Dict, Iterator, List, Optional, Tuple, Union @@ -30,6 +31,9 @@ class SystemAPI(Adapter): safe = False + supports_limit = True + supports_offset = True + @staticmethod def supports(uri: str, fast: bool = True, **kwargs: Any) -> Optional[bool]: parsed = urllib.parse.urlparse(uri) @@ -76,22 +80,27 @@ def get_data( self, bounds: Dict[str, Filter], order: List[Tuple[str, RequestedOrder]], + limit: Optional[int] = None, + offset: Optional[int] = None, **kwargs: Any, ) -> Iterator[Row]: - i = 0 - while True: + rowid = 0 + while limit is None or rowid < limit: + if offset is not None: + time.sleep(self.interval * offset) + try: values = psutil.cpu_percent(interval=self.interval, percpu=True) except KeyboardInterrupt: return row = { - "rowid": i, + "rowid": rowid, "timestamp": datetime.now(timezone.utc), } for i, value in enumerate(values): row[f"cpu{i}"] = value / 100.0 - yield row _logger.debug(row) - i += 1 + yield row + rowid += 1 diff --git a/src/shillelagh/adapters/api/weatherapi.py b/src/shillelagh/adapters/api/weatherapi.py index a114118d..5f14cb79 100644 --- a/src/shillelagh/adapters/api/weatherapi.py +++ b/src/shillelagh/adapters/api/weatherapi.py @@ -76,6 +76,11 @@ class WeatherAPI(Adapter): safe = True + # Since the adapter doesn't return exact data (see the time columns below) + # implementing limit/offset is not worth the trouble. + supports_limit = False + supports_offset = False + # These two columns can be used to filter the results from the API. We # define them as inexact since we will retrieve data for the whole day, # even if specific hours are requested. The post-filtering will be done @@ -202,7 +207,7 @@ def get_data( # pylint: disable=too-many-locals tzinfo=local_timezone, ) row["rowid"] = int(row["time_epoch"]) - yield row _logger.debug(row) + yield row start += timedelta(days=1) diff --git a/src/shillelagh/adapters/file/csvfile.py b/src/shillelagh/adapters/file/csvfile.py index 8432fb60..abc0f606 100644 --- a/src/shillelagh/adapters/file/csvfile.py +++ b/src/shillelagh/adapters/file/csvfile.py @@ -85,6 +85,9 @@ class CSVFile(Adapter): # the filesystem, or potentially overwrite existing files safe = False + supports_limit = True + supports_offset = True + @staticmethod def supports(uri: str, fast: bool = True, **kwargs: Any) -> Optional[bool]: path = Path(uri) @@ -159,6 +162,8 @@ def get_data( self, bounds: Dict[str, Filter], order: List[Tuple[str, RequestedOrder]], + limit: Optional[int] = None, + offset: Optional[int] = None, **kwargs: Any, ) -> Iterator[Row]: _logger.info("Opening file CSV file %s to load data", self.path) @@ -177,9 +182,9 @@ def get_data( # Filter and sort the data. It would probably be more efficient to simply # declare the columns as having no filter and no sort order, and let the # backend handle this; but it's nice to have an example of how to do this. - for row in filter_data(data, bounds, order): - yield row + for row in filter_data(data, bounds, order, limit, offset): _logger.debug(row) + yield row def insert_data(self, row: Row) -> int: row_id: Optional[int] = row.pop("rowid") diff --git a/src/shillelagh/lib.py b/src/shillelagh/lib.py index 860e4461..dd85ab45 100644 --- a/src/shillelagh/lib.py +++ b/src/shillelagh/lib.py @@ -5,7 +5,7 @@ import math import operator import pickle -from typing import Any, Callable, Dict, Iterator, List, Optional, Tuple, Type +from typing import Any, Callable, Dict, Iterator, List, Optional, Tuple, Type, TypeVar from shillelagh.adapters.base import Adapter from shillelagh.exceptions import ImpossibleFilterError, ProgrammingError @@ -349,6 +349,8 @@ def filter_data( data: Iterator[Row], bounds: Dict[str, Filter], order: List[Tuple[str, RequestedOrder]], + limit: Optional[int] = None, + offset: Optional[int] = None, ) -> Iterator[Row]: """ Apply filtering and sorting to a stream of rows. @@ -404,9 +406,29 @@ def apply_filter( rows.sort(key=operator.itemgetter(column_name), reverse=reverse) data = iter(rows) + data = apply_limit_and_offset(data, limit, offset) + yield from data +T = TypeVar("T") + + +def apply_limit_and_offset( + rows: Iterator[T], + limit: Optional[int] = None, + offset: Optional[int] = None, +) -> Iterator[T]: + """ + Apply limit/offset to a stream of rows. + """ + if limit is not None or offset is not None: + start = offset or 0 + end = None if limit is None else start + limit + rows = itertools.islice(rows, start, end) + return rows + + def SimpleCostModel(rows: int, fixed_cost: int = 0): # pylint: disable=invalid-name """ A simple model for estimating query costs. diff --git a/tests/adapters/api/datasette_test.py b/tests/adapters/api/datasette_test.py index 74a71a83..f0f4d00e 100644 --- a/tests/adapters/api/datasette_test.py +++ b/tests/adapters/api/datasette_test.py @@ -94,6 +94,91 @@ def test_datasette(mocker: MockerFixture, requests_mock: Mocker) -> None: assert data == datasette_results +def test_datasette_limit_offset(mocker: MockerFixture, requests_mock: Mocker) -> None: + """ + Test a simple query with limit/offset. + """ + mocker.patch( + "shillelagh.adapters.api.datasette.requests_cache.CachedSession", + return_value=Session(), + ) + + columns_url = ( + "https://global-power-plants.datasettes.com/global-power-plants.json?" + "sql=SELECT+*+FROM+%22global-power-plants%22+LIMIT+0" + ) + requests_mock.get(columns_url, json=datasette_columns_response) + metadata_url = ( + "https://global-power-plants.datasettes.com/global-power-plants.json?sql=SELECT+" + "MAX%28%22country%22%29%2C+" + "MAX%28%22country_long%22%29%2C+" + "MAX%28%22name%22%29%2C+" + "MAX%28%22gppd_idnr%22%29%2C+" + "MAX%28%22capacity_mw%22%29%2C+" + "MAX%28%22latitude%22%29%2C+" + "MAX%28%22longitude%22%29%2C+" + "MAX%28%22primary_fuel%22%29%2C+" + "MAX%28%22other_fuel1%22%29%2C+" + "MAX%28%22other_fuel2%22%29%2C+" + "MAX%28%22other_fuel3%22%29%2C+" + "MAX%28%22commissioning_year%22%29%2C+" + "MAX%28%22owner%22%29%2C+" + "MAX%28%22source%22%29%2C+" + "MAX%28%22url%22%29%2C+" + "MAX%28%22geolocation_source%22%29%2C+" + "MAX%28%22wepp_id%22%29%2C+" + "MAX%28%22year_of_capacity_data%22%29%2C+" + "MAX%28%22generation_gwh_2013%22%29%2C+" + "MAX%28%22generation_gwh_2014%22%29%2C+" + "MAX%28%22generation_gwh_2015%22%29%2C+" + "MAX%28%22generation_gwh_2016%22%29%2C+" + "MAX%28%22generation_gwh_2017%22%29%2C+" + "MAX%28%22generation_data_source%22%29%2C+" + "MAX%28%22estimated_generation_gwh%22%29+" + "FROM+%22global-power-plants%22+LIMIT+1" + ) + requests_mock.get(metadata_url, json=datasette_metadata_response) + data_url_1 = ( + "https://global-power-plants.datasettes.com/global-power-plants.json?" + "sql=select+*+FROM+%22global-power-plants%22+WHERE+country+%3D+%27CAN%27+" + "LIMIT+10+OFFSET+1500" + ) + requests_mock.get(data_url_1, json=datasette_data_response_2) + data_url_2 = ( + "https://global-power-plants.datasettes.com/global-power-plants.json?" + "sql=select+*+FROM+%22global-power-plants%22+WHERE+country+%3D+%27CAN%27+" + "LIMIT+1001+OFFSET+0" + ) + requests_mock.get(data_url_2, json=datasette_data_response_1) + data_url_3 = ( + "https://global-power-plants.datasettes.com/global-power-plants.json?" + "sql=select+*+FROM+%22global-power-plants%22+WHERE+country+%3D+%27CAN%27+" + "LIMIT+500+OFFSET+1000" + ) + requests_mock.get(data_url_3, json=datasette_data_response_2) + + connection = connect(":memory:") + cursor = connection.cursor() + + sql = """ + SELECT * FROM + "https://global-power-plants.datasettes.com/global-power-plants/global-power-plants" + WHERE country='CAN' + LIMIT 10 OFFSET 1500 + """ + data = list(cursor.execute(sql)) + assert data == datasette_results[1000:1010] + + sql = """ + SELECT * FROM + "https://global-power-plants.datasettes.com/global-power-plants/global-power-plants" + WHERE country='CAN' + LIMIT 1500 + """ + data = list(cursor.execute(sql)) + assert data == datasette_results + + def test_datasette_no_data(mocker: MockerFixture) -> None: """ Test result with no rows. diff --git a/tests/adapters/api/github_test.py b/tests/adapters/api/github_test.py index 6ba86449..b7470c3c 100644 --- a/tests/adapters/api/github_test.py +++ b/tests/adapters/api/github_test.py @@ -9,8 +9,10 @@ from requests import Session from requests_mock.mocker import Mocker +from shillelagh.adapters.api.github import GitHubAPI from shillelagh.backends.apsw.db import connect from shillelagh.exceptions import ProgrammingError +from shillelagh.filters import Equal from ...fakes import github_response, github_single_response @@ -191,6 +193,113 @@ def test_github(mocker: MockerFixture, requests_mock: Mocker) -> None: ] +def test_github_limit_offset(mocker: MockerFixture, requests_mock: Mocker) -> None: + """ + Test a simple request with limit/offset. + """ + mocker.patch("shillelagh.adapters.api.github.PAGE_SIZE", new=5) + mocker.patch( + "shillelagh.adapters.api.github.requests_cache.CachedSession", + return_value=Session(), + ) + + page2_url = ( + "https://api.github.com/repos/apache/superset/pulls?state=all&per_page=5&page=2" + ) + requests_mock.get(page2_url, json=github_response[:5]) + page3_url = ( + "https://api.github.com/repos/apache/superset/pulls?state=all&per_page=5&page=3" + ) + requests_mock.get(page3_url, json=github_response[5:]) + + connection = connect(":memory:") + cursor = connection.cursor() + + sql = """ + SELECT * FROM + "https://api.github.com/repos/apache/superset/pulls" + LIMIT 5 OFFSET 8 + """ + data = list(cursor.execute(sql)) + assert data == [ + ( + "https://github.com/apache/superset/pull/16569", + 726107410, + 16569, + "open", + "docs: versioned _export Stable", + 47772523, + "amitmiran137", + False, + "version_export_ff_on", + datetime.datetime(2021, 9, 2, 16, 52, 34, tzinfo=datetime.timezone.utc), + datetime.datetime(2021, 9, 2, 18, 6, 27, tzinfo=datetime.timezone.utc), + None, + None, + ), + ( + "https://github.com/apache/superset/pull/16566", + 725808286, + 16566, + "open", + "fix(docker): add ecpg to docker image", + 33317356, + "villebro", + False, + "villebro/libecpg", + datetime.datetime(2021, 9, 2, 12, 1, 2, tzinfo=datetime.timezone.utc), + datetime.datetime(2021, 9, 2, 12, 6, 50, tzinfo=datetime.timezone.utc), + None, + None, + ), + ( + "https://github.com/apache/superset/pull/16564", + 725669631, + 16564, + "open", + "refactor: orderby control refactoring", + 2016594, + "zhaoyongjie", + True, + "refactor_orderby", + datetime.datetime(2021, 9, 2, 9, 45, 40, tzinfo=datetime.timezone.utc), + datetime.datetime(2021, 9, 3, 10, 31, 4, tzinfo=datetime.timezone.utc), + None, + None, + ), + ( + "https://github.com/apache/superset/pull/16554", + 724863880, + 16554, + "open", + "refactor: Update async query init to support runtime feature flags", + 296227, + "robdiciuccio", + False, + "rd/async-query-init-refactor", + datetime.datetime(2021, 9, 1, 19, 51, 51, tzinfo=datetime.timezone.utc), + datetime.datetime(2021, 9, 1, 22, 29, 46, tzinfo=datetime.timezone.utc), + None, + None, + ), + ( + "https://github.com/apache/superset/pull/16549", + 724669525, + 16549, + "open", + "feat(dashboard): Native filters - add type to native filter configuration", + 12539911, + "m-ajay", + False, + "feat/migration-add-type-to-native-filter", + datetime.datetime(2021, 9, 1, 16, 35, 50, tzinfo=datetime.timezone.utc), + datetime.datetime(2021, 9, 3, 17, 33, 42, tzinfo=datetime.timezone.utc), + None, + None, + ), + ] + + def test_github_single_resource(mocker: MockerFixture, requests_mock: Mocker) -> None: """ Test a request to a single resource. @@ -231,6 +340,35 @@ def test_github_single_resource(mocker: MockerFixture, requests_mock: Mocker) -> ] +def test_github_single_resource_with_offset( + mocker: MockerFixture, + requests_mock: Mocker, +) -> None: + """ + Test a request to a single resource. + """ + mocker.patch( + "shillelagh.adapters.api.github.requests_cache.CachedSession", + return_value=Session(), + ) + + url = "https://api.github.com/repos/apache/superset/pulls/16581" + requests_mock.get(url, json=github_single_response) + + connection = connect(":memory:") + cursor = connection.cursor() + + sql = """ + SELECT * FROM + "https://api.github.com/repos/apache/superset/pulls" + WHERE number = 16581 + LIMIT 1 + OFFSET 2 + """ + data = list(cursor.execute(sql)) + assert data == [] + + def test_github_rate_limit(mocker: MockerFixture, requests_mock: Mocker) -> None: """ Test that the adapter was rate limited by the API. @@ -313,3 +451,112 @@ def test_github_auth_token(mocker: MockerFixture, requests_mock: Mocker) -> None """ list(cursor.execute(sql)) assert multiple_resources_mock.last_request.headers["Authorization"] == "Bearer XXX" + + +def test_get_multiple_resources(mocker: MockerFixture, requests_mock: Mocker) -> None: + """ + Tests for ``_get_multiple_resources``. + """ + mocker.patch("shillelagh.adapters.api.github.PAGE_SIZE", new=5) + mocker.patch( + "shillelagh.adapters.api.github.requests_cache.CachedSession", + return_value=Session(), + ) + + page2_url = ( + "https://api.github.com/repos/apache/superset/pulls?state=all&per_page=5&page=2" + ) + requests_mock.get(page2_url, json=github_response[:5]) + page3_url = ( + "https://api.github.com/repos/apache/superset/pulls?state=all&per_page=5&page=3" + ) + requests_mock.get(page3_url, json=github_response[5:]) + + adapter = GitHubAPI("repos", "apache", "superset", "pulls") + rows = adapter._get_multiple_resources( # pylint: disable=protected-access + {"state": Equal("all")}, + limit=5, + offset=8, + ) + assert list(rows) == [ + { + "url": "https://github.com/apache/superset/pull/16569", + "id": 726107410, + "number": 16569, + "state": "open", + "title": "docs: versioned _export Stable", + "userid": 47772523, + "username": "amitmiran137", + "draft": False, + "head": "version_export_ff_on", + "created_at": "2021-09-02T16:52:34Z", + "updated_at": "2021-09-02T18:06:27Z", + "closed_at": None, + "merged_at": None, + "rowid": 0, + }, + { + "url": "https://github.com/apache/superset/pull/16566", + "id": 725808286, + "number": 16566, + "state": "open", + "title": "fix(docker): add ecpg to docker image", + "userid": 33317356, + "username": "villebro", + "draft": False, + "head": "villebro/libecpg", + "created_at": "2021-09-02T12:01:02Z", + "updated_at": "2021-09-02T12:06:50Z", + "closed_at": None, + "merged_at": None, + "rowid": 1, + }, + { + "url": "https://github.com/apache/superset/pull/16564", + "id": 725669631, + "number": 16564, + "state": "open", + "title": "refactor: orderby control refactoring", + "userid": 2016594, + "username": "zhaoyongjie", + "draft": True, + "head": "refactor_orderby", + "created_at": "2021-09-02T09:45:40Z", + "updated_at": "2021-09-03T10:31:04Z", + "closed_at": None, + "merged_at": None, + "rowid": 2, + }, + { + "url": "https://github.com/apache/superset/pull/16554", + "id": 724863880, + "number": 16554, + "state": "open", + "title": "refactor: Update async query init to support runtime feature flags", + "userid": 296227, + "username": "robdiciuccio", + "draft": False, + "head": "rd/async-query-init-refactor", + "created_at": "2021-09-01T19:51:51Z", + "updated_at": "2021-09-01T22:29:46Z", + "closed_at": None, + "merged_at": None, + "rowid": 3, + }, + { + "url": "https://github.com/apache/superset/pull/16549", + "id": 724669525, + "number": 16549, + "state": "open", + "title": "feat(dashboard): Native filters - add type to native filter configuration", + "userid": 12539911, + "username": "m-ajay", + "draft": False, + "head": "feat/migration-add-type-to-native-filter", + "created_at": "2021-09-01T16:35:50Z", + "updated_at": "2021-09-03T17:33:42Z", + "closed_at": None, + "merged_at": None, + "rowid": 4, + }, + ] diff --git a/tests/adapters/api/socrata_test.py b/tests/adapters/api/socrata_test.py index 439d5d49..705dd191 100644 --- a/tests/adapters/api/socrata_test.py +++ b/tests/adapters/api/socrata_test.py @@ -31,7 +31,7 @@ def test_socrata(mocker: MockerFixture, requests_mock: Mocker) -> None: data_url = ( "https://data.cdc.gov/resource/unsk-b7fc.json?" - "%24query=SELECT+%2A+WHERE+location+%3D+%27US%27+ORDER+BY+date+DESC" + "%24query=SELECT+%2A+WHERE+location+%3D+%27US%27+ORDER+BY+date+DESC+LIMIT+7" ) requests_mock.get(data_url, json=cdc_data_response) @@ -70,7 +70,7 @@ def test_socrata_app_token_url(mocker: MockerFixture, requests_mock: Mocker) -> data_url = ( "https://data.cdc.gov/resource/unsk-b7fc.json?" - "%24query=SELECT+%2A+WHERE+location+%3D+%27OK%27+ORDER+BY+date+DESC" + "%24query=SELECT+%2A+WHERE+location+%3D+%27OK%27+ORDER+BY+date+DESC+LIMIT+7" ) data = requests_mock.get(data_url, json=cdc_data_response) @@ -104,7 +104,7 @@ def test_socrata_app_token_connection( data_url = ( "https://data.cdc.gov/resource/unsk-b7fc.json?" - "%24query=SELECT+%2A+WHERE+location+%3D+%27NY%27+ORDER+BY+date+DESC" + "%24query=SELECT+%2A+WHERE+location+%3D+%27NY%27+ORDER+BY+date+DESC+LIMIT+7" ) data = requests_mock.get(data_url, json=cdc_data_response) connection = connect( @@ -137,7 +137,7 @@ def test_socrata_no_data(mocker: MockerFixture, requests_mock: Mocker) -> None: data_url = ( "https://data.cdc.gov/resource/unsk-b7fc.json?" - "%24query=SELECT+%2A+WHERE+location+%3D+%27BR%27+ORDER+BY+date+DESC" + "%24query=SELECT+%2A+WHERE+location+%3D+%27BR%27+ORDER+BY+date+DESC+LIMIT+7" ) requests_mock.get(data_url, json=[]) @@ -198,7 +198,7 @@ def test_socrata_invalid_query(mocker: MockerFixture, requests_mock: Mocker) -> data_url = ( "https://data.cdc.gov/resource/unsk-b7fc.json?" - "%24query=SELECT+%2A+WHERE+location+%3D+%27CA%27+ORDER+BY+date+DESC" + "%24query=SELECT+%2A+WHERE+location+%3D+%27CA%27+ORDER+BY+date+DESC+LIMIT+7" ) requests_mock.get( data_url, diff --git a/tests/adapters/api/system_test.py b/tests/adapters/api/system_test.py index d8d169ae..cb4fbd8d 100644 --- a/tests/adapters/api/system_test.py +++ b/tests/adapters/api/system_test.py @@ -8,6 +8,7 @@ from freezegun import freeze_time from pytest_mock import MockerFixture +from shillelagh.adapters.api.system import SystemAPI from shillelagh.backends.apsw.db import connect from shillelagh.exceptions import ProgrammingError @@ -118,3 +119,43 @@ def test_system_interrupt(mocker: MockerFixture) -> None: (datetime(2021, 1, 1, tzinfo=timezone.utc), 0.01, 0.02, 0.03, 0.04), (datetime(2021, 1, 1, tzinfo=timezone.utc), 0.01, 0.02, 0.03, 0.04), ] + + +def test_get_data(mocker: MockerFixture) -> None: + """ + Test ``get_data``. + """ + adapter = SystemAPI("cpu") + + psutil = mocker.patch("shillelagh.adapters.api.system.psutil") + psutil.cpu_count.return_value = 4 + psutil.cpu_percent.side_effect = [ + [1, 2, 3, 4], + [1, 2, 3, 4], + [1, 2, 3, 4], + [1, 2, 3, 4], + ] + time = mocker.patch("shillelagh.adapters.api.system.time") + + with freeze_time("2021-01-01T00:00:00Z"): + data = list(adapter.get_data({}, [], limit=2, offset=1)) + assert data == [ + { + "rowid": 0, + "timestamp": datetime(2021, 1, 1, 0, 0, tzinfo=timezone.utc), + "cpu0": 0.01, + "cpu1": 0.02, + "cpu2": 0.03, + "cpu3": 0.04, + }, + { + "rowid": 1, + "timestamp": datetime(2021, 1, 1, 0, 0, tzinfo=timezone.utc), + "cpu0": 0.01, + "cpu1": 0.02, + "cpu2": 0.03, + "cpu3": 0.04, + }, + ] + + time.sleep.assert_called_with(1.0) diff --git a/tests/adapters/base_test.py b/tests/adapters/base_test.py index fe80c982..4f8315d8 100644 --- a/tests/adapters/base_test.py +++ b/tests/adapters/base_test.py @@ -168,6 +168,94 @@ def test_adapter_manipulate_rows() -> None: ] +def test_limit_offset(registry: AdapterLoader) -> None: + """ + Test limit/offset in adapters that implement it and adapters that don't. + + Note that SQLite will always enforce the limit, even the adapter declares that it + supports it. For offset, on the other hand, if the adapter declares support for it + then SQLite will not apply an offset (since it couldn't know if an offset was + applied or not). + """ + + class CustomFakeAdapter(FakeAdapter): + + """ + Custom ``FakeAdapter`` with more data. + """ + + def __init__(self): + super().__init__() + + self.data = [ + {"rowid": 0, "name": "Alice", "age": 20, "pets": 0}, + {"rowid": 1, "name": "Bob", "age": 23, "pets": 3}, + {"rowid": None, "name": "Charlie", "age": 6, "pets": 1}, + ] + + class FakeAdapterWithLimitOnly(CustomFakeAdapter): + + """ + An adapter that only supports limit (like ``s3select``) + """ + + scheme = "limit" + + supports_limit = True + supports_offset = False + + class FakeAdapterWithLimitAndOffset(CustomFakeAdapter): + + """ + An adapter that supports both limit and offset. + """ + + scheme = "limit+offset" + + supports_limit = True + supports_offset = True + + class FakeAdapterWithOffsetOnly(CustomFakeAdapter): + + """ + An adapter that supports only offset. + """ + + scheme = "offset" + + supports_limit = False + supports_offset = True + + registry.add("dummy", CustomFakeAdapter) + registry.add("limit", FakeAdapterWithLimitOnly) + registry.add("limit+offset", FakeAdapterWithLimitAndOffset) + registry.add("offset", FakeAdapterWithOffsetOnly) + + connection = connect( + ":memory:", + ["dummy", "limit", "limit+offset", "offset"], + isolation_level="IMMEDIATE", + ) + cursor = connection.cursor() + + # adapter returns 3 rows, SQLite applies limit/offset + cursor.execute('SELECT * FROM "dummy://" LIMIT 1 OFFSET 1') + assert cursor.fetchall() == [(23, "Bob", 3)] + + # adapter returns 3 rows (even though it says it supports ``LIMIT``), SQLite then + # applies offset and enforces limit + cursor.execute('SELECT * FROM "limit://" LIMIT 1 OFFSET 1') + assert cursor.fetchall() == [(23, "Bob", 3)] + + # adapter returns 3 rows, SQLite enforces limit but doesn't apply offset + cursor.execute('SELECT * FROM "limit+offset://" LIMIT 1 OFFSET 1') + assert cursor.fetchall() == [(20, "Alice", 0)] + + # adapter returns 3 rows, SQLite enforces limit but doesn't apply offset + cursor.execute('SELECT * FROM "offset://" LIMIT 1 OFFSET 1') + assert cursor.fetchall() == [(20, "Alice", 0)] + + def test_type_conversion(registry: AdapterLoader) -> None: """ Test that native types are converted correctly. diff --git a/tests/fakes/__init__.py b/tests/fakes/__init__.py index 7f10f0c6..cf2b3cd8 100644 --- a/tests/fakes/__init__.py +++ b/tests/fakes/__init__.py @@ -35,16 +35,21 @@ class FakeAdapter(Adapter): A simple adapter that keeps data in memory. """ + scheme = "dummy" + safe = True + supports_limit = False + supports_offset = False + age = Float(filters=[Range], order=Order.ANY, exact=True) name = String(filters=[Equal], order=Order.ANY, exact=True) pets = Integer(order=Order.ANY) - @staticmethod - def supports(uri: str, fast: bool = True, **kwargs: Any) -> Optional[bool]: + @classmethod + def supports(cls, uri: str, fast: bool = True, **kwargs: Any) -> Optional[bool]: parsed = urllib.parse.urlparse(uri) - return parsed.scheme == "dummy" + return parsed.scheme == cls.scheme @staticmethod def parse_uri(uri: str) -> Tuple[()]: diff --git a/tests/lib_test.py b/tests/lib_test.py index c5b5721f..3c81a764 100644 --- a/tests/lib_test.py +++ b/tests/lib_test.py @@ -22,6 +22,7 @@ DELETED, RowIDManager, analyze, + apply_limit_and_offset, build_sql, combine_args_kwargs, deserialize, @@ -369,3 +370,20 @@ def test_is_not_null() -> None: """ assert is_not_null(20, 10) assert not is_not_null(None, 10) + + +def test_apply_limit_and_offset() -> None: + """ + Test ``apply_limit_and_offset``. + """ + rows = apply_limit_and_offset(iter(range(10))) + assert list(rows) == list(range(10)) + + rows = apply_limit_and_offset(iter(range(10)), limit=2) + assert list(rows) == [0, 1] + + rows = apply_limit_and_offset(iter(range(10)), limit=2, offset=2) + assert list(rows) == [2, 3] + + rows = apply_limit_and_offset(iter(range(10)), offset=2) + assert list(rows) == [2, 3, 4, 5, 6, 7, 8, 9]