Skip to content

[Data] Cherry-pick: Add enhanced support for Unity Catalog (#57954)#58049

Merged
aslonnie merged 1 commit intoreleases/2.51.0from
pg-cherry-pick-brach
Oct 23, 2025
Merged

[Data] Cherry-pick: Add enhanced support for Unity Catalog (#57954)#58049
aslonnie merged 1 commit intoreleases/2.51.0from
pg-cherry-pick-brach

Conversation

@gvspraveen
Copy link
Contributor

Description

This PR adds support for reading Unity Catalog Delta tables in Ray Data with automatic credential vending. This enables secure, temporary access to Delta Lake tables stored in Databricks Unity Catalog without requiring users to manage cloud credentials manually.

What's Added

  • ray.data.read_unity_catalog() - Updated public API for reading Unity Catalog Delta tables
  • UnityCatalogConnector - Handles Unity Catalog REST API integration and credential vending
  • Multi-cloud support - Works with AWS S3, Azure Data Lake Storage, and Google Cloud Storage
  • Automatic credential management - Obtains temporary, least-privilege credentials via Unity Catalog API
  • Delta Lake integration - Properly configures PyArrow filesystem for Delta tables with session tokens

Key Features

Production-ready credential vending API - Uses stable, public Unity Catalog APIs
Secure by default - Temporary credentials with automatic cleanup ✅ Multi-cloud - AWS (S3), Azure (Blob Storage), and GCP (Cloud Storage)
Delta Lake optimized - Handles session tokens and PyArrow filesystem configuration
Comprehensive error handling - Helpful messages for common issues (deletion vectors, permissions, etc.)
Full logging support - Debug and info logging throughout

Usage Example

import ray

# Read a Unity Catalog Delta table
ds = ray.data.read_unity_catalog(
    table="main.sales.transactions",
    url="https://dbc-XXXXXXX-XXXX.cloud.databricks.com",
    token="dapi...",
    region="us-west-2"  # Optional, for AWS
)

# Use standard Ray Data operations
ds = ds.filter(lambda row: row["amount"] > 100)
ds.show(5)

Implementation Notes

This is a simplified, focused implementation that:

  • Supports Unity Catalog tables only (no volumes - that's in private preview)
  • Assumes Delta Lake format (most common Unity Catalog use case)
  • Uses production-ready APIs only (no private preview features)
  • Provides ~600 lines of clean, reviewable code

The full implementation with volumes and multi-format support is available in the data_uc_volumes branch and can be added in a future PR once this foundation is reviewed.

Testing

  • ✅ All ruff lint checks pass
  • ✅ Code formatted per Ray standards
  • ✅ Tested with real Unity Catalog Delta tables on AWS S3
  • ✅ Proper PyArrow filesystem configuration verified
  • ✅ Credential vending flow validated

Related issues

Related to Unity Catalog and Delta Lake support requests in Ray Data.

Additional information

Architecture

The implementation follows the connector pattern rather than a Datasource subclass because Unity Catalog is a metadata/credential layer, not a data format. The connector:

  1. Fetches table metadata from Unity Catalog REST API
  2. Obtains temporary credentials via credential vending API
  3. Configures cloud-specific environment variables
  4. Delegates to ray.data.read_delta() with proper filesystem configuration

Delta Lake Special Handling

Delta Lake on AWS requires explicit PyArrow S3FileSystem configuration with session tokens (environment variables alone are insufficient). This implementation correctly creates and passes the filesystem object to the deltalake library.

Cloud Provider Support

Provider Credential Type Implementation
AWS S3 Temporary IAM credentials PyArrow S3FileSystem with session token
Azure Blob SAS tokens Environment variables
(AZURE_STORAGE_SAS_TOKEN)
GCP Cloud Storage OAuth tokens / Service account Environment variables (GCP_OAUTH_TOKEN, GOOGLE_APPLICATION_CREDENTIALS)

Error Handling

Comprehensive error messages for common issues:

  • Deletion Vectors: Guidance on upgrading deltalake library or disabling the feature
  • Column Mapping: Compatibility information and solutions
  • Permissions: Clear list of required Unity Catalog permissions
  • Credential issues: Detailed troubleshooting steps

Future Enhancements

Potential follow-up PRs:

  • Unity Catalog volumes support (when out of private preview)
  • Multi-format support (Parquet, CSV, JSON, images, etc.)
  • Custom datasource integration
  • Advanced Delta Lake features (time travel, partition filters)

Dependencies

  • Requires deltalake package for Delta Lake support
  • Uses standard Ray Data APIs (read_delta, read_datasource)
  • Integrates with existing PyArrow filesystem infrastructure

Documentation

  • Full docstrings with examples
  • Type hints throughout
  • Inline comments with references to external documentation
  • Comprehensive error messages with actionable guidance

Thank you for contributing to Ray! 🚀
Please review the Ray Contribution Guide before opening a pull request.

⚠️ Remove these instructions before submitting your PR.

💡 Tip: Mark as draft if you want early feedback, or ready for review when it's complete.

Description

Briefly describe what this PR accomplishes and why it's needed.

Related issues

Link related issues: "Fixes #1234", "Closes #1234", or "Related to #1234".

Additional information

Optional: Add implementation details, API changes, usage examples, screenshots, etc.

## Description

This PR adds support for reading Unity Catalog Delta tables in Ray Data
with automatic credential vending. This enables secure, temporary access
to Delta Lake tables stored in Databricks Unity Catalog without
requiring users to manage cloud credentials manually.

### What's Added

- **`ray.data.read_unity_catalog()`** - Updated public API for reading
Unity Catalog Delta tables
- **`UnityCatalogConnector`** - Handles Unity Catalog REST API
integration and credential vending
- **Multi-cloud support** - Works with AWS S3, Azure Data Lake Storage,
and Google Cloud Storage
- **Automatic credential management** - Obtains temporary,
least-privilege credentials via Unity Catalog API
- **Delta Lake integration** - Properly configures PyArrow filesystem
for Delta tables with session tokens

### Key Features

✅ **Production-ready credential vending API** - Uses stable, public
Unity Catalog APIs
✅ **Secure by default** - Temporary credentials with automatic cleanup  
✅ **Multi-cloud** - AWS (S3), Azure (Blob Storage), and GCP (Cloud
Storage)
✅ **Delta Lake optimized** - Handles session tokens and PyArrow
filesystem configuration
✅ **Comprehensive error handling** - Helpful messages for common issues
(deletion vectors, permissions, etc.)
✅ **Full logging support** - Debug and info logging throughout

### Usage Example

```python
import ray

# Read a Unity Catalog Delta table
ds = ray.data.read_unity_catalog(
    table="main.sales.transactions",
    url="https://dbc-XXXXXXX-XXXX.cloud.databricks.com",
    token="dapi...",
    region="us-west-2"  # Optional, for AWS
)

# Use standard Ray Data operations
ds = ds.filter(lambda row: row["amount"] > 100)
ds.show(5)
```

### Implementation Notes

This is a **simplified, focused implementation** that:
- Supports **Unity Catalog tables only** (no volumes - that's in private
preview)
- Assumes **Delta Lake format** (most common Unity Catalog use case)
- Uses **production-ready APIs** only (no private preview features)
- Provides ~600 lines of clean, reviewable code

The full implementation with volumes and multi-format support is
available in the `data_uc_volumes` branch and can be added in a future
PR once this foundation is reviewed.

### Testing

- ✅ All ruff lint checks pass
- ✅ Code formatted per Ray standards
- ✅ Tested with real Unity Catalog Delta tables on AWS S3
- ✅ Proper PyArrow filesystem configuration verified
- ✅ Credential vending flow validated

## Related issues

Related to Unity Catalog and Delta Lake support requests in Ray Data.

## Additional information

### Architecture

The implementation follows the **connector pattern** rather than a
`Datasource` subclass because Unity Catalog is a metadata/credential
layer, not a data format. The connector:

1. Fetches table metadata from Unity Catalog REST API
2. Obtains temporary credentials via credential vending API
3. Configures cloud-specific environment variables
4. Delegates to `ray.data.read_delta()` with proper filesystem
configuration

### Delta Lake Special Handling

Delta Lake on AWS requires explicit PyArrow S3FileSystem configuration
with session tokens (environment variables alone are insufficient). This
implementation correctly creates and passes the filesystem object to the
`deltalake` library.

### Cloud Provider Support

| Provider | Credential Type | Implementation |
|----------|----------------|----------------|
| AWS S3 | Temporary IAM credentials | PyArrow S3FileSystem with session
token |
| Azure Blob | SAS tokens | Environment variables
(AZURE_STORAGE_SAS_TOKEN) |
| GCP Cloud Storage | OAuth tokens / Service account | Environment
variables (GCP_OAUTH_TOKEN, GOOGLE_APPLICATION_CREDENTIALS) |

### Error Handling

Comprehensive error messages for common issues:
- **Deletion Vectors**: Guidance on upgrading deltalake library or
disabling the feature
- **Column Mapping**: Compatibility information and solutions
- **Permissions**: Clear list of required Unity Catalog permissions
- **Credential issues**: Detailed troubleshooting steps

### Future Enhancements

Potential follow-up PRs:
- Unity Catalog volumes support (when out of private preview)
- Multi-format support (Parquet, CSV, JSON, images, etc.)
- Custom datasource integration
- Advanced Delta Lake features (time travel, partition filters)

### Dependencies

- Requires `deltalake` package for Delta Lake support
- Uses standard Ray Data APIs (`read_delta`, `read_datasource`)
- Integrates with existing PyArrow filesystem infrastructure

### Documentation

- Full docstrings with examples
- Type hints throughout
- Inline comments with references to external documentation
- Comprehensive error messages with actionable guidance

---------

Signed-off-by: soffer-anyscale <[email protected]>
@gvspraveen gvspraveen requested a review from a team as a code owner October 23, 2025 17:13
@gvspraveen gvspraveen changed the title [Data] Add enhanced support for Unity Catalog (#57954) [Data] Cherry-pick: Add enhanced support for Unity Catalog (#57954) Oct 23, 2025
Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces enhanced support for reading from Databricks Unity Catalog, which is a great addition to Ray Data. The implementation is well-structured, using a UnityCatalogConnector to handle the logic of fetching metadata and vending credentials. The special handling for AWS credentials with Delta Lake is correctly implemented.

I've found a significant issue related to credential propagation for Azure and GCP when Ray is already initialized, which would cause reads to fail in a common scenario. I've provided detailed comments and code suggestions to address this.

Additionally, I've made a suggestion to improve the robustness of temporary file cleanup for GCP credentials in distributed environments.

Overall, this is a solid contribution with good error handling and clear documentation. Once the credential propagation issue is resolved, this will be a very valuable feature.

Comment on lines +140 to +145
def _read_delta_with_credentials(self):
"""Read Delta table with proper PyArrow filesystem for session tokens."""
import pyarrow.fs as pafs

creds = self._creds_response
reader_kwargs = self.reader_kwargs.copy()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

To support the fix for credential propagation, this method needs to accept reader_kwargs as an argument and use it, instead of relying on self.reader_kwargs.

Suggested change
def _read_delta_with_credentials(self):
"""Read Delta table with proper PyArrow filesystem for session tokens."""
import pyarrow.fs as pafs
creds = self._creds_response
reader_kwargs = self.reader_kwargs.copy()
def _read_delta_with_credentials(self, reader_kwargs: dict):
"""Read Delta table with proper PyArrow filesystem for session tokens."""
import pyarrow.fs as pafs
creds = self._creds_response
reader_kwargs = reader_kwargs.copy()

Comment on lines 186 to +195
if not ray.is_initialized():
ray.init(runtime_env=self._runtime_env, **self.ray_init_kwargs)

url = self._table_url
ds = reader(url, **self.reader_kwargs)
return ds
# Use special Delta reader for proper filesystem handling
if data_format == "delta":
return self._read_delta_with_credentials()

# Use standard reader for other formats
reader = self._get_ray_reader(data_format)
return reader(self._table_url, **self.reader_kwargs)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

There's an issue with credential propagation for Azure and GCP when Ray is already initialized. The current implementation sets environment variables for credentials and then configures self._runtime_env. This runtime_env is only used if ray.init() is called within this method. If Ray is already running, ray.init() is skipped, and the runtime_env with credentials is not passed to the Ray Data read tasks, causing reads to fail on remote workers for Azure and GCP. For AWS, this is not an issue because a filesystem object is explicitly created and passed.

To fix this, we should add the runtime_env to the ray_remote_args of the underlying read call if Ray is already initialized. This ensures credentials are available on worker processes. This change also requires updating _read_delta_with_credentials to accept the modified reader_kwargs, which I've suggested in a separate comment.

Suggested change
if not ray.is_initialized():
ray.init(runtime_env=self._runtime_env, **self.ray_init_kwargs)
url = self._table_url
ds = reader(url, **self.reader_kwargs)
return ds
# Use special Delta reader for proper filesystem handling
if data_format == "delta":
return self._read_delta_with_credentials()
# Use standard reader for other formats
reader = self._get_ray_reader(data_format)
return reader(self._table_url, **self.reader_kwargs)
reader_kwargs = self.reader_kwargs.copy()
if not ray.is_initialized():
ray.init(runtime_env=self._runtime_env, **self.ray_init_kwargs)
else:
# If Ray is already initialized, we need to pass the credentials
# via runtime_env to the read tasks.
ray_remote_args = reader_kwargs.get("ray_remote_args", {}).copy()
ray_remote_args["runtime_env"] = self._runtime_env
reader_kwargs["ray_remote_args"] = ray_remote_args
# Use special Delta reader for proper filesystem handling
if data_format == "delta":
return self._read_delta_with_credentials(reader_kwargs)
# Use standard reader for other formats
reader = self._get_ray_reader(data_format)
return reader(self._table_url, **reader_kwargs)

temp_file.close()
env_vars["GOOGLE_APPLICATION_CREDENTIALS"] = temp_file.name
self._gcp_temp_file = temp_file.name
atexit.register(self._cleanup_gcp_temp_file, temp_file.name)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Using atexit for cleanup is a good step, but it has limitations in a distributed environment. For instance, it won't trigger if the process is killed with SIGKILL, or if this code runs on a worker actor that fails. This could lead to temporary credential files being left on worker nodes.

For more robust cleanup in a distributed setting, consider leveraging Ray's actor lifecycle. For example, you could manage the temporary file within a dedicated actor and perform cleanup in its __del__ method or a custom shutdown hook.

Since these credentials are temporary, this isn't a critical security risk, but it's a good practice to ensure resource cleanup is as robust as possible.

Copy link
Member

@bveeramani bveeramani left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stamp

@aslonnie aslonnie merged commit 94f3589 into releases/2.51.0 Oct 23, 2025
3 of 5 checks passed
@aslonnie aslonnie deleted the pg-cherry-pick-brach branch October 23, 2025 17:22
weiquanlee pushed a commit to antgroup/ant-ray that referenced this pull request Dec 11, 2025
…ct#57954) (ray-project#58049)

## Description

This PR adds support for reading Unity Catalog Delta tables in Ray Data
with automatic credential vending. This enables secure, temporary access
to Delta Lake tables stored in Databricks Unity Catalog without
requiring users to manage cloud credentials manually.

### What's Added

- **`ray.data.read_unity_catalog()`** - Updated public API for reading
Unity Catalog Delta tables
- **`UnityCatalogConnector`** - Handles Unity Catalog REST API
integration and credential vending
- **Multi-cloud support** - Works with AWS S3, Azure Data Lake Storage,
and Google Cloud Storage
- **Automatic credential management** - Obtains temporary,
least-privilege credentials via Unity Catalog API
- **Delta Lake integration** - Properly configures PyArrow filesystem
for Delta tables with session tokens

### Key Features

✅ **Production-ready credential vending API** - Uses stable, public
Unity Catalog APIs
✅ **Secure by default** - Temporary credentials with automatic cleanup ✅
**Multi-cloud** - AWS (S3), Azure (Blob Storage), and GCP (Cloud
Storage)
✅ **Delta Lake optimized** - Handles session tokens and PyArrow
filesystem configuration
✅ **Comprehensive error handling** - Helpful messages for common issues
(deletion vectors, permissions, etc.)
✅ **Full logging support** - Debug and info logging throughout

### Usage Example

```python
import ray

# Read a Unity Catalog Delta table
ds = ray.data.read_unity_catalog(
    table="main.sales.transactions",
    url="https://dbc-XXXXXXX-XXXX.cloud.databricks.com",
    token="dapi...",
    region="us-west-2"  # Optional, for AWS
)

# Use standard Ray Data operations
ds = ds.filter(lambda row: row["amount"] > 100)
ds.show(5)
```

### Implementation Notes

This is a **simplified, focused implementation** that:
- Supports **Unity Catalog tables only** (no volumes - that's in private
preview)
- Assumes **Delta Lake format** (most common Unity Catalog use case)
- Uses **production-ready APIs** only (no private preview features)
- Provides ~600 lines of clean, reviewable code

The full implementation with volumes and multi-format support is
available in the `data_uc_volumes` branch and can be added in a future
PR once this foundation is reviewed.

### Testing

- ✅ All ruff lint checks pass
- ✅ Code formatted per Ray standards
- ✅ Tested with real Unity Catalog Delta tables on AWS S3
- ✅ Proper PyArrow filesystem configuration verified
- ✅ Credential vending flow validated

## Related issues

Related to Unity Catalog and Delta Lake support requests in Ray Data.

## Additional information

### Architecture

The implementation follows the **connector pattern** rather than a
`Datasource` subclass because Unity Catalog is a metadata/credential
layer, not a data format. The connector:

1. Fetches table metadata from Unity Catalog REST API
2. Obtains temporary credentials via credential vending API
3. Configures cloud-specific environment variables
4. Delegates to `ray.data.read_delta()` with proper filesystem
configuration

### Delta Lake Special Handling

Delta Lake on AWS requires explicit PyArrow S3FileSystem configuration
with session tokens (environment variables alone are insufficient). This
implementation correctly creates and passes the filesystem object to the
`deltalake` library.

### Cloud Provider Support

| Provider | Credential Type | Implementation |
|----------|----------------|----------------|
| AWS S3 | Temporary IAM credentials | PyArrow S3FileSystem with session
token |
| Azure Blob | SAS tokens | Environment variables
(AZURE_STORAGE_SAS_TOKEN) |
| GCP Cloud Storage | OAuth tokens / Service account | Environment
variables (GCP_OAUTH_TOKEN, GOOGLE_APPLICATION_CREDENTIALS) |

### Error Handling

Comprehensive error messages for common issues:
- **Deletion Vectors**: Guidance on upgrading deltalake library or
disabling the feature
- **Column Mapping**: Compatibility information and solutions
- **Permissions**: Clear list of required Unity Catalog permissions
- **Credential issues**: Detailed troubleshooting steps

### Future Enhancements

Potential follow-up PRs:
- Unity Catalog volumes support (when out of private preview)
- Multi-format support (Parquet, CSV, JSON, images, etc.)
- Custom datasource integration
- Advanced Delta Lake features (time travel, partition filters)

### Dependencies

- Requires `deltalake` package for Delta Lake support
- Uses standard Ray Data APIs (`read_delta`, `read_datasource`)
- Integrates with existing PyArrow filesystem infrastructure

### Documentation

- Full docstrings with examples
- Type hints throughout
- Inline comments with references to external documentation
- Comprehensive error messages with actionable guidance

---------

> Thank you for contributing to Ray! 🚀
> Please review the [Ray Contribution
Guide](https://docs.ray.io/en/master/ray-contribute/getting-involved.html)
before opening a pull request.

> ⚠️ Remove these instructions before submitting your PR.

> 💡 Tip: Mark as draft if you want early feedback, or ready for review
when it's complete.

## Description
> Briefly describe what this PR accomplishes and why it's needed.

## Related issues
> Link related issues: "Fixes ray-project#1234", "Closes ray-project#1234", or "Related to
ray-project#1234".

## Additional information
> Optional: Add implementation details, API changes, usage examples,
screenshots, etc.

Signed-off-by: soffer-anyscale <[email protected]>
Co-authored-by: soffer-anyscale <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants