Open
Description
Describe the issue:
It seems that the processes scheduler doesn't run in parallel when using map_blocks
.
Minimal Complete Verifiable Example:
from time import sleep
import dask.array as da
from dask.diagnostics import ProgressBar
def func(data):
sleep(10)
return data * 2
if __name__ == '__main__':
# 4 chunks
array = da.ones((100, 100), chunks=(25, -1))
out = array.map_blocks(func, meta=array)
print("using threads scheduler")
with ProgressBar():
out2 = out.compute(scheduler='threads')
print("using processes scheduler")
with ProgressBar():
out3 = out.compute(scheduler='processes')
The threads and processes scheduler will take about 10s and 40s, respectively. Howoever, in the following example (using dask delayed), both takes similar times: ~10-11s.
from time import sleep
import dask
import dask.array as da
from dask.diagnostics import ProgressBar
import numpy as np
def func(data):
sleep(10)
return data * 2
if __name__ == '__main__':
# 4 task
delayed_out = [dask.delayed(func, pure=True)(data_) for data_ in [np.ones((100, 100))]*4]
arrays = [
da.from_delayed(delayed_out_, dtype=float, shape=(100, 100))
for delayed_out_ in delayed_out
]
out = da.stack(arrays, axis=0)
print("using threads scheduler")
with ProgressBar():
out2 = out.compute(scheduler='threads')
print("using processes scheduler")
with ProgressBar():
out3 = out.compute(scheduler='processes')
Anything else we need to know?:
Environment:
- Dask version: 2025.1.0
- Python version: 3.12
- Operating System: windows 10
- Install method (conda, pip, source): conda