From aed64cd9a01cc3fb8236ce65d4dea82db5f92ee6 Mon Sep 17 00:00:00 2001 From: Ina Panova Date: Tue, 20 Aug 2024 12:32:25 +0200 Subject: [PATCH] Pulp-workers now also log domain name in which the task is executed. closes #5693 --- CHANGES/5693.feature | 1 + pulpcore/tasking/tasks.py | 18 ++++++----- pulpcore/tasking/worker.py | 61 ++++++++++++++++++++++++++++++-------- 3 files changed, 60 insertions(+), 20 deletions(-) create mode 100644 CHANGES/5693.feature diff --git a/CHANGES/5693.feature b/CHANGES/5693.feature new file mode 100644 index 0000000000..70ec60012d --- /dev/null +++ b/CHANGES/5693.feature @@ -0,0 +1 @@ +Pulp-workers now also log domain name in which the task is executed. diff --git a/pulpcore/tasking/tasks.py b/pulpcore/tasking/tasks.py index 896f4bf37d..cea508040f 100644 --- a/pulpcore/tasking/tasks.py +++ b/pulpcore/tasking/tasks.py @@ -63,8 +63,9 @@ def _execute_task(task): # Store the task id in the context for `Task.current()`. current_task.set(task) task.set_running() + domain = get_domain() try: - _logger.info(_("Starting task %s"), task.pk) + _logger.info(_("Starting task %s in domain: %s"), task.pk, domain.name) # Execute task module_name, function_name = task.name.rsplit(".", 1) @@ -81,12 +82,12 @@ def _execute_task(task): except Exception: exc_type, exc, tb = sys.exc_info() task.set_failed(exc, tb) - _logger.info(_("Task %s failed (%s)"), task.pk, exc) + _logger.info(_("Task %s failed (%s) in domain: %s"), task.pk, exc, domain.name) _logger.info("\n".join(traceback.format_list(traceback.extract_tb(tb)))) _send_task_notification(task) else: task.set_completed() - _logger.info(_("Task completed %s"), task.pk) + _logger.info(_("Task completed %s in domain: %s"), task.pk, domain.name) _send_task_notification(task) @@ -249,18 +250,19 @@ def cancel_task(task_id): Raises: rest_framework.exceptions.NotFound: If a task with given task_id does not exist """ - task = Task.objects.get(pk=task_id) + task = Task.objects.select_related("pulp_domain").get(pk=task_id) if task.state in TASK_FINAL_STATES: # If the task is already done, just stop _logger.debug( - "Task [{task_id}] already in a final state: {state}".format( - task_id=task_id, state=task.state + "Task [{task_id}] in domain: {name} already in a final state: {state}".format( + task_id=task_id, name=task.pulp_domain.name, state=task.state ) ) return task - - _logger.info(_("Canceling task: {id}").format(id=task_id)) + _logger.info( + _("Canceling task: {id} in domain: {name}").format(id=task_id, name=task.pulp_domain.name) + ) # This is the only valid transition without holding the task lock task.set_canceling() diff --git a/pulpcore/tasking/worker.py b/pulpcore/tasking/worker.py index 9a69b4d99e..df7322f44d 100644 --- a/pulpcore/tasking/worker.py +++ b/pulpcore/tasking/worker.py @@ -27,7 +27,7 @@ ) from pulpcore.app.apps import pulp_plugin_configs from pulpcore.app.models import Worker, Task, ApiAppStatus, ContentAppStatus -from pulpcore.app.util import PGAdvisoryLock +from pulpcore.app.util import PGAdvisoryLock, get_domain from pulpcore.exceptions import AdvisoryLockError from pulpcore.tasking.storage import WorkerDirectory @@ -189,19 +189,26 @@ def cancel_abandoned_task(self, task, final_state, reason=None): Return ``True`` if the task was actually canceled, ``False`` otherwise. """ # A task is considered abandoned when in running state, but no worker holds its lock + domain = get_domain() try: task.set_canceling() except RuntimeError: return False if reason: _logger.info( - "Cleaning up task %s and marking as %s. Reason: %s", + "Cleaning up task %s in domain: %s and marking as %s. Reason: %s", task.pk, + domain.name, final_state, reason, ) else: - _logger.info(_("Cleaning up task %s and marking as %s."), task.pk, final_state) + _logger.info( + _("Cleaning up task %s in domain: %s and marking as %s."), + task.pk, + domain.name, + final_state, + ) delete_incomplete_resources(task) task.set_canceled(final_state=final_state, reason=reason) if task.reserved_resources_record: @@ -209,6 +216,7 @@ def cancel_abandoned_task(self, task, final_state, reason=None): return True def is_compatible(self, task): + domain = get_domain() unmatched_versions = [ f"task: {label}>={version} worker: {self.versions.get(label)}" for label, version in task.versions.items() @@ -217,8 +225,9 @@ def is_compatible(self, task): ] if unmatched_versions: _logger.info( - _("Incompatible versions to execute task %s by worker %s: %s"), + _("Incompatible versions to execute task %s in domain: %s by worker %s: %s"), task.pk, + domain.name, self.name, ",".join(unmatched_versions), ) @@ -232,7 +241,11 @@ def identify_unblocked_tasks(self): taken_exclusive_resources = set() taken_shared_resources = set() # When batching this query, be sure to use "pulp_created" as a cursor - for task in Task.objects.filter(state__in=TASK_INCOMPLETE_STATES).order_by("pulp_created"): + for task in ( + Task.objects.filter(state__in=TASK_INCOMPLETE_STATES) + .order_by("pulp_created") + .select_related("pulp_domain") + ): reserved_resources_record = task.reserved_resources_record or [] exclusive_resources = [ resource @@ -246,7 +259,11 @@ def identify_unblocked_tasks(self): ] if task.state == TASK_STATES.CANCELING: if task.unblocked_at is None: - _logger.debug("Marking canceling task %s unblocked.", task.pk) + _logger.debug( + "Marking canceling task %s in domain: %s unblocked.", + task.pk, + task.pulp_domain.name, + ) task.unblock() changed = True # Don't consider this task's resources as held. @@ -263,7 +280,11 @@ def identify_unblocked_tasks(self): # No shared resource exclusively taken? and not any(resource in taken_exclusive_resources for resource in shared_resources) ): - _logger.debug("Marking waiting task %s unblocked.", task.pk) + _logger.debug( + "Marking waiting task %s in domain: %s unblocked.", + task.pk, + task.pulp_domain.name, + ) task.unblock() changed = True @@ -340,6 +361,7 @@ def supervise_task(self, task): task.save(update_fields=["worker"]) cancel_state = None cancel_reason = None + domain = get_domain() with TemporaryDirectory(dir=".") as task_working_dir_rel_path: task_process = Process(target=perform_task, args=(task.pk, task_working_dir_rel_path)) task_process.start() @@ -349,7 +371,11 @@ def supervise_task(self, task): _logger.info("Wait for canceled task to abort.") else: self.task_grace_timeout = TASK_KILL_INTERVAL - _logger.info("Aborting current task %s due to cancelation.", task.pk) + _logger.info( + "Aborting current task %s in domain: %s due to cancelation.", + task.pk, + domain.name, + ) os.kill(task_process.pid, signal.SIGUSR1) r, w, x = select.select( @@ -362,7 +388,11 @@ def supervise_task(self, task): if connection.connection in r: connection.connection.execute("SELECT 1") if self.cancel_task: - _logger.info(_("Received signal to cancel current task %s."), task.pk) + _logger.info( + _("Received signal to cancel current task %s in domain: %s."), + task.pk, + domain.name, + ) cancel_state = TASK_STATES.CANCELED self.cancel_task = False if self.wakeup: @@ -378,11 +408,18 @@ def supervise_task(self, task): os.read(self.sentinel, 256) if self.shutdown_requested: if self.task_grace_timeout != 0: - _logger.info( - "Worker shutdown requested, waiting for task %s to finish.", task.pk + msg = ( + "Worker shutdown requested, waiting for task {pk} in domain: {name} " + "to finish.".format(pk=task.pk, name=domain.name) ) + + _logger.info(msg) else: - _logger.info("Aborting current task %s due to worker shutdown.", task.pk) + _logger.info( + "Aborting current task %s in domain: %s due to worker shutdown.", + task.pk, + domain.name, + ) cancel_state = TASK_STATES.FAILED cancel_reason = "Aborted during worker shutdown." task_process.join()