8000 Map_partitions again accepts delayed objects by fjetter · Pull Request #11907 · dask/dask · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Map_partitions again accepts delayed objects #11907

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Apr 30, 2025

Conversation

fjetter
Copy link
Member
@fjetter fjetter commented Apr 24, 2025

With the move to dask-expr we apparently lost the feature that map_partitions accepts delayed objects. Maybe other APIs are also affected but I don' tknow of a good way to test this.

@fjetter fjetter requested a review from Copilot April 24, 2025 14:05
Copy link
@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR re-enables support for delayed objects in map_partitions by introducing a new "nargs" parameter and adjusting operand slicing and keyword argument handling across several modules. Key changes include test updates to cover both future and delayed objects, modifications in merge_asof and groupby functions to account for the new argument count, and an enhancement to the Dict class to implement the Mapping interface.

Reviewed Changes

Copilot reviewed 7 out of 7 changed files in this pull request and generated no comments.

Show a summary per file
File Description
dask/tests/test_task_spec.py Updated tests to verify that Task containers can be converted to Mappings correctly.
dask/dataframe/dask_expr/tests/test_distributed.py Extended test parameterization to cover both future and delayed objects in mapping.
dask/dataframe/dask_expr/_merge_asof.py Added an extra argument (1) to function calls to propagate new defaults (nargs).
dask/dataframe/dask_expr/_groupby.py Updated grouping logic to include the number of grouping keys via len(self.by)/len(by).
dask/dataframe/dask_expr/_expr.py Introduced and propagated a new "nargs" parameter for correct operand slicing.
dask/dataframe/dask_expr/_collection.py Modified map_partitions to wrap delayed arguments and use TaskRef and Dict for kwargs.
dask/_task_spec.py Enhanced the Dict class with Mapping methods (iter, len, getitem).
Comments suppressed due to low confidence (7)

dask/tests/test_task_spec.py:1002

  • Ensure that the test not only verifies the Mapping interface but also that the conversion to dict produces the exact expected tasks structure for both kwargs expansion and **kwargs usage.
assert isinstance(t, Mapping)

dask/dataframe/dask_expr/tests/test_distributed.py:353

  • Verify that the parametrized test covers both future and delayed cases correctly, and make sure that the expected DataFrame output is updated consistently with the new logic.
@pytest.mark.parametrize("kind", ["future", "delayed"])

dask/dataframe/dask_expr/_merge_asof.py:135

  • Review the purpose of injecting the literal '1' as an additional argument – confirm that it correctly represents the intended nargs parameter and doesn’t break downstream expectations.
self._kwargs,

dask/dataframe/dask_expr/_expr.py:660

  • Confirm that slicing the operands using the new 'nargs' parameter is correct and backward‐compatible; ensure that all downstream tasks receive the intended number of arguments.
return [self.frame] + self.operands[len(self._parameters) : len(self._parameters) + self.nargs]

dask/dataframe/dask_expr/_groupby.py:1394

  • Ensure that using 'len(self.by)' (and similarly 'len(by)') correctly captures the number of grouping keys; verify that this change synchronizes with the updated operand expectations.
len(self.by)

dask/dataframe/dask_expr/_collection.py:6171

  • Review the handling of delayed arguments: verify that wrapping with _DelayedExpr and appending delayed_kwargs, then constructing newkwargs with TaskRef, produces the expected behavior without side effects.
args = [_DelayedExpr(a) if isinstance(a, Delayed) else a for a in args]

dask/_task_spec.py:902

  • Ensure that the new Mapping interface methods (iter, len, getitem) for Dict correctly mirror the behavior of a standard Python dict without inadvertently modifying the original data structure.
class Dict(NestedContainer, Mapping):

Copy link
Contributor
github-actions bot commented Apr 24, 2025

Unit Test Results

See test report for an extended history of previous test failures. This is useful for diagnosing flaky tests.

      9 files  ±0        9 suites  ±0   3h 20m 47s ⏱️ - 1m 36s
 18 020 tests +1   16 806 ✅ +1   1 214 💤 ±0  0 ❌ ±0 
161 254 runs  +9  149 141 ✅ +8  12 113 💤 +1  0 ❌ ±0 

Results for commit 9f2c52f. ± Comparison against base commit 02a9e85.

This pull request removes 1 and adds 2 tests. Note that renamed tests count towards both.
dask.dataframe.dask_expr.tests.test_distributed ‑ test_future_in_map_partitions
dask.dataframe.dask_expr.tests.test_distributed ‑ test_future_in_map_partitions[delayed]
dask.dataframe
8000
.dask_expr.tests.test_distributed ‑ test_future_in_map_partitions[future]

♻️ This comment has been updated with latest results.

@fjetter fjetter force-pushed the map_partitions_delayed branch from a559652 to f6fd266 Compare April 30, 2025 09:46
@fjetter fjetter merged commit f1595e9 into dask:main Apr 30, 2025
24 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant
0