[Data] - Predicate Pushdown - Push predicate exprs past eligible operators#58555
Conversation
…ators Signed-off-by: Goutam <[email protected]>
There was a problem hiding this comment.
Code Review
This pull request introduces a significant and well-designed refactoring of the predicate pushdown logic. By moving to a trait-based system with PredicatePushable and PredicatePushdownBehavior, the logic becomes much more extensible and maintainable, which is a great improvement. The changes are consistently applied across various operators, and the new tests are comprehensive and well-structured.
I've found one important issue in the new _clone_op_with_new_inputs helper that could lead to an inconsistent operator graph. I've also left a comment on a potentially confusing implementation in the new PredicatePushable interface.
Overall, this is excellent work that improves the foundation of the query optimizer.
python/ray/data/_internal/logical/interfaces/logical_operator.py
Outdated
Show resolved
Hide resolved
Signed-off-by: Goutam <[email protected]>
Signed-off-by: Goutam <[email protected]>
python/ray/data/_internal/logical/interfaces/logical_operator.py
Outdated
Show resolved
Hide resolved
| return op | ||
|
|
||
| input_op = op.input_dependencies[0] | ||
| predicate_expr = op._predicate_expr |
There was a problem hiding this comment.
The annotation for op is LogicalOperator, but I don't think LogicalOperator is guaranteed to have a _predicate_expr attribute. Is there a risk of attribute errors?
There was a problem hiding this comment.
Oh we check if it's an expression based Filter in the if statement above. I can make the typing here explicit to make it easier.
| return input_op.apply_predicate(predicate_expr) | ||
|
|
||
| # Case 2: Check if operator allows predicates to pass through | ||
| if isinstance(input_op, PredicatePassThrough): |
There was a problem hiding this comment.
Nit: LogicalOperatorSupportsPredicatePushdown and PredicatePassThrough appear to follow very different naming convention. Any reason not to use similar naming patterns (e.g., PredicatePushdown/PredicatePassThrough or LogicalOperatorSupportsPredicatePushdown/LogicalOperatorSupportsPredicatePassThrough)?
There was a problem hiding this comment.
I don't have a strong opinion on this one. I can change this to LogicalOperatorSupportsPredicatePassThrough to be consistent.
| if behavior == PredicatePushdownBehavior.PASSTHROUGH: | ||
| # Push filter through and recursively try to push further | ||
| new_filter = Filter( | ||
| input_op.input_dependencies[0], |
There was a problem hiding this comment.
(Here and below) Are we always guaranteed to have one input dependency in this case? If this is an assumption we're making, add assertion?
There was a problem hiding this comment.
Yes added an assertion
| branch₁ ─> Filter(predicate) ─┐ | ||
| branch₂ ─> Filter(predicate) ─┤ Union | ||
| branch₃ ─> Filter(predicate) ─┘ | ||
| def _push_filter_through_conditionally( |
There was a problem hiding this comment.
This _push_filter_through_conditionally/PredicatePushdownBehavior .CONDITIONAL abstractions confused me because they sounds like a generic abstractions, but the implementation is specific to joins.
Don't really have a good alternative suggestion, but thought it was worth calling out.
There was a problem hiding this comment.
For now it's specific to joins. But this could apply to Intersect (not yet in RD), Window operator etc.
| ) | ||
|
|
||
|
|
||
| class TestPredicatePushdownIntoRead: |
There was a problem hiding this comment.
Are there tests for the join case anywhere?
There was a problem hiding this comment.
Yes in test_join
| # Verify plan: all filters pushed into Read, passthrough ops remain | ||
| plan = _get_optimized_plan(ds) | ||
| assert "Filter[Filter" not in plan, f"No Filter operators should remain: {plan}" |
There was a problem hiding this comment.
Here and elsewhere -- would prefer to not test against the string representations because that's historically been pretty brittle.
If it'd make sense to do so, one option is to add a utility function to LogicalPlan to see it structurally matches another.
There was a problem hiding this comment.
I added some helper functions in test utils to replace the string assertions with class assertions
Signed-off-by: Goutam <[email protected]>
Signed-off-by: Goutam <[email protected]>
Signed-off-by: Goutam <[email protected]>
Signed-off-by: Goutam <[email protected]>
Signed-off-by: Goutam <[email protected]>
Signed-off-by: Goutam <[email protected]>
Signed-off-by: Goutam <[email protected]>
| ) | ||
|
|
||
|
|
||
| class TestPredicatePushdownIntoRead: |
…ators (ray-project#58555) ## Description Push `Filter` past Join (depends on the join op), `Filter` into Union branches, `Filter` past projections (accounting for renames), past all shuffle ops. ## Related issues > Link related issues: "Fixes ray-project#1234", "Closes ray-project#1234", or "Related to ray-project#1234". ## Additional information > Optional: Add implementation details, API changes, usage examples, screenshots, etc. --------- Signed-off-by: Goutam <[email protected]>
…ators (ray-project#58555) ## Description Push `Filter` past Join (depends on the join op), `Filter` into Union branches, `Filter` past projections (accounting for renames), past all shuffle ops. ## Related issues > Link related issues: "Fixes ray-project#1234", "Closes ray-project#1234", or "Related to ray-project#1234". ## Additional information > Optional: Add implementation details, API changes, usage examples, screenshots, etc. --------- Signed-off-by: Goutam <[email protected]> Signed-off-by: Aydin Abiar <[email protected]>
…ators (ray-project#58555) ## Description Push `Filter` past Join (depends on the join op), `Filter` into Union branches, `Filter` past projections (accounting for renames), past all shuffle ops. ## Related issues > Link related issues: "Fixes ray-project#1234", "Closes ray-project#1234", or "Related to ray-project#1234". ## Additional information > Optional: Add implementation details, API changes, usage examples, screenshots, etc. --------- Signed-off-by: Goutam <[email protected]> Signed-off-by: YK <[email protected]>
…ators (ray-project#58555) ## Description Push `Filter` past Join (depends on the join op), `Filter` into Union branches, `Filter` past projections (accounting for renames), past all shuffle ops. ## Related issues > Link related issues: "Fixes ray-project#1234", "Closes ray-project#1234", or "Related to ray-project#1234". ## Additional information > Optional: Add implementation details, API changes, usage examples, screenshots, etc. --------- Signed-off-by: Goutam <[email protected]>
…ators (ray-project#58555) ## Description Push `Filter` past Join (depends on the join op), `Filter` into Union branches, `Filter` past projections (accounting for renames), past all shuffle ops. ## Related issues > Link related issues: "Fixes ray-project#1234", "Closes ray-project#1234", or "Related to ray-project#1234". ## Additional information > Optional: Add implementation details, API changes, usage examples, screenshots, etc. --------- Signed-off-by: Goutam <[email protected]> Signed-off-by: Future-Outlier <[email protected]>
…ators (ray-project#58555) ## Description Push `Filter` past Join (depends on the join op), `Filter` into Union branches, `Filter` past projections (accounting for renames), past all shuffle ops. ## Related issues > Link related issues: "Fixes ray-project#1234", "Closes ray-project#1234", or "Related to ray-project#1234". ## Additional information > Optional: Add implementation details, API changes, usage examples, screenshots, etc. --------- Signed-off-by: Goutam <[email protected]> Signed-off-by: peterxcli <[email protected]>
Description
Push
Filterpast Join (depends on the join op),Filterinto Union branches,Filterpast projections (accounting for renames), past all shuffle ops.Related issues
Additional information