Skip to content

Commit

Permalink
AirbyteLib: Python lint cleanup (airbytehq#34223)
Browse files Browse the repository at this point in the history
  • Loading branch information
aaronsteers authored Jan 17, 2024
1 parent b03d785 commit cbbbeb9
Show file tree
Hide file tree
Showing 23 changed files with 468 additions and 237 deletions.
2 changes: 2 additions & 0 deletions airbyte-lib/airbyte_lib/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
"""AirbyteLib brings Airbyte ELT to every Python developer."""

from airbyte_lib._factories.cache_factories import get_default_cache, new_local_cache
from airbyte_lib._factories.connector_factories import get_connector
from airbyte_lib.datasets import CachedDataset
Expand Down
53 changes: 32 additions & 21 deletions airbyte-lib/airbyte_lib/_executor.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,22 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
from __future__ import annotations

import os
import subprocess
import sys
from abc import ABC, abstractmethod
from collections.abc import Generator, Iterable, Iterator
from contextlib import contextmanager
from pathlib import Path
from typing import IO, Any, NoReturn
from typing import IO, TYPE_CHECKING, Any, NoReturn

from airbyte_lib.registry import ConnectorMetadata
from airbyte_lib.telemetry import SourceTelemetryInfo, SourceType


if TYPE_CHECKING:
from collections.abc import Generator, Iterable, Iterator

from airbyte_lib.registry import ConnectorMetadata


_LATEST_VERSION = "latest"


Expand Down Expand Up @@ -89,7 +93,7 @@ def _stream_from_file(file: IO[str]) -> Generator[str, Any, None]:
exit_code = process.wait()

# If the exit code is not 0 or -15 (SIGTERM), raise an exception
if exit_code != 0 and exit_code != -15:
if exit_code not in (0, -15):
raise Exception(f"Process exited with code {exit_code}")


Expand All @@ -98,8 +102,9 @@ def __init__(
self,
metadata: ConnectorMetadata,
target_version: str | None = None,
install_if_missing: bool = False,
pip_url: str | None = None,
*,
install_if_missing: bool = False,
) -> None:
super().__init__(metadata, target_version)
self.install_if_missing = install_if_missing
Expand All @@ -122,26 +127,28 @@ def _run_subprocess_and_raise_on_failure(self, args: list[str]) -> None:

def uninstall(self) -> None:
venv_name = self._get_venv_name()
if os.path.exists(venv_name):
if Path(venv_name).exists():
self._run_subprocess_and_raise_on_failure(["rm", "-rf", venv_name])

def install(self) -> None:
venv_name = self._get_venv_name()
self._run_subprocess_and_raise_on_failure([sys.executable, "-m", "venv", venv_name])

pip_path = os.path.join(venv_name, "bin", "pip")
pip_path = str(Path(venv_name) / "bin" / "pip")

self._run_subprocess_and_raise_on_failure([pip_path, "install", "-e", self.pip_url])

def _get_installed_version(self) -> str:
"""
In the venv, run the following: python -c "from importlib.metadata import version; print(version('<connector-name>'))"
"""Detect the version of the connector installed.
In the venv, we run the following:
> python -c "from importlib.metadata import version; print(version('<connector-name>'))"
"""
venv_name = self._get_venv_name()
connector_name = self.metadata.name
return subprocess.check_output(
[
os.path.join(venv_name, "bin", "python"),
Path(venv_name) / "bin" / "python",
"-c",
f"from importlib.metadata import version; print(version('{connector_name}'))",
],
Expand All @@ -151,8 +158,8 @@ def _get_installed_version(self) -> str:
def ensure_installation(
self,
) -> None:
"""
Ensure that the connector is installed in a virtual environment.
"""Ensure that the connector is installed in a virtual environment.
If not yet installed and if install_if_missing is True, then install.
Optionally, verify that the installed version matches the target version.
Expand All @@ -165,14 +172,16 @@ def ensure_installation(
if not venv_path.exists():
if not self.install_if_missing:
raise Exception(
f"Connector {self.metadata.name} is not available - venv {venv_name} does not exist"
f"Connector {self.metadata.name} is not available - "
f"venv {venv_name} does not exist"
)
self.install()

connector_path = self._get_connector_path()
if not connector_path.exists():
raise FileNotFoundError(
f"Could not find connector '{self.metadata.name}' in venv '{venv_name}' with connector path '{connector_path}'.",
f"Could not find connector '{self.metadata.name}' in venv '{venv_name}' with "
f"connector path '{connector_path}'.",
)

if self.enforce_version:
Expand All @@ -185,13 +194,14 @@ def ensure_installation(
version_after_install = self._get_installed_version()
if version_after_install != self.target_version:
raise Exception(
f"Failed to install connector {self.metadata.name} version {self.target_version}. Installed version is {version_after_install}",
f"Failed to install connector {self.metadata.name} version "
f"{self.target_version}. Installed version is {version_after_install}",
)

def execute(self, args: list[str]) -> Iterator[str]:
connector_path = self._get_connector_path()

with _stream_from_subprocess([str(connector_path)] + args) as stream:
with _stream_from_subprocess([str(connector_path), *args]) as stream:
yield from stream

def get_telemetry_info(self) -> SourceTelemetryInfo:
Expand All @@ -204,19 +214,20 @@ def ensure_installation(self) -> None:
self.execute(["spec"])
except Exception as e:
raise Exception(
f"Connector {self.metadata.name} is not available - executing it failed: {e}"
)
f"Connector {self.metadata.name} is not available - executing it failed"
) from e

def install(self) -> NoReturn:
raise Exception(f"Connector {self.metadata.name} is not available - cannot install it")

def uninstall(self) -> NoReturn:
raise Exception(
f"Connector {self.metadata.name} is installed manually and not managed by airbyte-lib - please remove it manually"
f"Connector {self.metadata.name} is installed manually and not managed by airbyte-lib -"
" please remove it manually"
)

def execute(self, args: list[str]) -> Iterator[str]:
with _stream_from_subprocess([self.metadata.name] + args) as stream:
with _stream_from_subprocess([self.metadata.name, *args]) as stream:
yield from stream

def get_telemetry_info(self) -> SourceTelemetryInfo:
Expand Down
5 changes: 3 additions & 2 deletions airbyte-lib/airbyte_lib/_factories/cache_factories.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.

from __future__ import annotations

from pathlib import Path

Expand All @@ -23,13 +23,14 @@ def get_default_cache() -> DuckDBCache:
def new_local_cache(
cache_name: str | None = None,
cache_dir: str | Path | None = None,
*,
cleanup: bool = True,
) -> DuckDBCache:
"""Get a local cache for storing data, using a name string to seed the path.
Args:
cache_name: Name to use for the cache. Defaults to None.
root_dir: Root directory to store the cache in. Defaults to None.
cache_dir: Root directory to store the cache in. Defaults to None.
cleanup: Whether to clean up temporary files. Defaults to True.
Cache files are stored in the `.cache` directory, relative to the current
Expand Down
27 changes: 18 additions & 9 deletions airbyte-lib/airbyte_lib/_factories/connector_factories.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.

from __future__ import annotations

from typing import Any

Expand All @@ -13,17 +13,26 @@ def get_connector(
version: str | None = None,
pip_url: str | None = None,
config: dict[str, Any] | None = None,
*,
use_local_install: bool = False,
install_if_missing: bool = True,
) -> Source:
"""
Get a connector by name and version.
:param name: connector name
:param version: connector version - if not provided, the currently installed version will be used. If no version is installed, the latest available version will be used. The version can also be set to "latest" to force the use of the latest available version.
:param pip_url: connector pip URL - if not provided, the pip url will be inferred from the connector name.
:param config: connector config - if not provided, you need to set it later via the set_config method.
:param use_local_install: whether to use a virtual environment to run the connector. If True, the connector is expected to be available on the path (e.g. installed via pip). If False, the connector will be installed automatically in a virtual environment.
:param install_if_missing: whether to install the connector if it is not available locally. This parameter is ignored if use_local_install is True.
"""Get a connector by name and version.
Args:
name: connector name
version: connector version - if not provided, the currently installed version will be used.
If no version is installed, the latest available version will be used. The version can
also be set to "latest" to force the use of the latest available version.
pip_url: connector pip URL - if not provided, the pip url will be inferred from the
connector name.
config: connector config - if not provided, you need to set it later via the set_config
method.
use_local_install: whether to use a virtual environment to run the connector. If True, the
connector is expected to be available on the path (e.g. installed via pip). If False,
the connector will be installed automatically in a virtual environment.
install_if_missing: whether to install the connector if it is not available locally. This
parameter is ignored if use_local_install is True.
"""
metadata = get_connector_metadata(name)
if use_local_install:
Expand Down
3 changes: 1 addition & 2 deletions airbyte-lib/airbyte_lib/_file_writers/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,7 @@ def _write_batch(
batch_id: str,
record_batch: pa.Table | pa.RecordBatch,
) -> FileWriterBatchHandle:
"""
Process a record batch.
"""Process a record batch.
Return a list of paths to one or more cache files.
"""
Expand Down
8 changes: 4 additions & 4 deletions airbyte-lib/airbyte_lib/_file_writers/parquet.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.

"""A Parquet cache implementation."""
from __future__ import annotations

from pathlib import Path
from typing import cast
Expand All @@ -16,8 +17,7 @@
class ParquetWriterConfig(FileWriterConfigBase):
"""Configuration for the Snowflake cache."""

# Inherits from base class:
# cache_dir: str | Path
# Inherits `cache_dir` from base class


class ParquetWriter(FileWriterBase):
Expand All @@ -44,11 +44,11 @@ def _write_batch(
batch_id: str,
record_batch: pa.Table | pa.RecordBatch,
) -> FileWriterBatchHandle:
"""
Process a record batch.
"""Process a record batch.
Return the path to the cache file.
"""
_ = batch_id # unused
output_file_path = self.get_new_cache_file_path(stream_name)

with parquet.ParquetWriter(output_file_path, record_batch.schema) as writer:
Expand Down
8 changes: 3 additions & 5 deletions airbyte-lib/airbyte_lib/_processors.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,7 @@ def process_stdin(
self,
max_batch_size: int = DEFAULT_BATCH_SIZE,
) -> None:
"""
Process the input stream from stdin.
"""Process the input stream from stdin.
Return a list of summaries for testing.
"""
Expand All @@ -126,8 +125,7 @@ def process_input_stream(
input_stream: io.TextIOBase,
max_batch_size: int = DEFAULT_BATCH_SIZE,
) -> None:
"""
Parse the input stream and process data in batches.
"""Parse the input stream and process data in batches.
Return a list of summaries for testing.
"""
Expand Down Expand Up @@ -229,7 +227,7 @@ def _cleanup_batch( # noqa: B027 # Intentionally empty, not abstract
For instance, file writers can override this method to delete the files created. Caches,
similarly, can override this method to delete any other temporary artifacts.
"""
pass # noqa: PIE790 # Intentional no-op
pass

def _new_batch_id(self) -> str:
"""Return a new batch handle."""
Expand Down
8 changes: 6 additions & 2 deletions airbyte-lib/airbyte_lib/_util/protocol_util.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.

"""Internal utility functions, especially for dealing with Airbyte Protocol."""
from __future__ import annotations

from collections.abc import Iterable, Iterator
from typing import Any, cast
from typing import TYPE_CHECKING, Any, cast

from airbyte_protocol.models import (
AirbyteMessage,
Expand All @@ -13,6 +13,10 @@
)


if TYPE_CHECKING:
from collections.abc import Iterable, Iterator


def airbyte_messages_to_record_dicts(
messages: Iterable[AirbyteMessage],
) -> Iterator[dict[str, Any]]:
Expand Down
Loading

0 comments on commit cbbbeb9

Please sign in to comment.