-
-
Notifications
You must be signed in to change notification settings - Fork 1.8k
Convert Bag
graphs to TaskSpec graphs during optimization
#11569
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Unit Test ResultsSee test report for an extended history of previous test failures. This is useful for diagnosing flaky tests. 15 files ± 0 15 suites ±0 4h 17m 56s ⏱️ + 11m 23s Results for commit 166478a. ± Comparison against base commit 0f3e5ff. This pull request removes 2 and adds 1 tests. Note that renamed tests count towards both.
♻️ This comment has been updated with latest results. |
Bag
graphs to TaskSpec graphs during optimization
dask/bag/core.py
Outdated
if task.func is _execute_subgraph: | ||
subgraph = task.args[0] | ||
outkey = task.args[1] | ||
# If there is a reify at the start of this we don't want to act | ||
final_task = lazify_task(subgraph[outkey], True) | ||
subgraph = { | ||
k: lazify_task(v, False) for k, v in subgraph.items() if k != outkey | ||
} | ||
subgraph[outkey] = final_task |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This optimization is a little odd. We have to ensure that there is at least one reify at the very end since many bag operations are using the _MapChunk
iterator such that inside of a task it is again able to iterate over chunks. If we don't have a reify at the end, the computation is just returning this iterator.
Now that I've understood this, the better approach would likely be to add a reify in this optimizer if we encounter such an iterator instead of removing all reifies. I will not do this in this PR but it may be something for later
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the additional context!
@@ -915,3 +915,22 @@ def fuse_linear_task_spec(dsk, keys): | |||
# Having the same prefixes can result in the same key, i.e. getitem-hash -> getitem-hash | |||
result[top_key] = Alias(top_key, target=renamed_key) | |||
return result | |||
|
|||
|
|||
def cull( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I copied this from distributed.scheduler
def test_rename_fused_keys_bag(): | ||
inp = {"b": (list, "a"), "c": (f, "b", 1)} | ||
|
||
outp = optimize(inp, ["c"], rename_fused_keys=False) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the rename kwargs is no longer supported
def test_inline_singleton_lists(): | ||
inp = {"b": (list, "a"), "c": (f, "b", 1)} | ||
out = {"c": (f, (list, "a"), 1)} | ||
assert inline_singleton_lists(inp, ["c"]) == out |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the only test that is sensitive to the inline_singleton_lists
optimization step. I chose to not migrate it and remove this test instead
dask/bag/tests/test_bag.py
Outdated
assert d.__dask_layers__() != b2.__dask_layers__() | ||
[d2] = b2.to_delayed(optimize_graph=False) | ||
assert dict(d2.dask) == dict(b2.dask) | ||
assert d2.__dask_layers__() == b2.__dask_layers__() | ||
assert d.compute() == d2.compute() | ||
assert calls == 1 + 3 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the significance of the 1 and 3?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
d
+ d2
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it make sense to add an inline comment for this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, @fjetter! The code generally looks good to me. Two minor questions/comments for a better understanding.
calls = 0 | ||
from dask.bag.core import reify | ||
|
||
def count_reify(*args, **kwargs): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like this!
dask/bag/core.py
Outdated
if task.func is _execute_subgraph: | ||
subgraph = task.args[0] | ||
outkey = task.args[1] | ||
# If there is a reify at the start of this we don't want to act | ||
final_task = lazify_task(subgraph[outkey], True) | ||
subgraph = { | ||
k: lazify_task(v, False) for k, v in subgraph.items() if k != outkey | ||
} | ||
subgraph[outkey] = final_task |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the additional context!
Co-authored-by: Hendrik Makait <hendrik@makait.com>
Co-authored-by: Hendrik Makait <hendrik@makait.com>
This optimization seems to be important. There are some interesting shananigans going on inside of the bag implementation with iterators so not lazifying this can break fusion it seems.
I stumbled over this in #11568 because it looks like I'll have to touch bag optimizers to get this over the finishing line, similar to what we did with arrays.
This is a minimal step in that directionEdit: I had to follow through and add the conversion layer to the optimization step as well. I killed off one of the optimizations (inlining lists). Many of these types of tasks will vanish when moving to #11568 so I didn't bother with the migration. If this becomes an issue, we'll have to look at how those tasks look like afterwards since it's not entirely clear to me at the moment.
From what I can tell, the most important test is
test_map_releases_element_references_as_soon_as_possible
which tests that tasks are indeed properly pipelined and intermediate memory is released as required. This is also what tripped me in #11568