From 9960989393ad5dd00458e1a7ccf4a23a5c3cc8d9 Mon Sep 17 00:00:00 2001 From: william Date: Mon, 19 Nov 2018 15:14:36 +0000 Subject: [PATCH 1/3] Extract the finished method for the Task Unfortunately the python3.5 version of `Task` doesn't have a means to determine if a task has finished vs done. Instead the internal `_state` method needs to be checked. To increase the maintainability, the method for doing this has been extracted into `finished`. This will make it easier to update in the future if need be --- src/commands/run.py | 2 +- src/models/job.py | 3 +++ 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/src/commands/run.py b/src/commands/run.py index 028da2e..94719fc 100644 --- a/src/commands/run.py +++ b/src/commands/run.py @@ -126,7 +126,7 @@ async def start_tasks(tasks): active_tasks = [] def remove_done_tasks(): for active_task in active_tasks: - if active_task._state == 'FINISHED': + if active_task.finished: active_tasks.remove(active_task) break for task in tasks: diff --git a/src/models/job.py b/src/models/job.py index 8cb5770..dd645c1 100644 --- a/src/models/job.py +++ b/src/models/job.py @@ -68,6 +68,9 @@ def __init__(self, job, thread_pool = None): self.add_job_callback(lambda job: job.set_ssh_results()) self.add_done_callback(type(self).report_results) + def finished(self): + return self._state == 'FINISHED' + def __getattr__(self, attr): return getattr(self.job, attr) From 96bb4aaecee5296fa5573b1a91c319d98d2fcf7d Mon Sep 17 00:00:00 2001 From: william Date: Mon, 19 Nov 2018 16:16:37 +0000 Subject: [PATCH 2/3] Protect the main task from cancellation Previously all tasks running in the event loop would be cancelled. However this makes the callback behaviour unpredictable as the main Future was also being cancelled. The main Future (aka `start_tasks`) still gets cancelled. This stops `add_tasks` from running. However the `await_finished` will continue running. This ensures the other Futures finish correctly The `CancelledError` catch has been moved to where the error is being generated --- src/commands/run.py | 30 +++++++++++++++++++----------- 1 file changed, 19 insertions(+), 11 deletions(-) diff --git a/src/commands/run.py b/src/commands/run.py index 94719fc..035ee4b 100644 --- a/src/commands/run.py +++ b/src/commands/run.py @@ -129,18 +129,27 @@ def remove_done_tasks(): if active_task.finished: active_tasks.remove(active_task) break - for task in tasks: - while len(active_tasks) > max_ssh: + + async def add_tasks(): + for task in tasks: + while len(active_tasks) > max_ssh: + remove_done_tasks() + await asyncio.sleep(0.01) + asyncio.ensure_future(task, loop = loop) + active_tasks.append(task) + run_print('Starting Job: {}'.format(task.node)) + await(asyncio.sleep(start_delay)) + + async def await_finished(): + while len(active_tasks) > 0: remove_done_tasks() await asyncio.sleep(0.01) - asyncio.ensure_future(task, loop = loop) - active_tasks.append(task) - run_print('Starting Job: {}'.format(task.node)) - await(asyncio.sleep(start_delay)) - run_print('Waiting for jobs to finish...') - while len(active_tasks) > 0: - remove_done_tasks() - await asyncio.sleep(0.01) + + try: + await add_tasks() + run_print('Waiting for jobs to finish...') + except concurrent.futures.CancelledError: pass + finally: await await_finished() session = Session() try: @@ -153,7 +162,6 @@ def remove_done_tasks(): run_print('Executing: {}'.format(batch.name())) tasks = map(lambda j: j.task(thread_pool = pool), batch.jobs) loop.run_until_complete(start_tasks(tasks)) - except concurrent.futures.CancelledError: pass finally: run_print('Cleaning up...') pool.shutdown(wait = True) From 8741ede655edd2433d8a481fea3cf1bc19bfbd6a Mon Sep 17 00:00:00 2001 From: william Date: Mon, 19 Nov 2018 17:22:18 +0000 Subject: [PATCH 3/3] Catch errors when closing the ssh connection Closing the ssh connection prematurely can lead to some ssh errors. In an attempt to catch these, all errors from the close method are being caught. Also the redundant catch of `CancelledError` has been removed --- src/models/job.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/src/models/job.py b/src/models/job.py index dd645c1..d9b40d0 100644 --- a/src/models/job.py +++ b/src/models/job.py @@ -64,10 +64,14 @@ def __init__(self, job, thread_pool = None): self.thread_pool = thread_pool super().__init__(self.run_async()) self.job = job - self.add_job_callback(lambda job: job.connection().close()) + self.add_job_callback(type(self).close) self.add_job_callback(lambda job: job.set_ssh_results()) self.add_done_callback(type(self).report_results) + def close(self): + try: job.connection.close() + except: pass + def finished(self): return self._state == 'FINISHED' @@ -120,8 +124,7 @@ def catch_errors(func, *args): async def run_async(self): if self.check_command(): - try: await self._run_thread(self.connection().open) - except concurrent.futures.CancelledError as e: raise e + await self._run_thread(self.connection().open) if self.connection().is_connected: await self._run_thread(self.run, self.batch)