Open
Description
Hi,
We're facing issue preserving divisions when writing pyarrow structs containing multiple fields, this is better illustrated in the following example:
data = {
'id': ["01", "02", "03", "04", "05", "06", "07", "08", "09", "10"],
'A': [11, 12, 13, 14, 15, 16, 17, 18, 19, 20],
}
df = pd.DataFrame(data)
# Create a Dask DataFrame
ddf = dd.from_pandas(df, npartitions=3) # Set the number of partitions
# Sort the index
ddf = ddf.set_index('id', drop=True, sorted=True)
def add_nested_struct_multiple_fields(rows):
example_dict = {
"foo": "foo",
"bar": "bar",
}
return [example_dict, example_dict]
def transform_multiple_fields(dataframe):
dataframe["nested_struct"] = dataframe.apply(add_nested_struct_multiple_fields, axis=1)
return dataframe
nested_struct_multiple_fields_schema = pa.list_(
pa.struct(
[
pa.field("foo", pa.string()),
pa.field("bar", pa.string()),
],
)
)
def create_meta_dict(nested_struct_schema):
meta_dict = {"id": pd.Series(dtype="object")}
meta_dict["A"] = pd.Series(dtype=pd.ArrowDtype(pa.int32()))
meta_dict["nested_struct"] = pd.Series(dtype=pd.ArrowDtype(nested_struct_schema))
meta_df = pd.DataFrame(meta_dict).set_index("id")
return<
5D75
/span> meta_df
schema_nested_struct_multiple_fields = {
"A":pa.int32(),
"nested_struct":nested_struct_multiple_fields_schema
}
ddf_multiple_fields = ddf.map_partitions(
transform_multiple_fields,
meta=nested_struct_multiple_fields_meta,
)
ddf_multiple_fields.known_divisions
-> True # Divisions are still preserved before writing
dd.to_parquet(ddf_multiple_fields, "multiple_fields", schema=schema_nested_struct_multiple_fields)
dd.read_parquet(path="multiple_fields", index="id", calculate_divisions=True).known_divisions
-> False # Division are not known after reading the saved dataframe again
Here, we are writing two new fields ("foo", "bar"), when reading the parquet file again, the divisions are not preserved. Oddly enough this does not seem to occur when only one field is added to the struct. See this notebook for more clarity on the issue where we compare both scenarios.
Could you please provide clarity on the issue? Thanks in advance!
Environment:
- Dask version: 2024.2.1
- Python version: 3.11
- Operating System: Linux
- Install method (conda, pip, source): pip