From fb1bf74bec6443eb84455eb623fdd1028fb6c553 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edgar=20Ram=C3=ADrez-Mondrag=C3=B3n?= Date: Tue, 26 Mar 2024 22:49:25 -0600 Subject: [PATCH] feat: Settings write-back --- docs/conf.py | 1 + docs/guides/index.md | 1 + docs/guides/signals.md | 49 ++++++++++++++++++++++ poetry.lock | 67 +++++++++++++++++------------- pyproject.toml | 1 + singer_sdk/authenticators.py | 8 ++-- singer_sdk/plugin_base.py | 76 +++++++++++++++++++++++++++++----- singer_sdk/streams/core.py | 7 ++-- tests/core/test_plugin_base.py | 8 ++++ 9 files changed, 171 insertions(+), 47 deletions(-) create mode 100644 docs/guides/signals.md diff --git a/docs/conf.py b/docs/conf.py index ee8b0b851..570c87f1b 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -152,6 +152,7 @@ # -- Options for intersphinx ----------------------------------------------------------- # https://www.sphinx-doc.org/en/master/usage/extensions/intersphinx.html#configuration intersphinx_mapping = { + "blinker": ("https://blinker.readthedocs.io/en/stable/", None), "requests": ("https://requests.readthedocs.io/en/latest/", None), "python": ("https://docs.python.org/3/", None), } diff --git a/docs/guides/index.md b/docs/guides/index.md index e86aa149c..f4739d106 100644 --- a/docs/guides/index.md +++ b/docs/guides/index.md @@ -8,4 +8,5 @@ The following pages contain useful information for developers building on top of porting pagination-classes custom-clis +signals ``` diff --git a/docs/guides/signals.md b/docs/guides/signals.md new file mode 100644 index 000000000..795ce3d9f --- /dev/null +++ b/docs/guides/signals.md @@ -0,0 +1,49 @@ +# Signals + +This guide will show you how to use the built-in [Blinker](inv:blinker:std:doc#index) signals in the Singer SDK. + +## Settings write-back + +The SDK provides a signal that allows you to write back settings to the configuration file. This is useful if you want to update the configuration file with new settings that were set during the run, like a `refresh_token`. + +```python +import requests +from singer_sdk.authenticators import OAuthAuthenticator +from singer_sdk.plugin_base import PluginBase + + +class RefreshTokenAuthenticator(OAuthAuthenticator): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.refresh_token = self.config["refresh_token"] + + @property + def oauth_request_body(self): + return { + "client_id": self.config["client_id"], + "client_secret": self.config["client_secret"], + "grant_type": "refresh_token", + "refresh_token": self.refresh_token, + "user_type": "Location", + } + + def update_access_token(self): + token_response = requests.post( + self.auth_endpoint, + headers=self._oauth_headers, + data=auth_request_payload, + timeout=60, + ) + token_response.raise_for_status() + token_json = token_response.json() + + self.access_token = token_json["access_token"] + self.refresh_token = token_json["refresh_token"] + PluginBase.config_updated.send(self, refresh_token=self.refresh_token) +``` + +In the example above, the `RefreshTokenAuthenticator` class is a subclass of `OAuthAuthenticator` that calls `PluginBase.config_updated.send` to send a signal to update the `refresh_token` in tap's configuration. + +```{note} +Only when a single file is passed via the `--config` command line option, the SDK will write back the settings to the same file. +``` diff --git a/poetry.lock b/poetry.lock index 848f23ce7..548fb60ab 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 1.7.1 and should not be changed by hand. +# This file is automatically @generated by Poetry 1.8.2 and should not be changed by hand. [[package]] name = "alabaster" @@ -158,19 +158,30 @@ charset-normalizer = ["charset-normalizer"] html5lib = ["html5lib"] lxml = ["lxml"] +[[package]] +name = "blinker" +version = "1.7.0" +description = "Fast, simple object-to-object and broadcast signaling" +optional = false +python-versions = ">=3.8" +files = [ + {file = "blinker-1.7.0-py3-none-any.whl", hash = "sha256:c3f865d4d54db7abc53758a01601cf343fe55b84c1de4e3fa910e420b438d5b9"}, + {file = "blinker-1.7.0.tar.gz", hash = "sha256:e6820ff6fa4e4d1d8e2747c2283749c3f547e4fee112b98555cdcdae32996182"}, +] + [[package]] name = "boto3" -version = "1.34.49" +version = "1.34.71" description = "The AWS SDK for Python" optional = true -python-versions = ">= 3.8" +python-versions = ">=3.8" files = [ - {file = "boto3-1.34.49-py3-none-any.whl", hash = "sha256:ce8d1de03024f52a1810e8d71ad4dba3a5b9bb48b35567191500e3432a9130b4"}, - {file = "boto3-1.34.49.tar.gz", hash = "sha256:96b9dc85ce8d52619b56ca7b1ac1423eaf0af5ce132904bcc8aa81396eec2abf"}, + {file = "boto3-1.34.71-py3-none-any.whl", hash = "sha256:7ce8c9a50af2f8a159a0dd86b40011d8dfdaba35005a118e51cd3ac72dc630f1"}, + {file = "boto3-1.34.71.tar.gz", hash = "sha256:d786e7fbe3c4152866199786468a625dc77b9f27294cd7ad4f63cd2e0c927287"}, ] [package.dependencies] -botocore = ">=1.34.49,<1.35.0" +botocore = ">=1.34.71,<1.35.0" jmespath = ">=0.7.1,<2.0.0" s3transfer = ">=0.10.0,<0.11.0" @@ -179,13 +190,13 @@ crt = ["botocore[crt] (>=1.21.0,<2.0a0)"] [[package]] name = "botocore" -version = "1.34.49" +version = "1.34.71" description = "Low-level, data-driven core of boto 3." optional = true -python-versions = ">= 3.8" +python-versions = ">=3.8" files = [ - {file = "botocore-1.34.49-py3-none-any.whl", hash = "sha256:4ed9d7603a04b5bb5bd5de63b513bc2c8a7e8b1cd0088229c5ceb461161f43b6"}, - {file = "botocore-1.34.49.tar.gz", hash = "sha256:d89410bc60673eaff1699f3f1fdcb0e3a5e1f7a6a048c0d88c3ce5c3549433ec"}, + {file = "botocore-1.34.71-py3-none-any.whl", hash = "sha256:3bc9e23aee73fe6f097823d61f79a8877790436038101a83fa96c7593e8109f8"}, + {file = "botocore-1.34.71.tar.gz", hash = "sha256:c58f9ed71af2ea53d24146187130541222d7de8c27eb87d23f15457e7b83d88b"}, ] [package.dependencies] @@ -193,7 +204,7 @@ jmespath = ">=0.7.1,<2.0.0" python-dateutil = ">=2.1,<3.0.0" urllib3 = [ {version = ">=1.25.4,<1.27", markers = "python_version < \"3.10\""}, - {version = ">=1.25.4,<2.1", markers = "python_version >= \"3.10\""}, + {version = ">=1.25.4,<2.2.0 || >2.2.0,<3", markers = "python_version >= \"3.10\""}, ] [package.extras] @@ -617,13 +628,13 @@ test = ["pytest (>=6)"] [[package]] name = "faker" -version = "24.3.0" +version = "24.4.0" description = "Faker is a Python package that generates fake data for you." optional = true python-versions = ">=3.8" files = [ - {file = "Faker-24.3.0-py3-none-any.whl", hash = "sha256:9978025e765ba79f8bf6154c9630a9c2b7f9c9b0f175d4ad5e04b19a82a8d8d6"}, - {file = "Faker-24.3.0.tar.gz", hash = "sha256:5fb5aa9749d09971e04a41281ae3ceda9414f683d4810a694f8a8eebb8f9edec"}, + {file = "Faker-24.4.0-py3-none-any.whl", hash = "sha256:998c29ee7d64429bd59204abffa9ba11f784fb26c7b9df4def78d1a70feb36a7"}, + {file = "Faker-24.4.0.tar.gz", hash = "sha256:a5ddccbe97ab691fad6bd8036c31f5697cfaa550e62e000078d1935fa8a7ec2e"}, ] [package.dependencies] @@ -646,18 +657,18 @@ devel = ["colorama", "json-spec", "jsonschema", "pylint", "pytest", "pytest-benc [[package]] name = "filelock" -version = "3.13.1" +version = "3.13.3" description = "A platform independent file lock." optional = false python-versions = ">=3.8" files = [ - {file = "filelock-3.13.1-py3-none-any.whl", hash = "sha256:57dbda9b35157b05fb3e58ee91448612eb674172fab98ee235ccb0b5bee19a1c"}, - {file = "filelock-3.13.1.tar.gz", hash = "sha256:521f5f56c50f8426f5e03ad3b281b490a87ef15bc6c526f168290f0c7148d44e"}, + {file = "filelock-3.13.3-py3-none-any.whl", hash = "sha256:5ffa845303983e7a0b7ae17636509bc97997d58afeafa72fb141a17b152284cb"}, + {file = "filelock-3.13.3.tar.gz", hash = "sha256:a79895a25bbefdf55d1a2a0a80968f7dbb28edcd6d4234a0afb3f37ecde4b546"}, ] [package.extras] -docs = ["furo (>=2023.9.10)", "sphinx (>=7.2.6)", "sphinx-autodoc-typehints (>=1.24)"] -testing = ["covdefaults (>=2.3)", "coverage (>=7.3.2)", "diff-cover (>=8)", "pytest (>=7.4.3)", "pytest-cov (>=4.1)", "pytest-mock (>=3.12)", "pytest-timeout (>=2.2)"] +docs = ["furo (>=2023.9.10)", "sphinx (>=7.2.6)", "sphinx-autodoc-typehints (>=1.25.2)"] +testing = ["covdefaults (>=2.3)", "coverage (>=7.3.2)", "diff-cover (>=8.0.1)", "pytest (>=7.4.3)", "pytest-cov (>=4.1)", "pytest-mock (>=3.12)", "pytest-timeout (>=2.2)"] typing = ["typing-extensions (>=4.8)"] [[package]] @@ -1880,13 +1891,13 @@ files = [ [[package]] name = "s3transfer" -version = "0.10.0" +version = "0.10.1" description = "An Amazon S3 Transfer Manager" optional = true python-versions = ">= 3.8" files = [ - {file = "s3transfer-0.10.0-py3-none-any.whl", hash = "sha256:3cdb40f5cfa6966e812209d0994f2a4709b561c88e90cf00c2696d2df4e56b2e"}, - {file = "s3transfer-0.10.0.tar.gz", hash = "sha256:d0c8bbf672d5eebbe4e57945e23b972d963f07d82f661cabf678a5c88831595b"}, + {file = "s3transfer-0.10.1-py3-none-any.whl", hash = "sha256:ceb252b11bcf87080fb7850a224fb6e05c8a776bab8f2b64b7f25b969464839d"}, + {file = "s3transfer-0.10.1.tar.gz", hash = "sha256:5683916b4c724f799e600f41dd9e10a9ff19871bf87623cc8f491cb4f5fa0a19"}, ] [package.dependencies] @@ -2622,18 +2633,18 @@ tests-strict = ["pytest (==4.6.0)", "pytest (==4.6.0)", "pytest (==6.2.5)", "pyt [[package]] name = "zipp" -version = "3.17.0" +version = "3.18.1" description = "Backport of pathlib-compatible object wrapper for zip files" optional = false python-versions = ">=3.8" files = [ - {file = "zipp-3.17.0-py3-none-any.whl", hash = "sha256:0e923e726174922dce09c53c59ad483ff7bbb8e572e00c7f7c46b88556409f31"}, - {file = "zipp-3.17.0.tar.gz", hash = "sha256:84e64a1c28cf7e91ed2078bb8cc8c259cb19b76942096c8d7b84947690cabaf0"}, + {file = "zipp-3.18.1-py3-none-any.whl", hash = "sha256:206f5a15f2af3dbaee80769fb7dc6f249695e940acca08dfb2a4769fe61e538b"}, + {file = "zipp-3.18.1.tar.gz", hash = "sha256:2884ed22e7d8961de1c9a05142eb69a247f120291bc0206a00a7642f09b5b715"}, ] [package.extras] -docs = ["furo", "jaraco.packaging (>=9.3)", "jaraco.tidelift (>=1.4)", "rst.linker (>=1.9)", "sphinx (<7.2.5)", "sphinx (>=3.5)", "sphinx-lint"] -testing = ["big-O", "jaraco.functools", "jaraco.itertools", "more-itertools", "pytest (>=6)", "pytest-black (>=0.3.7)", "pytest-checkdocs (>=2.4)", "pytest-cov", "pytest-enabler (>=2.2)", "pytest-ignore-flaky", "pytest-mypy (>=0.9.1)", "pytest-ruff"] +docs = ["furo", "jaraco.packaging (>=9.3)", "jaraco.tidelift (>=1.4)", "rst.linker (>=1.9)", "sphinx (>=3.5)", "sphinx-lint"] +testing = ["big-O", "jaraco.functools", "jaraco.itertools", "more-itertools", "pytest (>=6)", "pytest-checkdocs (>=2.4)", "pytest-cov", "pytest-enabler (>=2.2)", "pytest-ignore-flaky", "pytest-mypy", "pytest-ruff (>=0.2.1)"] [extras] docs = ["furo", "myst-parser", "pytest", "sphinx", "sphinx-autobuild", "sphinx-copybutton", "sphinx-inline-tabs", "sphinx-notfound-page", "sphinx-reredirects"] @@ -2645,4 +2656,4 @@ testing = ["pytest", "pytest-durations"] [metadata] lock-version = "2.0" python-versions = ">=3.8" -content-hash = "a1649e1ccefeed0391552089bf6be64ca6e8e54b6db177ac86e6ca730f677d01" +content-hash = "2923efb3e796fb7070479a2b49ea5d4a2edbc64d0c69529310fdbc6e208d7a9b" diff --git a/pyproject.toml b/pyproject.toml index 20c3cdc29..f32d1cdad 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -41,6 +41,7 @@ license = "Apache-2.0" python = ">=3.8" backoff = { version = ">=2.0.0", python = "<4" } backports-datetime-fromisoformat = { version = ">=2.0.1", python = "<3.11" } +blinker = ">=1.7.0" click = "~=8.0" cryptography = ">=3.4.6" fs = ">=2.4.16" diff --git a/singer_sdk/authenticators.py b/singer_sdk/authenticators.py index faf34e85a..651860bbe 100644 --- a/singer_sdk/authenticators.py +++ b/singer_sdk/authenticators.py @@ -7,7 +7,6 @@ import typing as t import warnings from datetime import timedelta -from types import MappingProxyType from urllib.parse import parse_qs, urlencode, urlsplit, urlunsplit import jwt @@ -19,6 +18,7 @@ if t.TYPE_CHECKING: import logging + from types import MappingProxyType from pendulum import DateTime @@ -93,19 +93,19 @@ def __init__(self, stream: RESTStream) -> None: stream: A stream for a RESTful endpoint. """ self.tap_name: str = stream.tap_name - self._config: dict[str, t.Any] = dict(stream.config) + self._config = stream.config self._auth_headers: dict[str, t.Any] = {} self._auth_params: dict[str, t.Any] = {} self.logger: logging.Logger = stream.logger @property - def config(self) -> t.Mapping[str, t.Any]: + def config(self) -> MappingProxyType: """Get stream or tap config. Returns: A frozen (read-only) config dictionary map. """ - return MappingProxyType(self._config) + return self._config @property def auth_headers(self) -> dict: diff --git a/singer_sdk/plugin_base.py b/singer_sdk/plugin_base.py index 1c2f46c97..40184e109 100644 --- a/singer_sdk/plugin_base.py +++ b/singer_sdk/plugin_base.py @@ -3,6 +3,7 @@ from __future__ import annotations import abc +import json import logging import os import sys @@ -13,6 +14,7 @@ from types import MappingProxyType import click +from blinker import Signal from jsonschema import Draft7Validator from singer_sdk import about, metrics @@ -98,6 +100,9 @@ class PluginBase(metaclass=abc.ABCMeta): _config: dict + # Signals + config_updated = Signal() + @classproperty def logger(cls) -> logging.Logger: # noqa: N805 """Get logger. @@ -134,10 +139,43 @@ def __init__( it can be a predetermined config dict. parse_env_config: True to parse settings from env vars. validate_config: True to require validation of config settings. + """ + self._config, self._config_path = self._process_config( + config=config, + parse_env_config=parse_env_config, + ) + metrics._setup_logging(self.config) # noqa: SLF001 + self.metrics_logger = metrics.get_metrics_logger() + + self._validate_config(raise_errors=validate_config) + self._mapper: PluginMapper | None = None + + # Initialization timestamp + self.__initialized_at = int(time.time() * 1000) + + self.config_updated.connect(self.update_config) + + def _process_config( + self, + *, + config: dict | PurePath | str | list[PurePath | str] | None = None, + parse_env_config: bool = False, + ) -> tuple[dict[str, t.Any], PurePath | str | None]: + """Process the plugin configuration. + + Args: + config: May be one or more paths, either as str or PurePath objects, or + it can be a predetermined config dict. + parse_env_config: True to parse settings from env vars. + + Returns: + A tuple containing the config dictionary and the config write-back path. Raises: ValueError: If config is not a dict or path string. """ + config_path = None + if not config: config_dict = {} elif isinstance(config, (str, PurePath)): @@ -148,28 +186,29 @@ def __init__( # Read each config file sequentially. Settings from files later in the # list will override those of earlier ones. config_dict.update(read_json_file(config_path)) + + if len(config) == 1 and not parse_env_config: + config_path = config[0] + elif isinstance(config, dict): config_dict = config - else: + else: # pragma: no cover msg = f"Error parsing config of type '{type(config).__name__}'." raise ValueError(msg) + + # Parse env var settings if parse_env_config: self.logger.info("Parsing env var for settings config...") config_dict.update(self._env_var_config) else: self.logger.info("Skipping parse of env var settings...") + + # Handle sensitive settings for k, v in config_dict.items(): if self._is_secret_config(k): config_dict[k] = SecretString(v) - self._config = config_dict - metrics._setup_logging(self.config) # noqa: SLF001 - self.metrics_logger = metrics.get_metrics_logger() - self._validate_config(raise_errors=validate_config) - self._mapper: PluginMapper | None = None - - # Initialization timestamp - self.__initialized_at = int(time.time() * 1000) + return config_dict, config_path def setup_mapper(self) -> None: """Initialize the plugin mapper for this tap.""" @@ -336,13 +375,28 @@ def state(self) -> dict: # Core plugin config: @property - def config(self) -> t.Mapping[str, t.Any]: + def config(self) -> MappingProxyType: """Get config. Returns: A frozen (read-only) config dictionary map. """ - return t.cast(dict, MappingProxyType(self._config)) + return MappingProxyType(self._config) + + def update_config(self, sender: t.Any, **settings: t.Any) -> None: # noqa: ANN401, ARG002 + """Update the config with new settings. + + This is a :external+blinker:std:doc:`Blinker ` signal receiver. + + Args: + sender: The sender of the signal. + **settings: New settings to update the config with. + """ + self._config.update(**settings) + if self._config_path is not None: # pragma: no cover + self.logger.info("Updating config file: %s", self._config_path) + with Path(self._config_path).open("w") as f: + json.dump(self._config, f) @staticmethod def _is_secret_config(config_key: str) -> bool: diff --git a/singer_sdk/streams/core.py b/singer_sdk/streams/core.py index e57d93a62..b83d17e60 100644 --- a/singer_sdk/streams/core.py +++ b/singer_sdk/streams/core.py @@ -9,7 +9,6 @@ import typing as t from os import PathLike from pathlib import Path -from types import MappingProxyType import pendulum @@ -52,6 +51,7 @@ if t.TYPE_CHECKING: import logging + from types import MappingProxyType from singer_sdk.helpers._compat import Traversable from singer_sdk.tap_base import Tap @@ -127,7 +127,6 @@ def __init__( self.logger: logging.Logger = tap.logger.getChild(self.name) self.metrics_logger = tap.metrics_logger self.tap_name: str = tap.name - self._config: dict = dict(tap.config) self._tap = tap self._tap_state = tap.state self._tap_input_catalog: singer.Catalog | None = None @@ -592,13 +591,13 @@ def _singer_catalog(self) -> singer.Catalog: return singer.Catalog([(self.tap_stream_id, self._singer_catalog_entry)]) @property - def config(self) -> t.Mapping[str, t.Any]: + def config(self) -> MappingProxyType[str, t.Any]: """Get stream configuration. Returns: A frozen (read-only) config dictionary map. """ - return MappingProxyType(self._config) + return self._tap.config @property def tap_stream_id(self) -> str: diff --git a/tests/core/test_plugin_base.py b/tests/core/test_plugin_base.py index 04b6a1665..92cee623b 100644 --- a/tests/core/test_plugin_base.py +++ b/tests/core/test_plugin_base.py @@ -57,3 +57,11 @@ def test_mapper_not_initialized(): def test_supported_python_versions(): """Test that supported python versions are correctly parsed.""" assert PluginBase._get_supported_python_versions(SDK_PACKAGE_NAME) + + +def test_config_updated_signal(): + plugin = PluginTest(config={"prop1": "hello"}) + assert plugin.config == {"prop1": "hello"} + + PluginBase.config_updated.send(prop2="abc") + assert plugin.config == {"prop1": "hello", "prop2": "abc"}