Skip to content

[Data] Fuse StreamingRepartition with MapBatches operators to scale collate#59108

Merged
raulchen merged 19 commits intoray-project:masterfrom
xinyuangui2:fuse-streaming-repartition
Dec 3, 2025
Merged

[Data] Fuse StreamingRepartition with MapBatches operators to scale collate#59108
raulchen merged 19 commits intoray-project:masterfrom
xinyuangui2:fuse-streaming-repartition

Conversation

@xinyuangui2
Copy link
Contributor

@xinyuangui2 xinyuangui2 commented Dec 2, 2025

Why are these changes needed?

This PR adds operator fusion support for StreamingRepartition and MapBatches operators. When batch_size matches target_num_rows_per_block, these operators can be fused together to reduce scheduling overhead and improve performance.

What changes were included?

Fusion Rules:

  • MapBatches -> StreamingRepartition: Fuses when batch_size == target_num_rows_per_block

Both orders will ensure the map_batch's function receive the correct number of rows.
For (MapBatches -> StreamingRepartition), we also ensure the output rows is batch_size.

Fusion Behavior:

  • Fused operators don't fuse further with surrounding map operators
  • Example: map -> s_r -> map -> map results in (map -> s_r)-> (map -> map)
  • Example: s_r -> map -> (map -> s_r), fused operators don't fuse further.

return (
self._max_num_rows_per_block() is not None
and block.num_rows()
>= MAX_SAFE_ROWS_PER_BLOCK_FACTOR * self._max_num_rows_per_block()
Copy link
Contributor Author

@xinyuangui2 xinyuangui2 Dec 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the case when batch_size == 10, blocks: [14], we want to get 2 result blocks: [10], [4] instead of [14].

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this makes sense. but why the original streaming repartition PR didn't catch this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The unittests didn't capture this edge case.

Signed-off-by: xgui <[email protected]>
@xinyuangui2 xinyuangui2 changed the title Fuse streaming repartition [Data] Fuse StreamingRepartition with MapBatches operators to scale collate Dec 2, 2025
@xinyuangui2 xinyuangui2 marked this pull request as ready for review December 2, 2025 18:50
@xinyuangui2 xinyuangui2 requested review from a team as code owners December 2, 2025 18:50
@xinyuangui2 xinyuangui2 requested a review from raulchen December 2, 2025 18:50
@ray-gardener ray-gardener bot added the data Ray Data-related issues label Dec 2, 2025
Copy link
Contributor

@srinathk10 srinathk10 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor comments. LGTM otherwise.

Signed-off-by: xgui <[email protected]>
return (
self._max_num_rows_per_block() is not None
and block.num_rows()
>= MAX_SAFE_ROWS_PER_BLOCK_FACTOR * self._max_num_rows_per_block()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this makes sense. but why the original streaming repartition PR didn't catch this?

def apply(self, plan: PhysicalPlan) -> PhysicalPlan:
self._op_map = plan.op_map.copy()
# Firstly fuse StreamingRepartition with MapBatches.
fused_dag = self._fuse_streaming_repartition_operators_in_dag(plan.dag)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should fuse map ops first. so that multiple maps can potentially be fused with streaming repartition.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's kinda tricky to do so. Because the map fusion returns one MapOperator without batch_size.

op = MapOperator.create(
up_op.get_map_transformer().fuse(down_op.get_map_transformer()),
input_op,
up_op.data_context,
target_max_block_size_override=target_max_block_size,
name=name,
compute_strategy=compute,
min_rows_per_bundle=min_rows_per_bundled_input,
map_task_kwargs=map_task_kwargs,
ray_remote_args=ray_remote_args,
ray_remote_args_fn=ray_remote_args_fn,
)

The batch_size information is hidden inside the transform_fn. That's why I put streaming_repartition fusion before map_fusion.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, can you add a comment explaining this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nvm, saw that

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@xinyuangui2 why don't we fuse this at logical operator level instead of physical?

# For now, we don't want to over-fuse StreamingRepartition with other map operators,
# so the result operator does not support further fusion.
supports_fusion=False,
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when are we dealing with the target block num rows?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is handled by transform_fn. As long as the streaming_repartition's transform_fn is the last one, the target block num rows can be guaranteed.

Copy link
Contributor

@raulchen raulchen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

approved by mistake.

@xinyuangui2 xinyuangui2 requested a review from raulchen December 3, 2025 16:27
@raulchen raulchen enabled auto-merge (squash) December 3, 2025 22:19
@github-actions github-actions bot added the go add ONLY when ready to merge, run all tests label Dec 3, 2025
@raulchen raulchen merged commit 7df47e9 into ray-project:master Dec 3, 2025
8 checks passed
@xinyuangui2
Copy link
Contributor Author

Resolved #58837

peterxcli pushed a commit to peterxcli/ray that referenced this pull request Feb 25, 2026
…ollate (ray-project#59108)

## Why are these changes needed?

This PR adds operator fusion support for `StreamingRepartition` and
`MapBatches` operators. When `batch_size` matches
`target_num_rows_per_block`, these operators can be fused together to
reduce scheduling overhead and improve performance.

## What changes were included?

**Fusion Rules:**
- `MapBatches -> StreamingRepartition`: Fuses when `batch_size ==
target_num_rows_per_block`

Both orders will ensure the map_batch's function receive the correct
number of rows.
For (MapBatches -> StreamingRepartition), we also ensure the output rows
is `batch_size`.

**Fusion Behavior:**
- Fused operators don't fuse further with surrounding map operators
- Example: `map -> s_r -> map -> map` results in `(map -> s_r)-> (map ->
map)`
- Example: `s_r -> map -> (map -> s_r)`, fused operators don't fuse
further.

---------

Signed-off-by: Xinyuan <[email protected]>
Signed-off-by: xgui <[email protected]>
Signed-off-by: peterxcli <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

data Ray Data-related issues go add ONLY when ready to merge, run all tests

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants