Open
Description
In the demo of dask/distributed#7586, you can spot "I/O" time in the collected metrics. This was possible by decorating I/O heavy dask/dask functions:
--- a/dask/dataframe/io/parquet/core.py
+++ b/dask/dataframe/io/parquet/core.py
@@ -27,6 +27,8 @@ from dask.highlevelgraph import HighLevelGraph
from dask.layers import DataFrameIOLayer
from dask.utils import apply, import_required, natural_sort_key, parse_bytes
+from distributed.metrics import context_meter
+
__all__ = ("read_parquet", "to_parquet")
NONE_LABEL = "__null_dask_index__"
@@ -158,6 +160,7 @@ class ToParquetFunctionWrapper:
self.kwargs_pass,
)
+ @context_meter.meter("thread-I/O")
def __call__(self, df, block_index: tuple[int]):
# Get partition index from block index tuple
part_i = block_index[0]
@@ -643,6 +646,7 @@ def check_multi_support(engine):
return hasattr(engine, "multi_support") and engine.multi_support()
+@context_meter.meter("thread-I/O")
def read_parquet_part(
fs, engine, meta, part, columns, index, use_nullable_dtypes, kwargs
):
without the above, I/O heavy tasks would be broadly classified as "thread-noncpu", which also includes GIL contention, higher load than there are physical CPUs, and context switch overhead.
We should decorate all functions in dask/dask that perform I/O as in the above example, with a trivial dummy contextmanager in case distributed is not installed.