Skip to content

Commit

Permalink
feat: support best index (#345)
Browse files Browse the repository at this point in the history
  • Loading branch information
betodealmeida authored Mar 21, 2023
1 parent 6ab702b commit f96317a
Show file tree
Hide file tree
Showing 12 changed files with 207 additions and 24 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ Next
- Allow specifying custom request headers when using the generic JSON adapter (#337)
- Fix for escaping identifiers correctly (#340)
- Support for S3-compatible storage (#343)
- Adapters can now know which columns were requested (#345)

Version 1.2.0 - 2023-02-17
==========================
Expand Down
23 changes: 23 additions & 0 deletions docs/development.rst
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,29 @@ If an adapter declares support for ``LIMIT`` and ``OFFSET`` a corresponding para
Now the adapter can handle ``limit`` and ``offset``, reducing the amount of data that is returned. Note that even if the adapter declares supporting ``LIMIT``, SQLite will still enforce the limit, ie, if for any reason the adapter returns more rows than the limit SQLite will fix the problem. The same is not true for the offset.

Returning only the requested columns
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

By default adapters should return all the columns available, since they have no information on which columns are actually needed. Starting with apsw `apsw 3.41.0.0 <https://github.com/rogerbinns/apsw/releases/tag/3.41.0.0>`_ adapters can optionally receive only the requested columns in their ``get_rows`` and ``get_data`` methods. The adapter must declare support for it by setting the attribute ``supports_bestindex`` to true:

.. code-block:: python
class WeatherAPI(Adapter):
supports_bestindex = True
Then the ``requested_columns: Optional[Set[str]]`` argument will be passed to ``get_rows`` and ``get_data``:

.. code-block:: python
def get_rows(
self,
bounds: Dict[str, Filter],
order: List[Tuple[str, RequestedOrder]],
requested_columns: Optional[Set[str]] = None,
**kwargs: Any,
) -> Iterator[Dict[str, Any]]:
A read-write adapter
====================

Expand Down
4 changes: 3 additions & 1 deletion examples/generic_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,10 @@
cursor = connection.cursor()

SQL = """
SELECT * FROM
SELECT domain, isDead FROM
"https://api.domainsdb.info/v1/domains/search?domain=facebook&zone=com#$.domains[*]"
WHERE isDead > 2
ORDER BY domain DESC
LIMIT 2
"""
for row in cursor.execute(SQL):
Expand Down
2 changes: 2 additions & 0 deletions requirements/base.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ greenlet==1.1.2
# via sqlalchemy
idna==3.3
# via requests
packaging==23.0
# via shillelagh
python-dateutil==2.8.2
# via shillelagh
requests==2.28.1
Expand Down
1 change: 1 addition & 0 deletions requirements/test.txt
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ packaging==21.3
# via
# build
# pytest
# shillelagh
pandas==1.4.3
# via shillelagh
pep517==0.12.0
Expand Down
1 change: 1 addition & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ install_requires =
requests>=2.25.1
sqlalchemy>=1.3
typing_extensions>=3.7.4.3
packaging
# The usage of test_requires is discouraged, see `Dependency Management` docs
# tests_require = pytest; pytest-cov
# Require a specific Python version, e.g. Python 2.7 or >= 3.4
Expand Down
11 changes: 9 additions & 2 deletions src/shillelagh/adapters/api/generic_json.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

import logging
import urllib.parse
from typing import Any, Dict, Iterator, List, Optional, Tuple
from typing import Any, Dict, Iterator, List, Optional, Set, Tuple

import requests_cache
from jsonpath import JSONPath
Expand Down Expand Up @@ -49,6 +49,7 @@ class GenericJSONAPI(Adapter):

supports_limit = False
supports_offset = False
supports_bestindex = True

@staticmethod
def supports(uri: str, fast: bool = True, **kwargs: Any) -> Optional[bool]:
Expand Down Expand Up @@ -108,12 +109,13 @@ def get_columns(self) -> Dict[str, Field]:

get_cost = SimpleCostModel(AVERAGE_NUMBER_OF_ROWS)

def get_data( # pylint: disable=unused-argument
def get_data( # pylint: disable=unused-argument, too-many-arguments
self,
bounds: Dict[str, Filter],
order: List[Tuple[str, RequestedOrder]],
limit: Optional[int] = None,
offset: Optional[int] = None,
requested_columns: Optional[Set[str]] = None,
**kwargs: Any,
) -> Iterator[Row]:
response = self._session.get(self.uri)
Expand All @@ -123,6 +125,11 @@ def get_data( # pylint: disable=unused-argument

parser = JSONPath(self.path)
for i, row in enumerate(parser.parse(payload)):
row = {
k: v
for k, v in row.items()
if requested_columns is None or k in requested_columns
}
row["rowid"] = i
_logger.debug(row)
yield flatten(row)
3 changes: 3 additions & 0 deletions src/shillelagh/adapters/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ class Adapter:
supports_limit = False
supports_offset = False

# if true, the requested columns will be passed to ``get_rows`` and ``get_data``
supports_bestindex = False

def __init__(self, *args: Any, **kwargs: Any): # pylint: disable=unused-argument
# ensure ``self.close`` gets called before GC
atexit.register(self.close)
Expand Down
10 changes: 9 additions & 1 deletion src/shillelagh/backends/apsw/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
)

import apsw
from packaging.version import Version

from shillelagh import functions
from shillelagh.adapters.base import Adapter
Expand Down Expand Up @@ -447,7 +448,14 @@ def __init__( # pylint: disable=too-many-arguments

# register adapters
for adapter in adapters:
self._connection.createmodule(adapter.__name__, VTModule(adapter))
if Version(apsw.apswversion()) >= Version("3.41.0.0"):
self._connection.createmodule(
adapter.__name__,
VTModule(adapter),
use_bestindex_object=adapter.supports_bestindex,
)
else:
self._connection.createmodule(adapter.__name__, VTModule(adapter))
self._adapters = adapters
self._adapter_kwargs = adapter_kwargs

Expand Down
94 changes: 75 additions & 19 deletions src/shillelagh/backends/apsw/vt.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
)

import apsw
from packaging.version import Version

from shillelagh.adapters.base import Adapter
from shillelagh.exceptions import ProgrammingError
Expand Down Expand Up @@ -51,6 +52,18 @@
SQLiteValidType,
)

if Version(apsw.apswversion()) >= Version("3.41.0.0"): # pragma: no cover
from apsw.ext import index_info_to_dict
else: # pragma: no cover
apsw.IndexInfo = Any # for type annotation

# pylint: disable=unused-argument
def index_info_to_dict(index_info: apsw.IndexInfo) -> None:
"""
Dummy function for testing.
"""


_logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -302,6 +315,7 @@ class VTTable:

def __init__(self, adapter: Adapter):
self.adapter = adapter
self.requested_columns: Optional[Set[str]] = None

def get_create_table(self, tablename: str) -> str:
"""
Expand Down Expand Up @@ -342,22 +356,25 @@ def BestIndex( # pylint: disable=too-many-locals
filtered_columns: List[Tuple[str, Operator]] = []
for column_index, sqlite_index_constraint in constraints:
operator = operator_map.get(sqlite_index_constraint)
column_name = column_names[column_index]
column_type = column_types[column_index]
for class_ in column_type.filters:
if operator in class_.operators:
filtered_columns.append((column_name, operator))
constraints_used.append((filter_index, column_type.exact))
filter_index += 1
indexes.append((column_index, sqlite_index_constraint))
break
else:
if (operator is Operator.LIMIT and self.adapter.supports_limit) or (
operator is Operator.OFFSET and self.adapter.supports_offset
):
constraints_used.append((filter_index, True))
filter_index += 1
indexes.append((LIMIT_OFFSET_INDEX, sqlite_index_constraint))

# LIMIT/OFFSET
if (operator is Operator.LIMIT and self.adapter.supports_limit) or (
operator is Operator.OFFSET and self.adapter.supports_offset
):
constraints_used.append((filter_index, True))
filter_index += 1
indexes.append((LIMIT_OFFSET_INDEX, sqlite_index_constraint))
# column operator
elif column_index >= 0:
column_name = column_names[column_index]
column_type = column_types[column_index]
for class_ in column_type.filters:
if operator in class_.operators:
filtered_columns.append((column_name, operator))
constraints_used.append((filter_index, column_type.exact))
filter_index += 1
indexes.append((column_index, sqlite_index_constraint))
break
else:
constraints_used.append(None)

Expand Down Expand Up @@ -389,11 +406,47 @@ def BestIndex( # pylint: disable=too-many-locals
estimated_cost,
)

def BestIndexObject(self, index_info: apsw.IndexInfo) -> bool:
"""
Alternative to ``BestIndex`` that allows returning only selected columns.
"""
columns = self.adapter.get_columns()
column_names = list(columns.keys())
self.requested_columns = {column_names[i] for i in index_info.colUsed}

index_info_dict = index_info_to_dict(index_info)
constraints = [
(constraint.get("iColumn", -1), constraint["op"])
for constraint in index_info_dict["aConstraint"]
]
orderbys = [
(orderby["iColumn"], orderby["desc"])
for orderby in index_info_dict["aOrderBy"]
]
(
constraints_used,
index_number,
index_name,
orderby_consumed,
estimated_cost,
) = self.BestIndex(constraints, orderbys)

for i, constraint in enumerate(constraints_used):
if isinstance(constraint, tuple):
index_info.set_aConstraintUsage_argvIndex(i, constraint[0])
index_info.set_aConstraintUsage_omit(i, constraint[1])
index_info.idxNum = index_number
index_info.idxStr = index_name
index_info.orderByConsumed = orderby_consumed
index_info.estimatedCost = estimated_cost

return True

def Open(self) -> "VTCursor":
"""
Returns a cursor object.
"""
return VTCursor(self.adapter)
return VTCursor(self.adapter, self.requested_columns)

def Disconnect(self) -> None:
"""
Expand Down Expand Up @@ -449,8 +502,9 @@ class VTCursor:
An object for iterating over a table.
"""

def __init__(self, adapter: Adapter):
def __init__(self, adapter: Adapter, requested_columns: Optional[Set[str]] = None):
self.adapter = adapter
self.requested_columns = requested_columns

self.data: Iterator[Tuple[Any, ...]]
self.current_row: Tuple[Any, ...]
Expand Down Expand Up @@ -485,11 +539,13 @@ def Filter( # pylint: disable=too-many-locals
order = get_order(orderbys, column_names)

# limit and offset were introduced in 1.1, and not all adapters support it
kwargs = {}
kwargs: Dict[str, Any] = {}
if self.adapter.supports_limit:
kwargs["limit"] = limit
if self.adapter.supports_offset:
kwargs["offset"] = offset
if self.adapter.supports_bestindex:
kwargs["requested_columns"] = self.requested_columns

rows = self.adapter.get_rows(bounds, order, **kwargs)
rows = convert_rows_to_sqlite(columns, rows)
Expand Down
29 changes: 28 additions & 1 deletion tests/backends/apsw/db_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from pytest_mock import MockerFixture

from shillelagh.adapters.registry import AdapterLoader
from shillelagh.backends.apsw.db import connect, convert_binding
from shillelagh.backends.apsw.db import Connection, connect, convert_binding
from shillelagh.exceptions import InterfaceError, NotSupportedError, ProgrammingError
from shillelagh.fields import Float, String, StringInteger

Expand Down Expand Up @@ -477,3 +477,30 @@ def test_drop_table(mocker: MockerFixture, registry: AdapterLoader) -> None:

cursor.execute('DROP TABLE "dummy://"')
drop_table.assert_called() # type: ignore


def test_best_index(mocker: MockerFixture) -> None:
"""
Test that ``use_bestindex_object`` is only passed for apsw >= 3.41.0.0
"""
# pylint: disable=redefined-outer-name, invalid-name
apsw = mocker.patch("shillelagh.backends.apsw.db.apsw")
VTModule = mocker.patch("shillelagh.backends.apsw.db.VTModule")
adapter = mocker.MagicMock()
adapter.__name__ = "some_adapter"
adapter.supports_bestindex = True

apsw.apswversion.return_value = "3.41.0.0"
Connection(":memory:", [adapter], {})
apsw.Connection().createmodule.assert_called_with(
"some_adapter",
VTModule(adapter),
use_bestindex_object=True,
)

apsw.apswversion.return_value = "3.40.1.0"
Connection(":memory:", [adapter], {})
apsw.Connection().createmodule.assert_called_with(
"some_adapter",
VTModule(adapter),
)
Loading

0 comments on commit f96317a

Please sign in to comment.