-
Notifications
You must be signed in to change notification settings - Fork 2
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Download job data in advance, upload results in background #93
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks good so far and the tests are passing but I will need to think a bit more about it. I am afraid of additional edge cases and race conditions...
|
||
async def _reconfigure_sheep(self, sheep: BaseSheep, job_id: str, model: ModelModel) -> bool: | ||
""" | ||
Slaughter a sheep and resurrect it with a different model. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Whoa a resurrection 👍
@blazekadam Sure, take your time - this is nice to have, but not essential by any means. |
@@ -88,6 +94,8 @@ def __init__(self, | |||
self._listener = asyncio.create_task(self._listen()) | |||
self._health_checker = asyncio.create_task(self._shepherd_health_check()) | |||
self._job_status_update_queue = TaskQueue(worker_count=1) | |||
self._prepare_job_dir_queue = TaskQueue(worker_count=8) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What an arbitrary constant?
try: | ||
if not sheep.running: | ||
for job_id in sheep.in_progress: | ||
# clean-up the working directory | ||
shutil.rmtree(path.join(self._get_sheep(sheep_id).sheep_data_root, job_id)) | ||
|
||
# if necessary, cancel input downloads |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The jobs should be imo canceled before the rmtree
:return: whether or not the reconfiguration succeeded | ||
""" | ||
|
||
# we need to wait for the in-progress jobs which might already be in the socket |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What a respectful way to slaughter a sheep :o
try: | ||
self._start_sheep(sheep.id, model.name, model.version) | ||
return True | ||
except SheepConfigurationError as sce: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The except
clauses looks quite similar, can we join them or is there a good reason to keep them separated?
await self._prepare_job_dir_futures[job_id] | ||
del self._prepare_job_dir_futures[job_id] | ||
except KeyError: | ||
logging.critical(f"Task that should have pulled data for job `{job_id}` mysteriously disappeared") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This may happen anytime a sheep is purged with health_check
, I do not think that it deserves the critical
level...
sheep.socket, InputMessage(dict(job_id=job_id, io_data_root=sheep.sheep_data_root)) | ||
) | ||
|
||
# notify the queue that the task is done |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is weird wording (line 303 as well), the job is certainly not done, it was just sent for processing
|
||
# save the done/error file | ||
if isinstance(message, DoneMessage): | ||
status = self._job_status.pop(job_id) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as the line 356, can we put it before the if-else
?
# save the done/error file | ||
if isinstance(message, DoneMessage): | ||
status = self._job_status.pop(job_id) | ||
status.status = JobStatus.DONE |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do not like how DoneMessage
is processed in place while ErrorMessage
is mostly handled with self._report_job_failed
. I.e., we do quite the same at two different places and two different depths...
Nothing really waits for this, right? |
That's right |
closes #43
closes #44
closes #59