-
Notifications
You must be signed in to change notification settings - Fork 952
Deterministic hashing for DataFrameScan nodes in cudf-polars multi-partition executor #18351
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Deterministic hashing for DataFrameScan nodes in cudf-polars multi-partition executor #18351
Conversation
This updates how we hash DataFrameScan nodes to be deterministic. Rather than relying on the memory address of the polars PyDataFrame object, which might be reused, we hash the actual data in the PyDataFrame.
Auto-sync is disabled for draft pull requests in this repository. Workflows must be run manually. Contributors can view more details about this message here. |
We've seen some flaky tests in cudf-polars with the dask-experimental executor (https://github.com/rapidsai/cudf/actions/runs/13957246743/job/39072610696). The failures are inconsistent, but seem to be in the join and groupby tests, and only with a distributed Cluster active. I could somewhat reliably reproduce it with
That would usually fail within 2-3 attempts. Sometimes more, but eventually. My best guess is that this is a bug in our "deterministic" hashing for our IR nodes, which ended up not being so deterministic. Specifically DataFrameScan.get_hashable. For the multi-partition executor, we build a task graph whose keys include tokenized representations of the inputs. The expectation is that those hashes are stable (at least for as long as the task graph is alive) and unique. I'm worried that by using
I might be wrong about some of the details, but I'm probably confident enough to conclude that if we fix the hashing to be more deterministic then the failures should go away. This PR has a new implementation that's based around converting the polars DataFrame to arrow RecordBatches and hashing those. We should end up with something that's unique across the lifetime of the process. Before moving this out of draft, I want to do a bit of performance checks and let the test suite run for a while. |
@@ -746,7 +747,7 @@ def get_hashable(self) -> Hashable: | |||
return ( | |||
type(self), | |||
schema_hash, | |||
id(self.df), | |||
hash_polars_dataframe(self.df), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While this will be correct, I am worried that it can be arbitrarily bad for performance.
Specifically, all the rewrites need a fast hash of the object. In this case, computing the hash scales with the size of this dataframe object.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we could instead use a different approach for generating the key names by using sequence numbers.
That is, we have one sequence number that is incremented each time we generate a new task graph (so that different executions of the same thing don't overlap). And then a sequence number keyed on the object we're generating the key for.
Sketch:
from weakref import WeakKeyDictionary
import itertools
def key_name(obj: ir.IR, graph_id: int, id_gen: itertools.count, seq_ids: WeakKeyDictionary) -> str:
obj_id = seq_ids.get(obj)
if obj_id is None:
obj_id = seq_ids.setdefault(obj, next(id_gen))
return f"task-graph-{graph_id}-{type(obj).__name__}-{obj_id}"
In that case even if we get hash collisions, because we don't have equality of two dataframescan objects that might collide in their hash, we'll never reuse a sequence id for something that hashes the same.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not just a new element to DataFrameScan.get_hashable
that includes this kind of sequencing logic (e.g. seq_ids.get(self.df)
)? I'm pretty sure this will solve the hashing problem without slowing things down.
We currently call key_name
multiple times for the same ir
node, and we expect to get the same value every time. I don't think we want to plumb through these arguments to all of these key_name
calls until we are sure it's necessary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this case, computing the hash scales with the size of this dataframe object.
This is correct, but I wonder how much does it matter in practice? My hope was that this DataFrameScan
object is only included when there's an actual, in-memory DataFrame in the IR. I assumed that only happens when a user builds that manually (Iike we do in some tests), but not when loaded through parquet, which will show up as a Scan
node.
I will try to get some concrete performance numbers though, for us to make an informed decision.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
but I wonder how much does it matter in practice?
It won't matter for medium/large data applications where people read from disk. However, it will probably show up often when people do queries on <10GB of in-memory data to compare CPU-vs-GPU performance.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We discussed some options for avoiding these hash collisions. The simplest would be to throw some randomness into DataFrameScan.get_hashable
(either directly, or in __init__
storing some random value or perhaps some sequence counter). Because hash(node)
is cached, and (currently) we only use get_hashable
via hash
, this will fix the flaky test.
But what do we lose? Most directly, two DataFrameScan
instances that happen to refer to the same PyDataFrame
will no longer hash the same, as demonstrated in this snippet:
import pylibcudf as plc
import polars as pl
from cudf_polars.dsl.ir import DataFrameScan
from cudf_polars.utils.config import ConfigOptions
df1 = pl.DataFrame({"a": [1, 2, 3, 4, 5]})
df2 = pl.DataFrame({"a": [0, 2, 3, 4, 5]})
schema = {
"a": plc.types.DataType(plc.types.TypeId.INT64)
}
ir1 = DataFrameScan(
schema=schema,
df=df1._df,
projection=None,
config_options=ConfigOptions({}),
)
ir2 = DataFrameScan(
schema=schema,
df=df2._df,
projection=None,
config_options=ConfigOptions({}),
)
assert hash(ir1) == hash(ir1) # stable, for a given instance
assert hash(ir1) != hash(ir2) # unique
# this is the potential problem
ir1_clone = DataFrameScan(
schema=schema,
df=df1._df,
projection=None,
config_options=ConfigOptions({}),
)
assert hash(ir1) == hash(ir1_clone) # Should this be the same?
On main
, that last assert is True
. Since ir1
and ir1_clone
have the same type, schema, projection, config options, and point to the same PyDataFrame
, they have the same hash. With the proposal to include some randomness in get_hashable
no two DataFrameScan
nodes would ever have the same hash (indeed, we chould just return a random number from DataFrameScan.get_hashable
), so that assertion will be False
.
My question (which I'll start looking into now): what are the consequences of two instances of DataFrameScan
, which happen to be the same (by pointing to the same PyDataFrame
), having different hashes? Does this actually occur in practice? If it does occur, what do we lose by not hashing them to the same value?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
get_hashable
is used (and cached) via hash
, which is used via
get_key_name
, which is used in:
PartitionInfo.keys
generate_ir_tasks[GroupBy]
(node and children)generate_ir_tasks[Join]
(node, left, and right)task_graph
generate_ir_tasks[IR]
generate_ir_tasks[Union]
generate_ir_tasks[Projection,Cache,Filter,HStack,Select]
generate_ir_tasks[Shuffle]
With the random hash, we'll have a behavior change if and only if we have
multiple DataFrameScan instances that refer to the same PyDataFrame (previously they'd have the same hash; now they'll have different hashes). So where do we create those nodes?
_translate_ir.register[pl_ir.DataFrameScan]
lower_ir_node[DataFrameScan]
- Kind of... we make one per partition, each of which is a slice into the
original PyDataFrame.
- Kind of... we make one per partition, each of which is a slice into the
This doesn't really answer my question, but getting closer hopefully.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added a print
statement to the translate_ir
for DataFrameScan
, and made a simple example that I thought would trigger the behavior change: joining a DataFrame to itself. I expected polars to give us (at most) two IR nodes that refer to the same PyDataFrame. Instead, on branch-25.06
I see that
In [1]: from cudf_polars.dsl.translate import Translator
...: import polars as pl
...:
...: df = pl.LazyFrame({"a": [1, 2, 3, 4, 5]})
...: q = df.join(df, on="a")
...:
...: # Convert to our IR
...: ir = Translator(q._ldf.visit(), pl.GPUEngine()).translate_ir()
...:
...:
id(node.df)=140413893011904
id(node.df)=140414147777824
In [2]: a = ir.children[0].children[0].children[0]
In [3]: b = ir.children[1].children[0].children[0]
In [4]: a # the memory address of the builtins.PyDataFrame object is what matters
Out[4]: DataFrameScan({'a': <pylibcudf.types.DataType object at 0x7fb5a2602b50>}, <builtins.PyDataFrame object at 0x7fb4a836e9c0>, ('a',), <cudf_polars.utils.config.ConfigOptions object at 0x7fb5a24dd3c0>)
In [5]: b
Out[5]: DataFrameScan({'a': <pylibcudf.types.DataType object at 0x7fb5a2602b50>}, <builtins.PyDataFrame object at 0x7fb4b7665520>, ('a',), <cudf_polars.utils.config.ConfigOptions object at 0x7fb5a24dd3c0>)
Note that the hashes give would be different, because polars gives us two different PyDataFrame
objects in the IR (this is the surprising bit).
On this branch at 516cd79, I see
In [1]: from cudf_polars.dsl.translate import Translator
...: import polars as pl
...:
...: df = pl.LazyFrame({"a": [1, 2, 3, 4, 5]})
...: q = df.join(df, on="a")
...:
...: # Convert to our IR
...: ir = Translator(q._ldf.visit(), pl.GPUEngine()).translate_ir()
...:
...:
id(node.df)=139673974155632
id(node.df)=139673974160352
In [2]: a = ir.children[0].children[0].children[0]
In [3]: b = ir.children[1].children[0].children[0]
In [4]: a # the memory address of the builtins.PyDataFrame object is what matters
Out[4]: DataFrameScan({'a': <pylibcudf.types.DataType object at 0x7f095bc9ab50>}, <builtins.PyDataFrame object at 0x7f08619d2970>, ('a',), <cudf_polars.utils.config.ConfigOptions object at 0x7f086a5af580>)
In [5]: b
Out[5]: DataFrameScan({'a': <pylibcudf.types.DataType object at 0x7f095bc9ab50>}, <builtins.PyDataFrame object at 0x7f08619d3be0>, ('a',), <cudf_polars.utils.config.ConfigOptions object at 0x7f086a5af580>)
So I'm tempted to say that, at least for this simple test, it doesn't really matter whether we actually hash the data since polars isn't ever giving us multiple (polars) IR nodes that point to the same PyDataFrame
(or it does but we cache something; in which case it still doesn't matter?).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another simple test that polars seemingly isn't giving us the same PyDataFrame
objects: concatenating the same dataframe.
df = pl.LazyFrame({"a": [1, 2, 3, 4, 5]})
q = pl.concat([df, df])
ir = Translator(q._ldf.visit(), pl.GPUEngine()).translate_ir()
If we override the translator to print out the id
of the pl_ir.DataFrameScan.df
:
from __future__ import annotations
import polars as pl
from polars.polars import _ir_nodes as pl_ir
import pylibcudf as plc
from cudf_polars.dsl import ir
from cudf_polars.dsl.translate import Translator, _translate_ir
@_translate_ir.register
def _(
node: pl_ir.DataFrameScan, translator: Translator, schema: dict[str, plc.DataType]
) -> ir.IR:
print(f"{node.df=}")
return ir.DataFrameScan(
schema,
node.df,
node.projection,
translator.config_options,
)
it prints out
node.df=<builtins.PyDataFrame object at 0x7faac0df8b20>
node.df=<builtins.PyDataFrame object at 0x7faac0df9cf0>
question: What does this |
@wence- - I believe he has |
I've marked this as ready for review. The main outstanding question is whether it's a problem that >>> hash(ir1) == hash(ir2)
False now returns My limited testing in #18351 (comment) showed that this returning False might not be a problem. If anyone with a bit more knowledge of polar's IR and our translation is able to confirm that then we're good to go. |
I guess I have one more question on the implementation: if we include a random integer in |
@TomAugspurger - My overall impression is that this PR looks good as is. However, it seems like you have some remaining concerns/questions:
I think this is exactly the goal. We want an initialized
If I understand correctly, you are suspicious that the current "fix" is actually changing anything at all, because we were already getting a different hash for the same data at translation time. Is that correct?
For a |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm happy to continue discussing the comments/questions in #18351 (comment)
That said, I also think this change makes a lot of sense if it fixes the flaky tests.
It's (almost) surely fixing the root cause of the flaky tests: us getting a hash collision for two In addition, it's also changing the behavior of node = pl_ir.DataFrameScan(...)
a = ir.DataFrameScan(
schema,
node.df,
node.projection,
config_options,
)
b = ir.DataFrameScan(
schema,
node.df,
node.projection,
config_options,
)
assert hash(a) == hash(b) # previously true, now false. But AFAICT, we never see polars giving us the same |
Oh, okay. I think I understand a bit better. My understanding is that it is absolutely fine for |
cc @wence- (Since you requested changes on an earlier draft) |
/merge |
…rtition executor (rapidsai#18351) This updates how we hash DataFrameScan nodes to avoid hash collisions when creating, hashing, and destroying many `DataFrameScan` instances with the same schema but differing values. Rather than relying on the memory address of the polars PyDataFrame object, which might be reused, we include a random integer, which is cached for the lifetime of the instance. There is a, hopefully non-breaking, behavior change. Previously, two `cudf_polars.dsl.ir.DataFrameScan` instances would hash equal when given the same `Schema`, `PyDataFrame`, `projection`, and `ConfigOptions`, where "same" for `PyDataFrame` means identity. Now, two instances will never hash the same. This will fix the flaky tests seen in CI, where two instances were previously hashing the same if CPython happened to reuse the same piece of memory for two different instances. Authors: - Tom Augspurger (https://github.com/TomAugspurger) Approvers: - Richard (Rick) Zamora (https://github.com/rjzamora) - Lawrence Mitchell (https://github.com/wence-) URL: rapidsai#18351
…s multi-partition executor (#18351) (#18420) ## Description This is a backport of #18351 to fix flaky tests in the cudf.polars experimental module. ## Checklist - [x] I am familiar with the [Contributing Guidelines](https://github.com/rapidsai/cudf/blob/HEAD/CONTRIBUTING.md). - [x] New or existing tests cover these changes. - [x] The documentation is up to date with these changes. Co-authored-by: Tom Augspurger <toaugspurger@nvidia.com>
Description
This updates how we hash DataFrameScan nodes to avoid hash collisions when creating, hashing, and destroying many
DataFrameScan
instances with the same schema but differing values. Rather than relying on the memory address of the polars PyDataFrame object, which might be reused, we include a random integer, which is cached for the lifetime of the instance.There is a, hopefully non-breaking, behavior change. Previously, two
cudf_polars.dsl.ir.DataFrameScan
instances would hash equal when given the sameSchema
,PyDataFrame
,projection
, andConfigOptions
, where "same" forPyDataFrame
means identity. Now, two instances will never hash the same.This will fix the flaky tests seen in CI, where two instances were previously hashing the same if CPython happened to reuse the same piece of memory for two different instances.
Checklist