From 6474eec80210a7e787e4cf488a87d5b348ef456a Mon Sep 17 00:00:00 2001 From: Damien Garros Date: Sun, 19 Jan 2025 18:26:49 +0100 Subject: [PATCH 1/6] Upgrade Prefect to 3.1.13 --- .../infrahub/core/migrations/schema/tasks.py | 2 +- backend/infrahub/core/validators/tasks.py | 2 +- backend/infrahub/generators/tasks.py | 2 +- backend/infrahub/git/integrator.py | 28 +++---- backend/infrahub/git/tasks.py | 12 +-- .../operations/requests/proposed_change.py | 2 +- backend/infrahub/proposed_change/tasks.py | 2 +- backend/infrahub/task_manager/constants.py | 2 +- backend/infrahub/task_manager/task.py | 2 +- backend/infrahub/tasks/artifact.py | 2 +- backend/infrahub/tasks/telemetry.py | 10 +-- backend/infrahub/trigger/tasks.py | 2 +- backend/infrahub/workflows/initialization.py | 6 +- backend/tests/conftest.py | 14 ---- backend/tests/functional/conftest.py | 15 ++++ backend/tests/helpers/utils.py | 2 +- backend/tests/integration/conftest.py | 19 +++-- .../integration/git/test_git_repository.py | 6 +- backend/tests/unit/api/conftest.py | 7 -- backend/tests/unit/conftest.py | 14 ++++ .../tests/unit/graphql/queries/test_task.py | 14 ++-- development/docker-compose-deps-nats.yml | 2 +- development/docker-compose-deps.yml | 2 +- docker-compose.yml | 2 +- helm/values.yaml | 2 +- poetry.lock | 83 ++++++++++++------- pyproject.toml | 2 +- .../docker-compose.test.yml | 2 +- tasks/shared.py | 2 +- 29 files changed, 149 insertions(+), 113 deletions(-) create mode 100644 backend/tests/functional/conftest.py diff --git a/backend/infrahub/core/migrations/schema/tasks.py b/backend/infrahub/core/migrations/schema/tasks.py index 84c08e3f54..9057ec7b8a 100644 --- a/backend/infrahub/core/migrations/schema/tasks.py +++ b/backend/infrahub/core/migrations/schema/tasks.py @@ -64,7 +64,7 @@ async def schema_apply_migrations(message: SchemaApplyMigrationData, service: In return error_messages -@task( +@task( # type: ignore[arg-type] name="schema-path-migrate", task_run_name="Migrate Schema Path {migration_name} on {branch.name}", description="Apply a given migration to the database", diff --git a/backend/infrahub/core/validators/tasks.py b/backend/infrahub/core/validators/tasks.py index f2bf2f0777..178b8de57f 100644 --- a/backend/infrahub/core/validators/tasks.py +++ b/backend/infrahub/core/validators/tasks.py @@ -50,7 +50,7 @@ async def schema_validate_migrations( return results -@task( +@task( # type: ignore[arg-type] name="schema-path-validate", task_run_name="Validate schema path {constraint_name} in {branch.name}", description="Validate if a given migration is compatible with the existing data", diff --git a/backend/infrahub/generators/tasks.py b/backend/infrahub/generators/tasks.py index 8e220976f6..e989b490f7 100644 --- a/backend/infrahub/generators/tasks.py +++ b/backend/infrahub/generators/tasks.py @@ -78,7 +78,7 @@ async def run_generator(model: RequestGeneratorRun, service: InfrahubServices) - await generator_instance.update(do_full_update=True) -@task(name="generator-define-instance", task_run_name="Define Instance", cache_policy=NONE) +@task(name="generator-define-instance", task_run_name="Define Instance", cache_policy=NONE) # type: ignore[arg-type] async def _define_instance(model: RequestGeneratorRun, service: InfrahubServices) -> CoreGeneratorInstance: if model.generator_instance: instance = await service.client.get( diff --git a/backend/infrahub/git/integrator.py b/backend/infrahub/git/integrator.py index 301438ce54..1e9c09cba3 100644 --- a/backend/infrahub/git/integrator.py +++ b/backend/infrahub/git/integrator.py @@ -180,7 +180,7 @@ async def import_objects_from_files( branch_name=infrahub_branch_name, commit=commit, config_file=config_file ) - await self.import_all_python_files( + await self.import_all_python_files( # type: ignore[call-overload] branch_name=infrahub_branch_name, commit=commit, config_file=config_file ) await self.import_jinja2_transforms( @@ -229,7 +229,7 @@ async def _update_sync_status(self, branch_name: str, status: RepositorySyncStat tracker="mutation-repository-update-admin-status", ) - @task(name="import-jinja2-tansforms", task_run_name="Import Jinja2 transform", cache_policy=NONE) + @task(name="import-jinja2-tansforms", task_run_name="Import Jinja2 transform", cache_policy=NONE) # type: ignore[arg-type] async def import_jinja2_transforms( self, branch_name: str, @@ -334,7 +334,7 @@ async def update_jinja2_transform( await existing_transform.save() - @task(name="import-artifact-definitions", task_run_name="Import Artifact Definitions", cache_policy=NONE) + @task(name="import-artifact-definitions", task_run_name="Import Artifact Definitions", cache_policy=NONE) # type: ignore[arg-type] async def import_artifact_definitions( self, branch_name: str, @@ -435,7 +435,7 @@ async def update_artifact_definition( await existing_artifact_definition.save() - @task(name="repository-get-config", task_run_name="get repository config", cache_policy=NONE) + @task(name="repository-get-config", task_run_name="get repository config", cache_policy=NONE) # type: ignore[arg-type] async def get_repository_config(self, branch_name: str, commit: str) -> Optional[InfrahubRepositoryConfig]: branch_wt = self.get_worktree(identifier=commit or branch_name) log = get_run_logger() @@ -464,7 +464,7 @@ async def get_repository_config(self, branch_name: str, commit: str) -> Optional log.error(f"Unable to load the configuration file {config_file_name}, the format is not valid : {exc}") return None - @task(name="import-schema-files", task_run_name="Import schema files", cache_policy=NONE) + @task(name="import-schema-files", task_run_name="Import schema files", cache_policy=NONE) # type: ignore[arg-type] async def import_schema_files(self, branch_name: str, commit: str, config_file: InfrahubRepositoryConfig) -> None: log = get_run_logger() branch_wt = self.get_worktree(identifier=commit or branch_name) @@ -536,7 +536,7 @@ async def import_schema_files(self, branch_name: str, commit: str, config_file: for schema_file in schemas_data: log.info(f"schema '{schema_file.identifier}' loaded successfully!") - @task(name="import-graphql-queries", task_run_name="Import GraphQL Queries", cache_policy=NONE) + @task(name="import-graphql-queries", task_run_name="Import GraphQL Queries", cache_policy=NONE) # type: ignore[arg-type] async def import_all_graphql_query( self, branch_name: str, commit: str, config_file: InfrahubRepositoryConfig ) -> None: @@ -594,7 +594,7 @@ async def create_graphql_query(self, branch_name: str, name: str, query_string: await obj.save() return obj - @task(name="import-python-check-definitions", task_run_name="Import Python Check Definitions", cache_policy=NONE) + @task(name="import-python-check-definitions", task_run_name="Import Python Check Definitions", cache_policy=NONE) # type: ignore[arg-type] async def import_python_check_definitions( self, branch_name: str, commit: str, config_file: InfrahubRepositoryConfig ) -> None: @@ -665,7 +665,7 @@ async def import_python_check_definitions( log.info(f"CheckDefinition '{check_name!r}' not found locally, deleting") await check_definition_in_graph[check_name].delete() - @task(name="import-generator-definitions", task_run_name="Import Generator Definitions", cache_policy=NONE) + @task(name="import-generator-definitions", task_run_name="Import Generator Definitions", cache_policy=NONE) # type: ignore[arg-type] async def import_generator_definitions( self, branch_name: str, commit: str, config_file: InfrahubRepositoryConfig ) -> None: @@ -755,7 +755,7 @@ async def _generator_requires_update( return True return False - @task(name="import-python-transforms", task_run_name="Import Python Transforms", cache_policy=NONE) + @task(name="import-python-transforms", task_run_name="Import Python Transforms", cache_policy=NONE) # type: ignore[arg-type] async def import_python_transforms( self, branch_name: str, commit: str, config_file: InfrahubRepositoryConfig ) -> None: @@ -826,7 +826,7 @@ async def import_python_transforms( log.info(f"TransformPython {transform_name!r} not found locally, deleting") await transform_definition_in_graph[transform_name].delete() - @task(name="check-definition-get", task_run_name="Get Check Definition", cache_policy=NONE) + @task(name="check-definition-get", task_run_name="Get Check Definition", cache_policy=NONE) # type: ignore[arg-type] async def get_check_definition( self, branch_name: str, @@ -866,7 +866,7 @@ async def get_check_definition( raise return checks - @task(name="python-transform-get", task_run_name="Get Python Transform", cache_policy=NONE) + @task(name="python-transform-get", task_run_name="Get Python Transform", cache_policy=NONE) # type: ignore[arg-type] async def get_python_transforms( self, branch_name: str, module: types.ModuleType, file_path: str, transform: InfrahubPythonTransformConfig ) -> list[TransformPythonInformation]: @@ -1067,7 +1067,7 @@ async def import_all_python_files( await self.import_python_transforms(branch_name=branch_name, commit=commit, config_file=config_file) await self.import_generator_definitions(branch_name=branch_name, commit=commit, config_file=config_file) - @task(name="jinja2-template-render", task_run_name="Render Jinja2 template", cache_policy=NONE) + @task(name="jinja2-template-render", task_run_name="Render Jinja2 template", cache_policy=NONE) # type: ignore[arg-type] async def render_jinja2_template(self, commit: str, location: str, data: dict) -> str: log = get_run_logger() commit_worktree = self.get_commit_worktree(commit=commit) @@ -1083,7 +1083,7 @@ async def render_jinja2_template(self, commit: str, location: str, data: dict) - log.error(str(exc), exc_info=True) raise TransformError(repository_name=self.name, commit=commit, location=location, message=str(exc)) from exc - @task(name="python-check-execute", task_run_name="Execute Python Check", cache_policy=NONE) + @task(name="python-check-execute", task_run_name="Execute Python Check", cache_policy=NONE) # type: ignore[arg-type] async def execute_python_check( self, branch_name: str, @@ -1142,7 +1142,7 @@ async def execute_python_check( repository_name=self.name, class_name=class_name, commit=commit, location=location, message=str(exc) ) from exc - @task(name="python-transform-execute", task_run_name="Execute Python Transform", cache_policy=NONE) + @task(name="python-transform-execute", task_run_name="Execute Python Transform", cache_policy=NONE) # type: ignore[arg-type] async def execute_python_transform( self, branch_name: str, commit: str, location: str, client: InfrahubClient, data: Optional[dict] = None ) -> Any: diff --git a/backend/infrahub/git/tasks.py b/backend/infrahub/git/tasks.py index 6edb56593d..618ab56ed9 100644 --- a/backend/infrahub/git/tasks.py +++ b/backend/infrahub/git/tasks.py @@ -48,7 +48,7 @@ async def add_git_repository(model: GitRepositoryAdd, service: InfrahubServices) default_branch_name=model.default_branch_name, service=service, ) - await repo.import_objects_from_files( + await repo.import_objects_from_files( # type: ignore[call-overload] infrahub_branch_name=model.infrahub_branch_name, git_branch_name=model.default_branch_name ) if model.internal_status == RepositoryInternalStatus.ACTIVE.value: @@ -84,7 +84,7 @@ async def add_git_repository_read_only(model: GitRepositoryAddReadOnly, service: infrahub_branch_name=model.infrahub_branch_name, service=service, ) - await repo.import_objects_from_files(infrahub_branch_name=model.infrahub_branch_name) + await repo.import_objects_from_files(infrahub_branch_name=model.infrahub_branch_name) # type: ignore[call-overload] if model.internal_status == RepositoryInternalStatus.ACTIVE.value: await repo.sync_from_remote() @@ -167,7 +167,7 @@ async def sync_remote_repositories(service: InfrahubServices) -> None: internal_status=active_internal_status, default_branch_name=repository_data.repository.default_branch.value, ) - await repo.import_objects_from_files( + await repo.import_objects_from_files( # type: ignore[call-overload] git_branch_name=registry.default_branch, infrahub_branch_name=infrahub_branch ) except RepositoryError as exc: @@ -191,7 +191,7 @@ async def sync_remote_repositories(service: InfrahubServices) -> None: log.info(exc.message) -@task( +@task( # type: ignore[arg-type] name="git-branch-create", task_run_name="Create branch '{branch}' in repository {repository_name}", cache_policy=NONE, @@ -384,7 +384,7 @@ async def pull_read_only(model: GitRepositoryPullReadOnly, service: InfrahubServ service=service, ) - await repo.import_objects_from_files(infrahub_branch_name=model.infrahub_branch_name, commit=model.commit) + await repo.import_objects_from_files(infrahub_branch_name=model.infrahub_branch_name, commit=model.commit) # type: ignore[call-overload] await repo.sync_from_remote(commit=model.commit) # Tell workers to fetch to stay in sync @@ -454,7 +454,7 @@ async def import_objects_from_git_repository(model: GitRepositoryImportObjects, repository_kind=model.repository_kind, commit=model.commit, ) - await repo.import_objects_from_files(infrahub_branch_name=model.infrahub_branch_name, commit=model.commit) + await repo.import_objects_from_files(infrahub_branch_name=model.infrahub_branch_name, commit=model.commit) # type: ignore[call-overload] @flow( diff --git a/backend/infrahub/message_bus/operations/requests/proposed_change.py b/backend/infrahub/message_bus/operations/requests/proposed_change.py index 7c7dfdb814..4d71d2700e 100644 --- a/backend/infrahub/message_bus/operations/requests/proposed_change.py +++ b/backend/infrahub/message_bus/operations/requests/proposed_change.py @@ -516,7 +516,7 @@ async def _get_proposed_change_repositories( return _parse_proposed_change_repositories(message=message, source=source_all, destination=destination_all) -@task(name="proposed-change-validate-repository-conflicts", task_run_name="Validate conflicts on repository") +@task(name="proposed-change-validate-repository-conflicts", task_run_name="Validate conflicts on repository") # type: ignore[arg-type] async def _validate_repository_merge_conflicts( repositories: list[ProposedChangeRepository], service: InfrahubServices ) -> bool: diff --git a/backend/infrahub/proposed_change/tasks.py b/backend/infrahub/proposed_change/tasks.py index 302981606b..9549aeb5c6 100644 --- a/backend/infrahub/proposed_change/tasks.py +++ b/backend/infrahub/proposed_change/tasks.py @@ -168,7 +168,7 @@ async def cancel_proposed_changes_branch(branch_name: str, service: InfrahubServ await cancel_proposed_change(proposed_change=proposed_change, service=service) -@task(name="Cancel a propose change", description="Cancel a propose change", cache_policy=NONE) +@task(name="Cancel a propose change", description="Cancel a propose change", cache_policy=NONE) # type: ignore[arg-type] async def cancel_proposed_change(proposed_change: CoreProposedChange, service: InfrahubServices) -> None: await add_tags(nodes=[proposed_change.id]) log = get_run_logger() diff --git a/backend/infrahub/task_manager/constants.py b/backend/infrahub/task_manager/constants.py index 17bcd2c19e..93bca38498 100644 --- a/backend/infrahub/task_manager/constants.py +++ b/backend/infrahub/task_manager/constants.py @@ -2,7 +2,7 @@ LOG_LEVEL_MAPPING = {10: "debug", 20: "info", 30: "warning", 40: "error", 50: "critical"} -CONCLUSION_STATE_MAPPING = { +CONCLUSION_STATE_MAPPING: dict[str, TaskConclusion] = { "Scheduled": TaskConclusion.UNKNOWN, "Pending": TaskConclusion.UNKNOWN, "Running": TaskConclusion.UNKNOWN, diff --git a/backend/infrahub/task_manager/task.py b/backend/infrahub/task_manager/task.py index b4a7389b44..ca79e25d58 100644 --- a/backend/infrahub/task_manager/task.py +++ b/backend/infrahub/task_manager/task.py @@ -250,7 +250,7 @@ async def query( "node": { "title": flow.name, "conclusion": CONCLUSION_STATE_MAPPING.get( - flow.state_name, TaskConclusion.UNKNOWN + str(flow.state_name), TaskConclusion.UNKNOWN ).value, "state": flow.state_type, "progress": progress_flow.data.get(flow.id, None), diff --git a/backend/infrahub/tasks/artifact.py b/backend/infrahub/tasks/artifact.py index f2ce1d1185..d6db0b4dff 100644 --- a/backend/infrahub/tasks/artifact.py +++ b/backend/infrahub/tasks/artifact.py @@ -11,7 +11,7 @@ from infrahub.services import InfrahubServices -@task(name="define-artifact", task_run_name="Define Artifact", cache_policy=NONE) +@task(name="define-artifact", task_run_name="Define Artifact", cache_policy=NONE) # type: ignore[arg-type] async def define_artifact( message: Union[messages.CheckArtifactCreate, RequestArtifactGenerate], service: InfrahubServices ) -> InfrahubNode: diff --git a/backend/infrahub/tasks/telemetry.py b/backend/infrahub/tasks/telemetry.py index ff06f02bdf..2bf40b8a56 100644 --- a/backend/infrahub/tasks/telemetry.py +++ b/backend/infrahub/tasks/telemetry.py @@ -19,7 +19,7 @@ TELEMETRY_VERSION: str = "20240524" -@task(name="telemetry-gather-db", task_run_name="Gather Database Information", cache_policy=NONE) +@task(name="telemetry-gather-db", task_run_name="Gather Database Information", cache_policy=NONE) # type: ignore[arg-type] async def gather_database_information(service: InfrahubServices, branch: Branch) -> dict: async with service.database.start_session() as db: data: dict[str, Any] = { @@ -37,7 +37,7 @@ async def gather_database_information(service: InfrahubServices, branch: Branch) return data -@task(name="telemetry-schema-information", task_run_name="Gather Schema Information", cache_policy=NONE) +@task(name="telemetry-schema-information", task_run_name="Gather Schema Information", cache_policy=NONE) # type: ignore[arg-type] async def gather_schema_information(service: InfrahubServices, branch: Branch) -> dict: data: dict[str, Any] = {} main_schema = registry.schema.get_schema_branch(name=branch.name) @@ -48,7 +48,7 @@ async def gather_schema_information(service: InfrahubServices, branch: Branch) - return data -@task(name="telemetry-feature-information", task_run_name="Gather Feature Information", cache_policy=NONE) +@task(name="telemetry-feature-information", task_run_name="Gather Feature Information", cache_policy=NONE) # type: ignore[arg-type] async def gather_feature_information(service: InfrahubServices, branch: Branch) -> dict: async with service.database.start_session() as db: data = {} @@ -67,7 +67,7 @@ async def gather_feature_information(service: InfrahubServices, branch: Branch) return data -@task(name="telemetry-gather-data", task_run_name="Gather Anonynous Data", cache_policy=NONE) +@task(name="telemetry-gather-data", task_run_name="Gather Anonynous Data", cache_policy=NONE) # type: ignore[arg-type] async def gather_anonymous_telemetry_data(service: InfrahubServices) -> dict: start_time = time.time() @@ -97,7 +97,7 @@ async def gather_anonymous_telemetry_data(service: InfrahubServices) -> dict: return data -@task(name="telemetry-post-data", task_run_name="Upload data", retries=5, cache_policy=NONE) +@task(name="telemetry-post-data", task_run_name="Upload data", retries=5, cache_policy=NONE) # type: ignore[arg-type] async def post_telemetry_data(service: InfrahubServices, url: str, payload: dict[str, Any]) -> None: """Send the telemetry data to the specified URL, using HTTP POST.""" response = await service.http.post(url=url, json=payload) diff --git a/backend/infrahub/trigger/tasks.py b/backend/infrahub/trigger/tasks.py index f03c51450f..364a506ac1 100644 --- a/backend/infrahub/trigger/tasks.py +++ b/backend/infrahub/trigger/tasks.py @@ -14,7 +14,7 @@ from uuid import UUID -@task(name="trigger-setup", task_run_name="Setup triggers in task-manager") +@task(name="trigger-setup", task_run_name="Setup triggers in task-manager") # type: ignore[arg-type] async def setup_triggers(client: PrefectClient) -> None: log = get_run_logger() diff --git a/backend/infrahub/workflows/initialization.py b/backend/infrahub/workflows/initialization.py index 4eaa5f7dee..536514a11a 100644 --- a/backend/infrahub/workflows/initialization.py +++ b/backend/infrahub/workflows/initialization.py @@ -13,7 +13,7 @@ from .models import TASK_RESULT_STORAGE_NAME -@task(name="task-manager-setup-worker-pools", task_run_name="Setup Worker pools", cache_policy=NONE) +@task(name="task-manager-setup-worker-pools", task_run_name="Setup Worker pools", cache_policy=NONE) # type: ignore[arg-type] async def setup_worker_pools(client: PrefectClient) -> None: log = get_run_logger() for worker in worker_pools: @@ -29,7 +29,7 @@ async def setup_worker_pools(client: PrefectClient) -> None: log.warning(f"Work pool {worker.name} already present ") -@task(name="task-manager-setup-deployments", task_run_name="Setup Deployments", cache_policy=NONE) +@task(name="task-manager-setup-deployments", task_run_name="Setup Deployments", cache_policy=NONE) # type: ignore[arg-type] async def setup_deployments(client: PrefectClient) -> None: log = get_run_logger() for workflow in workflows: @@ -40,7 +40,7 @@ async def setup_deployments(client: PrefectClient) -> None: log.info(f"Flow {workflow.name}, created successfully ... ") -@task(name="task-manager-setup-blocks", task_run_name="Setup Blocks", cache_policy=NONE) +@task(name="task-manager-setup-blocks", task_run_name="Setup Blocks", cache_policy=NONE) # type: ignore[arg-type] async def setup_blocks() -> None: log = get_run_logger() diff --git a/backend/tests/conftest.py b/backend/tests/conftest.py index d05e641ca3..74059fd89e 100644 --- a/backend/tests/conftest.py +++ b/backend/tests/conftest.py @@ -14,8 +14,6 @@ from neo4j import GraphDatabase from neo4j.exceptions import ServiceUnavailable from prefect import settings as prefect_settings -from prefect.logging.loggers import disable_run_logger -from prefect.testing.utilities import prefect_test_harness from testcontainers.core.container import DockerContainer from testcontainers.core.waiting_utils import wait_for_logs @@ -202,18 +200,6 @@ async def do_register_core_models_schema(branch: Branch) -> SchemaBranch: return schema_branch -@pytest.fixture(scope="session") -def prefect_test_fixture(): - with prefect_test_harness(): - yield - - -@pytest.fixture(scope="session") -def prefect_test(prefect_test_fixture): - with disable_run_logger(): - yield - - @pytest.fixture(scope="session") def neo4j(request: pytest.FixtureRequest, load_settings_before_session) -> Optional[dict[int, int]]: if not INFRAHUB_USE_TEST_CONTAINERS or config.SETTINGS.database.db_type == "memgraph": diff --git a/backend/tests/functional/conftest.py b/backend/tests/functional/conftest.py new file mode 100644 index 0000000000..4774fb4b70 --- /dev/null +++ b/backend/tests/functional/conftest.py @@ -0,0 +1,15 @@ +import pytest +from prefect.logging.loggers import disable_run_logger +from prefect.testing.utilities import prefect_test_harness + + +@pytest.fixture(scope="session", autouse=True) +def prefect_test_fixture(): + with prefect_test_harness(): + yield + + +@pytest.fixture(scope="session") +def prefect_test(prefect_test_fixture): + with disable_run_logger(): + yield diff --git a/backend/tests/helpers/utils.py b/backend/tests/helpers/utils.py index e7f4341619..576171aef5 100644 --- a/backend/tests/helpers/utils.py +++ b/backend/tests/helpers/utils.py @@ -39,7 +39,7 @@ def start_prefect_server_container( return None container = ( - DockerContainer(image="prefecthq/prefect:3.0.11-python3.12") + DockerContainer(image="prefecthq/prefect:3.1.13-python3.12") .with_command("prefect server start --host 0.0.0.0 --ui") .with_exposed_ports(PORT_PREFECT) ) diff --git a/backend/tests/integration/conftest.py b/backend/tests/integration/conftest.py index d11f68717f..f64a39ce7b 100644 --- a/backend/tests/integration/conftest.py +++ b/backend/tests/integration/conftest.py @@ -6,6 +6,7 @@ import pytest import yaml from infrahub_sdk.uuidt import UUIDT +from prefect.logging.loggers import disable_run_logger from prefect.testing.utilities import prefect_test_harness from pytest import TempPathFactory @@ -27,12 +28,6 @@ def add_tracker(): os.environ["PYTEST_RUNNING"] = "true" -@pytest.fixture(autouse=True, scope="session") -def prefect_test_fixture(): - with prefect_test_harness(): - yield - - @pytest.fixture(scope="session") def event_loop(): """Overrides pytest default function scoped event loop""" @@ -128,3 +123,15 @@ def git_repo_car_dealership(git_sources_dir: Path) -> FileRepo: """Simple Git Repository used for testing.""" return FileRepo(name="car-dealership", sources_directory=git_sources_dir) + + +@pytest.fixture(scope="session", autouse=True) +def prefect_test_fixture(): + with prefect_test_harness(): + yield + + +@pytest.fixture(scope="session") +def prefect_test(prefect_test_fixture): + with disable_run_logger(): + yield diff --git a/backend/tests/integration/git/test_git_repository.py b/backend/tests/integration/git/test_git_repository.py index 38db6c3d98..65f021cb23 100644 --- a/backend/tests/integration/git/test_git_repository.py +++ b/backend/tests/integration/git/test_git_repository.py @@ -182,7 +182,7 @@ async def test_import_all_python_files( config_file = await repo.get_repository_config(branch_name="main", commit=commit) assert config_file - await repo.import_all_python_files(branch_name="main", commit=commit, config_file=config_file) + await repo.import_all_python_files(branch_name="main", commit=commit, config_file=config_file) # type: ignore[call-overload] check_definitions = await client.all(kind=CoreCheckDefinition) assert len(check_definitions) >= 1 @@ -192,7 +192,7 @@ async def test_import_all_python_files( # Validate if the function is idempotent, another import just after the first one shouldn't change anything nbr_relationships_before = await count_relationships(db=db) - await repo.import_all_python_files(branch_name="main", commit=commit, config_file=config_file) + await repo.import_all_python_files(branch_name="main", commit=commit, config_file=config_file) # type: ignore[call-overload] assert await count_relationships(db=db) == nbr_relationships_before # 1. Modify an object to validate if its being properly updated @@ -234,7 +234,7 @@ async def test_import_all_python_files( ) await obj2.save(db=db) - await repo.import_all_python_files(branch_name="main", commit=commit, config_file=config_file) + await repo.import_all_python_files(branch_name="main", commit=commit, config_file=config_file) # type: ignore[call-overload] modified_check0 = await client.get(kind=CoreCheckDefinition, id=check_definitions[0].id) assert modified_check0.timeout.value == check_timeout_value_before_change diff --git a/backend/tests/unit/api/conftest.py b/backend/tests/unit/api/conftest.py index 46ad9aec61..bd1af84de3 100644 --- a/backend/tests/unit/api/conftest.py +++ b/backend/tests/unit/api/conftest.py @@ -1,7 +1,6 @@ import pendulum import pytest from fastapi.testclient import TestClient -from prefect.testing.utilities import prefect_test_harness from infrahub import config from infrahub.core.constants import InfrahubKind @@ -31,12 +30,6 @@ def admin_headers(): return {"X-INFRAHUB-KEY": "admin-security"} -@pytest.fixture(autouse=True, scope="session") -def prefect_test_fixture(): - with prefect_test_harness(): - yield - - @pytest.fixture def rpc_bus(helper): original = config.OVERRIDE.message_bus diff --git a/backend/tests/unit/conftest.py b/backend/tests/unit/conftest.py index 8fe58da314..a64e210ee7 100644 --- a/backend/tests/unit/conftest.py +++ b/backend/tests/unit/conftest.py @@ -8,6 +8,8 @@ from infrahub_sdk import Config, InfrahubClient from infrahub_sdk.uuidt import UUIDT from neo4j._codec.hydration.v1 import HydrationHandler +from prefect.logging.loggers import disable_run_logger +from prefect.testing.utilities import prefect_test_harness from pytest_httpx import HTTPXMock from infrahub import config @@ -87,6 +89,18 @@ def neo4j_factory(): return hydration_scope._graph_hydrator +@pytest.fixture(scope="session", autouse=True) +def prefect_test_fixture(): + with prefect_test_harness(): + yield + + +@pytest.fixture(scope="session") +def prefect_test(prefect_test_fixture): + with disable_run_logger(): + yield + + @pytest.fixture def git_sources_dir(default_branch, tmp_path: Path) -> Path: source_dir = tmp_path / "sources" diff --git a/backend/tests/unit/graphql/queries/test_task.py b/backend/tests/unit/graphql/queries/test_task.py index 4fee3ebbee..f74c03336f 100644 --- a/backend/tests/unit/graphql/queries/test_task.py +++ b/backend/tests/unit/graphql/queries/test_task.py @@ -7,7 +7,6 @@ from prefect.artifacts import ArtifactRequest from prefect.client.orchestration import PrefectClient, get_client from prefect.states import State -from prefect.testing.utilities import prefect_test_harness from infrahub.core.branch import Branch from infrahub.core.constants import InfrahubKind @@ -86,12 +85,6 @@ """ -@pytest.fixture -def local_prefect_server(): - with prefect_test_harness(): - yield - - @pytest.fixture async def tag_blue(db: InfrahubDatabase, default_branch: Branch) -> Node: blue = await Node.init(db=db, schema=InfrahubKind.TAG, branch=default_branch) @@ -125,7 +118,7 @@ async def account_bill(db: InfrahubDatabase, default_branch: Branch) -> Node: @pytest.fixture -async def prefect_client(local_prefect_server): +async def prefect_client(prefect_test_fixture): async with get_client(sync_client=False) as client: yield client @@ -479,6 +472,7 @@ async def test_task_query_filter_node( tag_red, account_bob, account_bill, + delete_flow_runs, flow_runs_data, ): result = await run_query( @@ -585,6 +579,7 @@ async def test_task_query_both( register_core_models_schema: None, tag_blue, account_bob, + delete_flow_runs, flow_runs_data, ): result = await run_query( @@ -615,6 +610,7 @@ async def test_task_branch_status( register_core_models_schema: None, tag_blue, account_bob, + delete_flow_runs, flow_runs_data, ): QUERY = """ @@ -713,6 +709,7 @@ async def test_task_no_count( register_core_models_schema: None, tag_blue, account_bob, + delete_flow_runs, flow_runs_data, ): QUERY = """ @@ -755,6 +752,7 @@ async def test_task_only_count( register_core_models_schema: None, tag_blue, account_bob, + delete_flow_runs, flow_runs_data, ): QUERY = """ diff --git a/development/docker-compose-deps-nats.yml b/development/docker-compose-deps-nats.yml index 988d524124..eec2350ba3 100644 --- a/development/docker-compose-deps-nats.yml +++ b/development/docker-compose-deps-nats.yml @@ -24,7 +24,7 @@ services: start_period: 10s task-manager: profiles: [demo, dev] - image: "${TASK_MANAGER_DOCKER_IMAGE:-prefecthq/prefect:3.0.11-python3.12}" + image: "${TASK_MANAGER_DOCKER_IMAGE:-prefecthq/prefect:3.1.13-python3.12}" command: prefect server start --host 0.0.0.0 --ui environment: PREFECT_API_DATABASE_CONNECTION_URL: postgresql+asyncpg://postgres:postgres@task-manager-db:5432/prefect diff --git a/development/docker-compose-deps.yml b/development/docker-compose-deps.yml index 82fc383903..5dc4b97971 100644 --- a/development/docker-compose-deps.yml +++ b/development/docker-compose-deps.yml @@ -23,7 +23,7 @@ services: retries: 3 task-manager: profiles: [demo, dev] - image: "${TASK_MANAGER_DOCKER_IMAGE:-prefecthq/prefect:3.0.11-python3.12}" + image: "${TASK_MANAGER_DOCKER_IMAGE:-prefecthq/prefect:3.1.13-python3.12}" command: prefect server start --host 0.0.0.0 --ui environment: PREFECT_API_DATABASE_CONNECTION_URL: postgresql+asyncpg://postgres:postgres@task-manager-db:5432/prefect diff --git a/docker-compose.yml b/docker-compose.yml index ea2aa9e15b..dded32d0b4 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -174,7 +174,7 @@ services: - 6362:6362 task-manager: - image: "${TASK_MANAGER_DOCKER_IMAGE:-prefecthq/prefect:3.0.11-python3.12}" + image: "${TASK_MANAGER_DOCKER_IMAGE:-prefecthq/prefect:3.1.13-python3.12}" command: prefect server start --host 0.0.0.0 --ui restart: unless-stopped depends_on: diff --git a/helm/values.yaml b/helm/values.yaml index 7b85b04707..1701a7ad0a 100644 --- a/helm/values.yaml +++ b/helm/values.yaml @@ -173,7 +173,7 @@ prefect-server: enabled: true server: image: - prefectTag: 3.0.11-python3.12-kubernetes + prefectTag: 3.1.13-python3.12-kubernetes env: - name: PREFECT_UI_SERVE_BASE value: / diff --git a/poetry.lock b/poetry.lock index ded6ef56fd..8b03d61e40 100644 --- a/poetry.lock +++ b/poetry.lock @@ -3522,13 +3522,13 @@ virtualenv = ">=20.10.0" [[package]] name = "prefect" -version = "3.0.11" +version = "3.1.13" description = "Workflow orchestration and management." optional = false python-versions = ">=3.9" files = [ - {file = "prefect-3.0.11-py3-none-any.whl", hash = "sha256:09d27c94eb6f745767b5811e4900b9120212a75a44e53bb8186aea0bffa1120f"}, - {file = "prefect-3.0.11.tar.gz", hash = "sha256:9969cabe8d10080441c1f6e80ce3019ff0f998eafe2da26fd214cf76eaf86e96"}, + {file = "prefect-3.1.13-py3-none-any.whl", hash = "sha256:06b7cf6e6a0f2c8a0db03c45af080bbd44db63400a9722f6212696c8963d54fe"}, + {file = "prefect-3.1.13.tar.gz", hash = "sha256:3b5651d230dcd89cd1b45005394197f7d9f055b0707af1a6235466e44a1be1a9"}, ] [package.dependencies] @@ -3542,7 +3542,7 @@ cachetools = ">=5.3,<6.0" click = ">=8.0,<8.2" cloudpickle = ">=2.0,<4.0" coolname = ">=1.0.4,<3.0.0" -croniter = ">=1.0.12,<4.0.0" +croniter = ">=1.0.12,<7.0.0" cryptography = ">=36.0.1" dateparser = ">=1.1.1,<2.0.0" docker = ">=4.0,<8.0" @@ -3558,17 +3558,19 @@ jinja2 = ">=3.0.0,<4.0.0" jinja2-humanize-extension = ">=0.4.0" jsonpatch = ">=1.32,<2.0" jsonschema = ">=4.0.0,<5.0.0" +opentelemetry-api = ">=1.27.0,<2.0.0" orjson = ">=3.7,<4.0" packaging = ">=21.3,<24.3" pathspec = ">=0.8.0" pendulum = ">=3.0.0,<4" prometheus-client = ">=0.20.0" -pydantic = ">=2.7,<3.0.0" -pydantic-core = ">=2.12.0,<3.0.0" -pydantic-extra-types = ">=2.8.2,<3.0.0" -pydantic-settings = ">2.2.1" -python-dateutil = ">=2.8.2,<3.0.0" +pydantic = ">=2.9,<2.10.0 || >2.10.0,<3.0.0" +pydantic_core = ">=2.12.0,<3.0.0" +pydantic_extra_types = ">=2.8.2,<3.0.0" +pydantic_settings = ">2.2.1" +python_dateutil = ">=2.8.2,<3.0.0" python-slugify = ">=5.0,<9.0" +python-socks = {version = ">=2.5.3,<3.0", extras = ["asyncio"]} pytz = ">=2021.1,<2025" pyyaml = ">=5.4.1,<7.0.0" readchar = ">=4.0.0,<5.0.0" @@ -3578,32 +3580,33 @@ rich = ">=11.0,<14.0" sniffio = ">=1.3.0,<2.0.0" sqlalchemy = {version = ">=2.0,<3.0.0", extras = ["asyncio"]} toml = ">=0.10.0" -typer = ">=0.12.0,<0.12.2 || >0.12.2,<0.13.0" -typing-extensions = ">=4.5.0,<5.0.0" +typer = ">=0.12.0,<0.12.2 || >0.12.2,<0.14.0" +typing_extensions = ">=4.5.0,<5.0.0" ujson = ">=5.8.0,<6.0.0" uvicorn = ">=0.14.0,<0.29.0 || >0.29.0" websockets = ">=10.4,<14.0" [package.extras] -aws = ["prefect-aws (>=0.5.0rc1)"] -azure = ["prefect-azure (>=0.4.0rc1)"] -bitbucket = ["prefect-bitbucket (>=0.3.0rc1)"] -dask = ["prefect-dask (>=0.3.0rc1)"] -databricks = ["prefect-databricks (>=0.3.0rc1)"] -dbt = ["prefect-dbt (>=0.6.0rc1)"] -dev = ["cairosvg", "codespell (>=2.2.6)", "ipython", "jinja2", "mkdocs", "mkdocs-gen-files", "mkdocs-material", "mkdocstrings[python]", "moto (>=5)", "mypy (>=1.9.0)", "numpy", "pillow", "pluggy (>=1.4.0)", "pre-commit", "pytest (>=8.3)", "pytest-asyncio (>=0.24)", "pytest-benchmark", "pytest-codspeed", "pytest-cov", "pytest-env", "pytest-flakefinder", "pytest-timeout", "pytest-xdist (>=3.6.1)", "pyyaml", "redis (>=5.0.1)", "respx", "ruff", "setuptools", "types-PyYAML", "types-cachetools", "uv (>=0.4.5)", "vale", "vermin", "virtualenv", "watchfiles"] -docker = ["prefect-docker (>=0.6.0rc1)"] -email = ["prefect-email (>=0.4.0rc1)"] -gcp = ["prefect-gcp (>=0.6.0rc1)"] -github = ["prefect-github (>=0.3.0rc1)"] -gitlab = ["prefect-gitlab (>=0.3.0rc1)"] -kubernetes = ["prefect-kubernetes (>=0.4.0rc1)"] -ray = ["prefect-ray (>=0.4.0rc1)"] +aws = ["prefect-aws (>=0.5.0)"] +azure = ["prefect-azure (>=0.4.0)"] +bitbucket = ["prefect-bitbucket (>=0.3.0)"] +dask = ["prefect-dask (>=0.3.0)"] +databricks = ["prefect-databricks (>=0.3.0)"] +dbt = ["prefect-dbt (>=0.6.0)"] +dev = ["cairosvg", "codespell (>=2.2.6)", "ipython", "jinja2", "mkdocs", "mkdocs-gen-files", "mkdocs-material", "mkdocstrings[python]", "moto (>=5)", "mypy (>=1.9.0)", "numpy", "opentelemetry-distro (>=0.48b0,<1.0.0)", "opentelemetry-exporter-otlp (>=1.27.0,<2.0.0)", "opentelemetry-instrumentation (>=0.48b0,<1.0.0)", "opentelemetry-instrumentation-logging (>=0.48b0,<1.0.0)", "opentelemetry-test-utils (>=0.48b0,<1.0.0)", "pillow", "pluggy (>=1.4.0)", "pre-commit", "pytest (>=8.3)", "pytest-asyncio (>=0.24)", "pytest-benchmark", "pytest-cov", "pytest-env", "pytest-flakefinder", "pytest-mypy-plugins (>=3.2.0)", "pytest-timeout", "pytest-xdist (>=3.6.1)", "pyyaml", "redis (>=5.0.1)", "respx", "ruff", "setuptools", "types-PyYAML", "types-cachetools", "uv (>=0.4.5)", "vale", "vermin", "virtualenv", "watchfiles"] +docker = ["prefect-docker (>=0.6.0)"] +email = ["prefect-email (>=0.4.0)"] +gcp = ["prefect-gcp (>=0.6.0)"] +github = ["prefect-github (>=0.3.0)"] +gitlab = ["prefect-gitlab (>=0.3.0)"] +kubernetes = ["prefect-kubernetes (>=0.4.0)"] +otel = ["opentelemetry-distro (>=0.48b0,<1.0.0)", "opentelemetry-exporter-otlp (>=1.27.0,<2.0.0)", "opentelemetry-instrumentation (>=0.48b0,<1.0.0)", "opentelemetry-instrumentation-logging (>=0.48b0,<1.0.0)"] +ray = ["prefect-ray (>=0.4.0)"] redis = ["prefect-redis (>=0.2.0)"] -shell = ["prefect-shell (>=0.3.0rc1)"] -slack = ["prefect-slack (>=0.3.0rc1)"] -snowflake = ["prefect-snowflake (>=0.28.0rc1)"] -sqlalchemy = ["prefect-sqlalchemy (>=0.5.0rc1)"] +shell = ["prefect-shell (>=0.3.0)"] +slack = ["prefect-slack (>=0.3.0)"] +snowflake = ["prefect-snowflake (>=0.28.0)"] +sqlalchemy = ["prefect-sqlalchemy (>=0.5.0)"] [[package]] name = "prometheus-client" @@ -4227,6 +4230,26 @@ text-unidecode = ">=1.3" [package.extras] unidecode = ["Unidecode (>=1.1.1)"] +[[package]] +name = "python-socks" +version = "2.6.1" +description = "Proxy (SOCKS4, SOCKS5, HTTP CONNECT) client for Python" +optional = false +python-versions = ">=3.8.0" +files = [ + {file = "python_socks-2.6.1-py3-none-any.whl", hash = "sha256:7fe9324c1834bb9cdce46ddb6721cf42a05a00da5f1fb87f8507ac0c6f305e1d"}, + {file = "python_socks-2.6.1.tar.gz", hash = "sha256:9743929aab6ffe0bab640ecfbbee7130af92408ad86e4aa2984789f742f3ec9e"}, +] + +[package.dependencies] +async-timeout = {version = ">=4.0", optional = true, markers = "python_version < \"3.11\" and extra == \"asyncio\""} + +[package.extras] +anyio = ["anyio (>=3.3.4,<5.0.0)"] +asyncio = ["async-timeout (>=4.0)"] +curio = ["curio (>=1.4)"] +trio = ["trio (>=0.24)"] + [[package]] name = "pytz" version = "2024.2" @@ -5850,4 +5873,4 @@ type = ["pytest-mypy"] [metadata] lock-version = "2.0" python-versions = "^3.10, < 3.13" -content-hash = "ac690383b6c2a01aba769b8b1958d8f979063f1c519f4261138030903a93ba47" +content-hash = "ccef4997d37e764d996288f129a8470f2318b8f294433fadebafc25fba632aa7" diff --git a/pyproject.toml b/pyproject.toml index dbad96b929..5947c95d9c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -35,7 +35,7 @@ boto3 = "1.34.129" email-validator = "~2.1" redis = { version = "^5.0.0", extras = ["hiredis"] } typer = "0.12.5" -prefect = "3.0.11" +prefect = "3.1.13" ujson = "^5" Jinja2 = "^3" gitpython = "^3" diff --git a/python_testcontainers/infrahub_testcontainers/docker-compose.test.yml b/python_testcontainers/infrahub_testcontainers/docker-compose.test.yml index 0874123c44..5f4b8ad923 100644 --- a/python_testcontainers/infrahub_testcontainers/docker-compose.test.yml +++ b/python_testcontainers/infrahub_testcontainers/docker-compose.test.yml @@ -49,7 +49,7 @@ services: - ${INFRAHUB_TESTING_DATABASE_UI_PORT:-0}:7474 task-manager: - image: "${TASK_MANAGER_DOCKER_IMAGE:-prefecthq/prefect:3.0.11-python3.12}" + image: "${TASK_MANAGER_DOCKER_IMAGE:-prefecthq/prefect:3.1.13-python3.12}" command: prefect server start --host 0.0.0.0 --ui depends_on: task-manager-db: diff --git a/tasks/shared.py b/tasks/shared.py index 45ea18b7e9..4adaf3e172 100644 --- a/tasks/shared.py +++ b/tasks/shared.py @@ -51,7 +51,7 @@ class Namespace(str, Enum): "redis:7.2.4" if not INFRAHUB_USE_NATS else "nats:2.10.14-alpine", ) -TASK_MANAGER_DOCKER_IMAGE = os.getenv("TASK_MANAGER_DOCKER_IMAGE", "prefecthq/prefect:3.0.11-python3.12") +TASK_MANAGER_DOCKER_IMAGE = os.getenv("TASK_MANAGER_DOCKER_IMAGE", "prefecthq/prefect:3.1.13-python3.12") here = Path(__file__).parent.resolve() TOP_DIRECTORY_NAME = here.parent.name From 68a970e675220f6a02143b0ff4e91f5d77ae5207 Mon Sep 17 00:00:00 2001 From: Fatih Acar Date: Tue, 21 Jan 2025 21:39:59 +0100 Subject: [PATCH 2/6] fix(backend, tests): increase sleep time during rmq test In some cases, when the CI runner is overwhelmed, 0.1s may not be enough to successfully send a message to rabbitmq. Increase it to 1 second to be consistent with other tests. Signed-off-by: Fatih Acar --- .../services/adapters/message_bus/test_rabbitmq.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/backend/tests/integration/services/adapters/message_bus/test_rabbitmq.py b/backend/tests/integration/services/adapters/message_bus/test_rabbitmq.py index 34f6832de8..11066efc9c 100644 --- a/backend/tests/integration/services/adapters/message_bus/test_rabbitmq.py +++ b/backend/tests/integration/services/adapters/message_bus/test_rabbitmq.py @@ -384,7 +384,7 @@ async def test_rabbitmq_callback(rabbitmq_api: RabbitMQManager, fake_log: FakeLo await queue.consume(bus.on_callback, no_ack=True) await service.message_bus.send(message=messages.SendEchoRequest(message="Hello there")) - await asyncio.sleep(delay=0.1) + await asyncio.sleep(delay=1) assert "Received message: Hello there" in fake_log.info_logs await service.shutdown() @@ -402,7 +402,7 @@ async def test_rabbitmq_callback_with_invalid_routing_key(rabbitmq_api: RabbitMQ await queue.consume(bus.on_callback, no_ack=True) await bus.exchange.publish(Message(body="Completely invalid".encode()), routing_key="event.branch.invalid") - await asyncio.sleep(delay=0.1) + await asyncio.sleep(delay=1) assert "Invalid message received" in fake_log.error_logs await service.shutdown() From fdf90d16eeddf9d0083eac5eb929772cf04358af Mon Sep 17 00:00:00 2001 From: Fatih Acar Date: Tue, 21 Jan 2025 21:42:10 +0100 Subject: [PATCH 3/6] fix(backend, tests): increase prefect startup timeout 30 second may not be enough when using Prefect 3.1 which seems to add a little overhead to 3.0. Prefect usually took 20s-25s to start and a small load spike on the CI runner can fail the tests. Signed-off-by: Fatih Acar --- backend/tests/functional/conftest.py | 2 +- backend/tests/integration/conftest.py | 2 +- backend/tests/unit/conftest.py | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/backend/tests/functional/conftest.py b/backend/tests/functional/conftest.py index 4774fb4b70..c273d95595 100644 --- a/backend/tests/functional/conftest.py +++ b/backend/tests/functional/conftest.py @@ -5,7 +5,7 @@ @pytest.fixture(scope="session", autouse=True) def prefect_test_fixture(): - with prefect_test_harness(): + with prefect_test_harness(server_startup_timeout=60): yield diff --git a/backend/tests/integration/conftest.py b/backend/tests/integration/conftest.py index f64a39ce7b..b4f39e42c5 100644 --- a/backend/tests/integration/conftest.py +++ b/backend/tests/integration/conftest.py @@ -127,7 +127,7 @@ def git_repo_car_dealership(git_sources_dir: Path) -> FileRepo: @pytest.fixture(scope="session", autouse=True) def prefect_test_fixture(): - with prefect_test_harness(): + with prefect_test_harness(server_startup_timeout=60): yield diff --git a/backend/tests/unit/conftest.py b/backend/tests/unit/conftest.py index a64e210ee7..a97535037f 100644 --- a/backend/tests/unit/conftest.py +++ b/backend/tests/unit/conftest.py @@ -91,7 +91,7 @@ def neo4j_factory(): @pytest.fixture(scope="session", autouse=True) def prefect_test_fixture(): - with prefect_test_harness(): + with prefect_test_harness(server_startup_timeout=60): yield From 42c9eb0827500058659d06ff0de35f09ea37894a Mon Sep 17 00:00:00 2001 From: Fatih Acar Date: Tue, 21 Jan 2025 22:17:13 +0100 Subject: [PATCH 4/6] feat(backend): enable tracing for task workers This was previously enabled on git agents but got removed during the transition to task workers. Prefect 3.1 also adds instrumentation to workflow runs. Signed-off-by: Fatih Acar --- backend/infrahub/workers/infrahub_async.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/backend/infrahub/workers/infrahub_async.py b/backend/infrahub/workers/infrahub_async.py index 4d3a4e6f42..6c703205e2 100644 --- a/backend/infrahub/workers/infrahub_async.py +++ b/backend/infrahub/workers/infrahub_async.py @@ -32,6 +32,7 @@ from infrahub.services.adapters.workflow import InfrahubWorkflow from infrahub.services.adapters.workflow.local import WorkflowLocalExecution from infrahub.services.adapters.workflow.worker import WorkflowWorkerExecution +from infrahub.trace import configure_trace from infrahub.workers.utils import inject_service_parameter, load_flow_function from infrahub.workflows.models import TASK_RESULT_STORAGE_NAME @@ -90,6 +91,16 @@ async def setup( self._init_logger() + # Initialize trace + if config.SETTINGS.trace.enable: + configure_trace( + service="infrahub-task-worker", + version=infrahub_version, + exporter_type=config.SETTINGS.trace.exporter_type, + exporter_endpoint=config.SETTINGS.trace.exporter_endpoint, + exporter_protocol=config.SETTINGS.trace.exporter_protocol, + ) + # Start metric endpoint if metric_port is None or metric_port != 0: metric_port = metric_port or int(os.environ.get("INFRAHUB_METRICS_PORT", 8000)) From 3f9d2733b687789a94ffe052c38b9809775d8425 Mon Sep 17 00:00:00 2001 From: Fatih Acar Date: Wed, 22 Jan 2025 12:05:36 +0100 Subject: [PATCH 5/6] fix(ci): use old Prefect version when testing demo This is a temporary fix until we use the Infrahub image to run prefect server. Signed-off-by: Fatih Acar --- .github/workflows/ci.yml | 4 +++- .github/workflows/version-upgrade.yml | 5 ++++- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 026f9c3e06..43ae2882e8 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -929,7 +929,9 @@ jobs: run: echo JOB_NAME="$GITHUB_JOB" >> $GITHUB_ENV - name: "Set environment variables" - run: echo INFRAHUB_BUILD_NAME=infrahub-${{ runner.name }} >> $GITHUB_ENV + run: | + echo INFRAHUB_BUILD_NAME=infrahub-${{ runner.name }} >> $GITHUB_ENV + echo TASK_MANAGER_DOCKER_IMAGE="prefecthq/prefect:3.0.11-python3.12" >> $GITHUB_ENV - name: "Clear docker environment" run: docker compose -p $INFRAHUB_BUILD_NAME down -v --remove-orphans --rmi local diff --git a/.github/workflows/version-upgrade.yml b/.github/workflows/version-upgrade.yml index e386f1103b..33a7d51c12 100644 --- a/.github/workflows/version-upgrade.yml +++ b/.github/workflows/version-upgrade.yml @@ -60,6 +60,7 @@ jobs: run: | echo INFRAHUB_BUILD_NAME=infrahub-${{ runner.name }} >> $GITHUB_ENV echo INFRAHUB_IMAGE_VER=${{ matrix.source_version }} >> $GITHUB_ENV + echo TASK_MANAGER_DOCKER_IMAGE="prefecthq/prefect:3.0.11-python3.12" >> $GITHUB_ENV - name: "Clear docker environment" run: docker compose -p $INFRAHUB_BUILD_NAME down -v --remove-orphans --rmi local - name: "Store start time" @@ -80,7 +81,9 @@ jobs: # Build the local version and run the migrations - name: "Set environment variables" - run: echo INFRAHUB_IMAGE_VER=local-${{ runner.name }}-${{ github.sha }} >> $GITHUB_ENV + run: | + echo INFRAHUB_IMAGE_VER=local-${{ runner.name }}-${{ github.sha }} >> $GITHUB_ENV + echo TASK_MANAGER_DOCKER_IMAGE="" >> $GITHUB_ENV - name: Build Demo run: invoke dev.build From ec01f33ac1813bbea2f29e87537f2c7143fefbf6 Mon Sep 17 00:00:00 2001 From: Damien Garros Date: Thu, 23 Jan 2025 06:27:51 +0100 Subject: [PATCH 6/6] Set cache_policy for setup_triggers task --- backend/infrahub/trigger/tasks.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/backend/infrahub/trigger/tasks.py b/backend/infrahub/trigger/tasks.py index 364a506ac1..fb808dcbbb 100644 --- a/backend/infrahub/trigger/tasks.py +++ b/backend/infrahub/trigger/tasks.py @@ -3,6 +3,7 @@ from infrahub_sdk.utils import compare_lists from prefect import get_run_logger, task from prefect.automations import AutomationCore +from prefect.cache_policies import NONE from prefect.client.orchestration import PrefectClient from prefect.client.schemas.filters import DeploymentFilter, DeploymentFilterName @@ -14,7 +15,7 @@ from uuid import UUID -@task(name="trigger-setup", task_run_name="Setup triggers in task-manager") # type: ignore[arg-type] +@task(name="trigger-setup", task_run_name="Setup triggers in task-manager", cache_policy=NONE) # type: ignore[arg-type] async def setup_triggers(client: PrefectClient) -> None: log = get_run_logger()