From cef8b80c6b4f26cdeea1f06dfcd5a14c62f8c8e7 Mon Sep 17 00:00:00 2001 From: Eugene Kulak Date: Tue, 22 Jun 2021 10:42:10 +0300 Subject: [PATCH] SAT: DX improvements, better error handling and more (#4260) small fixes for SAT for better DX: - better stack trace in case of error inside the connector, print only relevant information with proper formatting (multiline stack trace instead of single string) - better logging - print message about image pulling only when it actually happens, stop tests if image not found - using discovery command for json_schema, when configured_catalog will be loaded we populate `json_schema` from a schema that we get from discovery command, the result is cached for all session duration. - better record comparison, takes care of lists inside dicts - because lists are unordered we will have false negatives when compare serialized records. - copied pytest config to airbyte root folder, so when pytest runs tests locally it can find it, this will affect all local execution of pytest - add IPython as a standard debugger Co-authored-by: Eugene Kulak --- .../bases/source-acceptance-test/Dockerfile | 2 +- .../bases/source-acceptance-test/setup.py | 4 +- .../source_acceptance_test/conftest.py | 37 ++++++++++++++++--- .../source_acceptance_test/tests/test_core.py | 11 ++---- .../tests/test_full_refresh.py | 6 +-- .../source_acceptance_test/utils/__init__.py | 3 +- .../source_acceptance_test/utils/compare.py | 12 +++++- .../utils/connector_runner.py | 16 ++++++-- pytest.ini | 3 ++ 9 files changed, 67 insertions(+), 27 deletions(-) create mode 100644 pytest.ini diff --git a/airbyte-integrations/bases/source-acceptance-test/Dockerfile b/airbyte-integrations/bases/source-acceptance-test/Dockerfile index ec852b92c4796..9d12f0bed5f8f 100644 --- a/airbyte-integrations/bases/source-acceptance-test/Dockerfile +++ b/airbyte-integrations/bases/source-acceptance-test/Dockerfile @@ -8,7 +8,7 @@ COPY setup.py ./ COPY pytest.ini ./ RUN pip install . -LABEL io.airbyte.version=0.1.3 +LABEL io.airbyte.version=0.1.4 LABEL io.airbyte.name=airbyte/source-acceptance-test ENTRYPOINT ["python", "-m", "pytest", "-p", "source_acceptance_test.plugin"] diff --git a/airbyte-integrations/bases/source-acceptance-test/setup.py b/airbyte-integrations/bases/source-acceptance-test/setup.py index d9c45b1f5e495..e4a806ad4cde1 100644 --- a/airbyte-integrations/bases/source-acceptance-test/setup.py +++ b/airbyte-integrations/bases/source-acceptance-test/setup.py @@ -29,9 +29,9 @@ "airbyte-cdk~=0.1", "docker~=4.4", "PyYAML~=5.4", - "inflection~=0.5", "icdiff~=1.9", - "pendulum~=1.2", + "inflection~=0.5", + "pdbpp~=0.10", "pydantic~=1.6", "pytest~=6.1", "pytest-sugar~=0.9", diff --git a/airbyte-integrations/bases/source-acceptance-test/source_acceptance_test/conftest.py b/airbyte-integrations/bases/source-acceptance-test/source_acceptance_test/conftest.py index c482c6fd8b480..54279ac6dcd79 100644 --- a/airbyte-integrations/bases/source-acceptance-test/source_acceptance_test/conftest.py +++ b/airbyte-integrations/bases/source-acceptance-test/source_acceptance_test/conftest.py @@ -29,7 +29,8 @@ from typing import Any, List, MutableMapping, Optional import pytest -from airbyte_cdk.models import AirbyteCatalog, AirbyteRecordMessage, ConfiguredAirbyteCatalog, ConnectorSpecification +from airbyte_cdk.models import AirbyteCatalog, AirbyteRecordMessage, ConfiguredAirbyteCatalog, ConnectorSpecification, Type +from docker import errors from source_acceptance_test.config import Config from source_acceptance_test.utils import ConnectorRunner, SecretDict, load_config @@ -75,9 +76,12 @@ def configured_catalog_path_fixture(inputs, base_path) -> Optional[str]: @pytest.fixture(name="configured_catalog") -def configured_catalog_fixture(configured_catalog_path) -> Optional[ConfiguredAirbyteCatalog]: +def configured_catalog_fixture(configured_catalog_path, catalog_schemas) -> Optional[ConfiguredAirbyteCatalog]: if configured_catalog_path: - return ConfiguredAirbyteCatalog.parse_file(configured_catalog_path) + catalog = ConfiguredAirbyteCatalog.parse_file(configured_catalog_path) + for configured_stream in catalog.streams: + configured_stream.stream.json_schema = catalog_schemas.get(configured_stream.stream.name, {}) + return catalog return None @@ -128,9 +132,12 @@ def docker_runner_fixture(image_tag, tmp_path) -> ConnectorRunner: @pytest.fixture(scope="session", autouse=True) def pull_docker_image(acceptance_test_config) -> None: """Startup fixture to pull docker image""" - print("Pulling docker image", acceptance_test_config.connector_image) - ConnectorRunner(image_name=acceptance_test_config.connector_image, volume=Path(".")) - print("Pulling completed") + image_name = acceptance_test_config.connector_image + config_filename = "acceptance-test-config.yml" + try: + ConnectorRunner(image_name=image_name, volume=Path(".")) + except errors.ImageNotFound: + pytest.exit(f"Docker image `{image_name}` not found, please check your {config_filename} file", returncode=1) @pytest.fixture(name="expected_records") @@ -141,3 +148,21 @@ def expected_records_fixture(inputs, base_path) -> List[AirbyteRecordMessage]: with open(str(base_path / getattr(expect_records, "path"))) as f: return [AirbyteRecordMessage.parse_raw(line) for line in f] + + +@pytest.fixture(name="cached_schemas", scope="session") +def cached_schemas_fixture() -> MutableMapping[str, Any]: + """Simple cache for discovered catalog: stream_name -> json_schema""" + return {} + + +@pytest.fixture(name="catalog_schemas") +def catalog_schemas_fixture(connector_config, docker_runner: ConnectorRunner, cached_schemas) -> MutableMapping[str, Any]: + """JSON schemas for each stream""" + if not cached_schemas: + output = docker_runner.call_discover(config=connector_config) + catalogs = [message.catalog for message in output if message.type == Type.CATALOG] + for stream in catalogs[-1].streams: + cached_schemas[stream.name] = stream.json_schema + + return cached_schemas diff --git a/airbyte-integrations/bases/source-acceptance-test/source_acceptance_test/tests/test_core.py b/airbyte-integrations/bases/source-acceptance-test/source_acceptance_test/tests/test_core.py index 285660308d76c..e377d68d04313 100644 --- a/airbyte-integrations/bases/source-acceptance-test/source_acceptance_test/tests/test_core.py +++ b/airbyte-integrations/bases/source-acceptance-test/source_acceptance_test/tests/test_core.py @@ -23,7 +23,6 @@ # -import json from collections import Counter, defaultdict from typing import Any, List, Mapping, MutableMapping @@ -32,7 +31,7 @@ from docker.errors import ContainerError from source_acceptance_test.base import BaseTest from source_acceptance_test.config import BasicReadTestConfig, ConnectionTestConfig -from source_acceptance_test.utils import ConnectorRunner +from source_acceptance_test.utils import ConnectorRunner, serialize @pytest.mark.timeout(10) @@ -152,8 +151,8 @@ def compare_records(stream_name, actual, expected, extra_fields, exact_order, ex r2 = TestBasicRead.remove_extra_fields(r2, r1) assert r1 == r2, f"Stream {stream_name}: Mismatch of record order or values" else: - expected = set(map(TestBasicRead.serialize_record_for_comparison, expected)) - actual = set(map(TestBasicRead.serialize_record_for_comparison, actual)) + expected = set(map(serialize, expected)) + actual = set(map(serialize, actual)) missing_expected = set(expected) - set(actual) assert not missing_expected, f"Stream {stream_name}: All expected records must be produced" @@ -170,7 +169,3 @@ def group_by_stream(records) -> MutableMapping[str, List[MutableMapping]]: result[record.stream].append(record.data) return result - - @staticmethod - def serialize_record_for_comparison(record: Mapping) -> str: - return json.dumps(record, sort_keys=True) diff --git a/airbyte-integrations/bases/source-acceptance-test/source_acceptance_test/tests/test_full_refresh.py b/airbyte-integrations/bases/source-acceptance-test/source_acceptance_test/tests/test_full_refresh.py index 01c3ad0f0fa32..3c1a414771f61 100644 --- a/airbyte-integrations/bases/source-acceptance-test/source_acceptance_test/tests/test_full_refresh.py +++ b/airbyte-integrations/bases/source-acceptance-test/source_acceptance_test/tests/test_full_refresh.py @@ -23,13 +23,10 @@ # -import json -from functools import partial - import pytest from airbyte_cdk.models import Type from source_acceptance_test.base import BaseTest -from source_acceptance_test.utils import ConnectorRunner, full_refresh_only_catalog +from source_acceptance_test.utils import ConnectorRunner, full_refresh_only_catalog, serialize @pytest.mark.timeout(20 * 60) @@ -41,7 +38,6 @@ def test_sequential_reads(self, connector_config, configured_catalog, docker_run output = docker_runner.call_read(connector_config, configured_catalog) records_2 = [message.record.data for message in output if message.type == Type.RECORD] - serialize = partial(json.dumps, sort_keys=True) assert not ( set(map(serialize, records_1)) - set(map(serialize, records_2)) diff --git a/airbyte-integrations/bases/source-acceptance-test/source_acceptance_test/utils/__init__.py b/airbyte-integrations/bases/source-acceptance-test/source_acceptance_test/utils/__init__.py index 2ffd4168ba760..899a08d6ff84d 100644 --- a/airbyte-integrations/bases/source-acceptance-test/source_acceptance_test/utils/__init__.py +++ b/airbyte-integrations/bases/source-acceptance-test/source_acceptance_test/utils/__init__.py @@ -1,5 +1,5 @@ from .common import SecretDict, filter_output, full_refresh_only_catalog, incremental_only_catalog, load_config -from .compare import diff_dicts +from .compare import diff_dicts, serialize from .connector_runner import ConnectorRunner from .json_schema_helper import JsonSchemaHelper @@ -12,4 +12,5 @@ "SecretDict", "ConnectorRunner", "diff_dicts", + "serialize", ] diff --git a/airbyte-integrations/bases/source-acceptance-test/source_acceptance_test/utils/compare.py b/airbyte-integrations/bases/source-acceptance-test/source_acceptance_test/utils/compare.py index 699da81ddac78..ead1da41370d4 100644 --- a/airbyte-integrations/bases/source-acceptance-test/source_acceptance_test/utils/compare.py +++ b/airbyte-integrations/bases/source-acceptance-test/source_acceptance_test/utils/compare.py @@ -23,7 +23,8 @@ # -from typing import List, Optional +import json +from typing import List, Mapping, Optional import icdiff import py @@ -67,3 +68,12 @@ def diff_dicts(left, right, use_markup) -> Optional[List[str]]: icdiff_lines = list(differ.make_table(pretty_left, pretty_right, context=True)) return ["equals failed"] + [color_off + line for line in icdiff_lines] + + +def serialize(value) -> str: + """Simplify comparison of nested dicts/lists""" + if isinstance(value, Mapping): + return json.dumps({k: serialize(v) for k, v in value.items()}, sort_keys=True) + if isinstance(value, List): + return sorted([serialize(v) for v in value]) + return str(value) diff --git a/airbyte-integrations/bases/source-acceptance-test/source_acceptance_test/utils/connector_runner.py b/airbyte-integrations/bases/source-acceptance-test/source_acceptance_test/utils/connector_runner.py index 580adeb52f083..4748ed733a7bb 100644 --- a/airbyte-integrations/bases/source-acceptance-test/source_acceptance_test/utils/connector_runner.py +++ b/airbyte-integrations/bases/source-acceptance-test/source_acceptance_test/utils/connector_runner.py @@ -30,6 +30,7 @@ import docker from airbyte_cdk.models import AirbyteMessage, ConfiguredAirbyteCatalog +from docker.errors import ContainerError from pydantic import ValidationError @@ -39,7 +40,9 @@ def __init__(self, image_name: str, volume: Path): try: self._image = self._client.images.get(image_name) except docker.errors.ImageNotFound: + print("Pulling docker image", image_name) self._image = self._client.images.pull(image_name) + print("Pulling completed") self._runs = 0 self._volume_base = volume @@ -107,10 +110,17 @@ def call_read_with_state(self, config, catalog, state, **kwargs) -> List[Airbyte def run(self, cmd, config=None, state=None, catalog=None, **kwargs) -> Iterable[AirbyteMessage]: self._runs += 1 volumes = self._prepare_volumes(config, state, catalog) - logs = self._client.containers.run( - image=self._image, command=cmd, working_dir="/data", volumes=volumes, network="host", stdout=True, stderr=True, **kwargs - ) logging.info("Docker run: \n%s\ninput: %s\noutput: %s", cmd, self.input_folder, self.output_folder) + try: + logs = self._client.containers.run( + image=self._image, command=cmd, working_dir="/data", volumes=volumes, network="host", stdout=True, stderr=True, **kwargs + ) + except ContainerError as err: + # beautify error from container + patched_error = ContainerError( + container=err.container, exit_status=err.exit_status, command=err.command, image=err.image, stderr=err.stderr.decode() + ) + raise patched_error from None # get rid of any previous exception stack with open(str(self.output_folder / "raw"), "wb+") as f: f.write(logs) diff --git a/pytest.ini b/pytest.ini new file mode 100644 index 0000000000000..f56af89089341 --- /dev/null +++ b/pytest.ini @@ -0,0 +1,3 @@ +[pytest] + +addopts = -r a --capture=no -vv --log-level=INFO --color=yes