diff --git a/poetry.lock b/poetry.lock index 1646e3d3..513a71e6 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1607,29 +1607,26 @@ files = [ [[package]] name = "singer-sdk" -version = "0.40.0" +version = "0.40.0.post12.dev0+6708cb99" description = "A framework for building Singer taps" optional = false python-versions = ">=3.8" -files = [ - {file = "singer_sdk-0.40.0-py3-none-any.whl", hash = "sha256:eb54a1de031a8888adb5ba2d1236161e95b4f87522444c943229a122f4d6b1ee"}, - {file = "singer_sdk-0.40.0.tar.gz", hash = "sha256:f9360cbfac187cf4f14338b181313cab5726c8d1314c5719f2e8212c375ca3cf"}, -] +files = [] +develop = false [package.dependencies] backoff = {version = ">=2.0.0", markers = "python_version < \"4\""} backports-datetime-fromisoformat = {version = ">=2.0.1", markers = "python_version < \"3.11\""} -click = ">=8.0,<9.0" -faker = {version = ">=22.5", optional = true, markers = "extra == \"faker\""} +click = "~=8.0" fs = ">=2.4.16" importlib-metadata = {version = "<9.0.0", markers = "python_version < \"3.12\""} -importlib-resources = {version = ">=5.12.0,<6.2.0 || >6.2.0,<6.3.0 || >6.3.0,<6.3.1 || >6.3.1", markers = "python_version < \"3.10\""} +importlib-resources = {version = ">=5.12.0,!=6.2.0,!=6.3.0,!=6.3.1", markers = "python_version < \"3.10\""} inflection = ">=0.5.1" joblib = ">=1.3.0" jsonpath-ng = ">=1.5.3" jsonschema = ">=4.16.0" packaging = ">=23.1" -pytest = {version = ">=7.2.1", optional = true, markers = "extra == \"docs\" or extra == \"testing\""} +pytest = {version = ">=7.2.1", optional = true} python-dotenv = ">=0.20" PyYAML = ">=6.0" referencing = ">=0.30.0" @@ -1648,6 +1645,12 @@ parquet = ["numpy (>=1.22)", "numpy (>=1.22,<1.25)", "numpy (>=1.22,<2.1)", "pya s3 = ["fs-s3fs (>=1.1.1)"] testing = ["pytest (>=7.2.1)"] +[package.source] +type = "git" +url = "https://github.com/meltano/sdk.git" +reference = "HEAD" +resolved_reference = "6708cb995c68ab6f74d4874dfc8f978c3b054ceb" + [[package]] name = "six" version = "1.16.0" @@ -1955,4 +1958,4 @@ type = ["pytest-mypy"] [metadata] lock-version = "2.0" python-versions = ">=3.8.1" -content-hash = "e7e4eac15fb0b08f3b711ccdb1fb2c6bf74a0903bedd3e8749c0379cbe5c75dd" +content-hash = "07ca5ae97a209704b1e98aa8a2bd6b5227713ce33ed568fc7812dd654dfe59c7" diff --git a/pyproject.toml b/pyproject.toml index f5a477ab..bad96165 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -39,8 +39,7 @@ sqlalchemy = "<3" sshtunnel = "0.4.0" [tool.poetry.dependencies.singer-sdk] -version = "~=0.40.0a1" -extras = ["faker"] +git = "https://github.com/meltano/sdk.git" [tool.poetry.group.dev.dependencies] faker = ">=18.5.1" @@ -57,7 +56,7 @@ types-jsonschema = ">=4.19.0.3" types-psycopg2 = ">=2.9.21.20240118" [tool.poetry.dev-dependencies.singer-sdk] -version = "*" +git = "https://github.com/meltano/sdk.git" extras = ["testing"] [tool.mypy] diff --git a/tap_postgres/client.py b/tap_postgres/client.py index 2babaf53..40e2f4b4 100644 --- a/tap_postgres/client.py +++ b/tap_postgres/client.py @@ -6,6 +6,7 @@ from __future__ import annotations import datetime +import functools import json import select import typing as t @@ -16,19 +17,59 @@ import psycopg2 import singer_sdk.helpers._typing import sqlalchemy as sa +import sqlalchemy.types from psycopg2 import extras from singer_sdk import SQLConnector, SQLStream -from singer_sdk import typing as th +from singer_sdk.connectors.sql import SQLToJSONSchema from singer_sdk.helpers._state import increment_state from singer_sdk.helpers._typing import TypeConformanceLevel from singer_sdk.streams.core import REPLICATION_INCREMENTAL +from sqlalchemy.dialects import postgresql if TYPE_CHECKING: from singer_sdk.helpers.types import Context from sqlalchemy.dialects import postgresql from sqlalchemy.engine import Engine from sqlalchemy.engine.reflection import Inspector - from sqlalchemy.types import TypeEngine + + +class PostgresSQLToJSONSchema(SQLToJSONSchema): + """Custom SQL to JSON Schema conversion for Postgres.""" + + def __init__(self, dates_as_string: bool, *args, **kwargs): + """Initialize the SQL to JSON Schema converter.""" + super().__init__(*args, **kwargs) + self.dates_as_string = dates_as_string + + @SQLToJSONSchema.to_jsonschema.register # type: ignore[attr-defined] + def array_to_jsonschema(self, column_type: postgresql.ARRAY) -> dict: + """Override the default mapping for NUMERIC columns. + + For example, a scale of 4 translates to a multipleOf 0.0001. + """ + return { + "type": "array", + "items": self.to_jsonschema(column_type.item_type), + } + + @SQLToJSONSchema.to_jsonschema.register # type: ignore[attr-defined] + def json_to_jsonschema(self, column_type: postgresql.JSON) -> dict: + """Override the default mapping for JSON and JSONB columns.""" + return {"type": ["string", "number", "integer", "array", "object", "boolean"]} + + @SQLToJSONSchema.to_jsonschema.register # type: ignore[attr-defined] + def datetime_to_jsonschema(self, column_type: sqlalchemy.types.DateTime) -> dict: + """Override the default mapping for DATETIME columns.""" + if self.dates_as_string: + return {"type": ["string", "null"]} + return super().datetime_to_jsonschema(column_type) + + @SQLToJSONSchema.to_jsonschema.register # type: ignore[attr-defined] + def date_to_jsonschema(self, column_type: sqlalchemy.types.Date) -> dict: + """Override the default mapping for DATE columns.""" + if self.dates_as_string: + return {"type": ["string", "null"]} + return super().date_to_jsonschema(column_type) def patched_conform( @@ -115,131 +156,10 @@ def __init__( super().__init__(config=config, sqlalchemy_url=sqlalchemy_url) - # Note super is static, we can get away with this because this is called once - # and is luckily referenced via the instance of the class - def to_jsonschema_type( # type: ignore[override] - self, - sql_type: str | TypeEngine | type[TypeEngine] | postgresql.ARRAY | Any, - ) -> dict: - """Return a JSON Schema representation of the provided type. - - Overridden from SQLConnector to correctly handle JSONB and Arrays. - - Also Overridden in order to call our instance method `sdk_typing_object()` - instead of the static version - - By default will call `typing.to_jsonschema_type()` for strings and SQLAlchemy - types. - - Args: - sql_type: The string representation of the SQL type, a SQLAlchemy - TypeEngine class or object, or a custom-specified object. - - Raises: - ValueError: If the type received could not be translated to jsonschema. - - Returns: - The JSON Schema representation of the provided type. - - """ - type_name = None - if isinstance(sql_type, str): - type_name = sql_type - elif isinstance(sql_type, sa.types.TypeEngine): - type_name = type(sql_type).__name__ - - if ( - type_name is not None - and isinstance(sql_type, sa.dialects.postgresql.ARRAY) - and type_name == "ARRAY" - ): - array_type = self.sdk_typing_object(sql_type.item_type) - return th.ArrayType(array_type).type_dict - return self.sdk_typing_object(sql_type).type_dict - - def sdk_typing_object( - self, - from_type: str | TypeEngine | type[TypeEngine], - ) -> ( - th.DateTimeType - | th.NumberType - | th.IntegerType - | th.DateType - | th.StringType - | th.BooleanType - | th.CustomType - ): - """Return the JSON Schema dict that describes the sql type. - - Args: - from_type: The SQL type as a string or as a TypeEngine. If a TypeEngine is - provided, it may be provided as a class or a specific object instance. - - Raises: - ValueError: If the `from_type` value is not of type `str` or `TypeEngine`. - - Returns: - A compatible JSON Schema type definition. - """ - # NOTE: This is an ordered mapping, with earlier mappings taking precedence. If - # the SQL-provided type contains the type name on the left, the mapping will - # return the respective singer type. - # NOTE: jsonb and json should theoretically be th.AnyType().type_dict but that - # causes problems down the line with an error like: - # singer_sdk.helpers._typing.EmptySchemaTypeError: Could not detect type from - # empty type_dict. Did you forget to define a property in the stream schema? - sqltype_lookup: dict[ - str, - th.DateTimeType - | th.NumberType - | th.IntegerType - | th.DateType - | th.StringType - | th.BooleanType - | th.CustomType, - ] = { - "jsonb": th.CustomType( - {"type": ["string", "number", "integer", "array", "object", "boolean"]} - ), - "json": th.CustomType( - {"type": ["string", "number", "integer", "array", "object", "boolean"]} - ), - "timestamp": th.DateTimeType(), - "datetime": th.DateTimeType(), - "date": th.DateType(), - "int": th.IntegerType(), - "numeric": th.NumberType(), - "decimal": th.NumberType(), - "double": th.NumberType(), - "float": th.NumberType(), - "real": th.NumberType(), - "float4": th.NumberType(), - "string": th.StringType(), - "text": th.StringType(), - "char": th.StringType(), - "bool": th.BooleanType(), - "variant": th.StringType(), - } - if self.config["dates_as_string"] is True: - sqltype_lookup["date"] = th.StringType() - sqltype_lookup["datetime"] = th.StringType() - if isinstance(from_type, str): - type_name = from_type - elif isinstance(from_type, sa.types.TypeEngine): - type_name = type(from_type).__name__ - elif isinstance(from_type, type) and issubclass(from_type, sa.types.TypeEngine): - type_name = from_type.__name__ - else: - raise ValueError( - "Expected `str` or a SQLAlchemy `TypeEngine` object or type." - ) - - # Look for the type name within the known SQL type names: - for sqltype, jsonschema_type in sqltype_lookup.items(): - if sqltype.lower() in type_name.lower(): - return jsonschema_type - - return sqltype_lookup["string"] # safe failover to str + @functools.cached_property + def sql_to_jsonschema(self): + """Return a mapping of SQL types to JSON Schema types.""" + return PostgresSQLToJSONSchema(dates_as_string=self.config["dates_as_string"]) def get_schema_names(self, engine: Engine, inspected: Inspector) -> list[str]: """Return a list of schema names in DB, or overrides with user-provided values.