[Data] Fuse StreamingRepartition with MapBatches operators to scale collate#59108
Conversation
The GIL makes checking s`elf._serialize_cache is not None` atomic, so we don't need lock. Signed-off-by: Xinyuan <[email protected]>
Signed-off-by: xgui <[email protected]>
Signed-off-by: xgui <[email protected]>
Signed-off-by: xgui <[email protected]>
| return ( | ||
| self._max_num_rows_per_block() is not None | ||
| and block.num_rows() | ||
| >= MAX_SAFE_ROWS_PER_BLOCK_FACTOR * self._max_num_rows_per_block() |
There was a problem hiding this comment.
For the case when batch_size == 10, blocks: [14], we want to get 2 result blocks: [10], [4] instead of [14].
There was a problem hiding this comment.
this makes sense. but why the original streaming repartition PR didn't catch this?
There was a problem hiding this comment.
The unittests didn't capture this edge case.
Signed-off-by: xgui <[email protected]>
Signed-off-by: xgui <[email protected]>
srinathk10
left a comment
There was a problem hiding this comment.
Minor comments. LGTM otherwise.
Signed-off-by: xgui <[email protected]>
| return ( | ||
| self._max_num_rows_per_block() is not None | ||
| and block.num_rows() | ||
| >= MAX_SAFE_ROWS_PER_BLOCK_FACTOR * self._max_num_rows_per_block() |
There was a problem hiding this comment.
this makes sense. but why the original streaming repartition PR didn't catch this?
| def apply(self, plan: PhysicalPlan) -> PhysicalPlan: | ||
| self._op_map = plan.op_map.copy() | ||
| # Firstly fuse StreamingRepartition with MapBatches. | ||
| fused_dag = self._fuse_streaming_repartition_operators_in_dag(plan.dag) |
There was a problem hiding this comment.
we should fuse map ops first. so that multiple maps can potentially be fused with streaming repartition.
There was a problem hiding this comment.
It's kinda tricky to do so. Because the map fusion returns one MapOperator without batch_size.
ray/python/ray/data/_internal/logical/rules/operator_fusion.py
Lines 318 to 329 in 5328b32
The batch_size information is hidden inside the transform_fn. That's why I put streaming_repartition fusion before map_fusion.
There was a problem hiding this comment.
ok, can you add a comment explaining this?
There was a problem hiding this comment.
@xinyuangui2 why don't we fuse this at logical operator level instead of physical?
| # For now, we don't want to over-fuse StreamingRepartition with other map operators, | ||
| # so the result operator does not support further fusion. | ||
| supports_fusion=False, | ||
| ) |
There was a problem hiding this comment.
when are we dealing with the target block num rows?
There was a problem hiding this comment.
This is handled by transform_fn. As long as the streaming_repartition's transform_fn is the last one, the target block num rows can be guaranteed.
Signed-off-by: xgui <[email protected]>
Signed-off-by: xgui <[email protected]>
Signed-off-by: xgui <[email protected]>
|
Resolved #58837 |
…ollate (ray-project#59108) ## Why are these changes needed? This PR adds operator fusion support for `StreamingRepartition` and `MapBatches` operators. When `batch_size` matches `target_num_rows_per_block`, these operators can be fused together to reduce scheduling overhead and improve performance. ## What changes were included? **Fusion Rules:** - `MapBatches -> StreamingRepartition`: Fuses when `batch_size == target_num_rows_per_block` Both orders will ensure the map_batch's function receive the correct number of rows. For (MapBatches -> StreamingRepartition), we also ensure the output rows is `batch_size`. **Fusion Behavior:** - Fused operators don't fuse further with surrounding map operators - Example: `map -> s_r -> map -> map` results in `(map -> s_r)-> (map -> map)` - Example: `s_r -> map -> (map -> s_r)`, fused operators don't fuse further. --------- Signed-off-by: Xinyuan <[email protected]> Signed-off-by: xgui <[email protected]> Signed-off-by: peterxcli <[email protected]>
Why are these changes needed?
This PR adds operator fusion support for
StreamingRepartitionandMapBatchesoperators. Whenbatch_sizematchestarget_num_rows_per_block, these operators can be fused together to reduce scheduling overhead and improve performance.What changes were included?
Fusion Rules:
MapBatches -> StreamingRepartition: Fuses whenbatch_size == target_num_rows_per_blockBoth orders will ensure the map_batch's function receive the correct number of rows.
For (MapBatches -> StreamingRepartition), we also ensure the output rows is
batch_size.Fusion Behavior:
map -> s_r -> map -> mapresults in(map -> s_r)-> (map -> map)s_r -> map -> (map -> s_r), fused operators don't fuse further.