Skip to content

Commit

Permalink
chore: Integrate service locator from Crawlee [WIP]
Browse files Browse the repository at this point in the history
  • Loading branch information
vdusek committed Dec 13, 2024
1 parent ef6d579 commit 89e28b6
Show file tree
Hide file tree
Showing 8 changed files with 388 additions and 289 deletions.
431 changes: 221 additions & 210 deletions poetry.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ keywords = [
python = "^3.9"
apify-client = ">=1.8.1"
apify-shared = ">=1.2.1"
crawlee = "~0.4.0"
crawlee = { git = "https://github.com/apify/crawlee-python.git", branch = "refactor-service-container" }
cryptography = ">=42.0.0"
# TODO: relax the upper bound once the issue is resolved:
# https://github.com/apify/apify-sdk-python/issues/348
Expand Down
49 changes: 28 additions & 21 deletions src/apify/_actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@
from apify_client import ApifyClientAsync
from apify_shared.consts import ActorEnvVars, ActorExitCodes, ApifyEnvVars
from apify_shared.utils import ignore_docs, maybe_extract_enum_member_value
from crawlee import service_container
from crawlee import service_locator
from crawlee.events._types import Event, EventPersistStateData
from crawlee.memory_storage_client import MemoryStorageClient

from apify._configuration import Configuration
from apify._consts import EVENT_LISTENERS_TIMEOUT
Expand Down Expand Up @@ -69,17 +70,31 @@ def __init__(
self._configure_logging = configure_logging
self._apify_client = self.new_client()

self._event_manager: EventManager
if self._configuration.is_at_home:
self._event_manager = PlatformEventManager(
# We need to keep both local & cloud storage clients because of the `force_cloud` option.
self._local_storage_client = MemoryStorageClient.from_config()
self._cloud_storage_client = ApifyStorageClient(configuration=self._configuration)

# Set the event manager based on whether the Actor is running on the platform or locally.
self._event_manager = (
PlatformEventManager(
config=self._configuration,
persist_state_interval=self._configuration.persist_state_interval,
)
else:
self._event_manager = LocalEventManager(
if self.is_at_home()
else LocalEventManager(
system_info_interval=self._configuration.system_info_interval,
persist_state_interval=self._configuration.persist_state_interval,
)
)

# Register services in the service locator.
if self.is_at_home():
service_locator.set_storage_client(self._cloud_storage_client)
else:
service_locator.set_storage_client(self._local_storage_client)

service_locator.set_event_manager(self.event_manager)
service_locator.set_configuration(self.configuration)

self._is_initialized = False

Expand All @@ -93,7 +108,7 @@ async def __aenter__(self) -> Self:
executing the block code, the `Actor.fail` method is called.
"""
if self._configure_logging:
_configure_logging(self._configuration)
_configure_logging()

await self.init()
return self
Expand Down Expand Up @@ -172,16 +187,6 @@ async def init(self) -> None:
if self._is_initialized:
raise RuntimeError('The Actor was already initialized!')

if self._configuration.token:
service_container.set_cloud_storage_client(ApifyStorageClient(configuration=self._configuration))

if self._configuration.is_at_home:
service_container.set_default_storage_client_type('cloud')
else:
service_container.set_default_storage_client_type('local')

service_container.set_event_manager(self._event_manager)

self._is_exiting = False
self._was_final_persist_state_emitted = False

Expand Down Expand Up @@ -233,7 +238,6 @@ async def finalize() -> None:
await self._event_manager.wait_for_all_listeners_to_complete(timeout=event_listeners_timeout)

await self._event_manager.__aexit__(None, None, None)
cast(dict, service_container._services).clear() # noqa: SLF001

await asyncio.wait_for(finalize(), cleanup_timeout.total_seconds())
self._is_initialized = False
Expand Down Expand Up @@ -335,12 +339,13 @@ async def open_dataset(
An instance of the `Dataset` class for the given ID or name.
"""
self._raise_if_not_initialized()
storage_client = self._cloud_storage_client if force_cloud else service_locator.get_storage_client()

return await Dataset.open(
id=id,
name=name,
configuration=self._configuration,
storage_client=service_container.get_storage_client(client_type='cloud' if force_cloud else None),
storage_client=storage_client,
)

async def open_key_value_store(
Expand All @@ -367,12 +372,13 @@ async def open_key_value_store(
An instance of the `KeyValueStore` class for the given ID or name.
"""
self._raise_if_not_initialized()
storage_client = self._cloud_storage_client if force_cloud else service_locator.get_storage_client()

return await KeyValueStore.open(
id=id,
name=name,
configuration=self._configuration,
storage_client=service_container.get_storage_client(client_type='cloud' if force_cloud else None),
storage_client=storage_client,
)

async def open_request_queue(
Expand Down Expand Up @@ -401,12 +407,13 @@ async def open_request_queue(
An instance of the `RequestQueue` class for the given ID or name.
"""
self._raise_if_not_initialized()
storage_client = self._cloud_storage_client if force_cloud else service_locator.get_storage_client()

return await RequestQueue.open(
id=id,
name=name,
configuration=self._configuration,
storage_client=service_container.get_storage_client(client_type='cloud' if force_cloud else None),
storage_client=storage_client,
)

async def push_data(self, data: dict | list[dict]) -> None:
Expand Down
12 changes: 10 additions & 2 deletions src/apify/_configuration.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

from datetime import datetime, timedelta
from logging import getLogger
from typing import Annotated, Any

from pydantic import AliasChoices, BeforeValidator, Field
Expand All @@ -12,6 +13,8 @@

from apify._utils import docs_group

logger = getLogger(__name__)


def _transform_to_list(value: Any) -> list[str] | None:
if value is None:
Expand Down Expand Up @@ -353,6 +356,11 @@ class Configuration(CrawleeConfiguration):
),
] = None

@classmethod
def get_global_configuration(cls) -> Configuration:
"""Retrieve the global instance of the configuration.
# Monkey-patch the base class so that it works with the extended configuration
CrawleeConfiguration.get_global_configuration = Configuration.get_global_configuration # type: ignore[method-assign]
Mostly for the backwards compatibility. It is recommended to use the `service_locator.get_configuration()`
instead.
"""
return cls()
12 changes: 4 additions & 8 deletions src/apify/log.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,10 @@
from __future__ import annotations

import logging
from typing import TYPE_CHECKING

from apify_shared.utils import ignore_docs
from crawlee._log_config import CrawleeLogFormatter, configure_logger, get_configured_log_level

if TYPE_CHECKING:
from apify import Configuration

# Name of the logger used throughout the library (resolves to 'apify')
logger_name = __name__.split('.')[0]

Expand All @@ -21,11 +17,11 @@ class ActorLogFormatter(CrawleeLogFormatter): # noqa: D101 (Inherited from pare
pass


def _configure_logging(configuration: Configuration) -> None:
def _configure_logging() -> None:
apify_client_logger = logging.getLogger('apify_client')
configure_logger(apify_client_logger, configuration, remove_old_handlers=True)
configure_logger(apify_client_logger, remove_old_handlers=True)

level = get_configured_log_level(configuration)
level = get_configured_log_level()

# Keep apify_client logger quiet unless debug logging is requested
if level > logging.DEBUG:
Expand All @@ -42,4 +38,4 @@ def _configure_logging(configuration: Configuration) -> None:

# Use configured log level for apify logger
apify_logger = logging.getLogger('apify')
configure_logger(apify_logger, configuration, remove_old_handlers=True)
configure_logger(apify_logger, remove_old_handlers=True)
72 changes: 61 additions & 11 deletions tests/integration/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@
import sys
import textwrap
from pathlib import Path
from typing import TYPE_CHECKING, Any, Callable, Protocol, cast
from typing import TYPE_CHECKING, Any, Callable, Protocol

import pytest
from filelock import FileLock

from apify_client import ApifyClientAsync
from apify_shared.consts import ActorJobStatus, ActorSourceType
from apify_shared.consts import ActorJobStatus, ActorSourceType, ApifyEnvVars
from crawlee import service_locator
from crawlee.storages import _creation_management

import apify._actor
from ._utils import generate_unique_resource_name
Expand All @@ -29,19 +31,67 @@
_SDK_ROOT_PATH = Path(__file__).parent.parent.parent.resolve()


@pytest.fixture(autouse=True)
def _reset_and_patch_default_instances() -> None:
"""Reset the used singletons and patch the default storage client with a temporary directory.
@pytest.fixture
def prepare_test_env(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> Callable[[], None]:
"""Prepare the testing environment by resetting the global state before each test.
This fixture ensures that the global state of the package is reset to a known baseline before each test runs.
It also configures a temporary storage directory for test isolation.
Args:
monkeypatch: Test utility provided by pytest for patching.
tmp_path: A unique temporary directory path provided by pytest for test isolation.
To isolate the tests, we need to reset the used singletons before each test case. We also patch the default
storage client with a tmp_path.
Returns:
A callable that prepares the test environment.
"""
from crawlee import service_container

cast(dict, service_container._services).clear()
delattr(apify._actor.Actor, '__wrapped__')
def _prepare_test_env() -> None:
delattr(apify._actor.Actor, '__wrapped__')

# Set the environment variable for the local storage directory to the temporary path.
monkeypatch.setenv(ApifyEnvVars.LOCAL_STORAGE_DIR, str(tmp_path))

# Reset the flags in the service locator to indicate that no services are explicitly set. This ensures
# a clean state, as services might have been set during a previous test and not reset properly.
service_locator._configuration_was_set = False
service_locator._storage_client_was_set = False
service_locator._event_manager_was_set = False

# Reset the services in the service locator.
service_locator._configuration = None
service_locator._event_manager = None
service_locator._storage_client = None

# Clear creation-related caches to ensure no state is carried over between tests.
monkeypatch.setattr(_creation_management, '_cache_dataset_by_id', {})
monkeypatch.setattr(_creation_management, '_cache_dataset_by_name', {})
monkeypatch.setattr(_creation_management, '_cache_kvs_by_id', {})
monkeypatch.setattr(_creation_management, '_cache_kvs_by_name', {})
monkeypatch.setattr(_creation_management, '_cache_rq_by_id', {})
monkeypatch.setattr(_creation_management, '_cache_rq_by_name', {})

# Verify that the test environment was set up correctly.
assert os.environ.get(ApifyEnvVars.LOCAL_STORAGE_DIR) == str(tmp_path)
assert service_locator._configuration_was_set is False
assert service_locator._storage_client_was_set is False
assert service_locator._event_manager_was_set is False

return _prepare_test_env


@pytest.fixture(autouse=True)
def _isolate_test_environment(prepare_test_env: Callable[[], None]) -> None:
"""Isolate the testing environment by resetting global state before and after each test.
This fixture ensures that each test starts with a clean slate and that any modifications during the test
do not affect subsequent tests. It runs automatically for all tests.
Args:
prepare_test_env: Fixture to prepare the environment before each test.
"""

# TODO: StorageClientManager local storage client purge # noqa: TD003
prepare_test_env()


@pytest.fixture
Expand Down
3 changes: 3 additions & 0 deletions tests/unit/actor/test_actor_non_default_instance.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
from datetime import timedelta

import pytest

from apify import Actor, Configuration


@pytest.mark.only
async def test_actor_with_non_default_config() -> None:
config = Configuration(internal_timeout=timedelta(minutes=111))

Expand Down
Loading

0 comments on commit 89e28b6

Please sign in to comment.