8000 Convert `Bag` graphs to TaskSpec graphs during optimization by fjetter · Pull Request #11569 · dask/dask · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Convert Bag graphs to TaskSpec graphs during optimization #11569

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Dec 2, 2024

Conversation

fjetter
Copy link
Member
@fjetter fjetter commented Nov 28, 2024

This optimization seems to be important. There are some interesting shananigans going on inside of the bag implementation with iterators so not lazifying this can break fusion it seems.

I stumbled over this in #11568 because it looks like I'll have to touch bag optimizers to get this over the finishing line, similar to what we did with arrays.

This is a minimal step in that direction

Edit: I had to follow through and add the conversion layer to the optimization step as well. I killed off one of the optimizations (inlining lists). Many of these types of tasks will vanish when moving to #11568 so I didn't bother with the migration. If this becomes an issue, we'll have to look at how those tasks look like afterwards since it's not entirely clear to me at the moment.

From what I can tell, the most important test is test_map_releases_element_references_as_soon_as_possible which tests that tasks are indeed properly pipelined and intermediate memory is released as required. This is also what tripped me in #11568

Copy link
Contributor
github-actions bot commented Nov 28, 2024

Unit Test Results

See test report for an extended history of previous test failures. This is useful for diagnosing flaky tests.

     15 files  ± 0       15 suites  ±0   4h 17m 56s ⏱️ + 11m 23s
 13 255 tests  -  1   12 193 ✅  -  1   1 062 💤 ±0  0 ❌ ±0 
164 431 runs   - 15  141 514 ✅  - 13  22 917 💤  - 2  0 ❌ ±0 

Results for commit 166478a. ± Comparison against base commit 0f3e5ff.

This pull request removes 2 and adds 1 tests. Note that renamed tests count towards both.
dask.bag.tests.test_bag ‑ test_inline_singleton_lists
dask.bag.tests.test_bag ‑ test_rename_fused_keys_bag
dask.bag.tests.test_bag ‑ test_lazify_task_legacy

♻️ This comment has been updated with latest results.

@fjetter fjetter changed the title lazify_task for Task class Convert bag graphs to TaskSpec graphs during optimization Nov 28, 2024
@fjetter fjetter changed the title Convert bag graphs to TaskSpec graphs during optimization Convert Bag graphs to TaskSpec graphs during optimization Nov 28, 2024
dask/bag/core.py Outdated
Comment on lines 112 to 120
if task.func is _execute_subgraph:
subgraph = task.args[0]
outkey = task.args[1]
# If there is a reify at the start of this we don't want to act
final_task = lazify_task(subgraph[outkey], True)
subgraph = {
k: lazify_task(v, False) for k, v in subgraph.items() if k != outkey
}
subgraph[outkey] = final_task
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This optimization is a little odd. We have to ensure that there is at least one reify at the very end since many bag operations are using the _MapChunk iterator such that inside of a task it is again able to iterate over chunks. If we don't have a reify at the end, the computation is just returning this iterator.

Now that I've understood this, the better approach would likely be to add a reify in this optimizer if we encounter such an iterator instead of removing all reifies. I will not do this in this PR but it may be something for later

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the additional context!

@@ -915,3 +915,22 @@ def fuse_linear_task_spec(dsk, keys):
# Having the same prefixes can result in the same key, i.e. getitem-hash -> getitem-hash
result[top_key] = Alias(top_key, target=renamed_key)
return result


def cull(
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I copied this from distributed.scheduler

def test_rename_fused_keys_bag():
inp = {"b": (list, "a"), "c": (f, "b", 1)}

outp = optimize(inp, ["c"], rename_fused_keys=False)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the rename kwargs is no longer supported

10000

Comment on lines -601 to -604
def test_inline_singleton_lists():
inp = {"b": (list, "a"), "c": (f, "b", 1)}
out = {"c": (f, (list, "a"), 1)}
assert inline_singleton_lists(inp, ["c"]) == out
Copy link
Member Author
@fjetter fjetter Nov 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the only test that is sensitive to the inline_singleton_lists optimization step. I chose to not migrate it and remove this test instead

@hendrikmakait hendrikmakait self-requested a review December 2, 2024 12:03
assert d.__dask_layers__() != b2.__dask_layers__()
[d2] = b2.to_delayed(optimize_graph=False)
assert dict(d2.dask) == dict(b2.dask)
assert d2.__dask_layers__() == b2.__dask_layers__()
assert d.compute() == d2.compute()
assert calls == 1 + 3
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the significance of the 1 and 3?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

d + d2

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to add an inline comment for this?

Copy link
Member
@hendrikmakait hendrikmakait left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, @fjetter! The code generally looks good to me. Two minor questions/comments for a better understanding.

calls = 0
from dask.bag.core import reify

def count_reify(*args, **kwargs):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this!

dask/bag/core.py Outdated
Comment on lines 112 to 120
if task.func is _execute_subgraph:
subgraph = task.args[0]
outkey = task.args[1]
# If there is a reify at the start of this we don't want to act
final_task = lazify_task(subgraph[outkey], True)
subgraph = {
k: lazify_task(v, False) for k, v in subgraph.items() if k != outkey
}
subgraph[outkey] = final_task
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the additional context!

Co-authored-by: Hendrik Makait <hendrik@makait.com>
Co-authored-by: Hendrik Makait <hendrik@makait.com>
@fjetter fjetter merged commit 307038b into dask:main Dec 2, 2024
26 checks passed
@fjetter fjetter deleted the lazify_task_bag branch December 2, 2024 15:19
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants
0