Skip to content

Commit

Permalink
Merge pull request #5518 from opsmill/dga-20250119-prefect-3.1.13
Browse files Browse the repository at this point in the history
Upgrade Prefect to 3.1.13
  • Loading branch information
dgarros authored Jan 23, 2025
2 parents c668965 + ec01f33 commit 900d523
Show file tree
Hide file tree
Showing 33 changed files with 170 additions and 117 deletions.
4 changes: 3 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -929,7 +929,9 @@ jobs:
run: echo JOB_NAME="$GITHUB_JOB" >> $GITHUB_ENV

- name: "Set environment variables"
run: echo INFRAHUB_BUILD_NAME=infrahub-${{ runner.name }} >> $GITHUB_ENV
run: |
echo INFRAHUB_BUILD_NAME=infrahub-${{ runner.name }} >> $GITHUB_ENV
echo TASK_MANAGER_DOCKER_IMAGE="prefecthq/prefect:3.0.11-python3.12" >> $GITHUB_ENV
- name: "Clear docker environment"
run: docker compose -p $INFRAHUB_BUILD_NAME down -v --remove-orphans --rmi local

Expand Down
5 changes: 4 additions & 1 deletion .github/workflows/version-upgrade.yml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ jobs:
run: |
echo INFRAHUB_BUILD_NAME=infrahub-${{ runner.name }} >> $GITHUB_ENV
echo INFRAHUB_IMAGE_VER=${{ matrix.source_version }} >> $GITHUB_ENV
echo TASK_MANAGER_DOCKER_IMAGE="prefecthq/prefect:3.0.11-python3.12" >> $GITHUB_ENV
- name: "Clear docker environment"
run: docker compose -p $INFRAHUB_BUILD_NAME down -v --remove-orphans --rmi local
- name: "Store start time"
Expand All @@ -80,7 +81,9 @@ jobs:

# Build the local version and run the migrations
- name: "Set environment variables"
run: echo INFRAHUB_IMAGE_VER=local-${{ runner.name }}-${{ github.sha }} >> $GITHUB_ENV
run: |
echo INFRAHUB_IMAGE_VER=local-${{ runner.name }}-${{ github.sha }} >> $GITHUB_ENV
echo TASK_MANAGER_DOCKER_IMAGE="" >> $GITHUB_ENV
- name: Build Demo
run: invoke dev.build
Expand Down
2 changes: 1 addition & 1 deletion backend/infrahub/core/migrations/schema/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ async def schema_apply_migrations(message: SchemaApplyMigrationData, service: In
return error_messages


@task(
@task( # type: ignore[arg-type]
name="schema-path-migrate",
task_run_name="Migrate Schema Path {migration_name} on {branch.name}",
description="Apply a given migration to the database",
Expand Down
2 changes: 1 addition & 1 deletion backend/infrahub/core/validators/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ async def schema_validate_migrations(
return results


@task(
@task( # type: ignore[arg-type]
name="schema-path-validate",
task_run_name="Validate schema path {constraint_name} in {branch.name}",
description="Validate if a given migration is compatible with the existing data",
Expand Down
2 changes: 1 addition & 1 deletion backend/infrahub/generators/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ async def run_generator(model: RequestGeneratorRun, service: InfrahubServices) -
await generator_instance.update(do_full_update=True)


@task(name="generator-define-instance", task_run_name="Define Instance", cache_policy=NONE)
@task(name="generator-define-instance", task_run_name="Define Instance", cache_policy=NONE) # type: ignore[arg-type]
async def _define_instance(model: RequestGeneratorRun, service: InfrahubServices) -> CoreGeneratorInstance:
if model.generator_instance:
instance = await service.client.get(
Expand Down
28 changes: 14 additions & 14 deletions backend/infrahub/git/integrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ async def import_objects_from_files(
branch_name=infrahub_branch_name, commit=commit, config_file=config_file
)

await self.import_all_python_files(
await self.import_all_python_files( # type: ignore[call-overload]
branch_name=infrahub_branch_name, commit=commit, config_file=config_file
)
await self.import_jinja2_transforms(
Expand Down Expand Up @@ -229,7 +229,7 @@ async def _update_sync_status(self, branch_name: str, status: RepositorySyncStat
tracker="mutation-repository-update-admin-status",
)

@task(name="import-jinja2-tansforms", task_run_name="Import Jinja2 transform", cache_policy=NONE)
@task(name="import-jinja2-tansforms", task_run_name="Import Jinja2 transform", cache_policy=NONE) # type: ignore[arg-type]
async def import_jinja2_transforms(
self,
branch_name: str,
Expand Down Expand Up @@ -334,7 +334,7 @@ async def update_jinja2_transform(

await existing_transform.save()

@task(name="import-artifact-definitions", task_run_name="Import Artifact Definitions", cache_policy=NONE)
@task(name="import-artifact-definitions", task_run_name="Import Artifact Definitions", cache_policy=NONE) # type: ignore[arg-type]
async def import_artifact_definitions(
self,
branch_name: str,
Expand Down Expand Up @@ -435,7 +435,7 @@ async def update_artifact_definition(

await existing_artifact_definition.save()

@task(name="repository-get-config", task_run_name="get repository config", cache_policy=NONE)
@task(name="repository-get-config", task_run_name="get repository config", cache_policy=NONE) # type: ignore[arg-type]
async def get_repository_config(self, branch_name: str, commit: str) -> Optional[InfrahubRepositoryConfig]:
branch_wt = self.get_worktree(identifier=commit or branch_name)
log = get_run_logger()
Expand Down Expand Up @@ -464,7 +464,7 @@ async def get_repository_config(self, branch_name: str, commit: str) -> Optional
log.error(f"Unable to load the configuration file {config_file_name}, the format is not valid : {exc}")
return None

@task(name="import-schema-files", task_run_name="Import schema files", cache_policy=NONE)
@task(name="import-schema-files", task_run_name="Import schema files", cache_policy=NONE) # type: ignore[arg-type]
async def import_schema_files(self, branch_name: str, commit: str, config_file: InfrahubRepositoryConfig) -> None:
log = get_run_logger()
branch_wt = self.get_worktree(identifier=commit or branch_name)
Expand Down Expand Up @@ -536,7 +536,7 @@ async def import_schema_files(self, branch_name: str, commit: str, config_file:
for schema_file in schemas_data:
log.info(f"schema '{schema_file.identifier}' loaded successfully!")

@task(name="import-graphql-queries", task_run_name="Import GraphQL Queries", cache_policy=NONE)
@task(name="import-graphql-queries", task_run_name="Import GraphQL Queries", cache_policy=NONE) # type: ignore[arg-type]
async def import_all_graphql_query(
self, branch_name: str, commit: str, config_file: InfrahubRepositoryConfig
) -> None:
Expand Down Expand Up @@ -594,7 +594,7 @@ async def create_graphql_query(self, branch_name: str, name: str, query_string:
await obj.save()
return obj

@task(name="import-python-check-definitions", task_run_name="Import Python Check Definitions", cache_policy=NONE)
@task(name="import-python-check-definitions", task_run_name="Import Python Check Definitions", cache_policy=NONE) # type: ignore[arg-type]
async def import_python_check_definitions(
self, branch_name: str, commit: str, config_file: InfrahubRepositoryConfig
) -> None:
Expand Down Expand Up @@ -665,7 +665,7 @@ async def import_python_check_definitions(
log.info(f"CheckDefinition '{check_name!r}' not found locally, deleting")
await check_definition_in_graph[check_name].delete()

@task(name="import-generator-definitions", task_run_name="Import Generator Definitions", cache_policy=NONE)
@task(name="import-generator-definitions", task_run_name="Import Generator Definitions", cache_policy=NONE) # type: ignore[arg-type]
async def import_generator_definitions(
self, branch_name: str, commit: str, config_file: InfrahubRepositoryConfig
) -> None:
Expand Down Expand Up @@ -755,7 +755,7 @@ async def _generator_requires_update(
return True
return False

@task(name="import-python-transforms", task_run_name="Import Python Transforms", cache_policy=NONE)
@task(name="import-python-transforms", task_run_name="Import Python Transforms", cache_policy=NONE) # type: ignore[arg-type]
async def import_python_transforms(
self, branch_name: str, commit: str, config_file: InfrahubRepositoryConfig
) -> None:
Expand Down Expand Up @@ -826,7 +826,7 @@ async def import_python_transforms(
log.info(f"TransformPython {transform_name!r} not found locally, deleting")
await transform_definition_in_graph[transform_name].delete()

@task(name="check-definition-get", task_run_name="Get Check Definition", cache_policy=NONE)
@task(name="check-definition-get", task_run_name="Get Check Definition", cache_policy=NONE) # type: ignore[arg-type]
async def get_check_definition(
self,
branch_name: str,
Expand Down Expand Up @@ -866,7 +866,7 @@ async def get_check_definition(
raise
return checks

@task(name="python-transform-get", task_run_name="Get Python Transform", cache_policy=NONE)
@task(name="python-transform-get", task_run_name="Get Python Transform", cache_policy=NONE) # type: ignore[arg-type]
async def get_python_transforms(
self, branch_name: str, module: types.ModuleType, file_path: str, transform: InfrahubPythonTransformConfig
) -> list[TransformPythonInformation]:
Expand Down Expand Up @@ -1067,7 +1067,7 @@ async def import_all_python_files(
await self.import_python_transforms(branch_name=branch_name, commit=commit, config_file=config_file)
await self.import_generator_definitions(branch_name=branch_name, commit=commit, config_file=config_file)

@task(name="jinja2-template-render", task_run_name="Render Jinja2 template", cache_policy=NONE)
@task(name="jinja2-template-render", task_run_name="Render Jinja2 template", cache_policy=NONE) # type: ignore[arg-type]
async def render_jinja2_template(self, commit: str, location: str, data: dict) -> str:
log = get_run_logger()
commit_worktree = self.get_commit_worktree(commit=commit)
Expand All @@ -1083,7 +1083,7 @@ async def render_jinja2_template(self, commit: str, location: str, data: dict) -
log.error(str(exc), exc_info=True)
raise TransformError(repository_name=self.name, commit=commit, location=location, message=str(exc)) from exc

@task(name="python-check-execute", task_run_name="Execute Python Check", cache_policy=NONE)
@task(name="python-check-execute", task_run_name="Execute Python Check", cache_policy=NONE) # type: ignore[arg-type]
async def execute_python_check(
self,
branch_name: str,
Expand Down Expand Up @@ -1142,7 +1142,7 @@ async def execute_python_check(
repository_name=self.name, class_name=class_name, commit=commit, location=location, message=str(exc)
) from exc

@task(name="python-transform-execute", task_run_name="Execute Python Transform", cache_policy=NONE)
@task(name="python-transform-execute", task_run_name="Execute Python Transform", cache_policy=NONE) # type: ignore[arg-type]
async def execute_python_transform(
self, branch_name: str, commit: str, location: str, client: InfrahubClient, data: Optional[dict] = None
) -> Any:
Expand Down
12 changes: 6 additions & 6 deletions backend/infrahub/git/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ async def add_git_repository(model: GitRepositoryAdd, service: InfrahubServices)
default_branch_name=model.default_branch_name,
service=service,
)
await repo.import_objects_from_files(
await repo.import_objects_from_files( # type: ignore[call-overload]
infrahub_branch_name=model.infrahub_branch_name, git_branch_name=model.default_branch_name
)
if model.internal_status == RepositoryInternalStatus.ACTIVE.value:
Expand Down Expand Up @@ -84,7 +84,7 @@ async def add_git_repository_read_only(model: GitRepositoryAddReadOnly, service:
infrahub_branch_name=model.infrahub_branch_name,
service=service,
)
await repo.import_objects_from_files(infrahub_branch_name=model.infrahub_branch_name)
await repo.import_objects_from_files(infrahub_branch_name=model.infrahub_branch_name) # type: ignore[call-overload]
if model.internal_status == RepositoryInternalStatus.ACTIVE.value:
await repo.sync_from_remote()

Expand Down Expand Up @@ -167,7 +167,7 @@ async def sync_remote_repositories(service: InfrahubServices) -> None:
internal_status=active_internal_status,
default_branch_name=repository_data.repository.default_branch.value,
)
await repo.import_objects_from_files(
await repo.import_objects_from_files( # type: ignore[call-overload]
git_branch_name=registry.default_branch, infrahub_branch_name=infrahub_branch
)
except RepositoryError as exc:
Expand All @@ -191,7 +191,7 @@ async def sync_remote_repositories(service: InfrahubServices) -> None:
log.info(exc.message)


@task(
@task( # type: ignore[arg-type]
name="git-branch-create",
task_run_name="Create branch '{branch}' in repository {repository_name}",
cache_policy=NONE,
Expand Down Expand Up @@ -384,7 +384,7 @@ async def pull_read_only(model: GitRepositoryPullReadOnly, service: InfrahubServ
service=service,
)

await repo.import_objects_from_files(infrahub_branch_name=model.infrahub_branch_name, commit=model.commit)
await repo.import_objects_from_files(infrahub_branch_name=model.infrahub_branch_name, commit=model.commit) # type: ignore[call-overload]
await repo.sync_from_remote(commit=model.commit)

# Tell workers to fetch to stay in sync
Expand Down Expand Up @@ -454,7 +454,7 @@ async def import_objects_from_git_repository(model: GitRepositoryImportObjects,
repository_kind=model.repository_kind,
commit=model.commit,
)
await repo.import_objects_from_files(infrahub_branch_name=model.infrahub_branch_name, commit=model.commit)
await repo.import_objects_from_files(infrahub_branch_name=model.infrahub_branch_name, commit=model.commit) # type: ignore[call-overload]


@flow(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -516,7 +516,7 @@ async def _get_proposed_change_repositories(
return _parse_proposed_change_repositories(message=message, source=source_all, destination=destination_all)


@task(name="proposed-change-validate-repository-conflicts", task_run_name="Validate conflicts on repository")
@task(name="proposed-change-validate-repository-conflicts", task_run_name="Validate conflicts on repository") # type: ignore[arg-type]
async def _validate_repository_merge_conflicts(
repositories: list[ProposedChangeRepository], service: InfrahubServices
) -> bool:
Expand Down
2 changes: 1 addition & 1 deletion backend/infrahub/proposed_change/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ async def cancel_proposed_changes_branch(branch_name: str, service: InfrahubServ
await cancel_proposed_change(proposed_change=proposed_change, service=service)


@task(name="Cancel a propose change", description="Cancel a propose change", cache_policy=NONE)
@task(name="Cancel a propose change", description="Cancel a propose change", cache_policy=NONE) # type: ignore[arg-type]
async def cancel_proposed_change(proposed_change: CoreProposedChange, service: InfrahubServices) -> None:
await add_tags(nodes=[proposed_change.id])
log = get_run_logger()
Expand Down
2 changes: 1 addition & 1 deletion backend/infrahub/task_manager/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

LOG_LEVEL_MAPPING = {10: "debug", 20: "info", 30: "warning", 40: "error", 50: "critical"}

CONCLUSION_STATE_MAPPING = {
CONCLUSION_STATE_MAPPING: dict[str, TaskConclusion] = {
"Scheduled": TaskConclusion.UNKNOWN,
"Pending": TaskConclusion.UNKNOWN,
"Running": TaskConclusion.UNKNOWN,
Expand Down
2 changes: 1 addition & 1 deletion backend/infrahub/task_manager/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ async def query(
"node": {
"title": flow.name,
"conclusion": CONCLUSION_STATE_MAPPING.get(
flow.state_name, TaskConclusion.UNKNOWN
str(flow.state_name), TaskConclusion.UNKNOWN
).value,
"state": flow.state_type,
"progress": progress_flow.data.get(flow.id, None),
Expand Down
2 changes: 1 addition & 1 deletion backend/infrahub/tasks/artifact.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from infrahub.services import InfrahubServices


@task(name="define-artifact", task_run_name="Define Artifact", cache_policy=NONE)
@task(name="define-artifact", task_run_name="Define Artifact", cache_policy=NONE) # type: ignore[arg-type]
async def define_artifact(
message: Union[messages.CheckArtifactCreate, RequestArtifactGenerate], service: InfrahubServices
) -> InfrahubNode:
Expand Down
10 changes: 5 additions & 5 deletions backend/infrahub/tasks/telemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
TELEMETRY_VERSION: str = "20240524"


@task(name="telemetry-gather-db", task_run_name="Gather Database Information", cache_policy=NONE)
@task(name="telemetry-gather-db", task_run_name="Gather Database Information", cache_policy=NONE) # type: ignore[arg-type]
async def gather_database_information(service: InfrahubServices, branch: Branch) -> dict:
async with service.database.start_session() as db:
data: dict[str, Any] = {
Expand All @@ -37,7 +37,7 @@ async def gather_database_information(service: InfrahubServices, branch: Branch)
return data


@task(name="telemetry-schema-information", task_run_name="Gather Schema Information", cache_policy=NONE)
@task(name="telemetry-schema-information", task_run_name="Gather Schema Information", cache_policy=NONE) # type: ignore[arg-type]
async def gather_schema_information(service: InfrahubServices, branch: Branch) -> dict:
data: dict[str, Any] = {}
main_schema = registry.schema.get_schema_branch(name=branch.name)
Expand All @@ -48,7 +48,7 @@ async def gather_schema_information(service: InfrahubServices, branch: Branch) -
return data


@task(name="telemetry-feature-information", task_run_name="Gather Feature Information", cache_policy=NONE)
@task(name="telemetry-feature-information", task_run_name="Gather Feature Information", cache_policy=NONE) # type: ignore[arg-type]
async def gather_feature_information(service: InfrahubServices, branch: Branch) -> dict:
async with service.database.start_session() as db:
data = {}
Expand All @@ -67,7 +67,7 @@ async def gather_feature_information(service: InfrahubServices, branch: Branch)
return data


@task(name="telemetry-gather-data", task_run_name="Gather Anonynous Data", cache_policy=NONE)
@task(name="telemetry-gather-data", task_run_name="Gather Anonynous Data", cache_policy=NONE) # type: ignore[arg-type]
async def gather_anonymous_telemetry_data(service: InfrahubServices) -> dict:
start_time = time.time()

Expand Down Expand Up @@ -97,7 +97,7 @@ async def gather_anonymous_telemetry_data(service: InfrahubServices) -> dict:
return data


@task(name="telemetry-post-data", task_run_name="Upload data", retries=5, cache_policy=NONE)
@task(name="telemetry-post-data", task_run_name="Upload data", retries=5, cache_policy=NONE) # type: ignore[arg-type]
async def post_telemetry_data(service: InfrahubServices, url: str, payload: dict[str, Any]) -> None:
"""Send the telemetry data to the specified URL, using HTTP POST."""
response = await service.http.post(url=url, json=payload)
Expand Down
3 changes: 2 additions & 1 deletion backend/infrahub/trigger/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from infrahub_sdk.utils import compare_lists
from prefect import get_run_logger, task
from prefect.automations import AutomationCore
from prefect.cache_policies import NONE
from prefect.client.orchestration import PrefectClient
from prefect.client.schemas.filters import DeploymentFilter, DeploymentFilterName

Expand All @@ -14,7 +15,7 @@
from uuid import UUID


@task(name="trigger-setup", task_run_name="Setup triggers in task-manager")
@task(name="trigger-setup", task_run_name="Setup triggers in task-manager", cache_policy=NONE) # type: ignore[arg-type]
async def setup_triggers(client: PrefectClient) -> None:
log = get_run_logger()

Expand Down
11 changes: 11 additions & 0 deletions backend/infrahub/workers/infrahub_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
from infrahub.services.adapters.workflow import InfrahubWorkflow
from infrahub.services.adapters.workflow.local import WorkflowLocalExecution
from infrahub.services.adapters.workflow.worker import WorkflowWorkerExecution
from infrahub.trace import configure_trace
from infrahub.workers.utils import inject_service_parameter, load_flow_function
from infrahub.workflows.models import TASK_RESULT_STORAGE_NAME

Expand Down Expand Up @@ -90,6 +91,16 @@ async def setup(

self._init_logger()

# Initialize trace
if config.SETTINGS.trace.enable:
configure_trace(
service="infrahub-task-worker",
version=infrahub_version,
exporter_type=config.SETTINGS.trace.exporter_type,
exporter_endpoint=config.SETTINGS.trace.exporter_endpoint,
exporter_protocol=config.SETTINGS.trace.exporter_protocol,
)

# Start metric endpoint
if metric_port is None or metric_port != 0:
metric_port = metric_port or int(os.environ.get("INFRAHUB_METRICS_PORT", 8000))
Expand Down
Loading

0 comments on commit 900d523

Please sign in to comment.