Open
Description
Context: dask/dask-expr#659 recently adjusted the default shuffle method used by series.unique()
(from "tasks" to "p2p"). I don't think there was anything wrong with that particular PR. However, that change exposed the fact that .cat.as_known()
does not properly preserve the original dtype of the categories when "p2p" is enabled:
import dask
import dask.dataframe as dd
from distributed import Client, LocalCluster
with LocalCluster() as cluster:
with Client(cluster) as client:
df = dd.from_dict({"qid": [1, 2, 1, 0, 2]}, npartitions=3)
with dask.config.set({"dataframe.shuffle.method": "tasks"}):
known_tasks = df.qid.astype("category").cat.as_known()
with dask.config.set({"dataframe.shuffle.method": "p2p"}):
known_p2p = df.qid.astype("category").cat.as_known()
dd.assert_eq(known_tasks, known_p2p)
...
AssertionError: Attributes of Series are different
Attribute "dtype" are different
[left]: CategoricalDtype(categories=[2, 0, 1], ordered=False, categories_dtype=int64)
[right]: CategoricalDtype(categories=['__UNKNOWN_CATEGORIES__', 2, 0, 1], ordered=False, categories_dtype=object)
I don't think the problem is in "p2p" itself. Rather, the proper metadata seems to be lost before the data is shuffled. For "tasks", the proper metadata is recovered after the intermediate compute step. However, we aren't as lucky for "p2p", because we round-trip the data to PyArrow.