Skip to content

Commit

Permalink
fix(targets)!: Default handling of ACTIVATE_VERSION messages to sof…
Browse files Browse the repository at this point in the history
…t deletes and add new `SQLConnector.delete_old_versions` method (meltano#2105)

* fix(targets): Default handling of `ACTIVATE_VERSION` messages to soft deletes

* Test SQL target capabilities info
  • Loading branch information
edgarrmondragon authored Jan 10, 2024
1 parent f914452 commit a79cff8
Show file tree
Hide file tree
Showing 7 changed files with 103 additions and 13 deletions.
25 changes: 25 additions & 0 deletions singer_sdk/connectors/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -1189,3 +1189,28 @@ def deserialize_json(self, json_str: str) -> object:
.. versionadded:: 0.31.0
"""
return json.loads(json_str, parse_float=decimal.Decimal)

def delete_old_versions(
self,
*,
full_table_name: str,
version_column_name: str,
current_version: int,
) -> None:
"""Hard-deletes any old version rows from the table.
This is used to clean up old versions when an ACTIVATE_VERSION message is
received.
Args:
full_table_name: The fully qualified table name.
version_column_name: The name of the version column.
current_version: The current ACTIVATE version of the table.
"""
with self._connect() as conn, conn.begin():
conn.execute(
sa.text(
f"DELETE FROM {full_table_name} " # noqa: S608
f"WHERE {version_column_name} <= {current_version}",
),
)
8 changes: 8 additions & 0 deletions singer_sdk/helpers/capabilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,14 @@
description="Add metadata to records.",
),
).to_dict()
TARGET_HARD_DELETE_CONFIG = PropertiesList(
Property(
"hard_delete",
BooleanType(),
description="Hard delete records.",
default=False,
),
).to_dict()


class TargetLoadMethods(str, Enum):
Expand Down
14 changes: 6 additions & 8 deletions singer_sdk/sinks/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -378,14 +378,12 @@ def activate_version(self, new_version: int) -> None:
sql_type=sa.types.Integer(),
)

if self.config.get("hard_delete", True):
with self.connector._connect() as conn, conn.begin(): # noqa: SLF001
conn.execute(
sa.text(
f"DELETE FROM {self.full_table_name} " # noqa: S608
f"WHERE {self.version_column_name} <= {new_version}",
),
)
if self.config.get("hard_delete", False):
self.connector.delete_old_versions(
full_table_name=self.full_table_name,
version_column_name=self.version_column_name,
current_version=new_version,
)
return

if not self.connector.column_exists(
Expand Down
11 changes: 10 additions & 1 deletion singer_sdk/target_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from singer_sdk.helpers.capabilities import (
ADD_RECORD_METADATA_CONFIG,
BATCH_CONFIG,
TARGET_HARD_DELETE_CONFIG,
TARGET_LOAD_METHOD_CONFIG,
TARGET_SCHEMA_CONFIG,
CapabilitiesEnum,
Expand Down Expand Up @@ -636,7 +637,12 @@ def capabilities(self) -> list[CapabilitiesEnum]:
A list of capabilities supported by this target.
"""
sql_target_capabilities: list[CapabilitiesEnum] = super().capabilities
sql_target_capabilities.extend([TargetCapabilities.TARGET_SCHEMA])
sql_target_capabilities.extend(
[
TargetCapabilities.TARGET_SCHEMA,
TargetCapabilities.HARD_DELETE,
]
)

return sql_target_capabilities

Expand Down Expand Up @@ -668,6 +674,9 @@ def _merge_missing(source_jsonschema: dict, target_jsonschema: dict) -> None:
if TargetCapabilities.TARGET_SCHEMA in capabilities:
_merge_missing(TARGET_SCHEMA_CONFIG, config_jsonschema)

if TargetCapabilities.HARD_DELETE in capabilities:
_merge_missing(TARGET_HARD_DELETE_CONFIG, config_jsonschema)

super().append_builtin_config(config_jsonschema)

@final
Expand Down
Empty file added tests/core/targets/__init__.py
Empty file.
50 changes: 50 additions & 0 deletions tests/core/targets/test_target_sql.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
from __future__ import annotations

import typing as t

import pytest

from singer_sdk.helpers.capabilities import CapabilitiesEnum, TargetCapabilities
from singer_sdk.target_base import SQLTarget


class SQLTargetMock(SQLTarget):
name = "sql-target-mock"

def __init_subclass__(
cls,
*,
capabilities: t.Iterable[CapabilitiesEnum],
**kwargs: t.Any,
):
super().__init_subclass__(**kwargs)
cls.capabilities = [*capabilities]
cls.config_jsonschema = {"properties": {}}


@pytest.mark.parametrize(
"capabilities,expected_settings",
[
pytest.param([], set(), id="no capabilities"),
pytest.param(
[TargetCapabilities.TARGET_SCHEMA],
{"default_target_schema"},
id="default schema",
),
pytest.param(
[TargetCapabilities.HARD_DELETE],
{"hard_delete"},
id="hard delete",
),
],
)
def test_target_about_info(
capabilities: list[CapabilitiesEnum],
expected_settings: set[str],
):
class MyTarget(SQLTargetMock, capabilities=capabilities):
pass

about = MyTarget._get_about_info()
default_settings = {"add_record_metadata", "load_method"}
assert set(about.settings["properties"]) == expected_settings | default_settings
8 changes: 4 additions & 4 deletions tests/samples/test_target_sqlite.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,9 @@ def sqlite_sample_target(sqlite_target_test_config):


@pytest.fixture
def sqlite_sample_target_soft_delete(sqlite_target_test_config):
def sqlite_sample_target_hard_delete(sqlite_target_test_config):
"""Get a sample target object with hard_delete disabled."""
return SQLiteTarget(config={**sqlite_target_test_config, "hard_delete": False})
return SQLiteTarget(config={**sqlite_target_test_config, "hard_delete": True})


@pytest.fixture
Expand Down Expand Up @@ -217,7 +217,7 @@ def test_sqlite_column_addition(sqlite_sample_target: SQLTarget):

def test_sqlite_activate_version(
sqlite_sample_target: SQLTarget,
sqlite_sample_target_soft_delete: SQLTarget,
sqlite_sample_target_hard_delete: SQLTarget,
):
"""Test handling the activate_version message for the SQLite target.
Expand Down Expand Up @@ -249,7 +249,7 @@ def test_sqlite_activate_version(

target_sync_test(sqlite_sample_target, input=StringIO(tap_output), finalize=True)
target_sync_test(
sqlite_sample_target_soft_delete,
sqlite_sample_target_hard_delete,
input=StringIO(tap_output),
finalize=True,
)
Expand Down

0 comments on commit a79cff8

Please sign in to comment.