From 93bbe08dbf82398e8a63ff3ac3570f460181063b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edgar=20Ram=C3=ADrez-Mondrag=C3=B3n?= Date: Tue, 25 Jun 2024 18:51:28 +0200 Subject: [PATCH] refactor: Use a single source of truth for built-in capabilities --- docs/builtin.rst | 17 + docs/index.rst | 1 + singer_sdk/connectors/sql.py | 4 +- singer_sdk/helpers/capabilities.py | 375 ------------------ singer_sdk/helpers/capabilities/__init__.py | 74 ++++ singer_sdk/helpers/capabilities/_builtin.py | 49 +++ .../helpers/capabilities/_config_property.py | 41 ++ singer_sdk/helpers/capabilities/_enum.py | 200 ++++++++++ singer_sdk/helpers/capabilities/_schema.py | 182 +++++++++ singer_sdk/mapper.py | 2 +- singer_sdk/mapper_base.py | 15 +- singer_sdk/plugin_base.py | 48 +-- singer_sdk/sinks/core.py | 31 +- singer_sdk/tap_base.py | 45 +-- singer_sdk/target_base.py | 89 ++--- tests/core/helpers/__init__.py | 0 tests/core/helpers/capabilities/__init__.py | 0 .../capabilities}/test_capabilities.py | 26 ++ .../capabilities/test_config_property.py | 33 ++ 19 files changed, 709 insertions(+), 523 deletions(-) create mode 100644 docs/builtin.rst delete mode 100644 singer_sdk/helpers/capabilities.py create mode 100644 singer_sdk/helpers/capabilities/__init__.py create mode 100644 singer_sdk/helpers/capabilities/_builtin.py create mode 100644 singer_sdk/helpers/capabilities/_config_property.py create mode 100644 singer_sdk/helpers/capabilities/_enum.py create mode 100644 singer_sdk/helpers/capabilities/_schema.py create mode 100644 tests/core/helpers/__init__.py create mode 100644 tests/core/helpers/capabilities/__init__.py rename tests/core/{ => helpers/capabilities}/test_capabilities.py (63%) create mode 100644 tests/core/helpers/capabilities/test_config_property.py diff --git a/docs/builtin.rst b/docs/builtin.rst new file mode 100644 index 000000000..9f896e03d --- /dev/null +++ b/docs/builtin.rst @@ -0,0 +1,17 @@ +Built-in Settings and Capabilities +================================== + +.. currentmodule:: singer_sdk.helpers.capabilities + +The Singer SDK library provides a number of built-in settings and capabilities. + +.. autodata:: ADD_RECORD_METADATA + :no-value: + + .. autoattribute:: ADD_RECORD_METADATA.schema + +.. autodata:: BATCH + :no-value: + + .. autoattribute:: BATCH.schema + .. autoattribute:: BATCH.capability diff --git a/docs/index.rst b/docs/index.rst index 081c56f24..a9c89fbb8 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -57,6 +57,7 @@ within the `#singer-tap-development`_ and `#singer-target-development`_ Slack ch implementation/index typing capabilities + builtin .. toctree:: :caption: Advanced Concepts diff --git a/singer_sdk/connectors/sql.py b/singer_sdk/connectors/sql.py index 132c37518..12b5c0a0f 100644 --- a/singer_sdk/connectors/sql.py +++ b/singer_sdk/connectors/sql.py @@ -17,7 +17,7 @@ from singer_sdk import typing as th from singer_sdk._singerlib import CatalogEntry, MetadataMapping, Schema from singer_sdk.exceptions import ConfigValidationError -from singer_sdk.helpers.capabilities import TargetLoadMethods +from singer_sdk.helpers import capabilities if t.TYPE_CHECKING: from sqlalchemy.engine import Engine @@ -779,7 +779,7 @@ def prepare_table( as_temp_table=as_temp_table, ) return - if self.config["load_method"] == TargetLoadMethods.OVERWRITE: + if self.config["load_method"] == capabilities.TargetLoadMethods.OVERWRITE: self.get_table(full_table_name=full_table_name).drop(self._engine) self.create_empty_table( full_table_name=full_table_name, diff --git a/singer_sdk/helpers/capabilities.py b/singer_sdk/helpers/capabilities.py deleted file mode 100644 index f76400c5a..000000000 --- a/singer_sdk/helpers/capabilities.py +++ /dev/null @@ -1,375 +0,0 @@ -"""Module with helpers to declare capabilities and plugin behavior.""" - -from __future__ import annotations - -import typing as t -from enum import Enum, EnumMeta -from warnings import warn - -from singer_sdk.typing import ( - ArrayType, - BooleanType, - IntegerType, - NumberType, - ObjectType, - OneOf, - PropertiesList, - Property, - StringType, -) - -_EnumMemberT = t.TypeVar("_EnumMemberT") - -# Default JSON Schema to support config for built-in capabilities: - -STREAM_MAPS_CONFIG = PropertiesList( - Property( - "stream_maps", - ObjectType(), - description=( - "Config object for stream maps capability. " - "For more information check out " - "[Stream Maps](https://sdk.meltano.com/en/latest/stream_maps.html)." - ), - ), - Property( - "stream_map_config", - ObjectType(), - description="User-defined config values to be used within map expressions.", - ), - Property( - "faker_config", - ObjectType( - Property( - "seed", - OneOf(NumberType, StringType, BooleanType), - description=( - "Value to seed the Faker generator for deterministic output: " - "https://faker.readthedocs.io/en/master/#seeding-the-generator" - ), - ), - Property( - "locale", - OneOf(StringType, ArrayType(StringType)), - description=( - "One or more LCID locale strings to produce localized output for: " - "https://faker.readthedocs.io/en/master/#localization" - ), - ), - ), - description=( - "Config for the [`Faker`](https://faker.readthedocs.io/en/master/) " - "instance variable `fake` used within map expressions. Only applicable if " - "the plugin specifies `faker` as an addtional dependency (through the " - "`singer-sdk` `faker` extra or directly)." - ), - ), -).to_dict() -FLATTENING_CONFIG = PropertiesList( - Property( - "flattening_enabled", - BooleanType(), - description=( - "'True' to enable schema flattening and automatically expand nested " - "properties." - ), - ), - Property( - "flattening_max_depth", - IntegerType(), - description="The max depth to flatten schemas.", - ), -).to_dict() -BATCH_CONFIG = PropertiesList( - Property( - "batch_config", - description="", - wrapped=ObjectType( - Property( - "encoding", - description="Specifies the format and compression of the batch files.", - wrapped=ObjectType( - Property( - "format", - StringType, - allowed_values=["jsonl", "parquet"], - description="Format to use for batch files.", - ), - Property( - "compression", - StringType, - allowed_values=["gzip", "none"], - description="Compression format to use for batch files.", - ), - ), - ), - Property( - "storage", - description="Defines the storage layer to use when writing batch files", - wrapped=ObjectType( - Property( - "root", - StringType, - description="Root path to use when writing batch files.", - ), - Property( - "prefix", - StringType, - description="Prefix to use when writing batch files.", - ), - ), - ), - ), - ), -).to_dict() -TARGET_SCHEMA_CONFIG = PropertiesList( - Property( - "default_target_schema", - StringType(), - description="The default target database schema name to use for all streams.", - ), -).to_dict() -ADD_RECORD_METADATA_CONFIG = PropertiesList( - Property( - "add_record_metadata", - BooleanType(), - description="Add metadata to records.", - ), -).to_dict() -TARGET_HARD_DELETE_CONFIG = PropertiesList( - Property( - "hard_delete", - BooleanType(), - description="Hard delete records.", - default=False, - ), -).to_dict() -TARGET_VALIDATE_RECORDS_CONFIG = PropertiesList( - Property( - "validate_records", - BooleanType(), - description="Whether to validate the schema of the incoming streams.", - default=True, - ), -).to_dict() -TARGET_BATCH_SIZE_ROWS_CONFIG = PropertiesList( - Property( - "batch_size_rows", - IntegerType, - description="Maximum number of rows in each batch.", - ), -).to_dict() - - -class TargetLoadMethods(str, Enum): - """Target-specific capabilities.""" - - # always write all input records whether that records already exists or not - APPEND_ONLY = "append-only" - - # update existing records and insert new records - UPSERT = "upsert" - - # delete all existing records and insert all input records - OVERWRITE = "overwrite" - - -TARGET_LOAD_METHOD_CONFIG = PropertiesList( - Property( - "load_method", - StringType(), - description=( - "The method to use when loading data into the destination. " - "`append-only` will always write all input records whether that records " - "already exists or not. `upsert` will update existing records and insert " - "new records. `overwrite` will delete all existing records and insert all " - "input records." - ), - allowed_values=[ - TargetLoadMethods.APPEND_ONLY, - TargetLoadMethods.UPSERT, - TargetLoadMethods.OVERWRITE, - ], - default=TargetLoadMethods.APPEND_ONLY, - ), -).to_dict() - - -class DeprecatedEnum(Enum): - """Base class for capabilities enumeration.""" - - def __new__( - cls, - value: _EnumMemberT, - deprecation: str | None = None, - ) -> DeprecatedEnum: - """Create a new enum member. - - Args: - value: Enum member value. - deprecation: Deprecation message. - - Returns: - An enum member value. - """ - member: DeprecatedEnum = object.__new__(cls) - member._value_ = value - member.deprecation = deprecation - return member - - @property - def deprecation_message(self) -> str | None: - """Get deprecation message. - - Returns: - Deprecation message. - """ - self.deprecation: str | None - return self.deprecation - - def emit_warning(self) -> None: - """Emit deprecation warning.""" - warn( - f"{self.name} is deprecated. {self.deprecation_message}", - DeprecationWarning, - stacklevel=3, - ) - - -class DeprecatedEnumMeta(EnumMeta): - """Metaclass for enumeration with deprecation support.""" - - def __getitem__(self, name: str) -> t.Any: # noqa: ANN401 - """Retrieve mapping item. - - Args: - name: Item name. - - Returns: - Enum member. - """ - obj: Enum = super().__getitem__(name) - if isinstance(obj, DeprecatedEnum) and obj.deprecation_message: - obj.emit_warning() - return obj - - def __getattribute__(cls, name: str) -> t.Any: # noqa: ANN401, N805 - """Retrieve enum attribute. - - Args: - name: Attribute name. - - Returns: - Attribute. - """ - obj = super().__getattribute__(name) - if isinstance(obj, DeprecatedEnum) and obj.deprecation_message: - obj.emit_warning() - return obj - - def __call__(self, *args: t.Any, **kwargs: t.Any) -> t.Any: # noqa: ANN401 - """Call enum member. - - Args: - args: Positional arguments. - kwargs: Keyword arguments. - - Returns: - Enum member. - """ - obj = super().__call__(*args, **kwargs) - if isinstance(obj, DeprecatedEnum) and obj.deprecation_message: - obj.emit_warning() - return obj - - -class CapabilitiesEnum(DeprecatedEnum, metaclass=DeprecatedEnumMeta): - """Base capabilities enumeration.""" - - def __str__(self) -> str: - """String representation. - - Returns: - Stringified enum value. - """ - return str(self.value) - - def __repr__(self) -> str: - """String representation. - - Returns: - Stringified enum value. - """ - return str(self.value) - - -class PluginCapabilities(CapabilitiesEnum): - """Core capabilities which can be supported by taps and targets.""" - - #: Support plugin capability and setting discovery. - ABOUT = "about" - - #: Support :doc:`inline stream map transforms`. - STREAM_MAPS = "stream-maps" - - #: Support schema flattening, aka denesting of complex properties. - FLATTENING = "schema-flattening" - - #: Support the - #: `ACTIVATE_VERSION `_ - #: extension. - ACTIVATE_VERSION = "activate-version" - - #: Input and output from - #: `batched files `_. - #: A.K.A ``FAST_SYNC``. - BATCH = "batch" - - -class TapCapabilities(CapabilitiesEnum): - """Tap-specific capabilities.""" - - #: Generate a catalog with `--discover`. - DISCOVER = "discover" - - #: Accept input catalog, apply metadata and selection rules. - CATALOG = "catalog" - - #: Incremental refresh by means of state tracking. - STATE = "state" - - #: Automatic connectivity and stream init test via :ref:`--test`. - TEST = "test" - - #: Support for ``replication_method: LOG_BASED``. You can read more about this - #: feature in `MeltanoHub `_. - LOG_BASED = "log-based" - - #: Deprecated. Please use :attr:`~TapCapabilities.CATALOG` instead. - PROPERTIES = "properties", "Please use CATALOG instead." - - -class TargetCapabilities(CapabilitiesEnum): - """Target-specific capabilities.""" - - #: Allows a ``soft_delete=True`` config option. - #: Requires a tap stream supporting :attr:`PluginCapabilities.ACTIVATE_VERSION` - #: and/or :attr:`TapCapabilities.LOG_BASED`. - SOFT_DELETE = "soft-delete" - - #: Allows a ``hard_delete=True`` config option. - #: Requires a tap stream supporting :attr:`PluginCapabilities.ACTIVATE_VERSION` - #: and/or :attr:`TapCapabilities.LOG_BASED`. - HARD_DELETE = "hard-delete" - - #: Fail safe for unknown JSON Schema types. - DATATYPE_FAILSAFE = "datatype-failsafe" - - #: Allow denesting complex properties. - RECORD_FLATTENING = "record-flattening" - - #: Allow setting the target schema. - TARGET_SCHEMA = "target-schema" - - #: Validate the schema of the incoming records. - VALIDATE_RECORDS = "validate-records" diff --git a/singer_sdk/helpers/capabilities/__init__.py b/singer_sdk/helpers/capabilities/__init__.py new file mode 100644 index 000000000..3f258d435 --- /dev/null +++ b/singer_sdk/helpers/capabilities/__init__.py @@ -0,0 +1,74 @@ +"""Module with helpers to declare capabilities and plugin behavior.""" + +from __future__ import annotations + +from singer_sdk.helpers.capabilities import _schema as schema +from singer_sdk.helpers.capabilities._builtin import Builtin +from singer_sdk.helpers.capabilities._config_property import ConfigProperty +from singer_sdk.helpers.capabilities._enum import ( + CapabilitiesEnum, + PluginCapabilities, + TapCapabilities, + TargetCapabilities, + TargetLoadMethods, +) + +__all__ = [ + "ADD_RECORD_METADATA", + "BATCH", + "FLATTENING", + "STREAM_MAPS", + "TARGET_BATCH_SIZE_ROWS", + "TARGET_HARD_DELETE", + "TARGET_LOAD_METHOD", + "TARGET_SCHEMA", + "TARGET_VALIDATE_RECORDS", + "CapabilitiesEnum", + "ConfigProperty", + "PluginCapabilities", + "TapCapabilities", + "TargetCapabilities", + "TargetLoadMethods", +] + +#: Add metadata to records. +#: +#: Example: +#: +#: .. code-block:: json +#: +#: { +#: "add_record_metadata": true +#: } +#: +ADD_RECORD_METADATA = Builtin(schema=schema.ADD_RECORD_METADATA_CONFIG) + +#: For taps, support emitting BATCH messages. For targets, support consuming BATCH +#: messages. +BATCH = Builtin( + schema=schema.BATCH_CONFIG, + capability=PluginCapabilities.BATCH, +) + +FLATTENING = Builtin( + schema=schema.FLATTENING_CONFIG, + capability=PluginCapabilities.FLATTENING, +) +STREAM_MAPS = Builtin( + schema.STREAM_MAPS_CONFIG, + capability=PluginCapabilities.STREAM_MAPS, +) +TARGET_BATCH_SIZE_ROWS = Builtin(schema=schema.TARGET_BATCH_SIZE_ROWS_CONFIG) +TARGET_HARD_DELETE = Builtin( + schema=schema.TARGET_HARD_DELETE_CONFIG, + capability=TargetCapabilities.HARD_DELETE, +) +TARGET_LOAD_METHOD = Builtin(schema=schema.TARGET_LOAD_METHOD_CONFIG) +TARGET_SCHEMA = Builtin( + schema=schema.TARGET_SCHEMA_CONFIG, + capability=TargetCapabilities.TARGET_SCHEMA, +) +TARGET_VALIDATE_RECORDS = Builtin( + schema=schema.TARGET_VALIDATE_RECORDS_CONFIG, + capability=TargetCapabilities.VALIDATE_RECORDS, +) diff --git a/singer_sdk/helpers/capabilities/_builtin.py b/singer_sdk/helpers/capabilities/_builtin.py new file mode 100644 index 000000000..b3a2c1479 --- /dev/null +++ b/singer_sdk/helpers/capabilities/_builtin.py @@ -0,0 +1,49 @@ +from __future__ import annotations + +import typing as t + +from ._config_property import ConfigProperty + +if t.TYPE_CHECKING: + from ._enum import CapabilitiesEnum + +_T = t.TypeVar("_T") + + +class Builtin: + """Use this class to define built-in setting(s) for a plugin.""" + + def __init__( + self, + schema: dict[str, t.Any], + *, + capability: CapabilitiesEnum | None = None, + **kwargs: t.Any, + ): + """Initialize the descriptor. + + Args: + schema: The JSON schema for the setting. + capability: The capability that the setting is associated with. + kwargs: Additional keyword arguments. + """ + self.schema = schema + self.capability = capability + self.kwargs = kwargs + + def attribute( # noqa: PLR6301 + self, + custom_key: str | None = None, + *, + default: _T | None = None, + ) -> ConfigProperty[_T]: + """Generate a class attribute for the setting. + + Args: + custom_key: Custom key to use in the config. + default: Default value for the setting. + + Returns: + Class attribute for the setting. + """ + return ConfigProperty(custom_key=custom_key, default=default) diff --git a/singer_sdk/helpers/capabilities/_config_property.py b/singer_sdk/helpers/capabilities/_config_property.py new file mode 100644 index 000000000..ef3e35b56 --- /dev/null +++ b/singer_sdk/helpers/capabilities/_config_property.py @@ -0,0 +1,41 @@ +from __future__ import annotations + +import typing as t + +T = t.TypeVar("T") + + +class ConfigProperty(t.Generic[T]): + """A descriptor that gets a value from a named key of the config attribute.""" + + def __init__(self, custom_key: str | None = None, *, default: T | None = None): + """Initialize the descriptor. + + Args: + custom_key: The key to get from the config attribute instead of the + attribute name. + default: The default value if the key is not found. + """ + self.key = custom_key + self.default = default + + def __set_name__(self, owner, name: str) -> None: # noqa: ANN001 + """Set the name of the attribute. + + Args: + owner: The class of the object. + name: The name of the attribute. + """ + self.key = self.key or name + + def __get__(self, instance, owner) -> T | None: # noqa: ANN001 + """Get the value from the instance's config attribute. + + Args: + instance: The instance of the object. + owner: The class of the object. + + Returns: + The value from the config attribute. + """ + return instance.config.get(self.key, self.default) # type: ignore[no-any-return] diff --git a/singer_sdk/helpers/capabilities/_enum.py b/singer_sdk/helpers/capabilities/_enum.py new file mode 100644 index 000000000..95450fb80 --- /dev/null +++ b/singer_sdk/helpers/capabilities/_enum.py @@ -0,0 +1,200 @@ +from __future__ import annotations + +import enum +import typing as t +import warnings + +_EnumMemberT = t.TypeVar("_EnumMemberT") + + +class TargetLoadMethods(str, enum.Enum): + """Target-specific capabilities.""" + + # always write all input records whether that records already exists or not + APPEND_ONLY = "append-only" + + # update existing records and insert new records + UPSERT = "upsert" + + # delete all existing records and insert all input records + OVERWRITE = "overwrite" + + +class DeprecatedEnum(enum.Enum): + """Base class for capabilities enumeration.""" + + def __new__( + cls, + value: _EnumMemberT, + deprecation: str | None = None, + ) -> DeprecatedEnum: + """Create a new enum member. + + Args: + value: Enum member value. + deprecation: Deprecation message. + + Returns: + An enum member value. + """ + member: DeprecatedEnum = object.__new__(cls) + member._value_ = value + member.deprecation = deprecation + return member + + @property + def deprecation_message(self) -> str | None: + """Get deprecation message. + + Returns: + Deprecation message. + """ + self.deprecation: str | None + return self.deprecation + + def emit_warning(self) -> None: + """Emit deprecation warning.""" + warnings.warn( + f"{self.name} is deprecated. {self.deprecation_message}", + DeprecationWarning, + stacklevel=3, + ) + + +class DeprecatedEnumMeta(enum.EnumMeta): + """Metaclass for enumeration with deprecation support.""" + + def __getitem__(self, name: str) -> t.Any: # noqa: ANN401 + """Retrieve mapping item. + + Args: + name: Item name. + + Returns: + Enum member. + """ + obj: enum.Enum = super().__getitem__(name) + if isinstance(obj, DeprecatedEnum) and obj.deprecation_message: + obj.emit_warning() + return obj + + def __getattribute__(cls, name: str) -> t.Any: # noqa: ANN401, N805 + """Retrieve enum attribute. + + Args: + name: Attribute name. + + Returns: + Attribute. + """ + obj = super().__getattribute__(name) + if isinstance(obj, DeprecatedEnum) and obj.deprecation_message: + obj.emit_warning() + return obj + + def __call__(self, *args: t.Any, **kwargs: t.Any) -> t.Any: # noqa: ANN401 + """Call enum member. + + Args: + args: Positional arguments. + kwargs: Keyword arguments. + + Returns: + Enum member. + """ + obj = super().__call__(*args, **kwargs) + if isinstance(obj, DeprecatedEnum) and obj.deprecation_message: + obj.emit_warning() + return obj + + +class CapabilitiesEnum(DeprecatedEnum, metaclass=DeprecatedEnumMeta): + """Base capabilities enumeration.""" + + def __str__(self) -> str: + """String representation. + + Returns: + Stringified enum value. + """ + return str(self.value) + + def __repr__(self) -> str: + """String representation. + + Returns: + Stringified enum value. + """ + return str(self.value) + + +class PluginCapabilities(CapabilitiesEnum): + """Core capabilities which can be supported by taps and targets.""" + + #: Support plugin capability and setting discovery. + ABOUT = "about" + + #: Support :doc:`inline stream map transforms`. + STREAM_MAPS = "stream-maps" + + #: Support schema flattening, aka de-nesting of complex properties. + FLATTENING = "schema-flattening" + + #: Support the + #: `ACTIVATE_VERSION `_ + #: extension. + ACTIVATE_VERSION = "activate-version" + + #: Input and output from + #: `batched files `_. + #: A.K.A ``FAST_SYNC``. + BATCH = "batch" + + +class TapCapabilities(CapabilitiesEnum): + """Tap-specific capabilities.""" + + #: Generate a catalog with `--discover`. + DISCOVER = "discover" + + #: Accept input catalog, apply metadata and selection rules. + CATALOG = "catalog" + + #: Incremental refresh by means of state tracking. + STATE = "state" + + #: Automatic connectivity and stream init test via :ref:`--test`. + TEST = "test" + + #: Support for ``replication_method: LOG_BASED``. You can read more about this + #: feature in `MeltanoHub `_. + LOG_BASED = "log-based" + + #: Deprecated. Please use :attr:`~TapCapabilities.CATALOG` instead. + PROPERTIES = "properties", "Please use CATALOG instead." + + +class TargetCapabilities(CapabilitiesEnum): + """Target-specific capabilities.""" + + #: Allows a ``soft_delete=True`` config option. + #: Requires a tap stream supporting :attr:`PluginCapabilities.ACTIVATE_VERSION` + #: and/or :attr:`TapCapabilities.LOG_BASED`. + SOFT_DELETE = "soft-delete" + + #: Allows a ``hard_delete=True`` config option. + #: Requires a tap stream supporting :attr:`PluginCapabilities.ACTIVATE_VERSION` + #: and/or :attr:`TapCapabilities.LOG_BASED`. + HARD_DELETE = "hard-delete" + + #: Fail safe for unknown JSON Schema types. + DATATYPE_FAILSAFE = "datatype-failsafe" + + #: Allow de-nesting complex properties. + RECORD_FLATTENING = "record-flattening" + + #: Allow setting the target schema. + TARGET_SCHEMA = "target-schema" + + #: Validate the schema of the incoming records. + VALIDATE_RECORDS = "validate-records" diff --git a/singer_sdk/helpers/capabilities/_schema.py b/singer_sdk/helpers/capabilities/_schema.py new file mode 100644 index 000000000..a971b39a9 --- /dev/null +++ b/singer_sdk/helpers/capabilities/_schema.py @@ -0,0 +1,182 @@ +"""Default JSON Schema to support config for built-in capabilities.""" + +from __future__ import annotations + +from singer_sdk.typing import ( + ArrayType, + BooleanType, + IntegerType, + NumberType, + ObjectType, + OneOf, + PropertiesList, + Property, + StringType, +) + +from ._enum import TargetLoadMethods + +STREAM_MAPS_CONFIG = PropertiesList( + Property( + "stream_maps", + ObjectType(), + description=( + "Config object for stream maps capability. " + "For more information check out " + "[Stream Maps](https://sdk.meltano.com/en/latest/stream_maps.html)." + ), + ), + Property( + "stream_map_config", + ObjectType(), + description="User-defined config values to be used within map expressions.", + ), + Property( + "faker_config", + ObjectType( + Property( + "seed", + OneOf(NumberType, StringType, BooleanType), + description=( + "Value to seed the Faker generator for deterministic output: " + "https://faker.readthedocs.io/en/master/#seeding-the-generator" + ), + ), + Property( + "locale", + OneOf(StringType, ArrayType(StringType)), + description=( + "One or more LCID locale strings to produce localized output for: " + "https://faker.readthedocs.io/en/master/#localization" + ), + ), + ), + description=( + "Config for the [`Faker`](https://faker.readthedocs.io/en/master/) " + "instance variable `fake` used within map expressions. Only applicable if " + "the plugin specifies `faker` as an additional dependency (through the " + "`singer-sdk` `faker` extra or directly)." + ), + ), +).to_dict() + +FLATTENING_CONFIG = PropertiesList( + Property( + "flattening_enabled", + BooleanType(), + description=( + "'True' to enable schema flattening and automatically expand nested " + "properties." + ), + ), + Property( + "flattening_max_depth", + IntegerType(), + description="The max depth to flatten schemas.", + ), +).to_dict() + +BATCH_CONFIG = PropertiesList( + Property( + "batch_config", + description="", + wrapped=ObjectType( + Property( + "encoding", + description="Specifies the format and compression of the batch files.", + wrapped=ObjectType( + Property( + "format", + StringType, + allowed_values=["jsonl", "parquet"], + description="Format to use for batch files.", + ), + Property( + "compression", + StringType, + allowed_values=["gzip", "none"], + description="Compression format to use for batch files.", + ), + ), + ), + Property( + "storage", + description="Defines the storage layer to use when writing batch files", + wrapped=ObjectType( + Property( + "root", + StringType, + description="Root path to use when writing batch files.", + ), + Property( + "prefix", + StringType, + description="Prefix to use when writing batch files.", + ), + ), + ), + ), + ), +).to_dict() + +TARGET_SCHEMA_CONFIG = PropertiesList( + Property( + "default_target_schema", + StringType(), + description="The default target database schema name to use for all streams.", + ), +).to_dict() + +ADD_RECORD_METADATA_CONFIG = PropertiesList( + Property( + "add_record_metadata", + BooleanType(), + description="Add metadata to records.", + ), +).to_dict() + +TARGET_HARD_DELETE_CONFIG = PropertiesList( + Property( + "hard_delete", + BooleanType(), + description="Hard delete records.", + default=False, + ), +).to_dict() + +TARGET_VALIDATE_RECORDS_CONFIG = PropertiesList( + Property( + "validate_records", + BooleanType(), + description="Whether to validate the schema of the incoming streams.", + default=True, + ), +).to_dict() + +TARGET_BATCH_SIZE_ROWS_CONFIG = PropertiesList( + Property( + "batch_size_rows", + IntegerType, + description="Maximum number of rows in each batch.", + ), +).to_dict() + +TARGET_LOAD_METHOD_CONFIG = PropertiesList( + Property( + "load_method", + StringType(), + description=( + "The method to use when loading data into the destination. " + "`append-only` will always write all input records whether that records " + "already exists or not. `upsert` will update existing records and insert " + "new records. `overwrite` will delete all existing records and insert all " + "input records." + ), + allowed_values=[ + TargetLoadMethods.APPEND_ONLY, + TargetLoadMethods.UPSERT, + TargetLoadMethods.OVERWRITE, + ], + default=TargetLoadMethods.APPEND_ONLY, + ), +).to_dict() diff --git a/singer_sdk/mapper.py b/singer_sdk/mapper.py index 8d7dd3322..c7f1071c2 100644 --- a/singer_sdk/mapper.py +++ b/singer_sdk/mapper.py @@ -626,7 +626,7 @@ def _init_faker_instance(self) -> Faker | None: class PluginMapper: - """Inline map tranformer.""" + """Inline map transformer.""" def __init__( self, diff --git a/singer_sdk/mapper_base.py b/singer_sdk/mapper_base.py index 626133787..7614f1239 100644 --- a/singer_sdk/mapper_base.py +++ b/singer_sdk/mapper_base.py @@ -7,7 +7,6 @@ import click -from singer_sdk.helpers._classproperty import classproperty from singer_sdk.helpers.capabilities import CapabilitiesEnum, PluginCapabilities from singer_sdk.io_base import SingerReader, SingerWriter from singer_sdk.plugin_base import PluginBase @@ -19,16 +18,10 @@ class InlineMapper(PluginBase, SingerReader, SingerWriter, metaclass=abc.ABCMeta): """Abstract base class for inline mappers.""" - @classproperty - def capabilities(self) -> list[CapabilitiesEnum]: # noqa: PLR6301 - """Get capabilities. - - Returns: - A list of plugin capabilities. - """ - return [ - PluginCapabilities.STREAM_MAPS, - ] + #: A list of plugin capabilities. + capabilities: t.ClassVar[list[CapabilitiesEnum]] = [ + PluginCapabilities.STREAM_MAPS, + ] def _write_messages(self, messages: t.Iterable[singer.Message]) -> None: for message in messages: diff --git a/singer_sdk/plugin_base.py b/singer_sdk/plugin_base.py index 1564558cf..d4097ef13 100644 --- a/singer_sdk/plugin_base.py +++ b/singer_sdk/plugin_base.py @@ -22,15 +22,10 @@ parse_environment_config, ) from singer_sdk.exceptions import ConfigValidationError +from singer_sdk.helpers import capabilities from singer_sdk.helpers._classproperty import classproperty from singer_sdk.helpers._secrets import SecretString, is_common_secret_key from singer_sdk.helpers._util import read_json_file -from singer_sdk.helpers.capabilities import ( - FLATTENING_CONFIG, - STREAM_MAPS_CONFIG, - CapabilitiesEnum, - PluginCapabilities, -) from singer_sdk.mapper import PluginMapper from singer_sdk.typing import extend_validator_with_defaults @@ -84,7 +79,7 @@ def invoke(self, ctx: click.Context) -> t.Any: # noqa: ANN401 sys.exit(1) -class PluginBase(metaclass=abc.ABCMeta): # noqa: PLR0904 +class PluginBase(metaclass=abc.ABCMeta): """Abstract base class for taps.""" #: The executable name of the tap or target plugin. e.g. tap-foo @@ -98,6 +93,14 @@ class PluginBase(metaclass=abc.ABCMeta): # noqa: PLR0904 _config: dict + #: Advertised built-in plugin capabilities. Developers may override this property + #: in order to add or remove advertised capabilities for this plugin. + capabilities: t.ClassVar[list[capabilities.CapabilitiesEnum]] = [ + capabilities.PluginCapabilities.STREAM_MAPS, + capabilities.PluginCapabilities.FLATTENING, + capabilities.PluginCapabilities.BATCH, + ] + @classproperty def logger(cls) -> logging.Logger: # noqa: N805 """Get logger. @@ -210,22 +213,6 @@ def initialized_at(self) -> int: """ return self.__initialized_at - @classproperty - def capabilities(self) -> list[CapabilitiesEnum]: # noqa: PLR6301 - """Get capabilities. - - Developers may override this property in oder to add or remove - advertised capabilities for this plugin. - - Returns: - A list of plugin capabilities. - """ - return [ - PluginCapabilities.STREAM_MAPS, - PluginCapabilities.FLATTENING, - PluginCapabilities.BATCH, - ] - @classproperty def _env_var_config(cls) -> dict[str, t.Any]: # noqa: N805 """Return any config specified in environment variables. @@ -443,12 +430,17 @@ def append_builtin_config(cls: type[PluginBase], config_jsonschema: dict) -> Non Args: config_jsonschema: [description] """ - capabilities = cls.capabilities - if PluginCapabilities.STREAM_MAPS in capabilities: - merge_missing_config_jsonschema(STREAM_MAPS_CONFIG, config_jsonschema) + if capabilities.STREAM_MAPS.capability in cls.capabilities: + merge_missing_config_jsonschema( + capabilities.STREAM_MAPS.schema, + config_jsonschema, + ) - if PluginCapabilities.FLATTENING in capabilities: - merge_missing_config_jsonschema(FLATTENING_CONFIG, config_jsonschema) + if capabilities.FLATTENING.capability in cls.capabilities: + merge_missing_config_jsonschema( + capabilities.FLATTENING.schema, + config_jsonschema, + ) @classmethod def print_about( diff --git a/singer_sdk/sinks/core.py b/singer_sdk/sinks/core.py index a2f54c8ca..e1756601c 100644 --- a/singer_sdk/sinks/core.py +++ b/singer_sdk/sinks/core.py @@ -9,7 +9,6 @@ import json import time import typing as t -from functools import cached_property from gzip import GzipFile from gzip import open as gzip_open from types import MappingProxyType @@ -22,6 +21,7 @@ InvalidRecord, MissingKeyPropertiesError, ) +from singer_sdk.helpers import capabilities from singer_sdk.helpers._batch import ( BaseBatchFileEncoding, BatchConfig, @@ -138,6 +138,17 @@ class Sink(metaclass=abc.ABCMeta): # noqa: PLR0904 fail_on_record_validation_exception: bool = True """Interrupt the target execution when a record fails schema validation.""" + #: Enable JSON schema record validation. + validate_schema = capabilities.TARGET_VALIDATE_RECORDS.attribute( + "validate_records", + default=True, + ) + + include_sdc_metadata_properties = capabilities.ADD_RECORD_METADATA.attribute( + "add_record_metadata", + default=False, + ) + def __init__( self, target: Target, @@ -189,15 +200,6 @@ def __init__( self._validator: BaseJSONSchemaValidator | None = self.get_validator() - @cached_property - def validate_schema(self) -> bool: - """Enable JSON schema record validation. - - Returns: - True if JSON schema validation is enabled. - """ - return self.config.get("validate_records", True) - def get_validator(self) -> BaseJSONSchemaValidator | None: """Get a record validator for this sink. @@ -359,15 +361,6 @@ def batch_config(self) -> BatchConfig | None: raw = self.config.get("batch_config") return BatchConfig.from_dict(raw) if raw else None - @property - def include_sdc_metadata_properties(self) -> bool: - """Check if metadata columns should be added. - - Returns: - True if metadata columns should be added. - """ - return self.config.get("add_record_metadata", False) - @property def datetime_error_treatment(self) -> DatetimeErrorTreatmentEnum: """Return a treatment to use for datetime parse errors: ERROR. MAX, or NULL. diff --git a/singer_sdk/tap_base.py b/singer_sdk/tap_base.py index d8fb75a8f..ff807184c 100644 --- a/singer_sdk/tap_base.py +++ b/singer_sdk/tap_base.py @@ -17,16 +17,9 @@ AbortedSyncPausedException, ConfigValidationError, ) -from singer_sdk.helpers import _state -from singer_sdk.helpers._classproperty import classproperty +from singer_sdk.helpers import _state, capabilities from singer_sdk.helpers._state import write_stream_state from singer_sdk.helpers._util import read_json_file -from singer_sdk.helpers.capabilities import ( - BATCH_CONFIG, - CapabilitiesEnum, - PluginCapabilities, - TapCapabilities, -) from singer_sdk.io_base import SingerWriter from singer_sdk.plugin_base import PluginBase @@ -59,6 +52,17 @@ class Tap(PluginBase, SingerWriter, metaclass=abc.ABCMeta): # noqa: PLR0904 """Whether the tap's catalog is dynamic. Set to True if the catalog is generated dynamically (e.g. by querying a database's system tables).""" + #: A list of capabilities supported by this tap. + capabilities: t.ClassVar[list[capabilities.CapabilitiesEnum]] = [ + capabilities.TapCapabilities.CATALOG, + capabilities.TapCapabilities.STATE, + capabilities.TapCapabilities.DISCOVER, + capabilities.PluginCapabilities.ABOUT, + capabilities.PluginCapabilities.STREAM_MAPS, + capabilities.PluginCapabilities.FLATTENING, + capabilities.PluginCapabilities.BATCH, + ] + # Constructor def __init__( @@ -179,23 +183,6 @@ def setup_mapper(self) -> None: super().setup_mapper() self.mapper.register_raw_streams_from_catalog(self.catalog) - @classproperty - def capabilities(self) -> list[CapabilitiesEnum]: # noqa: PLR6301 - """Get tap capabilities. - - Returns: - A list of capabilities supported by this tap. - """ - return [ - TapCapabilities.CATALOG, - TapCapabilities.STATE, - TapCapabilities.DISCOVER, - PluginCapabilities.ABOUT, - PluginCapabilities.STREAM_MAPS, - PluginCapabilities.FLATTENING, - PluginCapabilities.BATCH, - ] - @classmethod def append_builtin_config(cls: type[PluginBase], config_jsonschema: dict) -> None: """Appends built-in config to `config_jsonschema` if not already set. @@ -214,9 +201,11 @@ def append_builtin_config(cls: type[PluginBase], config_jsonschema: dict) -> Non """ PluginBase.append_builtin_config(config_jsonschema) - capabilities = cls.capabilities - if PluginCapabilities.BATCH in capabilities: - merge_missing_config_jsonschema(BATCH_CONFIG, config_jsonschema) + if capabilities.BATCH.capability in cls.capabilities: # pragma: no branch + merge_missing_config_jsonschema( + capabilities.BATCH.schema, + config_jsonschema, + ) # Connection and sync tests: diff --git a/singer_sdk/target_base.py b/singer_sdk/target_base.py index d560a555e..780d760d1 100644 --- a/singer_sdk/target_base.py +++ b/singer_sdk/target_base.py @@ -13,20 +13,8 @@ from joblib import Parallel, delayed, parallel_config from singer_sdk.exceptions import RecordsWithoutSchemaException +from singer_sdk.helpers import capabilities from singer_sdk.helpers._batch import BaseBatchFileEncoding -from singer_sdk.helpers._classproperty import classproperty -from singer_sdk.helpers.capabilities import ( - ADD_RECORD_METADATA_CONFIG, - BATCH_CONFIG, - TARGET_BATCH_SIZE_ROWS_CONFIG, - TARGET_HARD_DELETE_CONFIG, - TARGET_LOAD_METHOD_CONFIG, - TARGET_SCHEMA_CONFIG, - TARGET_VALIDATE_RECORDS_CONFIG, - CapabilitiesEnum, - PluginCapabilities, - TargetCapabilities, -) from singer_sdk.io_base import SingerMessageType, SingerReader from singer_sdk.plugin_base import PluginBase @@ -56,6 +44,14 @@ class Target(PluginBase, SingerReader, metaclass=abc.ABCMeta): # Required if `Target.get_sink_class()` is not defined. default_sink_class: type[Sink] + #: Target capabilities. + capabilities: t.ClassVar[list[capabilities.CapabilitiesEnum]] = [ + capabilities.PluginCapabilities.ABOUT, + capabilities.PluginCapabilities.STREAM_MAPS, + capabilities.PluginCapabilities.FLATTENING, + capabilities.TargetCapabilities.VALIDATE_RECORDS, + ] + def __init__( self, *, @@ -95,20 +91,6 @@ def __init__( if setup_mapper: self.setup_mapper() - @classproperty - def capabilities(self) -> list[CapabilitiesEnum]: # noqa: PLR6301 - """Get target capabilities. - - Returns: - A list of capabilities supported by this target. - """ - return [ - PluginCapabilities.ABOUT, - PluginCapabilities.STREAM_MAPS, - PluginCapabilities.FLATTENING, - TargetCapabilities.VALIDATE_RECORDS, - ] - @property def max_parallelism(self) -> int: """Get max parallel sinks. @@ -610,17 +592,18 @@ def _merge_missing(source_jsonschema: dict, target_jsonschema: dict) -> None: if k not in target_jsonschema["properties"]: target_jsonschema["properties"][k] = v - _merge_missing(ADD_RECORD_METADATA_CONFIG, config_jsonschema) - _merge_missing(TARGET_LOAD_METHOD_CONFIG, config_jsonschema) - _merge_missing(TARGET_BATCH_SIZE_ROWS_CONFIG, config_jsonschema) - - capabilities = cls.capabilities + _merge_missing(capabilities.ADD_RECORD_METADATA.schema, config_jsonschema) + _merge_missing(capabilities.TARGET_LOAD_METHOD.schema, config_jsonschema) + _merge_missing(capabilities.TARGET_BATCH_SIZE_ROWS.schema, config_jsonschema) - if PluginCapabilities.BATCH in capabilities: - _merge_missing(BATCH_CONFIG, config_jsonschema) + if capabilities.BATCH.capability in cls.capabilities: + _merge_missing(capabilities.BATCH.schema, config_jsonschema) - if TargetCapabilities.VALIDATE_RECORDS in capabilities: - _merge_missing(TARGET_VALIDATE_RECORDS_CONFIG, config_jsonschema) + if capabilities.TARGET_VALIDATE_RECORDS.capability in cls.capabilities: + _merge_missing( + capabilities.TARGET_VALIDATE_RECORDS.schema, + config_jsonschema, + ) super().append_builtin_config(config_jsonschema) @@ -632,6 +615,13 @@ class SQLTarget(Target): default_sink_class: type[SQLSink] + #: Target capabilities. + capabilities: t.ClassVar[list[capabilities.CapabilitiesEnum]] = [ + *Target.capabilities, + capabilities.TargetCapabilities.TARGET_SCHEMA, + capabilities.TargetCapabilities.HARD_DELETE, + ] + @property def target_connector(self) -> SQLConnector: """The connector object. @@ -645,23 +635,6 @@ def target_connector(self) -> SQLConnector: ) return self._target_connector - @classproperty - def capabilities(self) -> list[CapabilitiesEnum]: - """Get target capabilities. - - Returns: - A list of capabilities supported by this target. - """ - sql_target_capabilities: list[CapabilitiesEnum] = super().capabilities - sql_target_capabilities.extend( - [ - TargetCapabilities.TARGET_SCHEMA, - TargetCapabilities.HARD_DELETE, - ] - ) - - return sql_target_capabilities - @classmethod def append_builtin_config(cls: type[SQLTarget], config_jsonschema: dict) -> None: """Appends built-in config to `config_jsonschema` if not already set. @@ -685,13 +658,11 @@ def _merge_missing(source_jsonschema: dict, target_jsonschema: dict) -> None: if k not in target_jsonschema["properties"]: target_jsonschema["properties"][k] = v - capabilities = cls.capabilities - - if TargetCapabilities.TARGET_SCHEMA in capabilities: - _merge_missing(TARGET_SCHEMA_CONFIG, config_jsonschema) + if capabilities.TARGET_SCHEMA.capability in cls.capabilities: + _merge_missing(capabilities.TARGET_SCHEMA.schema, config_jsonschema) - if TargetCapabilities.HARD_DELETE in capabilities: - _merge_missing(TARGET_HARD_DELETE_CONFIG, config_jsonschema) + if capabilities.TARGET_HARD_DELETE.capability in cls.capabilities: + _merge_missing(capabilities.TARGET_HARD_DELETE.schema, config_jsonschema) super().append_builtin_config(config_jsonschema) diff --git a/tests/core/helpers/__init__.py b/tests/core/helpers/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/core/helpers/capabilities/__init__.py b/tests/core/helpers/capabilities/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/core/test_capabilities.py b/tests/core/helpers/capabilities/test_capabilities.py similarity index 63% rename from tests/core/test_capabilities.py rename to tests/core/helpers/capabilities/test_capabilities.py index 49149469a..b4297c8d5 100644 --- a/tests/core/test_capabilities.py +++ b/tests/core/helpers/capabilities/test_capabilities.py @@ -16,10 +16,36 @@ class DummyCapabilitiesEnum(CapabilitiesEnum): def test_deprecated_capabilities(): + # Dictionary access + with warnings.catch_warnings(): + warnings.simplefilter("error") + _ = DummyCapabilitiesEnum["MY_SUPPORTED_FEATURE"] + + # Call + with warnings.catch_warnings(): + warnings.simplefilter("error") + _ = DummyCapabilitiesEnum("supported") + + # Attribute access with warnings.catch_warnings(): warnings.simplefilter("error") _ = DummyCapabilitiesEnum.MY_SUPPORTED_FEATURE + # Dictionary access + with pytest.warns( + DeprecationWarning, + match="is deprecated. No longer supported", + ) as record: + _ = DummyCapabilitiesEnum["MY_DEPRECATED_FEATURE"] + + # Call + with pytest.warns( + DeprecationWarning, + match="is deprecated. No longer supported", + ) as record: + DummyCapabilitiesEnum("deprecated") + + # Attribute access with pytest.warns( DeprecationWarning, match="is deprecated. No longer supported", diff --git a/tests/core/helpers/capabilities/test_config_property.py b/tests/core/helpers/capabilities/test_config_property.py new file mode 100644 index 000000000..10165dcc5 --- /dev/null +++ b/tests/core/helpers/capabilities/test_config_property.py @@ -0,0 +1,33 @@ +"""Test the BuiltinSetting descriptor.""" + +from __future__ import annotations + +from singer_sdk.helpers.capabilities import ConfigProperty + + +def test_builtin_setting_descriptor(): + class ObjWithConfig: + example = ConfigProperty(default=1) + + def __init__(self): + self.config = {"example": 1} + + obj = ObjWithConfig() + assert obj.example == 1 + + obj.config["example"] = 2 + assert obj.example == 2 + + +def test_builtin_setting_descriptor_custom_key(): + class ObjWithConfig: + my_attr = ConfigProperty("example", default=1) + + def __init__(self): + self.config = {"example": 1} + + obj = ObjWithConfig() + assert obj.my_attr == 1 + + obj.config["example"] = 2 + assert obj.my_attr == 2