-
-
Notifications
You must be signed in to change notification settings - Fork 726
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Shuffle Service #5976
Shuffle Service #5976
Conversation
This manages memory more smoothly. We still have issues though in that we're still passing around slices of arrow tables, which hold onto large references
This helps to reduce lots of extra unmanaged memory This flow pretty well right now. I'm finding that it's useful to blend between the disk and comm buffer sizes. The abstraction in multi_file and multi_comm are getting a little bit worn down (it would be awkward to shift back to pandas), but maybe that's ok.
Isn't solid yet
This avoids a race
We don't need a lot of comm buffer, we also don't want more connecitons than machines (too much sitting in buffers). We also improve some printing
To enable better diagnostics, it would be useful to allow worker extensions to piggy-back on the standard heartbeat. This adds an optional "heartbeat" method to extensions, and, if present, calls a custom method that gets sent to the scheduler and processed by an extension of the same name. This also starts to store the extensions on the worker in a named dictionary. Previously this was a list, but I'm not sure that it was actually used anywhere. This is a breaking change without deprecation, but in a space that I suspect no one will care about. I'm happy to provide a fallback if desired.
Tests are failing. I can't reproduce locally. This is just blind praying that it fixes the problem. It should be innocuous.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm curious how intense you're looking for the multi_file and multi_comm tests to be. The recent commit does the very smallest amount of work that one can do, but doesn't actually verify much of the internals. As we've spoken before I'm personally ok with this. I don't really view these systems as separate from the larger system
I think it's particularly useful to test edge and error cases since they are much more difficult to reproduce in the larger system.
Below are a few suggestions of tests I would write. Once can start a debate about whether or not things like a max_message_size are internals but the overall behaviour "put large stuff in and see if it works" is a reasonable black box test. Same for concurrency and exception handling.
Tests I would like to see for MultiComm
- What happens if we put messages in that are larger than max_message_size?
- ... larger than memory_limit?
- We have more data/shards than allowed connections
- What happens if
send
raises an OSError / any other Error in any of the above situations? - It appears that we're just inserting stuff sequentially. In reality we're inserting stuff concurrently. This is particularly interesting with exceptions
For MultiFile similar things
- What happens if we put messages in that are larger than memory_limit?
- What happens if dump, load or sizeof raises an OSError / any other Error?
- Put concurrency, something like asyncio.gather(asyncio.create_task(lamdba: mf.put(b"foo"))) with many tasks (i.e. more than the limit)
- concurrency, particularly with errors
- name: Install dask branch | ||
shell: bash -l {0} | ||
run: python -m pip install --no-deps git+https://github.com/mrocklin/dask@p2p-shuffle | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this already in?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not yet: dask/dask#8836
distributed/shuffle/arrow.py
Outdated
from typing import TYPE_CHECKING | ||
|
||
if TYPE_CHECKING: | ||
import pyarrow as pa |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This PR introduces pyarrow as an optional dependency. We typically deal with optional imports by using a
try:
...
except ImportError:
# raise useful warning
if not self.shards: | ||
await asyncio.sleep(0.1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks like it should rather use an asyncio.Event
, shouldn't it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I used to use a condition/event. It was a little complex. I can bring it back though. In practice I don't think that this matters much. Polling is mildly less efficient, but far less tricky/error prone. I decided to sacrifice performance in favor of robust understanding.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suspect that your aversion to polling comes from reviewing tests, where I fully agree it is the bane of all humanity. I think that in this context it's actually a better choice though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are a few reasons why I dislike this
- The sleep is very artificial. Why 0.1? Why are we "waking" the event loop if nothing needs to be done?
- It makes things slower in tests and introduces some artificial overhead. This overhead is very likely completely irrelevant in practice but it is unnecessary. we're not introducing random sleeps in other places, are we? Events and Conditions where built for this.
I decided to sacrifice performance in favor of robust understanding.
That's interesting because I would consider this more difficult to understand and maintain. Apart from the lack of performance, my actual concern is that this polling obfuscates a connection between communicate
and poll
. If an Event is set, I know for a fact that something else is listening and I know this listener will do something on the next event tick. With the polling, neither is very clear.
Below is how I started to familiarize myself with the code. This is true, not a hypothetical scenario.
- I started reading
rearrange_by_column_p2p
to understand high level what's happening, transfer, barrier, collect. Great, easy. - (browse through setup, unique IDs, register and initialize stuff, cool, move on)
- ...
- I look into
shuffle_transfer
which I follow toShuffle.add_partition
that callsMultiComm.put
- Reading
MultiComm.put
tells me we're buffering something. We're not sending anything, yet. Not even if the buffer grows very large... - I read on and look into barrier because that's the next logical step, isn't it?
- The barrier actually flushes the buffer. Is this where we are sending things now? What is flush actually doing?
- No, the flush actually just waits for the buffer to become empty (again a poll) but how is it supposed to get emptied if we're never sending anything?
- Let's read all the code and it's individual methods and try to find what's going on (it's not that much code, I don't want to exaggerate)
- ... aha there is a background task that is polling... I should start from the top with this in mind
- ...
Of course, this is an individual journey and a subjective argument about simplicity.
In general, I try to avoid polling if possible. Apart from clarity, in my experience, debugging polling is a bit harder because if the poll never finishes, it's less clear what we're actually waiting on. If there is an Event, I can follow the Event.
Most of my concern is still about style and we do not have to agree on this. If you feel strongly about the benefits of polling, let's move on. I just wanted to provide a bit more context to this conversation because my concerns are not always about testing :)
dump_batch, schema=pa.Schema.from_pandas(self.metadata.empty) | ||
), | ||
load=load_arrow, | ||
directory=os.path.join(self.worker.local_directory, str(self.metadata.id)), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
from an API perspective, I'm wondering if the MultiFile shouldn't be responsible for creating the directory. It is also responsible for cleaning it up
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It does create the directory. It currently doesn't specify where it should be though. I prefer it having the same name as the shuffle ID. Currently the MultiFile doesn't know its id, which I like.
|
||
def test_basic(client: Client): | ||
df = dd.demo.make_timeseries(freq="15D", partition_freq="30D") | ||
df["name"] = df["name"].astype("string[python]") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
def test_basic(client: Client): | |
df = dd.demo.make_timeseries(freq="15D", partition_freq="30D") | |
df["name"] = df["name"].astype("string[python]") | |
@pytest.mark.parametrize("cast_string", [True, False]) | |
def test_basic(client: Client, cast_string: bool): | |
df = dd.demo.make_timeseries(freq="15D", partition_freq="30D") | |
if cast_string: | |
df["name"] = df["name"].astype("string[python]") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, this is a live issue. Arrow likes converting things to strings if they are actually strings. I don't mind this. I've been avoiding this for now, but it's good that you brought it up so that we can address it explicitly.
I also haven't tested non-string object types at all. I should add a larger test for this behavior.
async def send(address, shards): | ||
d[address].extend(shards) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What happens if send raises an OSError
? What happens if it raises any error?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why things grind to a halt of course. I actually ran into this on Coiled when I ran out of disk space. Things just hung.
I'd prefer to address this by creating a simple end-to-end test that tries to mimic that behavior. Objections? Or would you prefer something within each of multi_file and multi_comm? All of the above?
This is just a performance optimization. We happily pass large messages through. I don't think that a test here is super relevant.
Same. These aren't strict. We also aren't promising that we'll handle giant things any better than the underlying system.
That's already tested, and is the common case.
This one I think is interesting
Rather than try to mock out a thing that looks like the shuffle service in terms of concurrency I'd rather use the existing system. How we treat this system in terms of concurrency will likely change over the next few weeks. I don't think that a test here is worth the effort. I'd rather continue relying on and investing in a more serious shuffling test suite. Multifile
Again, I don't actually care what happens here so much.
This one I like. I'm less concerned about how exactly this error gets raised with MultiFile, there are many valid behaviors here I think, and I don't care to prescribe one of them. What I actually care about is how this error gets back out to the user in a Dask context. That I definitely want to invest in.
Same as above. I can do this. I don't think it's worth the effort so much, but I'm happy to acquiesce on this one. |
I've added a couple of checks for
In both cases we verify that we get a sensible error message. These tests aren't around MultiFile/MultiComm specifically, but instead around the whole system. I personally greatly prefer this approach. |
Add SchedulerPlugin to dump state on cluster close This also adds a new method to SchedulerPlugins that runs directly before closing time
Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com>
This was flakey due to cleaning up resources. My experience is that making things async helps with this in general. I don't have strong confidence that this will fix the issue, but I do have mild confidence, and strong confidence that it won't hurt.
self.send = send | ||
self.shards = defaultdict(list) | ||
self.sizes = defaultdict(int) | ||
self.total_size = 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's now a class attribute but you initialize it here again to zero. is this intended?
if not self.shards: | ||
await asyncio.sleep(0.1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are a few reasons why I dislike this
- The sleep is very artificial. Why 0.1? Why are we "waking" the event loop if nothing needs to be done?
- It makes things slower in tests and introduces some artificial overhead. This overhead is very likely completely irrelevant in practice but it is unnecessary. we're not introducing random sleeps in other places, are we? Events and Conditions where built for this.
I decided to sacrifice performance in favor of robust understanding.
That's interesting because I would consider this more difficult to understand and maintain. Apart from the lack of performance, my actual concern is that this polling obfuscates a connection between communicate
and poll
. If an Event is set, I know for a fact that something else is listening and I know this listener will do something on the next event tick. With the polling, neither is very clear.
Below is how I started to familiarize myself with the code. This is true, not a hypothetical scenario.
- I started reading
rearrange_by_column_p2p
to understand high level what's happening, transfer, barrier, collect. Great, easy. - (browse through setup, unique IDs, register and initialize stuff, cool, move on)
- ...
- I look into
shuffle_transfer
which I follow toShuffle.add_partition
that callsMultiComm.put
- Reading
MultiComm.put
tells me we're buffering something. We're not sending anything, yet. Not even if the buffer grows very large... - I read on and look into barrier because that's the next logical step, isn't it?
- The barrier actually flushes the buffer. Is this where we are sending things now? What is flush actually doing?
- No, the flush actually just waits for the buffer to become empty (again a poll) but how is it supposed to get emptied if we're never sending anything?
- Let's read all the code and it's individual methods and try to find what's going on (it's not that much code, I don't want to exaggerate)
- ... aha there is a background task that is polling... I should start from the top with this in mind
- ...
Of course, this is an individual journey and a subjective argument about simplicity.
In general, I try to avoid polling if possible. Apart from clarity, in my experience, debugging polling is a bit harder because if the poll never finishes, it's less clear what we're actually waiting on. If there is an Event, I can follow the Event.
Most of my concern is still about style and we do not have to agree on this. If you feel strongly about the benefits of polling, let's move on. I just wanted to provide a bit more context to this conversation because my concerns are not always about testing :)
try: | ||
with self.time("read"): | ||
with open( | ||
self.directory / str(id), mode="rb", buffering=100_000_000 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it depends on what you're doing with the file descriptor. For instance, if you just call fd.read()
and read the entire file, this should not make a difference. If you are trying to perfom a bunch of partial reads, it will matter but I don't think we're doing this, are we? Our writer is performing many small writes and append to the same file so that makes sense.
In the end, it depends on what arrow is doing in pa.RecordBatchStreamReader.read_all
. As I said, it shouldn't do any harm so we can keep it
from distributed.utils_test import gen_cluster | ||
|
||
|
||
@gen_cluster(client=True, timeout=1000000) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should remove these timeouts before merging
while self.shards: | ||
await asyncio.sleep(0.05) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would've expected it to be sufficient to first await _communicate_future
then.
I tried but this is not possible since the communicate
never completes if we do not set _done
. However, if we set it too soon, we will not flush all shards. This feels a bit brittle and I was wondering if we should distinguish "done because we flushed" and "done because an exception was raised", with something like
def communicate(self):
while not self._exception:
if self._flushed and not self.shards:
break
if not self.shards:
await asyncio.sleep(0.1)
I also receive a bunch of warnings when executing tests about pending tasks
Task was destroyed but it is pending!
task: <Task pending name='Task-3542' coro=<MultiComm.communicate() running at /Users/fjetter/workspace/distributed-main/distributed/shuffle/multi_comm.py:129> wait_for=<Future pending cb=[Task.task_wakeup()]>>
(same for the multi_file). The line points to the sleep / polling
empty[c] = empty[c].astype( | ||
"string" | ||
) # TODO: we fail at non-string object dtypes | ||
empty[column] = empty[column].astype("int64") # TODO: this shouldn't be necesssary |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I actually thing we should have both. A bug report to get the issue fixed and a test case that ensures that we actually hotfixed this bug.
If I remove the int cast, test_basic
actually fails but I am having a very hard time figuring out what is going on. If we remove the string conversion, nothing fails. How would we know when this is safe to be removed?
df = dask.datasets.timeseries( | ||
start="2000-01-01", | ||
end="2000-01-10", | ||
dtypes={"x": float, "y": float}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would suggest to remove this constraint and just use the ordinary timeseries. If we include the name
column, this covers at least the case where we're casting the meta column to string
async def put(self, data: dict[str, list[object]]): | ||
""" | ||
Writes many objects into the local buffers, blocks until ready for more | ||
|
||
Parameters | ||
---------- | ||
data: dict | ||
A dictionary mapping destinations to lists of objects that should | ||
be written to that destination | ||
""" | ||
if self._exception: | ||
raise self._exception |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not crucial but I see that flush sets done
to False
. IIUC, once we call flush and the buffer is empty, the communicate
will stop. If after this another put
happens, we'd just append to the buffer but would never know what happened to it.
How about raising an exception here if we're done to ensure this object is not accidentally reused? Not reusing this is probably a very important attribute in error cases as well so this would serve two purposes at once.
Alternatively, upon closing we could set the exception attribute with an appropriate exception which would serve a similar purpose, I believe
This extends the shufle service in #5520 with disk, relatively-well bounded memory, and performance. It does not yet handle issues like worker resilience, partial outputs, conflicting restrictions, or other topics, but I think that it's nearing a point where it could be useful.
This PR is large. There are a couple things that can help with that:
There are a few more caveats that I'll mention in the diff below.