[Data] Handle FS serialization issue in get_parquet_dataset#57047
[Data] Handle FS serialization issue in get_parquet_dataset#57047bveeramani merged 7 commits intomasterfrom
Conversation
Signed-off-by: Srinath Krishnamachari <[email protected]>
There was a problem hiding this comment.
Code Review
This pull request addresses a filesystem serialization issue in get_parquet_dataset that occurs on worker nodes. The fix introduces a fallback mechanism: when a TypeError indicates a serialization problem, it attempts to re-resolve the filesystem directly on the worker. This is a clean and effective solution to the problem described. The changes are well-tested with a new unit test that cleverly simulates the filesystem error condition to validate the fallback path. I have one minor suggestion to improve the test's assertion to make it more specific.
| return get_parquet_dataset(paths, fs, kwargs) | ||
|
|
||
| ds = ray.get(call_helper.remote([str(local_file)], problematic_fs, {})) | ||
| assert ds is not None |
There was a problem hiding this comment.
The test logic is great and effectively validates the fallback mechanism. To make the assertion slightly more robust, consider checking a property of the returned ParquetDataset object instead of just asserting it's not None. This would provide stronger confirmation that the dataset was created successfully. For example, you could check that it contains the expected number of file fragments.
| assert ds is not None | |
| assert ds is not None and len(ds.fragments) == 1 |
Signed-off-by: Srinath Krishnamachari <[email protected]>
| except TypeError: | ||
| # Fallback: resolve filesystem locally in the worker | ||
| try: | ||
| resolved_paths, resolved_filesystem = _resolve_paths_and_filesystem( | ||
| paths, filesystem=None | ||
| ) | ||
| resolved_filesystem = RetryingPyFileSystem.wrap( | ||
| resolved_filesystem, | ||
| retryable_errors=DataContext.get_current().retried_io_errors, | ||
| ) | ||
| dataset = pq.ParquetDataset( | ||
| resolved_paths, | ||
| **dataset_kwargs, | ||
| filesystem=resolved_filesystem, | ||
| ) | ||
| except OSError as os_e: |
There was a problem hiding this comment.
Ok, so after some research i think the reason why it's failing is that PA 14.0 isn't handling PyFileSystems correctly, and your fix actually won't work since you're rewrapping it before passing to ParquetDataset
This doesn't seem to be a problem in PA 21.0 though.
So i think the proper fix would be to unwrap our RetryignPyFileSystem when passing it to get_parquet_dataset for PA < 14.0
There was a problem hiding this comment.
This is an intermittent failure though and calling _resolve_paths_and_filesystem locally for TypeError on the worker handles the cases when the underlying pa filesystem is a cpp pointer.
Signed-off-by: Srinath Krishnamachari <[email protected]>
|
This pull request has been automatically marked as stale because it has not had You can always ask for help on our discussion forum or Ray's public slack channel. If you'd like to keep this open, just leave any comment, and the stale label will be removed. |
|
This pull request has been automatically closed because there has been no more activity in the 14 days Please feel free to reopen or open a new pull request if you'd still like this to be addressed. Again, you can always ask for help on our discussion forum or Ray's public slack channel. Thanks again for your contribution! |
…ect#57047) <!-- Thank you for your contribution! Please review https://github.com/ray-project/ray/blob/master/CONTRIBUTING.rst before opening a pull request. --> <!-- Please add a reviewer to the assignee section when you create a PR. If you don't have the access to it, we will shortly find a reviewer and assign them to your PR. --> ## Why are these changes needed? <!-- Please give a short summary of the change and the problem this solves. --> ### [Data] Handle FS serialization issue in get_parquet_dataset ### Issue - In read_parquet, FS resolution is done by calling _resolve_paths_and_filesystem on the driver node. - However, on the worker nodes, get_parquet_dataset may not be able to deserialize the FS. ``` 2025-09-24 14:02:35,151 ERROR worker.py:421 -- Unhandled error (suppress with 'RAY_IGNORE_UNHANDLED_ERRORS=1'): ray::ReadFiles() (pid=10215, ip=10.242.32.217) for b_out in map_transformer.apply_transform(iter(blocks), ctx): File "/models/ray_pipelines/container/container_bin.runfiles/rules_python~~pip~python_deps_311_ray_cp311_cp311_manylinux_2_17_x86_64_d3f8d8c8/site-packages/ray/data/_internal/execution/operators/map_transformer.py", line 532, in __call__ for data in iter: File "/models/ray_pipelines/container/container_bin.runfiles/rules_python~~pip~python_deps_311_ray_cp311_cp311_manylinux_2_17_x86_64_d3f8d8c8/site-packages/ray/data/_internal/execution/operators/map_transformer.py", line 327, in __call__ yield from self._batch_fn(input, ctx) File "/models/ray_pipelines/container/container_bin.runfiles/rules_python~~pip~python_deps_311_ray_cp311_cp311_manylinux_2_17_x86_64_d3f8d8c8/site-packages/ray/anyscale/data/_internal/planner/plan_read_files_op.py", line 79, in read_paths yield from reader.read_paths( File "/models/ray_pipelines/container/container_bin.runfiles/rules_python~~pip~python_deps_311_ray_cp311_cp311_manylinux_2_17_x86_64_d3f8d8c8/site-packages/ray/anyscale/data/_internal/readers/parquet_reader.py", line 137, in read_paths fragments = self._create_fragments(paths, filesystem=filesystem) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/models/ray_pipelines/container/container_bin.runfiles/rules_python~~pip~python_deps_311_ray_cp311_cp311_manylinux_2_17_x86_64_d3f8d8c8/site-packages/ray/anyscale/data/_internal/readers/parquet_reader.py", line 193, in _create_fragments parquet_dataset = call_with_retry( ^^^^^^^^^^^^^^^^ File "/models/ray_pipelines/container/container_bin.runfiles/rules_python~~pip~python_deps_311_ray_cp311_cp311_manylinux_2_17_x86_64_d3f8d8c8/site-packages/ray/data/_internal/util.py", line 1400, in call_with_retry raise e from None File "/models/ray_pipelines/container/container_bin.runfiles/rules_python~~pip~python_deps_311_ray_cp311_cp311_manylinux_2_17_x86_64_d3f8d8c8/site-packages/ray/data/_internal/util.py", line 1386, in call_with_retry return f() ^^^ File "/models/ray_pipelines/container/container_bin.runfiles/rules_python~~pip~python_deps_311_ray_cp311_cp311_manylinux_2_17_x86_64_d3f8d8c8/site-packages/ray/anyscale/data/_internal/readers/parquet_reader.py", line 194, in <lambda> lambda: get_parquet_dataset(paths, filesystem, self._dataset_kwargs), ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/models/ray_pipelines/container/container_bin.runfiles/rules_python~~pip~python_deps_311_ray_cp311_cp311_manylinux_2_17_x86_64_d3f8d8c8/site-packages/ray/data/_internal/datasource/parquet_datasource.py", line 628, in get_parquet_dataset dataset = pq.ParquetDataset( ^^^^^^^^^^^^^^^^^^ File "/models/ray_pipelines/container/container_bin.runfiles/rules_python~~pip~python_deps_311_pyarrow_cp311_cp311_manylinux_2_17_x86_64_06ff1264/site-packages/pyarrow/parquet/core.py", line 1793, in __new__ return _ParquetDatasetV2( ^^^^^^^^^^^^^^^^^^ File "/models/ray_pipelines/container/container_bin.runfiles/rules_python~~pip~python_deps_311_pyarrow_cp311_cp311_manylinux_2_17_x86_64_06ff1264/site-packages/pyarrow/parquet/core.py", line 2498, in __init__ filesystem=fragment.filesystem ^^^^^^^^^^^^^^^^^^^ File "pyarrow/_dataset.pyx", line 1901, in pyarrow._dataset.FileFragment.filesystem.__get__ File "pyarrow/_fs.pyx", line 500, in pyarrow._fs.FileSystem.wrap TypeError: Cannot wrap FileSystem pointer 2025-09-24 14:02:35,151 ERROR worker.py:421 -- Unhandled error (suppress with 'RAY_IGNORE_UNHANDLED_ERRORS=1'): ray::ReadFiles() (pid=10215, ip=1 ``` **Fix** Upon TYPE_ERROR, invoke _resolve_paths_and_filesystem on the worker again. ## Related issue number <!-- For example: "Closes ray-project#1234" --> ## Checks - [ ] I've signed off every commit(by using the -s flag, i.e., `git commit -s`) in this PR. - [ ] I've run pre-commit jobs to lint the changes in this PR. ([pre-commit setup](https://docs.ray.io/en/latest/ray-contribute/getting-involved.html#lint-and-formatting)) - [ ] I've included any doc changes needed for https://docs.ray.io/en/master/. - [ ] I've added any new APIs to the API Reference. For example, if I added a method in Tune, I've added it in `doc/source/tune/api/` under the corresponding `.rst` file. - [ ] I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/ - Testing Strategy - [ ] Unit tests - [ ] Release tests - [ ] This PR is not tested :( <!-- CURSOR_SUMMARY --> --- > [!NOTE] > Adds a worker-side filesystem resolution fallback in `get_parquet_dataset` to handle PyArrow FS serialization errors, with a new test validating the behavior. > > - **Parquet datasource**: > - `get_parquet_dataset`: On `TypeError`, re-resolves `paths`/filesystem via `_resolve_paths_and_filesystem` on the worker and wraps with `RetryingPyFileSystem` using `DataContext.retried_io_errors`; continues to handle `OSError` via `_handle_read_os_error`. > - Import `_resolve_paths_and_filesystem` for local resolution. > - **Tests**: > - Add `test_get_parquet_dataset_fs_serialization_fallback` validating failure with a problematic fsspec-backed FS and success via the helper fallback (uses `PyFileSystem(FSSpecHandler(...))`). > > <sup>Written by [Cursor Bugbot](https://cursor.com/dashboard?tab=bugbot) for commit de68ddd. This will update automatically on new commits. Configure [here](https://cursor.com/dashboard?tab=bugbot).</sup> <!-- /CURSOR_SUMMARY --> --------- Signed-off-by: Srinath Krishnamachari <[email protected]>
…ect#57047) <!-- Thank you for your contribution! Please review https://github.com/ray-project/ray/blob/master/CONTRIBUTING.rst before opening a pull request. --> <!-- Please add a reviewer to the assignee section when you create a PR. If you don't have the access to it, we will shortly find a reviewer and assign them to your PR. --> ## Why are these changes needed? <!-- Please give a short summary of the change and the problem this solves. --> ### [Data] Handle FS serialization issue in get_parquet_dataset ### Issue - In read_parquet, FS resolution is done by calling _resolve_paths_and_filesystem on the driver node. - However, on the worker nodes, get_parquet_dataset may not be able to deserialize the FS. ``` 2025-09-24 14:02:35,151 ERROR worker.py:421 -- Unhandled error (suppress with 'RAY_IGNORE_UNHANDLED_ERRORS=1'): ray::ReadFiles() (pid=10215, ip=10.242.32.217) for b_out in map_transformer.apply_transform(iter(blocks), ctx): File "/models/ray_pipelines/container/container_bin.runfiles/rules_python~~pip~python_deps_311_ray_cp311_cp311_manylinux_2_17_x86_64_d3f8d8c8/site-packages/ray/data/_internal/execution/operators/map_transformer.py", line 532, in __call__ for data in iter: File "/models/ray_pipelines/container/container_bin.runfiles/rules_python~~pip~python_deps_311_ray_cp311_cp311_manylinux_2_17_x86_64_d3f8d8c8/site-packages/ray/data/_internal/execution/operators/map_transformer.py", line 327, in __call__ yield from self._batch_fn(input, ctx) File "/models/ray_pipelines/container/container_bin.runfiles/rules_python~~pip~python_deps_311_ray_cp311_cp311_manylinux_2_17_x86_64_d3f8d8c8/site-packages/ray/anyscale/data/_internal/planner/plan_read_files_op.py", line 79, in read_paths yield from reader.read_paths( File "/models/ray_pipelines/container/container_bin.runfiles/rules_python~~pip~python_deps_311_ray_cp311_cp311_manylinux_2_17_x86_64_d3f8d8c8/site-packages/ray/anyscale/data/_internal/readers/parquet_reader.py", line 137, in read_paths fragments = self._create_fragments(paths, filesystem=filesystem) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/models/ray_pipelines/container/container_bin.runfiles/rules_python~~pip~python_deps_311_ray_cp311_cp311_manylinux_2_17_x86_64_d3f8d8c8/site-packages/ray/anyscale/data/_internal/readers/parquet_reader.py", line 193, in _create_fragments parquet_dataset = call_with_retry( ^^^^^^^^^^^^^^^^ File "/models/ray_pipelines/container/container_bin.runfiles/rules_python~~pip~python_deps_311_ray_cp311_cp311_manylinux_2_17_x86_64_d3f8d8c8/site-packages/ray/data/_internal/util.py", line 1400, in call_with_retry raise e from None File "/models/ray_pipelines/container/container_bin.runfiles/rules_python~~pip~python_deps_311_ray_cp311_cp311_manylinux_2_17_x86_64_d3f8d8c8/site-packages/ray/data/_internal/util.py", line 1386, in call_with_retry return f() ^^^ File "/models/ray_pipelines/container/container_bin.runfiles/rules_python~~pip~python_deps_311_ray_cp311_cp311_manylinux_2_17_x86_64_d3f8d8c8/site-packages/ray/anyscale/data/_internal/readers/parquet_reader.py", line 194, in <lambda> lambda: get_parquet_dataset(paths, filesystem, self._dataset_kwargs), ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/models/ray_pipelines/container/container_bin.runfiles/rules_python~~pip~python_deps_311_ray_cp311_cp311_manylinux_2_17_x86_64_d3f8d8c8/site-packages/ray/data/_internal/datasource/parquet_datasource.py", line 628, in get_parquet_dataset dataset = pq.ParquetDataset( ^^^^^^^^^^^^^^^^^^ File "/models/ray_pipelines/container/container_bin.runfiles/rules_python~~pip~python_deps_311_pyarrow_cp311_cp311_manylinux_2_17_x86_64_06ff1264/site-packages/pyarrow/parquet/core.py", line 1793, in __new__ return _ParquetDatasetV2( ^^^^^^^^^^^^^^^^^^ File "/models/ray_pipelines/container/container_bin.runfiles/rules_python~~pip~python_deps_311_pyarrow_cp311_cp311_manylinux_2_17_x86_64_06ff1264/site-packages/pyarrow/parquet/core.py", line 2498, in __init__ filesystem=fragment.filesystem ^^^^^^^^^^^^^^^^^^^ File "pyarrow/_dataset.pyx", line 1901, in pyarrow._dataset.FileFragment.filesystem.__get__ File "pyarrow/_fs.pyx", line 500, in pyarrow._fs.FileSystem.wrap TypeError: Cannot wrap FileSystem pointer 2025-09-24 14:02:35,151 ERROR worker.py:421 -- Unhandled error (suppress with 'RAY_IGNORE_UNHANDLED_ERRORS=1'): ray::ReadFiles() (pid=10215, ip=1 ``` **Fix** Upon TYPE_ERROR, invoke _resolve_paths_and_filesystem on the worker again. ## Related issue number <!-- For example: "Closes ray-project#1234" --> ## Checks - [ ] I've signed off every commit(by using the -s flag, i.e., `git commit -s`) in this PR. - [ ] I've run pre-commit jobs to lint the changes in this PR. ([pre-commit setup](https://docs.ray.io/en/latest/ray-contribute/getting-involved.html#lint-and-formatting)) - [ ] I've included any doc changes needed for https://docs.ray.io/en/master/. - [ ] I've added any new APIs to the API Reference. For example, if I added a method in Tune, I've added it in `doc/source/tune/api/` under the corresponding `.rst` file. - [ ] I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/ - Testing Strategy - [ ] Unit tests - [ ] Release tests - [ ] This PR is not tested :( <!-- CURSOR_SUMMARY --> --- > [!NOTE] > Adds a worker-side filesystem resolution fallback in `get_parquet_dataset` to handle PyArrow FS serialization errors, with a new test validating the behavior. > > - **Parquet datasource**: > - `get_parquet_dataset`: On `TypeError`, re-resolves `paths`/filesystem via `_resolve_paths_and_filesystem` on the worker and wraps with `RetryingPyFileSystem` using `DataContext.retried_io_errors`; continues to handle `OSError` via `_handle_read_os_error`. > - Import `_resolve_paths_and_filesystem` for local resolution. > - **Tests**: > - Add `test_get_parquet_dataset_fs_serialization_fallback` validating failure with a problematic fsspec-backed FS and success via the helper fallback (uses `PyFileSystem(FSSpecHandler(...))`). > > <sup>Written by [Cursor Bugbot](https://cursor.com/dashboard?tab=bugbot) for commit de68ddd. This will update automatically on new commits. Configure [here](https://cursor.com/dashboard?tab=bugbot).</sup> <!-- /CURSOR_SUMMARY --> --------- Signed-off-by: Srinath Krishnamachari <[email protected]> Signed-off-by: YK <[email protected]>
…ect#57047) <!-- Thank you for your contribution! Please review https://github.com/ray-project/ray/blob/master/CONTRIBUTING.rst before opening a pull request. --> <!-- Please add a reviewer to the assignee section when you create a PR. If you don't have the access to it, we will shortly find a reviewer and assign them to your PR. --> ## Why are these changes needed? <!-- Please give a short summary of the change and the problem this solves. --> ### [Data] Handle FS serialization issue in get_parquet_dataset ### Issue - In read_parquet, FS resolution is done by calling _resolve_paths_and_filesystem on the driver node. - However, on the worker nodes, get_parquet_dataset may not be able to deserialize the FS. ``` 2025-09-24 14:02:35,151 ERROR worker.py:421 -- Unhandled error (suppress with 'RAY_IGNORE_UNHANDLED_ERRORS=1'): ray::ReadFiles() (pid=10215, ip=10.242.32.217) for b_out in map_transformer.apply_transform(iter(blocks), ctx): File "/models/ray_pipelines/container/container_bin.runfiles/rules_python~~pip~python_deps_311_ray_cp311_cp311_manylinux_2_17_x86_64_d3f8d8c8/site-packages/ray/data/_internal/execution/operators/map_transformer.py", line 532, in __call__ for data in iter: File "/models/ray_pipelines/container/container_bin.runfiles/rules_python~~pip~python_deps_311_ray_cp311_cp311_manylinux_2_17_x86_64_d3f8d8c8/site-packages/ray/data/_internal/execution/operators/map_transformer.py", line 327, in __call__ yield from self._batch_fn(input, ctx) File "/models/ray_pipelines/container/container_bin.runfiles/rules_python~~pip~python_deps_311_ray_cp311_cp311_manylinux_2_17_x86_64_d3f8d8c8/site-packages/ray/anyscale/data/_internal/planner/plan_read_files_op.py", line 79, in read_paths yield from reader.read_paths( File "/models/ray_pipelines/container/container_bin.runfiles/rules_python~~pip~python_deps_311_ray_cp311_cp311_manylinux_2_17_x86_64_d3f8d8c8/site-packages/ray/anyscale/data/_internal/readers/parquet_reader.py", line 137, in read_paths fragments = self._create_fragments(paths, filesystem=filesystem) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/models/ray_pipelines/container/container_bin.runfiles/rules_python~~pip~python_deps_311_ray_cp311_cp311_manylinux_2_17_x86_64_d3f8d8c8/site-packages/ray/anyscale/data/_internal/readers/parquet_reader.py", line 193, in _create_fragments parquet_dataset = call_with_retry( ^^^^^^^^^^^^^^^^ File "/models/ray_pipelines/container/container_bin.runfiles/rules_python~~pip~python_deps_311_ray_cp311_cp311_manylinux_2_17_x86_64_d3f8d8c8/site-packages/ray/data/_internal/util.py", line 1400, in call_with_retry raise e from None File "/models/ray_pipelines/container/container_bin.runfiles/rules_python~~pip~python_deps_311_ray_cp311_cp311_manylinux_2_17_x86_64_d3f8d8c8/site-packages/ray/data/_internal/util.py", line 1386, in call_with_retry return f() ^^^ File "/models/ray_pipelines/container/container_bin.runfiles/rules_python~~pip~python_deps_311_ray_cp311_cp311_manylinux_2_17_x86_64_d3f8d8c8/site-packages/ray/anyscale/data/_internal/readers/parquet_reader.py", line 194, in <lambda> lambda: get_parquet_dataset(paths, filesystem, self._dataset_kwargs), ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/models/ray_pipelines/container/container_bin.runfiles/rules_python~~pip~python_deps_311_ray_cp311_cp311_manylinux_2_17_x86_64_d3f8d8c8/site-packages/ray/data/_internal/datasource/parquet_datasource.py", line 628, in get_parquet_dataset dataset = pq.ParquetDataset( ^^^^^^^^^^^^^^^^^^ File "/models/ray_pipelines/container/container_bin.runfiles/rules_python~~pip~python_deps_311_pyarrow_cp311_cp311_manylinux_2_17_x86_64_06ff1264/site-packages/pyarrow/parquet/core.py", line 1793, in __new__ return _ParquetDatasetV2( ^^^^^^^^^^^^^^^^^^ File "/models/ray_pipelines/container/container_bin.runfiles/rules_python~~pip~python_deps_311_pyarrow_cp311_cp311_manylinux_2_17_x86_64_06ff1264/site-packages/pyarrow/parquet/core.py", line 2498, in __init__ filesystem=fragment.filesystem ^^^^^^^^^^^^^^^^^^^ File "pyarrow/_dataset.pyx", line 1901, in pyarrow._dataset.FileFragment.filesystem.__get__ File "pyarrow/_fs.pyx", line 500, in pyarrow._fs.FileSystem.wrap TypeError: Cannot wrap FileSystem pointer 2025-09-24 14:02:35,151 ERROR worker.py:421 -- Unhandled error (suppress with 'RAY_IGNORE_UNHANDLED_ERRORS=1'): ray::ReadFiles() (pid=10215, ip=1 ``` **Fix** Upon TYPE_ERROR, invoke _resolve_paths_and_filesystem on the worker again. ## Related issue number <!-- For example: "Closes ray-project#1234" --> ## Checks - [ ] I've signed off every commit(by using the -s flag, i.e., `git commit -s`) in this PR. - [ ] I've run pre-commit jobs to lint the changes in this PR. ([pre-commit setup](https://docs.ray.io/en/latest/ray-contribute/getting-involved.html#lint-and-formatting)) - [ ] I've included any doc changes needed for https://docs.ray.io/en/master/. - [ ] I've added any new APIs to the API Reference. For example, if I added a method in Tune, I've added it in `doc/source/tune/api/` under the corresponding `.rst` file. - [ ] I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/ - Testing Strategy - [ ] Unit tests - [ ] Release tests - [ ] This PR is not tested :( <!-- CURSOR_SUMMARY --> --- > [!NOTE] > Adds a worker-side filesystem resolution fallback in `get_parquet_dataset` to handle PyArrow FS serialization errors, with a new test validating the behavior. > > - **Parquet datasource**: > - `get_parquet_dataset`: On `TypeError`, re-resolves `paths`/filesystem via `_resolve_paths_and_filesystem` on the worker and wraps with `RetryingPyFileSystem` using `DataContext.retried_io_errors`; continues to handle `OSError` via `_handle_read_os_error`. > - Import `_resolve_paths_and_filesystem` for local resolution. > - **Tests**: > - Add `test_get_parquet_dataset_fs_serialization_fallback` validating failure with a problematic fsspec-backed FS and success via the helper fallback (uses `PyFileSystem(FSSpecHandler(...))`). > > <sup>Written by [Cursor Bugbot](https://cursor.com/dashboard?tab=bugbot) for commit de68ddd. This will update automatically on new commits. Configure [here](https://cursor.com/dashboard?tab=bugbot).</sup> <!-- /CURSOR_SUMMARY --> --------- Signed-off-by: Srinath Krishnamachari <[email protected]>
…ect#57047) <!-- Thank you for your contribution! Please review https://github.com/ray-project/ray/blob/master/CONTRIBUTING.rst before opening a pull request. --> <!-- Please add a reviewer to the assignee section when you create a PR. If you don't have the access to it, we will shortly find a reviewer and assign them to your PR. --> ## Why are these changes needed? <!-- Please give a short summary of the change and the problem this solves. --> ### [Data] Handle FS serialization issue in get_parquet_dataset ### Issue - In read_parquet, FS resolution is done by calling _resolve_paths_and_filesystem on the driver node. - However, on the worker nodes, get_parquet_dataset may not be able to deserialize the FS. ``` 2025-09-24 14:02:35,151 ERROR worker.py:421 -- Unhandled error (suppress with 'RAY_IGNORE_UNHANDLED_ERRORS=1'): ray::ReadFiles() (pid=10215, ip=10.242.32.217) for b_out in map_transformer.apply_transform(iter(blocks), ctx): File "/models/ray_pipelines/container/container_bin.runfiles/rules_python~~pip~python_deps_311_ray_cp311_cp311_manylinux_2_17_x86_64_d3f8d8c8/site-packages/ray/data/_internal/execution/operators/map_transformer.py", line 532, in __call__ for data in iter: File "/models/ray_pipelines/container/container_bin.runfiles/rules_python~~pip~python_deps_311_ray_cp311_cp311_manylinux_2_17_x86_64_d3f8d8c8/site-packages/ray/data/_internal/execution/operators/map_transformer.py", line 327, in __call__ yield from self._batch_fn(input, ctx) File "/models/ray_pipelines/container/container_bin.runfiles/rules_python~~pip~python_deps_311_ray_cp311_cp311_manylinux_2_17_x86_64_d3f8d8c8/site-packages/ray/anyscale/data/_internal/planner/plan_read_files_op.py", line 79, in read_paths yield from reader.read_paths( File "/models/ray_pipelines/container/container_bin.runfiles/rules_python~~pip~python_deps_311_ray_cp311_cp311_manylinux_2_17_x86_64_d3f8d8c8/site-packages/ray/anyscale/data/_internal/readers/parquet_reader.py", line 137, in read_paths fragments = self._create_fragments(paths, filesystem=filesystem) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/models/ray_pipelines/container/container_bin.runfiles/rules_python~~pip~python_deps_311_ray_cp311_cp311_manylinux_2_17_x86_64_d3f8d8c8/site-packages/ray/anyscale/data/_internal/readers/parquet_reader.py", line 193, in _create_fragments parquet_dataset = call_with_retry( ^^^^^^^^^^^^^^^^ File "/models/ray_pipelines/container/container_bin.runfiles/rules_python~~pip~python_deps_311_ray_cp311_cp311_manylinux_2_17_x86_64_d3f8d8c8/site-packages/ray/data/_internal/util.py", line 1400, in call_with_retry raise e from None File "/models/ray_pipelines/container/container_bin.runfiles/rules_python~~pip~python_deps_311_ray_cp311_cp311_manylinux_2_17_x86_64_d3f8d8c8/site-packages/ray/data/_internal/util.py", line 1386, in call_with_retry return f() ^^^ File "/models/ray_pipelines/container/container_bin.runfiles/rules_python~~pip~python_deps_311_ray_cp311_cp311_manylinux_2_17_x86_64_d3f8d8c8/site-packages/ray/anyscale/data/_internal/readers/parquet_reader.py", line 194, in <lambda> lambda: get_parquet_dataset(paths, filesystem, self._dataset_kwargs), ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/models/ray_pipelines/container/container_bin.runfiles/rules_python~~pip~python_deps_311_ray_cp311_cp311_manylinux_2_17_x86_64_d3f8d8c8/site-packages/ray/data/_internal/datasource/parquet_datasource.py", line 628, in get_parquet_dataset dataset = pq.ParquetDataset( ^^^^^^^^^^^^^^^^^^ File "/models/ray_pipelines/container/container_bin.runfiles/rules_python~~pip~python_deps_311_pyarrow_cp311_cp311_manylinux_2_17_x86_64_06ff1264/site-packages/pyarrow/parquet/core.py", line 1793, in __new__ return _ParquetDatasetV2( ^^^^^^^^^^^^^^^^^^ File "/models/ray_pipelines/container/container_bin.runfiles/rules_python~~pip~python_deps_311_pyarrow_cp311_cp311_manylinux_2_17_x86_64_06ff1264/site-packages/pyarrow/parquet/core.py", line 2498, in __init__ filesystem=fragment.filesystem ^^^^^^^^^^^^^^^^^^^ File "pyarrow/_dataset.pyx", line 1901, in pyarrow._dataset.FileFragment.filesystem.__get__ File "pyarrow/_fs.pyx", line 500, in pyarrow._fs.FileSystem.wrap TypeError: Cannot wrap FileSystem pointer 2025-09-24 14:02:35,151 ERROR worker.py:421 -- Unhandled error (suppress with 'RAY_IGNORE_UNHANDLED_ERRORS=1'): ray::ReadFiles() (pid=10215, ip=1 ``` **Fix** Upon TYPE_ERROR, invoke _resolve_paths_and_filesystem on the worker again. ## Related issue number <!-- For example: "Closes ray-project#1234" --> ## Checks - [ ] I've signed off every commit(by using the -s flag, i.e., `git commit -s`) in this PR. - [ ] I've run pre-commit jobs to lint the changes in this PR. ([pre-commit setup](https://docs.ray.io/en/latest/ray-contribute/getting-involved.html#lint-and-formatting)) - [ ] I've included any doc changes needed for https://docs.ray.io/en/master/. - [ ] I've added any new APIs to the API Reference. For example, if I added a method in Tune, I've added it in `doc/source/tune/api/` under the corresponding `.rst` file. - [ ] I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/ - Testing Strategy - [ ] Unit tests - [ ] Release tests - [ ] This PR is not tested :( <!-- CURSOR_SUMMARY --> --- > [!NOTE] > Adds a worker-side filesystem resolution fallback in `get_parquet_dataset` to handle PyArrow FS serialization errors, with a new test validating the behavior. > > - **Parquet datasource**: > - `get_parquet_dataset`: On `TypeError`, re-resolves `paths`/filesystem via `_resolve_paths_and_filesystem` on the worker and wraps with `RetryingPyFileSystem` using `DataContext.retried_io_errors`; continues to handle `OSError` via `_handle_read_os_error`. > - Import `_resolve_paths_and_filesystem` for local resolution. > - **Tests**: > - Add `test_get_parquet_dataset_fs_serialization_fallback` validating failure with a problematic fsspec-backed FS and success via the helper fallback (uses `PyFileSystem(FSSpecHandler(...))`). > > <sup>Written by [Cursor Bugbot](https://cursor.com/dashboard?tab=bugbot) for commit de68ddd. This will update automatically on new commits. Configure [here](https://cursor.com/dashboard?tab=bugbot).</sup> <!-- /CURSOR_SUMMARY --> --------- Signed-off-by: Srinath Krishnamachari <[email protected]> Signed-off-by: peterxcli <[email protected]>
Why are these changes needed?
[Data] Handle FS serialization issue in get_parquet_dataset
Issue
Fix
Upon TYPE_ERROR, invoke _resolve_paths_and_filesystem on the worker again.
Related issue number
Checks
git commit -s) in this PR.method in Tune, I've added it in
doc/source/tune/api/under thecorresponding
.rstfile.Note
Adds a worker-side filesystem resolution fallback in
get_parquet_datasetto handle PyArrow FS serialization errors, with a new test validating the behavior.get_parquet_dataset: OnTypeError, re-resolvespaths/filesystem via_resolve_paths_and_filesystemon the worker and wraps withRetryingPyFileSystemusingDataContext.retried_io_errors; continues to handleOSErrorvia_handle_read_os_error._resolve_paths_and_filesystemfor local resolution.test_get_parquet_dataset_fs_serialization_fallbackvalidating failure with a problematic fsspec-backed FS and success via the helper fallback (usesPyFileSystem(FSSpecHandler(...))).Written by Cursor Bugbot for commit de68ddd. This will update automatically on new commits. Configure here.