Skip to content

Commit

Permalink
Pulp-workers now also log domain name in which the task is executed.
Browse files Browse the repository at this point in the history
closes #5693
  • Loading branch information
ipanova authored and mdellweg committed Aug 30, 2024
1 parent 66f4fc1 commit aed64cd
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 20 deletions.
1 change: 1 addition & 0 deletions CHANGES/5693.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Pulp-workers now also log domain name in which the task is executed.
18 changes: 10 additions & 8 deletions pulpcore/tasking/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,9 @@ def _execute_task(task):
# Store the task id in the context for `Task.current()`.
current_task.set(task)
task.set_running()
domain = get_domain()
try:
_logger.info(_("Starting task %s"), task.pk)
_logger.info(_("Starting task %s in domain: %s"), task.pk, domain.name)

# Execute task
module_name, function_name = task.name.rsplit(".", 1)
Expand All @@ -81,12 +82,12 @@ def _execute_task(task):
except Exception:
exc_type, exc, tb = sys.exc_info()
task.set_failed(exc, tb)
_logger.info(_("Task %s failed (%s)"), task.pk, exc)
_logger.info(_("Task %s failed (%s) in domain: %s"), task.pk, exc, domain.name)
_logger.info("\n".join(traceback.format_list(traceback.extract_tb(tb))))
_send_task_notification(task)
else:
task.set_completed()
_logger.info(_("Task completed %s"), task.pk)
_logger.info(_("Task completed %s in domain: %s"), task.pk, domain.name)
_send_task_notification(task)


Expand Down Expand Up @@ -249,18 +250,19 @@ def cancel_task(task_id):
Raises:
rest_framework.exceptions.NotFound: If a task with given task_id does not exist
"""
task = Task.objects.get(pk=task_id)
task = Task.objects.select_related("pulp_domain").get(pk=task_id)

if task.state in TASK_FINAL_STATES:
# If the task is already done, just stop
_logger.debug(
"Task [{task_id}] already in a final state: {state}".format(
task_id=task_id, state=task.state
"Task [{task_id}] in domain: {name} already in a final state: {state}".format(
task_id=task_id, name=task.pulp_domain.name, state=task.state
)
)
return task

_logger.info(_("Canceling task: {id}").format(id=task_id))
_logger.info(
_("Canceling task: {id} in domain: {name}").format(id=task_id, name=task.pulp_domain.name)
)

# This is the only valid transition without holding the task lock
task.set_canceling()
Expand Down
61 changes: 49 additions & 12 deletions pulpcore/tasking/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
)
from pulpcore.app.apps import pulp_plugin_configs
from pulpcore.app.models import Worker, Task, ApiAppStatus, ContentAppStatus
from pulpcore.app.util import PGAdvisoryLock
from pulpcore.app.util import PGAdvisoryLock, get_domain
from pulpcore.exceptions import AdvisoryLockError

from pulpcore.tasking.storage import WorkerDirectory
Expand Down Expand Up @@ -189,26 +189,34 @@ def cancel_abandoned_task(self, task, final_state, reason=None):
Return ``True`` if the task was actually canceled, ``False`` otherwise.
"""
# A task is considered abandoned when in running state, but no worker holds its lock
domain = get_domain()
try:
task.set_canceling()
except RuntimeError:
return False
if reason:
_logger.info(
"Cleaning up task %s and marking as %s. Reason: %s",
"Cleaning up task %s in domain: %s and marking as %s. Reason: %s",
task.pk,
domain.name,
final_state,
reason,
)
else:
_logger.info(_("Cleaning up task %s and marking as %s."), task.pk, final_state)
_logger.info(
_("Cleaning up task %s in domain: %s and marking as %s."),
task.pk,
domain.name,
final_state,
)
delete_incomplete_resources(task)
task.set_canceled(final_state=final_state, reason=reason)
if task.reserved_resources_record:
self.notify_workers()
return True

def is_compatible(self, task):
domain = get_domain()
unmatched_versions = [
f"task: {label}>={version} worker: {self.versions.get(label)}"
for label, version in task.versions.items()
Expand All @@ -217,8 +225,9 @@ def is_compatible(self, task):
]
if unmatched_versions:
_logger.info(
_("Incompatible versions to execute task %s by worker %s: %s"),
_("Incompatible versions to execute task %s in domain: %s by worker %s: %s"),
task.pk,
domain.name,
self.name,
",".join(unmatched_versions),
)
Expand All @@ -232,7 +241,11 @@ def identify_unblocked_tasks(self):
taken_exclusive_resources = set()
taken_shared_resources = set()
# When batching this query, be sure to use "pulp_created" as a cursor
for task in Task.objects.filter(state__in=TASK_INCOMPLETE_STATES).order_by("pulp_created"):
for task in (
Task.objects.filter(state__in=TASK_INCOMPLETE_STATES)
.order_by("pulp_created")
.select_related("pulp_domain")
):
reserved_resources_record = task.reserved_resources_record or []
exclusive_resources = [
resource
Expand All @@ -246,7 +259,11 @@ def identify_unblocked_tasks(self):
]
if task.state == TASK_STATES.CANCELING:
if task.unblocked_at is None:
_logger.debug("Marking canceling task %s unblocked.", task.pk)
_logger.debug(
"Marking canceling task %s in domain: %s unblocked.",
task.pk,
task.pulp_domain.name,
)
task.unblock()
changed = True
# Don't consider this task's resources as held.
Expand All @@ -263,7 +280,11 @@ def identify_unblocked_tasks(self):
# No shared resource exclusively taken?
and not any(resource in taken_exclusive_resources for resource in shared_resources)
):
_logger.debug("Marking waiting task %s unblocked.", task.pk)
_logger.debug(
"Marking waiting task %s in domain: %s unblocked.",
task.pk,
task.pulp_domain.name,
)
task.unblock()
changed = True

Expand Down Expand Up @@ -340,6 +361,7 @@ def supervise_task(self, task):
task.save(update_fields=["worker"])
cancel_state = None
cancel_reason = None
domain = get_domain()
with TemporaryDirectory(dir=".") as task_working_dir_rel_path:
task_process = Process(target=perform_task, args=(task.pk, task_working_dir_rel_path))
task_process.start()
Expand All @@ -349,7 +371,11 @@ def supervise_task(self, task):
_logger.info("Wait for canceled task to abort.")
else:
self.task_grace_timeout = TASK_KILL_INTERVAL
_logger.info("Aborting current task %s due to cancelation.", task.pk)
_logger.info(
"Aborting current task %s in domain: %s due to cancelation.",
task.pk,
domain.name,
)
os.kill(task_process.pid, signal.SIGUSR1)

r, w, x = select.select(
Expand All @@ -362,7 +388,11 @@ def supervise_task(self, task):
if connection.connection in r:
connection.connection.execute("SELECT 1")
if self.cancel_task:
_logger.info(_("Received signal to cancel current task %s."), task.pk)
_logger.info(
_("Received signal to cancel current task %s in domain: %s."),
task.pk,
domain.name,
)
cancel_state = TASK_STATES.CANCELED
self.cancel_task = False
if self.wakeup:
Expand All @@ -378,11 +408,18 @@ def supervise_task(self, task):
os.read(self.sentinel, 256)
if self.shutdown_requested:
if self.task_grace_timeout != 0:
_logger.info(
"Worker shutdown requested, waiting for task %s to finish.", task.pk
msg = (
"Worker shutdown requested, waiting for task {pk} in domain: {name} "
"to finish.".format(pk=task.pk, name=domain.name)
)

_logger.info(msg)
else:
_logger.info("Aborting current task %s due to worker shutdown.", task.pk)
_logger.info(
"Aborting current task %s in domain: %s due to worker shutdown.",
task.pk,
domain.name,
)
cancel_state = TASK_STATES.FAILED
cancel_reason = "Aborted during worker shutdown."
task_process.join()
Expand Down

0 comments on commit aed64cd

Please sign in to comment.