Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Download job data in advance, upload results in background #93

Closed
wants to merge 5 commits into from

Conversation

janbuchar
Copy link
Contributor

@janbuchar janbuchar commented Aug 7, 2019

closes #43
closes #44
closes #59

@janbuchar janbuchar requested a review from blazekadam August 7, 2019 12:56
@janbuchar janbuchar changed the title Download job data in advance Download job data in advance, upload results inthe background Aug 7, 2019
@janbuchar janbuchar changed the title Download job data in advance, upload results inthe background Download job data in advance, upload results in background Aug 7, 2019
Copy link
Contributor

@blazekadam blazekadam left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks good so far and the tests are passing but I will need to think a bit more about it. I am afraid of additional edge cases and race conditions...


async def _reconfigure_sheep(self, sheep: BaseSheep, job_id: str, model: ModelModel) -> bool:
"""
Slaughter a sheep and resurrect it with a different model.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whoa a resurrection 👍

@janbuchar
Copy link
Contributor Author

@blazekadam Sure, take your time - this is nice to have, but not essential by any means.

@@ -88,6 +94,8 @@ def __init__(self,
self._listener = asyncio.create_task(self._listen())
self._health_checker = asyncio.create_task(self._shepherd_health_check())
self._job_status_update_queue = TaskQueue(worker_count=1)
self._prepare_job_dir_queue = TaskQueue(worker_count=8)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What an arbitrary constant?

try:
if not sheep.running:
for job_id in sheep.in_progress:
# clean-up the working directory
shutil.rmtree(path.join(self._get_sheep(sheep_id).sheep_data_root, job_id))

# if necessary, cancel input downloads
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The jobs should be imo canceled before the rmtree

:return: whether or not the reconfiguration succeeded
"""

# we need to wait for the in-progress jobs which might already be in the socket
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What a respectful way to slaughter a sheep :o

try:
self._start_sheep(sheep.id, model.name, model.version)
return True
except SheepConfigurationError as sce:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The except clauses looks quite similar, can we join them or is there a good reason to keep them separated?

await self._prepare_job_dir_futures[job_id]
del self._prepare_job_dir_futures[job_id]
except KeyError:
logging.critical(f"Task that should have pulled data for job `{job_id}` mysteriously disappeared")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This may happen anytime a sheep is purged with health_check, I do not think that it deserves the critical level...

sheep.socket, InputMessage(dict(job_id=job_id, io_data_root=sheep.sheep_data_root))
)

# notify the queue that the task is done
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is weird wording (line 303 as well), the job is certainly not done, it was just sent for processing


# save the done/error file
if isinstance(message, DoneMessage):
status = self._job_status.pop(job_id)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as the line 356, can we put it before the if-else?

# save the done/error file
if isinstance(message, DoneMessage):
status = self._job_status.pop(job_id)
status.status = JobStatus.DONE
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not like how DoneMessage is processed in place while ErrorMessage is mostly handled with self._report_job_failed. I.e., we do quite the same at two different places and two different depths...

@blazekadam
Copy link
Contributor

Nothing really waits for this, right?

@janbuchar
Copy link
Contributor Author

That's right

@janbuchar janbuchar closed this Oct 30, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
2 participants