Skip to content

[Data] Handle FS serialization issue in get_parquet_dataset#57047

Merged
bveeramani merged 7 commits intomasterfrom
srinathk10/parquet_datasource_fs
Nov 19, 2025
Merged

[Data] Handle FS serialization issue in get_parquet_dataset#57047
bveeramani merged 7 commits intomasterfrom
srinathk10/parquet_datasource_fs

Conversation

@srinathk10
Copy link
Contributor

@srinathk10 srinathk10 commented Sep 30, 2025

Why are these changes needed?

[Data] Handle FS serialization issue in get_parquet_dataset

Issue

  • In read_parquet, FS resolution is done by calling _resolve_paths_and_filesystem on the driver node.
  • However, on the worker nodes, get_parquet_dataset may not be able to deserialize the FS.
2025-09-24 14:02:35,151 ERROR worker.py:421 -- Unhandled error (suppress with 'RAY_IGNORE_UNHANDLED_ERRORS=1'): ray::ReadFiles() (pid=10215, ip=10.242.32.217)
    for b_out in map_transformer.apply_transform(iter(blocks), ctx):
  File "/models/ray_pipelines/container/container_bin.runfiles/rules_python~~pip~python_deps_311_ray_cp311_cp311_manylinux_2_17_x86_64_d3f8d8c8/site-packages/ray/data/_internal/execution/operators/map_transformer.py", line 532, in __call__
    for data in iter:
  File "/models/ray_pipelines/container/container_bin.runfiles/rules_python~~pip~python_deps_311_ray_cp311_cp311_manylinux_2_17_x86_64_d3f8d8c8/site-packages/ray/data/_internal/execution/operators/map_transformer.py", line 327, in __call__
    yield from self._batch_fn(input, ctx)
  File "/models/ray_pipelines/container/container_bin.runfiles/rules_python~~pip~python_deps_311_ray_cp311_cp311_manylinux_2_17_x86_64_d3f8d8c8/site-packages/ray/anyscale/data/_internal/planner/plan_read_files_op.py", line 79, in read_paths
    yield from reader.read_paths(
  File "/models/ray_pipelines/container/container_bin.runfiles/rules_python~~pip~python_deps_311_ray_cp311_cp311_manylinux_2_17_x86_64_d3f8d8c8/site-packages/ray/anyscale/data/_internal/readers/parquet_reader.py", line 137, in read_paths
    fragments = self._create_fragments(paths, filesystem=filesystem)
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/models/ray_pipelines/container/container_bin.runfiles/rules_python~~pip~python_deps_311_ray_cp311_cp311_manylinux_2_17_x86_64_d3f8d8c8/site-packages/ray/anyscale/data/_internal/readers/parquet_reader.py", line 193, in _create_fragments
    parquet_dataset = call_with_retry(
                      ^^^^^^^^^^^^^^^^
  File "/models/ray_pipelines/container/container_bin.runfiles/rules_python~~pip~python_deps_311_ray_cp311_cp311_manylinux_2_17_x86_64_d3f8d8c8/site-packages/ray/data/_internal/util.py", line 1400, in call_with_retry
    raise e from None
  File "/models/ray_pipelines/container/container_bin.runfiles/rules_python~~pip~python_deps_311_ray_cp311_cp311_manylinux_2_17_x86_64_d3f8d8c8/site-packages/ray/data/_internal/util.py", line 1386, in call_with_retry
    return f()
           ^^^
  File "/models/ray_pipelines/container/container_bin.runfiles/rules_python~~pip~python_deps_311_ray_cp311_cp311_manylinux_2_17_x86_64_d3f8d8c8/site-packages/ray/anyscale/data/_internal/readers/parquet_reader.py", line 194, in <lambda>
    lambda: get_parquet_dataset(paths, filesystem, self._dataset_kwargs),
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/models/ray_pipelines/container/container_bin.runfiles/rules_python~~pip~python_deps_311_ray_cp311_cp311_manylinux_2_17_x86_64_d3f8d8c8/site-packages/ray/data/_internal/datasource/parquet_datasource.py", line 628, in get_parquet_dataset
    dataset = pq.ParquetDataset(
              ^^^^^^^^^^^^^^^^^^
  File "/models/ray_pipelines/container/container_bin.runfiles/rules_python~~pip~python_deps_311_pyarrow_cp311_cp311_manylinux_2_17_x86_64_06ff1264/site-packages/pyarrow/parquet/core.py", line 1793, in __new__
    return _ParquetDatasetV2(
           ^^^^^^^^^^^^^^^^^^
  File "/models/ray_pipelines/container/container_bin.runfiles/rules_python~~pip~python_deps_311_pyarrow_cp311_cp311_manylinux_2_17_x86_64_06ff1264/site-packages/pyarrow/parquet/core.py", line 2498, in __init__
    filesystem=fragment.filesystem
               ^^^^^^^^^^^^^^^^^^^
  File "pyarrow/_dataset.pyx", line 1901, in pyarrow._dataset.FileFragment.filesystem.__get__
  File "pyarrow/_fs.pyx", line 500, in pyarrow._fs.FileSystem.wrap
TypeError: Cannot wrap FileSystem pointer
2025-09-24 14:02:35,151 ERROR worker.py:421 -- Unhandled error (suppress with 'RAY_IGNORE_UNHANDLED_ERRORS=1'): ray::ReadFiles() (pid=10215, ip=1

Fix

Upon TYPE_ERROR, invoke _resolve_paths_and_filesystem on the worker again.

Related issue number

Checks

  • I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • I've run pre-commit jobs to lint the changes in this PR. (pre-commit setup)
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
    • I've added any new APIs to the API Reference. For example, if I added a
      method in Tune, I've added it in doc/source/tune/api/ under the
      corresponding .rst file.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

Note

Adds a worker-side filesystem resolution fallback in get_parquet_dataset to handle PyArrow FS serialization errors, with a new test validating the behavior.

  • Parquet datasource:
    • get_parquet_dataset: On TypeError, re-resolves paths/filesystem via _resolve_paths_and_filesystem on the worker and wraps with RetryingPyFileSystem using DataContext.retried_io_errors; continues to handle OSError via _handle_read_os_error.
    • Import _resolve_paths_and_filesystem for local resolution.
  • Tests:
    • Add test_get_parquet_dataset_fs_serialization_fallback validating failure with a problematic fsspec-backed FS and success via the helper fallback (uses PyFileSystem(FSSpecHandler(...))).

Written by Cursor Bugbot for commit de68ddd. This will update automatically on new commits. Configure here.

@srinathk10 srinathk10 requested a review from a team as a code owner September 30, 2025 18:58
Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request addresses a filesystem serialization issue in get_parquet_dataset that occurs on worker nodes. The fix introduces a fallback mechanism: when a TypeError indicates a serialization problem, it attempts to re-resolve the filesystem directly on the worker. This is a clean and effective solution to the problem described. The changes are well-tested with a new unit test that cleverly simulates the filesystem error condition to validate the fallback path. I have one minor suggestion to improve the test's assertion to make it more specific.

return get_parquet_dataset(paths, fs, kwargs)

ds = ray.get(call_helper.remote([str(local_file)], problematic_fs, {}))
assert ds is not None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The test logic is great and effectively validates the fallback mechanism. To make the assertion slightly more robust, consider checking a property of the returned ParquetDataset object instead of just asserting it's not None. This would provide stronger confirmation that the dataset was created successfully. For example, you could check that it contains the expected number of file fragments.

Suggested change
assert ds is not None
assert ds is not None and len(ds.fragments) == 1

@srinathk10 srinathk10 added the go add ONLY when ready to merge, run all tests label Sep 30, 2025
@ray-gardener ray-gardener bot added the data Ray Data-related issues label Sep 30, 2025
cursor[bot]

This comment was marked as outdated.

Copy link
Contributor

@goutamvenkat-anyscale goutamvenkat-anyscale left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ty

Signed-off-by: Srinath Krishnamachari <[email protected]>
Comment on lines +781 to +796
except TypeError:
# Fallback: resolve filesystem locally in the worker
try:
resolved_paths, resolved_filesystem = _resolve_paths_and_filesystem(
paths, filesystem=None
)
resolved_filesystem = RetryingPyFileSystem.wrap(
resolved_filesystem,
retryable_errors=DataContext.get_current().retried_io_errors,
)
dataset = pq.ParquetDataset(
resolved_paths,
**dataset_kwargs,
filesystem=resolved_filesystem,
)
except OSError as os_e:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, so after some research i think the reason why it's failing is that PA 14.0 isn't handling PyFileSystems correctly, and your fix actually won't work since you're rewrapping it before passing to ParquetDataset

This doesn't seem to be a problem in PA 21.0 though.

So i think the proper fix would be to unwrap our RetryignPyFileSystem when passing it to get_parquet_dataset for PA < 14.0

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an intermittent failure though and calling _resolve_paths_and_filesystem locally for TypeError on the worker handles the cases when the underlying pa filesystem is a cpp pointer.

Signed-off-by: Srinath Krishnamachari <[email protected]>
cursor[bot]

This comment was marked as outdated.

@github-actions
Copy link

This pull request has been automatically marked as stale because it has not had
any activity for 14 days. It will be closed in another 14 days if no further activity occurs.
Thank you for your contributions.

You can always ask for help on our discussion forum or Ray's public slack channel.

If you'd like to keep this open, just leave any comment, and the stale label will be removed.

@github-actions github-actions bot added the stale The issue is stale. It will be closed within 7 days unless there are further conversation label Oct 24, 2025
@github-actions
Copy link

github-actions bot commented Nov 8, 2025

This pull request has been automatically closed because there has been no more activity in the 14 days
since being marked stale.

Please feel free to reopen or open a new pull request if you'd still like this to be addressed.

Again, you can always ask for help on our discussion forum or Ray's public slack channel.

Thanks again for your contribution!

@github-actions github-actions bot closed this Nov 8, 2025
@srinathk10 srinathk10 reopened this Nov 18, 2025
@bveeramani bveeramani enabled auto-merge (squash) November 18, 2025 20:23
@github-actions github-actions bot disabled auto-merge November 18, 2025 21:59
@srinathk10 srinathk10 removed the stale The issue is stale. It will be closed within 7 days unless there are further conversation label Nov 18, 2025
@bveeramani bveeramani merged commit 6a9aba9 into master Nov 19, 2025
6 checks passed
@bveeramani bveeramani deleted the srinathk10/parquet_datasource_fs branch November 19, 2025 20:15
400Ping pushed a commit to 400Ping/ray that referenced this pull request Nov 21, 2025
…ect#57047)

<!-- Thank you for your contribution! Please review
https://github.com/ray-project/ray/blob/master/CONTRIBUTING.rst before
opening a pull request. -->

<!-- Please add a reviewer to the assignee section when you create a PR.
If you don't have the access to it, we will shortly find a reviewer and
assign them to your PR. -->

## Why are these changes needed?

<!-- Please give a short summary of the change and the problem this
solves. -->


### [Data] Handle FS serialization issue in get_parquet_dataset
### Issue

- In read_parquet, FS resolution is done by calling
_resolve_paths_and_filesystem on the driver node.
- However, on the worker nodes, get_parquet_dataset may not be able to
deserialize the FS.


```
2025-09-24 14:02:35,151 ERROR worker.py:421 -- Unhandled error (suppress with 'RAY_IGNORE_UNHANDLED_ERRORS=1'): ray::ReadFiles() (pid=10215, ip=10.242.32.217)
    for b_out in map_transformer.apply_transform(iter(blocks), ctx):
  File "/models/ray_pipelines/container/container_bin.runfiles/rules_python~~pip~python_deps_311_ray_cp311_cp311_manylinux_2_17_x86_64_d3f8d8c8/site-packages/ray/data/_internal/execution/operators/map_transformer.py", line 532, in __call__
    for data in iter:
  File "/models/ray_pipelines/container/container_bin.runfiles/rules_python~~pip~python_deps_311_ray_cp311_cp311_manylinux_2_17_x86_64_d3f8d8c8/site-packages/ray/data/_internal/execution/operators/map_transformer.py", line 327, in __call__
    yield from self._batch_fn(input, ctx)
  File "/models/ray_pipelines/container/container_bin.runfiles/rules_python~~pip~python_deps_311_ray_cp311_cp311_manylinux_2_17_x86_64_d3f8d8c8/site-packages/ray/anyscale/data/_internal/planner/plan_read_files_op.py", line 79, in read_paths
    yield from reader.read_paths(
  File "/models/ray_pipelines/container/container_bin.runfiles/rules_python~~pip~python_deps_311_ray_cp311_cp311_manylinux_2_17_x86_64_d3f8d8c8/site-packages/ray/anyscale/data/_internal/readers/parquet_reader.py", line 137, in read_paths
    fragments = self._create_fragments(paths, filesystem=filesystem)
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/models/ray_pipelines/container/container_bin.runfiles/rules_python~~pip~python_deps_311_ray_cp311_cp311_manylinux_2_17_x86_64_d3f8d8c8/site-packages/ray/anyscale/data/_internal/readers/parquet_reader.py", line 193, in _create_fragments
    parquet_dataset = call_with_retry(
                      ^^^^^^^^^^^^^^^^
  File "/models/ray_pipelines/container/container_bin.runfiles/rules_python~~pip~python_deps_311_ray_cp311_cp311_manylinux_2_17_x86_64_d3f8d8c8/site-packages/ray/data/_internal/util.py", line 1400, in call_with_retry
    raise e from None
  File "/models/ray_pipelines/container/container_bin.runfiles/rules_python~~pip~python_deps_311_ray_cp311_cp311_manylinux_2_17_x86_64_d3f8d8c8/site-packages/ray/data/_internal/util.py", line 1386, in call_with_retry
    return f()
           ^^^
  File "/models/ray_pipelines/container/container_bin.runfiles/rules_python~~pip~python_deps_311_ray_cp311_cp311_manylinux_2_17_x86_64_d3f8d8c8/site-packages/ray/anyscale/data/_internal/readers/parquet_reader.py", line 194, in <lambda>
    lambda: get_parquet_dataset(paths, filesystem, self._dataset_kwargs),
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/models/ray_pipelines/container/container_bin.runfiles/rules_python~~pip~python_deps_311_ray_cp311_cp311_manylinux_2_17_x86_64_d3f8d8c8/site-packages/ray/data/_internal/datasource/parquet_datasource.py", line 628, in get_parquet_dataset
    dataset = pq.ParquetDataset(
              ^^^^^^^^^^^^^^^^^^
  File "/models/ray_pipelines/container/container_bin.runfiles/rules_python~~pip~python_deps_311_pyarrow_cp311_cp311_manylinux_2_17_x86_64_06ff1264/site-packages/pyarrow/parquet/core.py", line 1793, in __new__
    return _ParquetDatasetV2(
           ^^^^^^^^^^^^^^^^^^
  File "/models/ray_pipelines/container/container_bin.runfiles/rules_python~~pip~python_deps_311_pyarrow_cp311_cp311_manylinux_2_17_x86_64_06ff1264/site-packages/pyarrow/parquet/core.py", line 2498, in __init__
    filesystem=fragment.filesystem
               ^^^^^^^^^^^^^^^^^^^
  File "pyarrow/_dataset.pyx", line 1901, in pyarrow._dataset.FileFragment.filesystem.__get__
  File "pyarrow/_fs.pyx", line 500, in pyarrow._fs.FileSystem.wrap
TypeError: Cannot wrap FileSystem pointer
2025-09-24 14:02:35,151 ERROR worker.py:421 -- Unhandled error (suppress with 'RAY_IGNORE_UNHANDLED_ERRORS=1'): ray::ReadFiles() (pid=10215, ip=1

```

**Fix**

Upon TYPE_ERROR, invoke _resolve_paths_and_filesystem on the worker
again.

## Related issue number

<!-- For example: "Closes ray-project#1234" -->

## Checks

- [ ] I've signed off every commit(by using the -s flag, i.e., `git
commit -s`) in this PR.
- [ ] I've run pre-commit jobs to lint the changes in this PR.
([pre-commit
setup](https://docs.ray.io/en/latest/ray-contribute/getting-involved.html#lint-and-formatting))
- [ ] I've included any doc changes needed for
https://docs.ray.io/en/master/.
- [ ] I've added any new APIs to the API Reference. For example, if I
added a
method in Tune, I've added it in `doc/source/tune/api/` under the
           corresponding `.rst` file.
- [ ] I've made sure the tests are passing. Note that there might be a
few flaky tests, see the recent failures at https://flakey-tests.ray.io/
- Testing Strategy
   - [ ] Unit tests
   - [ ] Release tests
   - [ ] This PR is not tested :(

<!-- CURSOR_SUMMARY -->
---

> [!NOTE]
> Adds a worker-side filesystem resolution fallback in
`get_parquet_dataset` to handle PyArrow FS serialization errors, with a
new test validating the behavior.
> 
> - **Parquet datasource**:
> - `get_parquet_dataset`: On `TypeError`, re-resolves
`paths`/filesystem via `_resolve_paths_and_filesystem` on the worker and
wraps with `RetryingPyFileSystem` using `DataContext.retried_io_errors`;
continues to handle `OSError` via `_handle_read_os_error`.
>   - Import `_resolve_paths_and_filesystem` for local resolution.
> - **Tests**:
> - Add `test_get_parquet_dataset_fs_serialization_fallback` validating
failure with a problematic fsspec-backed FS and success via the helper
fallback (uses `PyFileSystem(FSSpecHandler(...))`).
> 
> <sup>Written by [Cursor
Bugbot](https://cursor.com/dashboard?tab=bugbot) for commit
de68ddd. This will update automatically
on new commits. Configure
[here](https://cursor.com/dashboard?tab=bugbot).</sup>
<!-- /CURSOR_SUMMARY -->

---------

Signed-off-by: Srinath Krishnamachari <[email protected]>
ykdojo pushed a commit to ykdojo/ray that referenced this pull request Nov 27, 2025
…ect#57047)

<!-- Thank you for your contribution! Please review
https://github.com/ray-project/ray/blob/master/CONTRIBUTING.rst before
opening a pull request. -->

<!-- Please add a reviewer to the assignee section when you create a PR.
If you don't have the access to it, we will shortly find a reviewer and
assign them to your PR. -->

## Why are these changes needed?

<!-- Please give a short summary of the change and the problem this
solves. -->

### [Data] Handle FS serialization issue in get_parquet_dataset
### Issue

- In read_parquet, FS resolution is done by calling
_resolve_paths_and_filesystem on the driver node.
- However, on the worker nodes, get_parquet_dataset may not be able to
deserialize the FS.

```
2025-09-24 14:02:35,151 ERROR worker.py:421 -- Unhandled error (suppress with 'RAY_IGNORE_UNHANDLED_ERRORS=1'): ray::ReadFiles() (pid=10215, ip=10.242.32.217)
    for b_out in map_transformer.apply_transform(iter(blocks), ctx):
  File "/models/ray_pipelines/container/container_bin.runfiles/rules_python~~pip~python_deps_311_ray_cp311_cp311_manylinux_2_17_x86_64_d3f8d8c8/site-packages/ray/data/_internal/execution/operators/map_transformer.py", line 532, in __call__
    for data in iter:
  File "/models/ray_pipelines/container/container_bin.runfiles/rules_python~~pip~python_deps_311_ray_cp311_cp311_manylinux_2_17_x86_64_d3f8d8c8/site-packages/ray/data/_internal/execution/operators/map_transformer.py", line 327, in __call__
    yield from self._batch_fn(input, ctx)
  File "/models/ray_pipelines/container/container_bin.runfiles/rules_python~~pip~python_deps_311_ray_cp311_cp311_manylinux_2_17_x86_64_d3f8d8c8/site-packages/ray/anyscale/data/_internal/planner/plan_read_files_op.py", line 79, in read_paths
    yield from reader.read_paths(
  File "/models/ray_pipelines/container/container_bin.runfiles/rules_python~~pip~python_deps_311_ray_cp311_cp311_manylinux_2_17_x86_64_d3f8d8c8/site-packages/ray/anyscale/data/_internal/readers/parquet_reader.py", line 137, in read_paths
    fragments = self._create_fragments(paths, filesystem=filesystem)
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/models/ray_pipelines/container/container_bin.runfiles/rules_python~~pip~python_deps_311_ray_cp311_cp311_manylinux_2_17_x86_64_d3f8d8c8/site-packages/ray/anyscale/data/_internal/readers/parquet_reader.py", line 193, in _create_fragments
    parquet_dataset = call_with_retry(
                      ^^^^^^^^^^^^^^^^
  File "/models/ray_pipelines/container/container_bin.runfiles/rules_python~~pip~python_deps_311_ray_cp311_cp311_manylinux_2_17_x86_64_d3f8d8c8/site-packages/ray/data/_internal/util.py", line 1400, in call_with_retry
    raise e from None
  File "/models/ray_pipelines/container/container_bin.runfiles/rules_python~~pip~python_deps_311_ray_cp311_cp311_manylinux_2_17_x86_64_d3f8d8c8/site-packages/ray/data/_internal/util.py", line 1386, in call_with_retry
    return f()
           ^^^
  File "/models/ray_pipelines/container/container_bin.runfiles/rules_python~~pip~python_deps_311_ray_cp311_cp311_manylinux_2_17_x86_64_d3f8d8c8/site-packages/ray/anyscale/data/_internal/readers/parquet_reader.py", line 194, in <lambda>
    lambda: get_parquet_dataset(paths, filesystem, self._dataset_kwargs),
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/models/ray_pipelines/container/container_bin.runfiles/rules_python~~pip~python_deps_311_ray_cp311_cp311_manylinux_2_17_x86_64_d3f8d8c8/site-packages/ray/data/_internal/datasource/parquet_datasource.py", line 628, in get_parquet_dataset
    dataset = pq.ParquetDataset(
              ^^^^^^^^^^^^^^^^^^
  File "/models/ray_pipelines/container/container_bin.runfiles/rules_python~~pip~python_deps_311_pyarrow_cp311_cp311_manylinux_2_17_x86_64_06ff1264/site-packages/pyarrow/parquet/core.py", line 1793, in __new__
    return _ParquetDatasetV2(
           ^^^^^^^^^^^^^^^^^^
  File "/models/ray_pipelines/container/container_bin.runfiles/rules_python~~pip~python_deps_311_pyarrow_cp311_cp311_manylinux_2_17_x86_64_06ff1264/site-packages/pyarrow/parquet/core.py", line 2498, in __init__
    filesystem=fragment.filesystem
               ^^^^^^^^^^^^^^^^^^^
  File "pyarrow/_dataset.pyx", line 1901, in pyarrow._dataset.FileFragment.filesystem.__get__
  File "pyarrow/_fs.pyx", line 500, in pyarrow._fs.FileSystem.wrap
TypeError: Cannot wrap FileSystem pointer
2025-09-24 14:02:35,151 ERROR worker.py:421 -- Unhandled error (suppress with 'RAY_IGNORE_UNHANDLED_ERRORS=1'): ray::ReadFiles() (pid=10215, ip=1

```

**Fix**

Upon TYPE_ERROR, invoke _resolve_paths_and_filesystem on the worker
again.

## Related issue number

<!-- For example: "Closes ray-project#1234" -->

## Checks

- [ ] I've signed off every commit(by using the -s flag, i.e., `git
commit -s`) in this PR.
- [ ] I've run pre-commit jobs to lint the changes in this PR.
([pre-commit
setup](https://docs.ray.io/en/latest/ray-contribute/getting-involved.html#lint-and-formatting))
- [ ] I've included any doc changes needed for
https://docs.ray.io/en/master/.
- [ ] I've added any new APIs to the API Reference. For example, if I
added a
method in Tune, I've added it in `doc/source/tune/api/` under the
           corresponding `.rst` file.
- [ ] I've made sure the tests are passing. Note that there might be a
few flaky tests, see the recent failures at https://flakey-tests.ray.io/
- Testing Strategy
   - [ ] Unit tests
   - [ ] Release tests
   - [ ] This PR is not tested :(

<!-- CURSOR_SUMMARY -->
---

> [!NOTE]
> Adds a worker-side filesystem resolution fallback in
`get_parquet_dataset` to handle PyArrow FS serialization errors, with a
new test validating the behavior.
>
> - **Parquet datasource**:
> - `get_parquet_dataset`: On `TypeError`, re-resolves
`paths`/filesystem via `_resolve_paths_and_filesystem` on the worker and
wraps with `RetryingPyFileSystem` using `DataContext.retried_io_errors`;
continues to handle `OSError` via `_handle_read_os_error`.
>   - Import `_resolve_paths_and_filesystem` for local resolution.
> - **Tests**:
> - Add `test_get_parquet_dataset_fs_serialization_fallback` validating
failure with a problematic fsspec-backed FS and success via the helper
fallback (uses `PyFileSystem(FSSpecHandler(...))`).
>
> <sup>Written by [Cursor
Bugbot](https://cursor.com/dashboard?tab=bugbot) for commit
de68ddd. This will update automatically
on new commits. Configure
[here](https://cursor.com/dashboard?tab=bugbot).</sup>
<!-- /CURSOR_SUMMARY -->

---------

Signed-off-by: Srinath Krishnamachari <[email protected]>
Signed-off-by: YK <[email protected]>
SheldonTsen pushed a commit to SheldonTsen/ray that referenced this pull request Dec 1, 2025
…ect#57047)

<!-- Thank you for your contribution! Please review
https://github.com/ray-project/ray/blob/master/CONTRIBUTING.rst before
opening a pull request. -->

<!-- Please add a reviewer to the assignee section when you create a PR.
If you don't have the access to it, we will shortly find a reviewer and
assign them to your PR. -->

## Why are these changes needed?

<!-- Please give a short summary of the change and the problem this
solves. -->


### [Data] Handle FS serialization issue in get_parquet_dataset
### Issue

- In read_parquet, FS resolution is done by calling
_resolve_paths_and_filesystem on the driver node.
- However, on the worker nodes, get_parquet_dataset may not be able to
deserialize the FS.


```
2025-09-24 14:02:35,151 ERROR worker.py:421 -- Unhandled error (suppress with 'RAY_IGNORE_UNHANDLED_ERRORS=1'): ray::ReadFiles() (pid=10215, ip=10.242.32.217)
    for b_out in map_transformer.apply_transform(iter(blocks), ctx):
  File "/models/ray_pipelines/container/container_bin.runfiles/rules_python~~pip~python_deps_311_ray_cp311_cp311_manylinux_2_17_x86_64_d3f8d8c8/site-packages/ray/data/_internal/execution/operators/map_transformer.py", line 532, in __call__
    for data in iter:
  File "/models/ray_pipelines/container/container_bin.runfiles/rules_python~~pip~python_deps_311_ray_cp311_cp311_manylinux_2_17_x86_64_d3f8d8c8/site-packages/ray/data/_internal/execution/operators/map_transformer.py", line 327, in __call__
    yield from self._batch_fn(input, ctx)
  File "/models/ray_pipelines/container/container_bin.runfiles/rules_python~~pip~python_deps_311_ray_cp311_cp311_manylinux_2_17_x86_64_d3f8d8c8/site-packages/ray/anyscale/data/_internal/planner/plan_read_files_op.py", line 79, in read_paths
    yield from reader.read_paths(
  File "/models/ray_pipelines/container/container_bin.runfiles/rules_python~~pip~python_deps_311_ray_cp311_cp311_manylinux_2_17_x86_64_d3f8d8c8/site-packages/ray/anyscale/data/_internal/readers/parquet_reader.py", line 137, in read_paths
    fragments = self._create_fragments(paths, filesystem=filesystem)
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/models/ray_pipelines/container/container_bin.runfiles/rules_python~~pip~python_deps_311_ray_cp311_cp311_manylinux_2_17_x86_64_d3f8d8c8/site-packages/ray/anyscale/data/_internal/readers/parquet_reader.py", line 193, in _create_fragments
    parquet_dataset = call_with_retry(
                      ^^^^^^^^^^^^^^^^
  File "/models/ray_pipelines/container/container_bin.runfiles/rules_python~~pip~python_deps_311_ray_cp311_cp311_manylinux_2_17_x86_64_d3f8d8c8/site-packages/ray/data/_internal/util.py", line 1400, in call_with_retry
    raise e from None
  File "/models/ray_pipelines/container/container_bin.runfiles/rules_python~~pip~python_deps_311_ray_cp311_cp311_manylinux_2_17_x86_64_d3f8d8c8/site-packages/ray/data/_internal/util.py", line 1386, in call_with_retry
    return f()
           ^^^
  File "/models/ray_pipelines/container/container_bin.runfiles/rules_python~~pip~python_deps_311_ray_cp311_cp311_manylinux_2_17_x86_64_d3f8d8c8/site-packages/ray/anyscale/data/_internal/readers/parquet_reader.py", line 194, in <lambda>
    lambda: get_parquet_dataset(paths, filesystem, self._dataset_kwargs),
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/models/ray_pipelines/container/container_bin.runfiles/rules_python~~pip~python_deps_311_ray_cp311_cp311_manylinux_2_17_x86_64_d3f8d8c8/site-packages/ray/data/_internal/datasource/parquet_datasource.py", line 628, in get_parquet_dataset
    dataset = pq.ParquetDataset(
              ^^^^^^^^^^^^^^^^^^
  File "/models/ray_pipelines/container/container_bin.runfiles/rules_python~~pip~python_deps_311_pyarrow_cp311_cp311_manylinux_2_17_x86_64_06ff1264/site-packages/pyarrow/parquet/core.py", line 1793, in __new__
    return _ParquetDatasetV2(
           ^^^^^^^^^^^^^^^^^^
  File "/models/ray_pipelines/container/container_bin.runfiles/rules_python~~pip~python_deps_311_pyarrow_cp311_cp311_manylinux_2_17_x86_64_06ff1264/site-packages/pyarrow/parquet/core.py", line 2498, in __init__
    filesystem=fragment.filesystem
               ^^^^^^^^^^^^^^^^^^^
  File "pyarrow/_dataset.pyx", line 1901, in pyarrow._dataset.FileFragment.filesystem.__get__
  File "pyarrow/_fs.pyx", line 500, in pyarrow._fs.FileSystem.wrap
TypeError: Cannot wrap FileSystem pointer
2025-09-24 14:02:35,151 ERROR worker.py:421 -- Unhandled error (suppress with 'RAY_IGNORE_UNHANDLED_ERRORS=1'): ray::ReadFiles() (pid=10215, ip=1

```

**Fix**

Upon TYPE_ERROR, invoke _resolve_paths_and_filesystem on the worker
again.

## Related issue number

<!-- For example: "Closes ray-project#1234" -->

## Checks

- [ ] I've signed off every commit(by using the -s flag, i.e., `git
commit -s`) in this PR.
- [ ] I've run pre-commit jobs to lint the changes in this PR.
([pre-commit
setup](https://docs.ray.io/en/latest/ray-contribute/getting-involved.html#lint-and-formatting))
- [ ] I've included any doc changes needed for
https://docs.ray.io/en/master/.
- [ ] I've added any new APIs to the API Reference. For example, if I
added a
method in Tune, I've added it in `doc/source/tune/api/` under the
           corresponding `.rst` file.
- [ ] I've made sure the tests are passing. Note that there might be a
few flaky tests, see the recent failures at https://flakey-tests.ray.io/
- Testing Strategy
   - [ ] Unit tests
   - [ ] Release tests
   - [ ] This PR is not tested :(

<!-- CURSOR_SUMMARY -->
---

> [!NOTE]
> Adds a worker-side filesystem resolution fallback in
`get_parquet_dataset` to handle PyArrow FS serialization errors, with a
new test validating the behavior.
> 
> - **Parquet datasource**:
> - `get_parquet_dataset`: On `TypeError`, re-resolves
`paths`/filesystem via `_resolve_paths_and_filesystem` on the worker and
wraps with `RetryingPyFileSystem` using `DataContext.retried_io_errors`;
continues to handle `OSError` via `_handle_read_os_error`.
>   - Import `_resolve_paths_and_filesystem` for local resolution.
> - **Tests**:
> - Add `test_get_parquet_dataset_fs_serialization_fallback` validating
failure with a problematic fsspec-backed FS and success via the helper
fallback (uses `PyFileSystem(FSSpecHandler(...))`).
> 
> <sup>Written by [Cursor
Bugbot](https://cursor.com/dashboard?tab=bugbot) for commit
de68ddd. This will update automatically
on new commits. Configure
[here](https://cursor.com/dashboard?tab=bugbot).</sup>
<!-- /CURSOR_SUMMARY -->

---------

Signed-off-by: Srinath Krishnamachari <[email protected]>
peterxcli pushed a commit to peterxcli/ray that referenced this pull request Feb 25, 2026
…ect#57047)

<!-- Thank you for your contribution! Please review
https://github.com/ray-project/ray/blob/master/CONTRIBUTING.rst before
opening a pull request. -->

<!-- Please add a reviewer to the assignee section when you create a PR.
If you don't have the access to it, we will shortly find a reviewer and
assign them to your PR. -->

## Why are these changes needed?

<!-- Please give a short summary of the change and the problem this
solves. -->

### [Data] Handle FS serialization issue in get_parquet_dataset
### Issue

- In read_parquet, FS resolution is done by calling
_resolve_paths_and_filesystem on the driver node.
- However, on the worker nodes, get_parquet_dataset may not be able to
deserialize the FS.

```
2025-09-24 14:02:35,151 ERROR worker.py:421 -- Unhandled error (suppress with 'RAY_IGNORE_UNHANDLED_ERRORS=1'): ray::ReadFiles() (pid=10215, ip=10.242.32.217)
    for b_out in map_transformer.apply_transform(iter(blocks), ctx):
  File "/models/ray_pipelines/container/container_bin.runfiles/rules_python~~pip~python_deps_311_ray_cp311_cp311_manylinux_2_17_x86_64_d3f8d8c8/site-packages/ray/data/_internal/execution/operators/map_transformer.py", line 532, in __call__
    for data in iter:
  File "/models/ray_pipelines/container/container_bin.runfiles/rules_python~~pip~python_deps_311_ray_cp311_cp311_manylinux_2_17_x86_64_d3f8d8c8/site-packages/ray/data/_internal/execution/operators/map_transformer.py", line 327, in __call__
    yield from self._batch_fn(input, ctx)
  File "/models/ray_pipelines/container/container_bin.runfiles/rules_python~~pip~python_deps_311_ray_cp311_cp311_manylinux_2_17_x86_64_d3f8d8c8/site-packages/ray/anyscale/data/_internal/planner/plan_read_files_op.py", line 79, in read_paths
    yield from reader.read_paths(
  File "/models/ray_pipelines/container/container_bin.runfiles/rules_python~~pip~python_deps_311_ray_cp311_cp311_manylinux_2_17_x86_64_d3f8d8c8/site-packages/ray/anyscale/data/_internal/readers/parquet_reader.py", line 137, in read_paths
    fragments = self._create_fragments(paths, filesystem=filesystem)
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/models/ray_pipelines/container/container_bin.runfiles/rules_python~~pip~python_deps_311_ray_cp311_cp311_manylinux_2_17_x86_64_d3f8d8c8/site-packages/ray/anyscale/data/_internal/readers/parquet_reader.py", line 193, in _create_fragments
    parquet_dataset = call_with_retry(
                      ^^^^^^^^^^^^^^^^
  File "/models/ray_pipelines/container/container_bin.runfiles/rules_python~~pip~python_deps_311_ray_cp311_cp311_manylinux_2_17_x86_64_d3f8d8c8/site-packages/ray/data/_internal/util.py", line 1400, in call_with_retry
    raise e from None
  File "/models/ray_pipelines/container/container_bin.runfiles/rules_python~~pip~python_deps_311_ray_cp311_cp311_manylinux_2_17_x86_64_d3f8d8c8/site-packages/ray/data/_internal/util.py", line 1386, in call_with_retry
    return f()
           ^^^
  File "/models/ray_pipelines/container/container_bin.runfiles/rules_python~~pip~python_deps_311_ray_cp311_cp311_manylinux_2_17_x86_64_d3f8d8c8/site-packages/ray/anyscale/data/_internal/readers/parquet_reader.py", line 194, in <lambda>
    lambda: get_parquet_dataset(paths, filesystem, self._dataset_kwargs),
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/models/ray_pipelines/container/container_bin.runfiles/rules_python~~pip~python_deps_311_ray_cp311_cp311_manylinux_2_17_x86_64_d3f8d8c8/site-packages/ray/data/_internal/datasource/parquet_datasource.py", line 628, in get_parquet_dataset
    dataset = pq.ParquetDataset(
              ^^^^^^^^^^^^^^^^^^
  File "/models/ray_pipelines/container/container_bin.runfiles/rules_python~~pip~python_deps_311_pyarrow_cp311_cp311_manylinux_2_17_x86_64_06ff1264/site-packages/pyarrow/parquet/core.py", line 1793, in __new__
    return _ParquetDatasetV2(
           ^^^^^^^^^^^^^^^^^^
  File "/models/ray_pipelines/container/container_bin.runfiles/rules_python~~pip~python_deps_311_pyarrow_cp311_cp311_manylinux_2_17_x86_64_06ff1264/site-packages/pyarrow/parquet/core.py", line 2498, in __init__
    filesystem=fragment.filesystem
               ^^^^^^^^^^^^^^^^^^^
  File "pyarrow/_dataset.pyx", line 1901, in pyarrow._dataset.FileFragment.filesystem.__get__
  File "pyarrow/_fs.pyx", line 500, in pyarrow._fs.FileSystem.wrap
TypeError: Cannot wrap FileSystem pointer
2025-09-24 14:02:35,151 ERROR worker.py:421 -- Unhandled error (suppress with 'RAY_IGNORE_UNHANDLED_ERRORS=1'): ray::ReadFiles() (pid=10215, ip=1

```

**Fix**

Upon TYPE_ERROR, invoke _resolve_paths_and_filesystem on the worker
again.

## Related issue number

<!-- For example: "Closes ray-project#1234" -->

## Checks

- [ ] I've signed off every commit(by using the -s flag, i.e., `git
commit -s`) in this PR.
- [ ] I've run pre-commit jobs to lint the changes in this PR.
([pre-commit
setup](https://docs.ray.io/en/latest/ray-contribute/getting-involved.html#lint-and-formatting))
- [ ] I've included any doc changes needed for
https://docs.ray.io/en/master/.
- [ ] I've added any new APIs to the API Reference. For example, if I
added a
method in Tune, I've added it in `doc/source/tune/api/` under the
           corresponding `.rst` file.
- [ ] I've made sure the tests are passing. Note that there might be a
few flaky tests, see the recent failures at https://flakey-tests.ray.io/
- Testing Strategy
   - [ ] Unit tests
   - [ ] Release tests
   - [ ] This PR is not tested :(

<!-- CURSOR_SUMMARY -->
---

> [!NOTE]
> Adds a worker-side filesystem resolution fallback in
`get_parquet_dataset` to handle PyArrow FS serialization errors, with a
new test validating the behavior.
>
> - **Parquet datasource**:
> - `get_parquet_dataset`: On `TypeError`, re-resolves
`paths`/filesystem via `_resolve_paths_and_filesystem` on the worker and
wraps with `RetryingPyFileSystem` using `DataContext.retried_io_errors`;
continues to handle `OSError` via `_handle_read_os_error`.
>   - Import `_resolve_paths_and_filesystem` for local resolution.
> - **Tests**:
> - Add `test_get_parquet_dataset_fs_serialization_fallback` validating
failure with a problematic fsspec-backed FS and success via the helper
fallback (uses `PyFileSystem(FSSpecHandler(...))`).
>
> <sup>Written by [Cursor
Bugbot](https://cursor.com/dashboard?tab=bugbot) for commit
de68ddd. This will update automatically
on new commits. Configure
[here](https://cursor.com/dashboard?tab=bugbot).</sup>
<!-- /CURSOR_SUMMARY -->

---------

Signed-off-by: Srinath Krishnamachari <[email protected]>
Signed-off-by: peterxcli <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

data Ray Data-related issues go add ONLY when ready to merge, run all tests

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants