Description
Subsequent Blockwise layers are currently fused into a single layer. This reduces the number of tasks, the overhead and is very generally a good thing to do. Currently, the fused output does not generate unique key names which is a problem from a UX perspective but can also cause severe failure cases when being executed on the distributed scheduler since distributed assumes that a task key is a unique identifier for the entire task. While it is true that the data output of the fused key and the non-fused key is identical, the runspec and the local topology is intentionally very different. Specifically, a fused task, for example, may not have any dependencies while the non-fused task does have dependencies.
An example where this matters is the following (async code is not necessary but the condition is actually a bit difficult to trigger and this helps. Paste this code in a Jupyter notebook and run it a couple of times).
import asyncio
from distributed import Client, Scheduler, Worker
import dask
import time
import dask.dataframe as dd
async with (
Scheduler() as s,
Worker(s.address) as a,
Worker(s.address) as b,
Client(s.address, asynchronous=True) as c
):
df = dask.datasets.timeseries(
start="2000-01-01",
end="2000-01-10",
dtypes={"x": float, "y": float},
freq="100 s",
)
out = dd.shuffle.shuffle(df, "x")
out = out.persist()
while not a.tasks:
await asyncio.sleep(0.05)
del out
out = dd.shuffle.shuffle(df, "x")
x, y = c.compute([df.x.size, out.x.size])
x = await c.gather(x, y)
Note how initial shuffle is persisted and a slightly different version of this graph is computed again below but the graph below is slightly different. From what I can gather, the initial persist is fusing the keys while the latter one does not (maybe it's the other way round, I'm not sure. Either way a different issue).
This specific reproducer actually triggers (not every time) a KeyError
in a workers data buffer while trying to read data.
023-01-27 16:28:34,648 - distributed.worker - ERROR - Exception during execution of task ('size-chunk-0482e0de93343089cd64837d139d9a80-49c0404470df4695b3f5aa383f11c345', 3).
Traceback (most recent call last):
File "/Users/fjetter/workspace/distributed/distributed/worker.py", line 2366, in _prepare_args_for_execution
data[k] = self.data[k]
File "/Users/fjetter/workspace/distributed/distributed/spill.py", line 257, in __getitem__
return super().__getitem__(key)
File "/Users/fjetter/mambaforge/envs/dask-distributed-310/lib/python3.10/site-packages/zict/buffer.py", line 108, in __getitem__
raise KeyError(key)
KeyError: "('simple-shuffle-539a650502e21de2dabd2021c9c9e684', 3)"
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/Users/fjetter/workspace/distributed/distributed/worker.py", line 2247, in execute
args2, kwargs2 = self._prepare_args_for_execution(ts, args, kwargs)
File "/Users/fjetter/workspace/distributed/distributed/worker.py", line 2370, in _prepare_args_for_execution
data[k] = Actor(type(self.state.actors[k]), self.address, k, self)
KeyError: "('simple-shuffle-539a650502e21de2dabd2021c9c9e684', 3)"
This is caused because the dependency relations between tasks are no longer accurate on the scheduler and it considers a task "ready", i.e. all dependencies in memory, too soon, causing a failure on the worker.
When validate
is activated, the scheduler catches these cases earlier and raises appropriate AssertionErrors. This is not checked at runtime for performance reasons and is typically not necessary since we rely on the assumption that keys identify a task uniquely.
Apart from this artificial example, we do have internal reports about such a spurious KeyError in combination with an xgboost workload