8000 Deterministic hashing for DataFrameScan nodes in cudf-polars multi-partition executor by TomAugspurger · Pull Request #18351 · rapidsai/cudf · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Deterministic hashing for DataFrameScan nodes in cudf-polars multi-partition executor #18351

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Apr 2, 2025

Conversation

TomAugspurger
Copy link
Contributor
@TomAugspurger TomAugspurger commented Mar 21, 2025

Description

This updates how we hash DataFrameScan nodes to avoid hash collisions when creating, hashing, and destroying many DataFrameScan instances with the same schema but differing values. Rather than relying on the memory address of the polars PyDataFrame object, which might be reused, we include a random integer, which is cached for the lifetime of the instance.

There is a, hopefully non-breaking, behavior change. Previously, two cudf_polars.dsl.ir.DataFrameScan instances would hash equal when given the same Schema, PyDataFrame, projection, and ConfigOptions, where "same" for PyDataFrame means identity. Now, two instances will never hash the same.

This will fix the flaky tests seen in CI, where two instances were previously hashing the same if CPython happened to reuse the same piece of memory for two different instances.

Checklist

  • I am familiar with the Contributing Guidelines.
  • New or existing tests cover these changes.
  • The documentation is up to date with these changes.

This updates how we hash DataFrameScan nodes to be deterministic.
Rather than relying on the memory address of the polars PyDataFrame object,
which might be reused, we hash the actual data in the PyDataFrame.
Copy link
copy-pr-bot bot commented Mar 21, 2025

Auto-sync is disabled for draft pull requests in this repository. Workflows must be run manually.

Contributors can view more details about this message here.

@TomAugspurger
Copy link
Contributor Author
TomAugspurger commented Mar 21, 2025

We've seen some flaky tests in cudf-polars with the dask-experimental executor (https://github.com/rapidsai/cudf/actions/runs/13957246743/job/39072610696). The failures are inconsistent, but seem to be in the join and groupby tests, and only with a distributed Cluster active. I could somewhat reliably reproduce it with

counter=1
while true; do
    echo "Counter: $counter"
    python -m pytest "tests/experimental/" --executor dask-experimental --dask-cluster --count 5 -x --pdb; 
    ((counter++))
done

That would usually fail within 2-3 attempts. Sometimes more, but eventually.

My best guess is that this is a bug in our "deterministic" hashing for our IR nodes, which ended up not being so deterministic. Specifically DataFrameScan.get_hashable. For the multi-partition executor, we build a task graph whose keys include tokenized representations of the inputs. The expectation is that those hashes are stable (at least for as long as the task graph is alive) and unique. I'm worried that by using id(self.df), which gives the memory address of the Python object, we were getting "hash collisions" where

  1. We compute the hash for some (slice of) a polars DataFrame
  2. For whatever reason, that DataFrame went out of scope
  3. We make a new polars DataFrame, and CPython places it at the same memory address
  4. We accidentally place the new dataframe in place of the old one, since it has the same hash.

I might be wrong about some of the details, but I'm probably confident enough to conclude that if we fix the hashing to be more deterministic then the failures should go away.

This PR has a new implementation that's based around converting the polars DataFrame to arrow RecordBatches and hashing those. We should end up with something that's unique across the lifetime of the process.

Before moving this out of draft, I want to do a bit of performance checks and let the test suite run for a while.

@github-actions github-actions bot added Python Affects Python cuDF API. cudf-polars Issues specific to cudf-polars labels Mar 21, 2025
@@ -746,7 +747,7 @@ def get_hashable(self) -> Hashable:
return (
type(self),
schema_hash,
id(self.df),
hash_polars_dataframe(self.df),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While this will be correct, I am worried that it can be arbitrarily bad for performance.

Specifically, all the rewrites need a fast hash of the object. In this case, computing the hash scales with the size of this dataframe object.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we could instead use a different approach for generating the key names by using sequence numbers.

That is, we have one sequence number that is incremented each time we generate a new task graph (so that different executions of the same thing don't overlap). And then a sequence number keyed on the object we're generating the key for.

Sketch:

from weakref import WeakKeyDictionary
import itertools

def key_name(obj: ir.IR, graph_id: int, id_gen: itertools.count, seq_ids: WeakKeyDictionary) -> str:
    obj_id = seq_ids.get(obj)
    if obj_id is None:
        obj_id = seq_ids.setdefault(obj, next(id_gen))
    return f"task-graph-{graph_id}-{type(obj).__name__}-{obj_id}"

In that case even if we get hash collisions, because we don't have equality of two dataframescan objects that might collide in their hash, we'll never reuse a sequence id for something that hashes the same.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not just a new element to DataFrameScan.get_hashable that includes this kind of sequencing logic (e.g. seq_ids.get(self.df))? I'm pretty sure this will solve the hashing problem without slowing things down.

We currently call key_name multiple times for the same ir node, and we expect to get the same value every time. I don't think we want to plumb through these arguments to all of these key_name calls until we are sure it's necessary.

Copy link
Contributor Author
@TomAugspurger TomAugspurger Mar 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case, computing the hash scales with the size of this dataframe object.

This is correct, but I wonder how much does it matter in practice? My hope was that this DataFrameScan object is only included when there's an actual, in-memory DataFrame in the IR. I assumed that only happens when a user builds that manually (Iike we do in some tests), but not when loaded through parquet, which will show up as a Scan node.

I will try to get some concrete performance numbers though, for us to make an informed decision.

Copy link
Member
@rjzamora rjzamora Mar 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but I wonder how much does it matter in practice?

It won't matter for medium/large data applications where people read from disk. However, it will probably show up often when people do queries on <10GB of in-memory data to compare CPU-vs-GPU performance.

Copy link
Contributor Author
@TomAugspurger TomAugspurger Mar 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We discussed some options for avoiding these hash collisions. The simplest would be to throw some randomness into DataFrameScan.get_hashable (either directly, or in __init__ storing some random value or perhaps some sequence counter). Because hash(node) is cached, and (currently) we only use get_hashable via hash, this will fix the flaky test.

But what do we lose? Most directly, two DataFrameScan instances that happen to refer to the same PyDataFrame will no longer hash the same, as demonstrated in this snippet:

import pylibcudf as plc
import polars as pl
from cudf_polars.dsl.ir import DataFrameScan
from cudf_polars.utils.config import ConfigOptions

df1 = pl.DataFrame({"a": [1, 2, 3, 4, 5]})
df2 = pl.DataFrame({"a": [0, 2, 3, 4, 5]})
schema = {
    "a": plc.types.DataType(plc.types.TypeId.INT64)
}


ir1 = DataFrameScan(
    schema=schema,
    df=df1._df,
    projection=None,
    config_options=ConfigOptions({}),
)
ir2 = DataFrameScan(
    schema=schema,
    df=df2._df,
    projection=None,
    config_options=ConfigOptions({}),
)

assert hash(ir1) == hash(ir1)  # stable, for a given instance
assert hash(ir1) != hash(ir2)  # unique
# this is the potential problem
ir1_clone = DataFrameScan(
    schema=schema,
    df=df1._df,
    projection=None,
    config_options=ConfigOptions({}),
)

assert hash(ir1) == hash(ir1_clone)  # Should this be the same?

On main, that last assert is True. Since ir1 and ir1_clone have the same type, schema, projection, config options, and point to the same PyDataFrame, they have the same hash. With the proposal to include some randomness in get_hashable no two DataFrameScan nodes would ever have the same hash (indeed, we chould just return a random number from DataFrameScan.get_hashable), so that assertion will be False.

My question (which I'll start looking into now): what are the consequences of two instances of DataFrameScan, which happen to be the same (by pointing to the same PyDataFrame), having different hashes? Does this actually occur in practice? If it does occur, what do we lose by not hashing them to the same value?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

get_hashable is used (and cached) via hash, which is used via
get_key_name, which is used in:

  • PartitionInfo.keys
  • generate_ir_tasks[GroupBy] (node and children)
  • generate_ir_tasks[Join] (node, left, and right)
  • task_graph
  • generate_ir_tasks[IR]
  • generate_ir_tasks[Union]
  • generate_ir_tasks[Projection,Cache,Filter,HStack,Select]
  • generate_ir_tasks[Shuffle]

With the random hash, we'll have a behavior change if and only if we have
multiple DataFrameScan instances that refer to the same PyDataFrame (previously they'd have the same hash; now they'll have different hashes). So where do we create those nodes?

  • _translate_ir.register[pl_ir.DataFrameScan]
  • lower_ir_node[DataFrameScan]
    • Kind of... we make one per partition, each of which is a slice into the
      original PyDataFrame.

This doesn't really answer my question, but getting closer hopefully.

Copy link
Contributor Author
@TomAugspurger TomAugspurger Mar 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a print statement to the translate_ir for DataFrameScan, and made a simple example that I thought would trigger the behavior change: joining a DataFrame to itself. I expected polars to give us (at most) two IR nodes that refer to the same PyDataFrame. Instead, on branch-25.06 I see that

In [1]: from cudf_polars.dsl.translate import Translator
   ...: import polars as pl
   ...: 
   ...: df = pl.LazyFrame({"a": [1, 2, 3, 4, 5]})
   ...: q = df.join(df, on="a")
   ...: 
   ...: # Convert to our IR
   ...: ir = Translator(q._ldf.visit(), pl.GPUEngine()).translate_ir()
   ...: 
   ...: 
id(node.df)=140413893011904
id(node.df)=140414147777824

In [2]: a = ir.children[0].children[0].children[0]

In [3]: b = ir.children[1].children[0].children[0]

In [4]: a  # the memory address of the builtins.PyDataFrame object is what matters
Out[4]: DataFrameScan({'a': <pylibcudf.types.DataType object at 0x7fb5a2602b50>}, <builtins.PyDataFrame object at 0x7fb4a836e9c0>, ('a',), <cudf_polars.utils.config.ConfigOptions object at 0x7fb5a24dd3c0>)

In [5]: b
Out[5]: DataFrameScan({'a': <pylibcudf.types.DataType object at 0x7fb5a2602b50>}, <builtins.PyDataFrame object at 0x7fb4b7665520>, ('a',), <cudf_polars.utils.config.ConfigOptions object at 0x7fb5a24dd3c0>)

Note that the hashes give would be different, because polars gives us two different PyDataFrame objects in the IR (this is the surprising bit).

On this branch at 516cd79, I see

In [1]: from cudf_polars.dsl.translate import Translator
   ...: import polars as pl
   ...: 
   ...: df = pl.LazyFrame({"a": [1, 2, 3, 4, 5]})
   ...: q = df.join(df, on="a")
   ...: 
   ...: # Convert to our IR
   ...: ir = Translator(q._ldf.visit(), pl.GPUEngine()).translate_ir()
   ...: 
   ...: 
id(node.df)=139673974155632
id(node.df)=139673974160352

In [2]: a = ir.children[0].children[0].children[0]

In [3]: b = ir.children[1].children[0].children[0]

In [4]: a  # the memory address of the builtins.PyDataFrame object is what matters
Out[4]: DataFrameScan({'a': <pylibcudf.types.DataType object at 0x7f095bc9ab50>}, <builtins.PyDataFrame object at 0x7f08619d2970>, ('a',), <cudf_polars.utils.config.ConfigOptions object at 0x7f086a5af580>)

In [5]: b
Out[5]: DataFrameScan({'a': <pylibcudf.types.DataType object at 0x7f095bc9ab50>}, <builtins.PyDataFrame object at 0x7f08619d3be0>, ('a',), <cudf_polars.utils.config.ConfigOptions object at 0x7f086a5af580>)

So I'm tempted to say that, at least for this simple test, it doesn't really matter whether we actually hash the data since polars isn't ever giving us multiple (polars) IR nodes that point to the same PyDataFrame (or it does but we cache something; in which case it still doesn't matter?).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another simple test that polars seemingly isn't giving us the same PyDataFrame objects: concatenating the same dataframe.

df = pl.LazyFrame({"a": [1, 2, 3, 4, 5]})
q = pl.concat([df, df])
ir = Translator(q._ldf.visit(), pl.GPUEngine()).translate_ir()

If we override the translator to print out the id of the pl_ir.DataFrameScan.df:

from __future__ import annotations

import polars as pl
from polars.polars import _ir_nodes as pl_ir

import pylibcudf as plc

from cudf_polars.dsl import ir
from cudf_polars.dsl.translate import Translator, _translate_ir


@_translate_ir.register
def _(
    node: pl_ir.DataFrameScan, translator: Translator, schema: dict[str, plc.DataType]
) -> ir.IR:
    print(f"{node.df=}")
    return ir.DataFrameScan(
        schema,
        node.df,
        node.projection,
        translator.config_options,
    )

it prints out

node.df=<builtins.PyDataFrame object at 0x7faac0df8b20>
node.df=<builtins.PyDataFrame object at 0x7faac0df9cf0>

@wence-
Copy link
Contributor
wence- commented Mar 24, 2025

question: What does this --count 5 argument do? My pytest complains it is an unrecognised arg.

@rjzamora
Copy link
Member

@wence- - I believe he has pytest-repeat installed.

@TomAugspurger TomAugspurger added bug Something isn't working non-breaking Non-breaking change labels Mar 25, 2025
@TomAugspurger TomAugspurger marked this pull request as ready for review March 26, 2025 21:38
@TomAugspurger TomAugspurger requested a review from a team as a code owner March 26, 2025 21:38
@TomAugspurger
Copy link
Contributor Author

I've marked this as ready for review. The main outstanding question is whether it's a problem that

>>> hash(ir1) == hash(ir2)
False

now returns False when we have two DataFrameScan instances that are the same (same schema, projection, config options, and point to the same PyDataFrame). Previously that would have returned true (and also when CPython reused a memory address with a different PyDataFrame, which was the problem).

My limited testing in #18351 (comment) showed that this returning False might not be a problem. If anyone with a bit more knowledge of polar's IR and our translation is able to confirm that then we're good to go.

@TomAugspurger
Copy link
Contributor Author

I guess I have one more question on the implementation: if we include a random integer in get_hashable, why bother hashing anything (like the schema or projection)? Thanks to the presence if the random integer, they aren't going to hash to the same value anyway (my argument in #18351 (comment) is that it doesn't matter either way because they weren't hashing the same previously, since polars wasn't ever giving us the same PyDataFrame objects)

@rjzamora
Copy link
Member

@TomAugspurger - My overall impression is that this PR looks good as is. However, it seems like you have some remaining concerns/questions:

  1. The main outstanding question is whether it's a problem that hash(ir1) == hash(ir2) returns False when we have two DataFrameScan instances that are the same (same schema, projection, config options, and point to the same PyDataFrame).

I think this is exactly the goal. We want an initialized DataFrameScan object to be hashable (i.e. hash(ir1) == hash(ir1)), but we want that hash to be unique (even if the underlying data is the same).

  1. So I'm tempted to say that, at least for this simple test, it doesn't really matter whether we actually hash the data since polars isn't ever giving us multiple (polars) IR nodes that point to the same PyDataFrame (or it does but we cache something; in which case it still doesn't matter?).

If I understand correctly, you are suspicious that the current "fix" is actually changing anything at all, because we were already getting a different hash for the same data at translation time. Is that correct?

  1. if we include a random integer in get_hashable, why bother hashing anything (like the schema or projection)?

For a DataFrameScan object, we don't really need to. However, it probably makes sense to follow the same convention as we do for all other IR objects, since we may revisit this problem with a different "fix" in the future.

Copy link
Member
@rjzamora rjzamora left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm happy to continue discussing the comments/questions in #18351 (comment)

That said, I also think this change makes a lot of sense if it fixes the flaky tests.

@TomAugspurger
Copy link
Contributor Author

If I understand correctly, you are suspicious that the current "fix" is actually changing anything at all, because we were already getting a different hash for the same data at translation time. Is that correct?

It's (almost) surely fixing the root cause of the flaky tests: us getting a hash collision for two DataFrameScan nodes that are actually different.

In addition, it's also changing the behavior of hash(a) == hash(b) for an a and b that truly the "same"

node = pl_ir.DataFrameScan(...)
a = ir.DataFrameScan(
        schema,
        node.df,
        node.projection,
       config_options,
)
b = ir.DataFrameScan(
        schema,
        node.df,
        node.projection,
       config_options,
)
assert hash(a) == hash(b)  # previously true, now false.

But AFAICT, we never see polars giving us the same node twice, even for something like pl.concat([df, df]). So this behavior change shouldn't be observed (and even if it was observed I think it'd be OK, but I haven't confirmed that).

@rjzamora
Copy link
Member

So this behavior change shouldn't be observed (and even if it was observed I think it'd be OK, but I haven't confirmed that).

Oh, okay. I think I understand a bit better. My understanding is that it is absolutely fine for hash(a) to not equal hash(b) unless a and b are literally the same object.

@rjzamora
Copy link
Member

cc @wence- (Since you requested changes on an earlier draft)

@TomAugspurger
Copy link
Contributor Author

/merge

@rapids-bot rapids-bot bot merged commit 3d4e71b into rapidsai:branch-25.06 Apr 2, 2025
112 checks passed
@TomAugspurger TomAugspurger deleted the tom/dataframe-hash branch April 2, 2025 16:49
bdice pushed a commit to bdice/cudf that referenced this pull request Apr 2, 2025
…rtition executor (rapidsai#18351)

This updates how we hash DataFrameScan nodes to avoid hash collisions when creating, hashing, and destroying many `DataFrameScan` instances with the same schema but differing values. Rather than relying on the memory address of the polars PyDataFrame object, which might be reused, we include a random integer, which is cached for the lifetime of the instance.

There is a, hopefully non-breaking, behavior change. Previously, two `cudf_polars.dsl.ir.DataFrameScan` instances would hash equal when given the same `Schema`, `PyDataFrame`, `projection`, and `ConfigOptions`, where "same" for `PyDataFrame` means identity. Now, two instances will never hash the same.

This will fix the flaky tests seen in CI, where two instances were previously hashing the same if CPython happened to reuse the same piece of memory for two different instances.

Authors:
  - Tom Augspurger (https://github.com/TomAugspurger)

Approvers:
  - Richard (Rick) Zamora (https://github.com/rjzamora)
  - Lawrence Mitchell (https://github.com/wence-)

URL: rapidsai#18351
AyodeAwe pushed a commit that referenced this pull request Apr 2, 2025
…s multi-partition executor (#18351) (#18420)

## Description
This is a backport of #18351 to fix
flaky tests in the cudf.polars experimental module.

## Checklist
- [x] I am familiar with the [Contributing
Guidelines](https://github.com/rapidsai/cudf/blob/HEAD/CONTRIBUTING.md).
- [x] New or existing tests cover these changes.
- [x] The documentation is up to date with these changes.

Co-authored-by: Tom Augspurger <toaugspurger@nvidia.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working cudf-polars Issues specific to cudf-polars non-breaking Non-breaking change Python Affects Python cuDF API.
Projects
Archived in project
Development

Successfully merging this pull request may close these issues.

3 participants
0