Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade Prefect to 3.1.13 #5518

Merged
merged 6 commits into from
Jan 23, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -929,7 +929,9 @@ jobs:
run: echo JOB_NAME="$GITHUB_JOB" >> $GITHUB_ENV

- name: "Set environment variables"
run: echo INFRAHUB_BUILD_NAME=infrahub-${{ runner.name }} >> $GITHUB_ENV
run: |
echo INFRAHUB_BUILD_NAME=infrahub-${{ runner.name }} >> $GITHUB_ENV
echo TASK_MANAGER_DOCKER_IMAGE="prefecthq/prefect:3.0.11-python3.12" >> $GITHUB_ENV
- name: "Clear docker environment"
run: docker compose -p $INFRAHUB_BUILD_NAME down -v --remove-orphans --rmi local

Expand Down
5 changes: 4 additions & 1 deletion .github/workflows/version-upgrade.yml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ jobs:
run: |
echo INFRAHUB_BUILD_NAME=infrahub-${{ runner.name }} >> $GITHUB_ENV
echo INFRAHUB_IMAGE_VER=${{ matrix.source_version }} >> $GITHUB_ENV
echo TASK_MANAGER_DOCKER_IMAGE="prefecthq/prefect:3.0.11-python3.12" >> $GITHUB_ENV
- name: "Clear docker environment"
run: docker compose -p $INFRAHUB_BUILD_NAME down -v --remove-orphans --rmi local
- name: "Store start time"
Expand All @@ -80,7 +81,9 @@ jobs:

# Build the local version and run the migrations
- name: "Set environment variables"
run: echo INFRAHUB_IMAGE_VER=local-${{ runner.name }}-${{ github.sha }} >> $GITHUB_ENV
run: |
echo INFRAHUB_IMAGE_VER=local-${{ runner.name }}-${{ github.sha }} >> $GITHUB_ENV
echo TASK_MANAGER_DOCKER_IMAGE="" >> $GITHUB_ENV

- name: Build Demo
run: invoke dev.build
Expand Down
2 changes: 1 addition & 1 deletion backend/infrahub/core/migrations/schema/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ async def schema_apply_migrations(message: SchemaApplyMigrationData, service: In
return error_messages


@task(
@task( # type: ignore[arg-type]
name="schema-path-migrate",
task_run_name="Migrate Schema Path {migration_name} on {branch.name}",
description="Apply a given migration to the database",
Expand Down
2 changes: 1 addition & 1 deletion backend/infrahub/core/validators/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ async def schema_validate_migrations(
return results


@task(
@task( # type: ignore[arg-type]
name="schema-path-validate",
task_run_name="Validate schema path {constraint_name} in {branch.name}",
description="Validate if a given migration is compatible with the existing data",
Expand Down
2 changes: 1 addition & 1 deletion backend/infrahub/generators/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ async def run_generator(model: RequestGeneratorRun, service: InfrahubServices) -
await generator_instance.update(do_full_update=True)


@task(name="generator-define-instance", task_run_name="Define Instance", cache_policy=NONE)
@task(name="generator-define-instance", task_run_name="Define Instance", cache_policy=NONE) # type: ignore[arg-type]
async def _define_instance(model: RequestGeneratorRun, service: InfrahubServices) -> CoreGeneratorInstance:
if model.generator_instance:
instance = await service.client.get(
Expand Down
28 changes: 14 additions & 14 deletions backend/infrahub/git/integrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ async def import_objects_from_files(
branch_name=infrahub_branch_name, commit=commit, config_file=config_file
)

await self.import_all_python_files(
await self.import_all_python_files( # type: ignore[call-overload]
branch_name=infrahub_branch_name, commit=commit, config_file=config_file
)
await self.import_jinja2_transforms(
Expand Down Expand Up @@ -229,7 +229,7 @@ async def _update_sync_status(self, branch_name: str, status: RepositorySyncStat
tracker="mutation-repository-update-admin-status",
)

@task(name="import-jinja2-tansforms", task_run_name="Import Jinja2 transform", cache_policy=NONE)
@task(name="import-jinja2-tansforms", task_run_name="Import Jinja2 transform", cache_policy=NONE) # type: ignore[arg-type]
async def import_jinja2_transforms(
self,
branch_name: str,
Expand Down Expand Up @@ -334,7 +334,7 @@ async def update_jinja2_transform(

await existing_transform.save()

@task(name="import-artifact-definitions", task_run_name="Import Artifact Definitions", cache_policy=NONE)
@task(name="import-artifact-definitions", task_run_name="Import Artifact Definitions", cache_policy=NONE) # type: ignore[arg-type]
async def import_artifact_definitions(
self,
branch_name: str,
Expand Down Expand Up @@ -435,7 +435,7 @@ async def update_artifact_definition(

await existing_artifact_definition.save()

@task(name="repository-get-config", task_run_name="get repository config", cache_policy=NONE)
@task(name="repository-get-config", task_run_name="get repository config", cache_policy=NONE) # type: ignore[arg-type]
async def get_repository_config(self, branch_name: str, commit: str) -> Optional[InfrahubRepositoryConfig]:
branch_wt = self.get_worktree(identifier=commit or branch_name)
log = get_run_logger()
Expand Down Expand Up @@ -464,7 +464,7 @@ async def get_repository_config(self, branch_name: str, commit: str) -> Optional
log.error(f"Unable to load the configuration file {config_file_name}, the format is not valid : {exc}")
return None

@task(name="import-schema-files", task_run_name="Import schema files", cache_policy=NONE)
@task(name="import-schema-files", task_run_name="Import schema files", cache_policy=NONE) # type: ignore[arg-type]
async def import_schema_files(self, branch_name: str, commit: str, config_file: InfrahubRepositoryConfig) -> None:
log = get_run_logger()
branch_wt = self.get_worktree(identifier=commit or branch_name)
Expand Down Expand Up @@ -536,7 +536,7 @@ async def import_schema_files(self, branch_name: str, commit: str, config_file:
for schema_file in schemas_data:
log.info(f"schema '{schema_file.identifier}' loaded successfully!")

@task(name="import-graphql-queries", task_run_name="Import GraphQL Queries", cache_policy=NONE)
@task(name="import-graphql-queries", task_run_name="Import GraphQL Queries", cache_policy=NONE) # type: ignore[arg-type]
async def import_all_graphql_query(
self, branch_name: str, commit: str, config_file: InfrahubRepositoryConfig
) -> None:
Expand Down Expand Up @@ -594,7 +594,7 @@ async def create_graphql_query(self, branch_name: str, name: str, query_string:
await obj.save()
return obj

@task(name="import-python-check-definitions", task_run_name="Import Python Check Definitions", cache_policy=NONE)
@task(name="import-python-check-definitions", task_run_name="Import Python Check Definitions", cache_policy=NONE) # type: ignore[arg-type]
async def import_python_check_definitions(
self, branch_name: str, commit: str, config_file: InfrahubRepositoryConfig
) -> None:
Expand Down Expand Up @@ -665,7 +665,7 @@ async def import_python_check_definitions(
log.info(f"CheckDefinition '{check_name!r}' not found locally, deleting")
await check_definition_in_graph[check_name].delete()

@task(name="import-generator-definitions", task_run_name="Import Generator Definitions", cache_policy=NONE)
@task(name="import-generator-definitions", task_run_name="Import Generator Definitions", cache_policy=NONE) # type: ignore[arg-type]
async def import_generator_definitions(
self, branch_name: str, commit: str, config_file: InfrahubRepositoryConfig
) -> None:
Expand Down Expand Up @@ -755,7 +755,7 @@ async def _generator_requires_update(
return True
return False

@task(name="import-python-transforms", task_run_name="Import Python Transforms", cache_policy=NONE)
@task(name="import-python-transforms", task_run_name="Import Python Transforms", cache_policy=NONE) # type: ignore[arg-type]
async def import_python_transforms(
self, branch_name: str, commit: str, config_file: InfrahubRepositoryConfig
) -> None:
Expand Down Expand Up @@ -826,7 +826,7 @@ async def import_python_transforms(
log.info(f"TransformPython {transform_name!r} not found locally, deleting")
await transform_definition_in_graph[transform_name].delete()

@task(name="check-definition-get", task_run_name="Get Check Definition", cache_policy=NONE)
@task(name="check-definition-get", task_run_name="Get Check Definition", cache_policy=NONE) # type: ignore[arg-type]
async def get_check_definition(
self,
branch_name: str,
Expand Down Expand Up @@ -866,7 +866,7 @@ async def get_check_definition(
raise
return checks

@task(name="python-transform-get", task_run_name="Get Python Transform", cache_policy=NONE)
@task(name="python-transform-get", task_run_name="Get Python Transform", cache_policy=NONE) # type: ignore[arg-type]
async def get_python_transforms(
self, branch_name: str, module: types.ModuleType, file_path: str, transform: InfrahubPythonTransformConfig
) -> list[TransformPythonInformation]:
Expand Down Expand Up @@ -1067,7 +1067,7 @@ async def import_all_python_files(
await self.import_python_transforms(branch_name=branch_name, commit=commit, config_file=config_file)
await self.import_generator_definitions(branch_name=branch_name, commit=commit, config_file=config_file)

@task(name="jinja2-template-render", task_run_name="Render Jinja2 template", cache_policy=NONE)
@task(name="jinja2-template-render", task_run_name="Render Jinja2 template", cache_policy=NONE) # type: ignore[arg-type]
async def render_jinja2_template(self, commit: str, location: str, data: dict) -> str:
log = get_run_logger()
commit_worktree = self.get_commit_worktree(commit=commit)
Expand All @@ -1083,7 +1083,7 @@ async def render_jinja2_template(self, commit: str, location: str, data: dict) -
log.error(str(exc), exc_info=True)
raise TransformError(repository_name=self.name, commit=commit, location=location, message=str(exc)) from exc

@task(name="python-check-execute", task_run_name="Execute Python Check", cache_policy=NONE)
@task(name="python-check-execute", task_run_name="Execute Python Check", cache_policy=NONE) # type: ignore[arg-type]
async def execute_python_check(
self,
branch_name: str,
Expand Down Expand Up @@ -1142,7 +1142,7 @@ async def execute_python_check(
repository_name=self.name, class_name=class_name, commit=commit, location=location, message=str(exc)
) from exc

@task(name="python-transform-execute", task_run_name="Execute Python Transform", cache_policy=NONE)
@task(name="python-transform-execute", task_run_name="Execute Python Transform", cache_policy=NONE) # type: ignore[arg-type]
async def execute_python_transform(
self, branch_name: str, commit: str, location: str, client: InfrahubClient, data: Optional[dict] = None
) -> Any:
Expand Down
12 changes: 6 additions & 6 deletions backend/infrahub/git/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ async def add_git_repository(model: GitRepositoryAdd, service: InfrahubServices)
default_branch_name=model.default_branch_name,
service=service,
)
await repo.import_objects_from_files(
await repo.import_objects_from_files( # type: ignore[call-overload]
infrahub_branch_name=model.infrahub_branch_name, git_branch_name=model.default_branch_name
)
if model.internal_status == RepositoryInternalStatus.ACTIVE.value:
Expand Down Expand Up @@ -84,7 +84,7 @@ async def add_git_repository_read_only(model: GitRepositoryAddReadOnly, service:
infrahub_branch_name=model.infrahub_branch_name,
service=service,
)
await repo.import_objects_from_files(infrahub_branch_name=model.infrahub_branch_name)
await repo.import_objects_from_files(infrahub_branch_name=model.infrahub_branch_name) # type: ignore[call-overload]
if model.internal_status == RepositoryInternalStatus.ACTIVE.value:
await repo.sync_from_remote()

Expand Down Expand Up @@ -167,7 +167,7 @@ async def sync_remote_repositories(service: InfrahubServices) -> None:
internal_status=active_internal_status,
default_branch_name=repository_data.repository.default_branch.value,
)
await repo.import_objects_from_files(
await repo.import_objects_from_files( # type: ignore[call-overload]
git_branch_name=registry.default_branch, infrahub_branch_name=infrahub_branch
)
except RepositoryError as exc:
Expand All @@ -191,7 +191,7 @@ async def sync_remote_repositories(service: InfrahubServices) -> None:
log.info(exc.message)


@task(
@task( # type: ignore[arg-type]
name="git-branch-create",
task_run_name="Create branch '{branch}' in repository {repository_name}",
cache_policy=NONE,
Expand Down Expand Up @@ -384,7 +384,7 @@ async def pull_read_only(model: GitRepositoryPullReadOnly, service: InfrahubServ
service=service,
)

await repo.import_objects_from_files(infrahub_branch_name=model.infrahub_branch_name, commit=model.commit)
await repo.import_objects_from_files(infrahub_branch_name=model.infrahub_branch_name, commit=model.commit) # type: ignore[call-overload]
await repo.sync_from_remote(commit=model.commit)

# Tell workers to fetch to stay in sync
Expand Down Expand Up @@ -454,7 +454,7 @@ async def import_objects_from_git_repository(model: GitRepositoryImportObjects,
repository_kind=model.repository_kind,
commit=model.commit,
)
await repo.import_objects_from_files(infrahub_branch_name=model.infrahub_branch_name, commit=model.commit)
await repo.import_objects_from_files(infrahub_branch_name=model.infrahub_branch_name, commit=model.commit) # type: ignore[call-overload]


@flow(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -516,7 +516,7 @@ async def _get_proposed_change_repositories(
return _parse_proposed_change_repositories(message=message, source=source_all, destination=destination_all)


@task(name="proposed-change-validate-repository-conflicts", task_run_name="Validate conflicts on repository")
@task(name="proposed-change-validate-repository-conflicts", task_run_name="Validate conflicts on repository") # type: ignore[arg-type]
async def _validate_repository_merge_conflicts(
repositories: list[ProposedChangeRepository], service: InfrahubServices
) -> bool:
Expand Down
2 changes: 1 addition & 1 deletion backend/infrahub/proposed_change/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ async def cancel_proposed_changes_branch(branch_name: str, service: InfrahubServ
await cancel_proposed_change(proposed_change=proposed_change, service=service)


@task(name="Cancel a propose change", description="Cancel a propose change", cache_policy=NONE)
@task(name="Cancel a propose change", description="Cancel a propose change", cache_policy=NONE) # type: ignore[arg-type]
async def cancel_proposed_change(proposed_change: CoreProposedChange, service: InfrahubServices) -> None:
await add_tags(nodes=[proposed_change.id])
log = get_run_logger()
Expand Down
2 changes: 1 addition & 1 deletion backend/infrahub/task_manager/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

LOG_LEVEL_MAPPING = {10: "debug", 20: "info", 30: "warning", 40: "error", 50: "critical"}

CONCLUSION_STATE_MAPPING = {
CONCLUSION_STATE_MAPPING: dict[str, TaskConclusion] = {
"Scheduled": TaskConclusion.UNKNOWN,
"Pending": TaskConclusion.UNKNOWN,
"Running": TaskConclusion.UNKNOWN,
Expand Down
2 changes: 1 addition & 1 deletion backend/infrahub/task_manager/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ async def query(
"node": {
"title": flow.name,
"conclusion": CONCLUSION_STATE_MAPPING.get(
flow.state_name, TaskConclusion.UNKNOWN
str(flow.state_name), TaskConclusion.UNKNOWN
).value,
"state": flow.state_type,
"progress": progress_flow.data.get(flow.id, None),
Expand Down
2 changes: 1 addition & 1 deletion backend/infrahub/tasks/artifact.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from infrahub.services import InfrahubServices


@task(name="define-artifact", task_run_name="Define Artifact", cache_policy=NONE)
@task(name="define-artifact", task_run_name="Define Artifact", cache_policy=NONE) # type: ignore[arg-type]
async def define_artifact(
message: Union[messages.CheckArtifactCreate, RequestArtifactGenerate], service: InfrahubServices
) -> InfrahubNode:
Expand Down
10 changes: 5 additions & 5 deletions backend/infrahub/tasks/telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
TELEMETRY_VERSION: str = "20240524"


@task(name="telemetry-gather-db", task_run_name="Gather Database Information", cache_policy=NONE)
@task(name="telemetry-gather-db", task_run_name="Gather Database Information", cache_policy=NONE) # type: ignore[arg-type]
async def gather_database_information(service: InfrahubServices, branch: Branch) -> dict:
async with service.database.start_session() as db:
data: dict[str, Any] = {
Expand All @@ -37,7 +37,7 @@ async def gather_database_information(service: InfrahubServices, branch: Branch)
return data


@task(name="telemetry-schema-information", task_run_name="Gather Schema Information", cache_policy=NONE)
@task(name="telemetry-schema-information", task_run_name="Gather Schema Information", cache_policy=NONE) # type: ignore[arg-type]
async def gather_schema_information(service: InfrahubServices, branch: Branch) -> dict:
data: dict[str, Any] = {}
main_schema = registry.schema.get_schema_branch(name=branch.name)
Expand All @@ -48,7 +48,7 @@ async def gather_schema_information(service: InfrahubServices, branch: Branch) -
return data


@task(name="telemetry-feature-information", task_run_name="Gather Feature Information", cache_policy=NONE)
@task(name="telemetry-feature-information", task_run_name="Gather Feature Information", cache_policy=NONE) # type: ignore[arg-type]
async def gather_feature_information(service: InfrahubServices, branch: Branch) -> dict:
async with service.database.start_session() as db:
data = {}
Expand All @@ -67,7 +67,7 @@ async def gather_feature_information(service: InfrahubServices, branch: Branch)
return data


@task(name="telemetry-gather-data", task_run_name="Gather Anonynous Data", cache_policy=NONE)
@task(name="telemetry-gather-data", task_run_name="Gather Anonynous Data", cache_policy=NONE) # type: ignore[arg-type]
async def gather_anonymous_telemetry_data(service: InfrahubServices) -> dict:
start_time = time.time()

Expand Down Expand Up @@ -97,7 +97,7 @@ async def gather_anonymous_telemetry_data(service: InfrahubServices) -> dict:
return data


@task(name="telemetry-post-data", task_run_name="Upload data", retries=5, cache_policy=NONE)
@task(name="telemetry-post-data", task_run_name="Upload data", retries=5, cache_policy=NONE) # type: ignore[arg-type]
async def post_telemetry_data(service: InfrahubServices, url: str, payload: dict[str, Any]) -> None:
"""Send the telemetry data to the specified URL, using HTTP POST."""
response = await service.http.post(url=url, json=payload)
Expand Down
3 changes: 2 additions & 1 deletion backend/infrahub/trigger/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from infrahub_sdk.utils import compare_lists
from prefect import get_run_logger, task
from prefect.automations import AutomationCore
from prefect.cache_policies import NONE
from prefect.client.orchestration import PrefectClient
from prefect.client.schemas.filters import DeploymentFilter, DeploymentFilterName

Expand All @@ -14,7 +15,7 @@
from uuid import UUID


@task(name="trigger-setup", task_run_name="Setup triggers in task-manager")
@task(name="trigger-setup", task_run_name="Setup triggers in task-manager", cache_policy=NONE) # type: ignore[arg-type]
async def setup_triggers(client: PrefectClient) -> None:
log = get_run_logger()

Expand Down
11 changes: 11 additions & 0 deletions backend/infrahub/workers/infrahub_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
from infrahub.services.adapters.workflow import InfrahubWorkflow
from infrahub.services.adapters.workflow.local import WorkflowLocalExecution
from infrahub.services.adapters.workflow.worker import WorkflowWorkerExecution
from infrahub.trace import configure_trace
from infrahub.workers.utils import inject_service_parameter, load_flow_function
from infrahub.workflows.models import TASK_RESULT_STORAGE_NAME

Expand Down Expand Up @@ -90,6 +91,16 @@ async def setup(

self._init_logger()

# Initialize trace
if config.SETTINGS.trace.enable:
configure_trace(
service="infrahub-task-worker",
version=infrahub_version,
exporter_type=config.SETTINGS.trace.exporter_type,
exporter_endpoint=config.SETTINGS.trace.exporter_endpoint,
exporter_protocol=config.SETTINGS.trace.exporter_protocol,
)

# Start metric endpoint
if metric_port is None or metric_port != 0:
metric_port = metric_port or int(os.environ.get("INFRAHUB_METRICS_PORT", 8000))
Expand Down
Loading
Loading