[Data] Cherry-pick: Add enhanced support for Unity Catalog (#57954)#58049
[Data] Cherry-pick: Add enhanced support for Unity Catalog (#57954)#58049aslonnie merged 1 commit intoreleases/2.51.0from
Conversation
## Description
This PR adds support for reading Unity Catalog Delta tables in Ray Data
with automatic credential vending. This enables secure, temporary access
to Delta Lake tables stored in Databricks Unity Catalog without
requiring users to manage cloud credentials manually.
### What's Added
- **`ray.data.read_unity_catalog()`** - Updated public API for reading
Unity Catalog Delta tables
- **`UnityCatalogConnector`** - Handles Unity Catalog REST API
integration and credential vending
- **Multi-cloud support** - Works with AWS S3, Azure Data Lake Storage,
and Google Cloud Storage
- **Automatic credential management** - Obtains temporary,
least-privilege credentials via Unity Catalog API
- **Delta Lake integration** - Properly configures PyArrow filesystem
for Delta tables with session tokens
### Key Features
✅ **Production-ready credential vending API** - Uses stable, public
Unity Catalog APIs
✅ **Secure by default** - Temporary credentials with automatic cleanup
✅ **Multi-cloud** - AWS (S3), Azure (Blob Storage), and GCP (Cloud
Storage)
✅ **Delta Lake optimized** - Handles session tokens and PyArrow
filesystem configuration
✅ **Comprehensive error handling** - Helpful messages for common issues
(deletion vectors, permissions, etc.)
✅ **Full logging support** - Debug and info logging throughout
### Usage Example
```python
import ray
# Read a Unity Catalog Delta table
ds = ray.data.read_unity_catalog(
table="main.sales.transactions",
url="https://dbc-XXXXXXX-XXXX.cloud.databricks.com",
token="dapi...",
region="us-west-2" # Optional, for AWS
)
# Use standard Ray Data operations
ds = ds.filter(lambda row: row["amount"] > 100)
ds.show(5)
```
### Implementation Notes
This is a **simplified, focused implementation** that:
- Supports **Unity Catalog tables only** (no volumes - that's in private
preview)
- Assumes **Delta Lake format** (most common Unity Catalog use case)
- Uses **production-ready APIs** only (no private preview features)
- Provides ~600 lines of clean, reviewable code
The full implementation with volumes and multi-format support is
available in the `data_uc_volumes` branch and can be added in a future
PR once this foundation is reviewed.
### Testing
- ✅ All ruff lint checks pass
- ✅ Code formatted per Ray standards
- ✅ Tested with real Unity Catalog Delta tables on AWS S3
- ✅ Proper PyArrow filesystem configuration verified
- ✅ Credential vending flow validated
## Related issues
Related to Unity Catalog and Delta Lake support requests in Ray Data.
## Additional information
### Architecture
The implementation follows the **connector pattern** rather than a
`Datasource` subclass because Unity Catalog is a metadata/credential
layer, not a data format. The connector:
1. Fetches table metadata from Unity Catalog REST API
2. Obtains temporary credentials via credential vending API
3. Configures cloud-specific environment variables
4. Delegates to `ray.data.read_delta()` with proper filesystem
configuration
### Delta Lake Special Handling
Delta Lake on AWS requires explicit PyArrow S3FileSystem configuration
with session tokens (environment variables alone are insufficient). This
implementation correctly creates and passes the filesystem object to the
`deltalake` library.
### Cloud Provider Support
| Provider | Credential Type | Implementation |
|----------|----------------|----------------|
| AWS S3 | Temporary IAM credentials | PyArrow S3FileSystem with session
token |
| Azure Blob | SAS tokens | Environment variables
(AZURE_STORAGE_SAS_TOKEN) |
| GCP Cloud Storage | OAuth tokens / Service account | Environment
variables (GCP_OAUTH_TOKEN, GOOGLE_APPLICATION_CREDENTIALS) |
### Error Handling
Comprehensive error messages for common issues:
- **Deletion Vectors**: Guidance on upgrading deltalake library or
disabling the feature
- **Column Mapping**: Compatibility information and solutions
- **Permissions**: Clear list of required Unity Catalog permissions
- **Credential issues**: Detailed troubleshooting steps
### Future Enhancements
Potential follow-up PRs:
- Unity Catalog volumes support (when out of private preview)
- Multi-format support (Parquet, CSV, JSON, images, etc.)
- Custom datasource integration
- Advanced Delta Lake features (time travel, partition filters)
### Dependencies
- Requires `deltalake` package for Delta Lake support
- Uses standard Ray Data APIs (`read_delta`, `read_datasource`)
- Integrates with existing PyArrow filesystem infrastructure
### Documentation
- Full docstrings with examples
- Type hints throughout
- Inline comments with references to external documentation
- Comprehensive error messages with actionable guidance
---------
Signed-off-by: soffer-anyscale <[email protected]>
There was a problem hiding this comment.
Code Review
This pull request introduces enhanced support for reading from Databricks Unity Catalog, which is a great addition to Ray Data. The implementation is well-structured, using a UnityCatalogConnector to handle the logic of fetching metadata and vending credentials. The special handling for AWS credentials with Delta Lake is correctly implemented.
I've found a significant issue related to credential propagation for Azure and GCP when Ray is already initialized, which would cause reads to fail in a common scenario. I've provided detailed comments and code suggestions to address this.
Additionally, I've made a suggestion to improve the robustness of temporary file cleanup for GCP credentials in distributed environments.
Overall, this is a solid contribution with good error handling and clear documentation. Once the credential propagation issue is resolved, this will be a very valuable feature.
| def _read_delta_with_credentials(self): | ||
| """Read Delta table with proper PyArrow filesystem for session tokens.""" | ||
| import pyarrow.fs as pafs | ||
|
|
||
| creds = self._creds_response | ||
| reader_kwargs = self.reader_kwargs.copy() |
There was a problem hiding this comment.
To support the fix for credential propagation, this method needs to accept reader_kwargs as an argument and use it, instead of relying on self.reader_kwargs.
| def _read_delta_with_credentials(self): | |
| """Read Delta table with proper PyArrow filesystem for session tokens.""" | |
| import pyarrow.fs as pafs | |
| creds = self._creds_response | |
| reader_kwargs = self.reader_kwargs.copy() | |
| def _read_delta_with_credentials(self, reader_kwargs: dict): | |
| """Read Delta table with proper PyArrow filesystem for session tokens.""" | |
| import pyarrow.fs as pafs | |
| creds = self._creds_response | |
| reader_kwargs = reader_kwargs.copy() |
| if not ray.is_initialized(): | ||
| ray.init(runtime_env=self._runtime_env, **self.ray_init_kwargs) | ||
|
|
||
| url = self._table_url | ||
| ds = reader(url, **self.reader_kwargs) | ||
| return ds | ||
| # Use special Delta reader for proper filesystem handling | ||
| if data_format == "delta": | ||
| return self._read_delta_with_credentials() | ||
|
|
||
| # Use standard reader for other formats | ||
| reader = self._get_ray_reader(data_format) | ||
| return reader(self._table_url, **self.reader_kwargs) |
There was a problem hiding this comment.
There's an issue with credential propagation for Azure and GCP when Ray is already initialized. The current implementation sets environment variables for credentials and then configures self._runtime_env. This runtime_env is only used if ray.init() is called within this method. If Ray is already running, ray.init() is skipped, and the runtime_env with credentials is not passed to the Ray Data read tasks, causing reads to fail on remote workers for Azure and GCP. For AWS, this is not an issue because a filesystem object is explicitly created and passed.
To fix this, we should add the runtime_env to the ray_remote_args of the underlying read call if Ray is already initialized. This ensures credentials are available on worker processes. This change also requires updating _read_delta_with_credentials to accept the modified reader_kwargs, which I've suggested in a separate comment.
| if not ray.is_initialized(): | |
| ray.init(runtime_env=self._runtime_env, **self.ray_init_kwargs) | |
| url = self._table_url | |
| ds = reader(url, **self.reader_kwargs) | |
| return ds | |
| # Use special Delta reader for proper filesystem handling | |
| if data_format == "delta": | |
| return self._read_delta_with_credentials() | |
| # Use standard reader for other formats | |
| reader = self._get_ray_reader(data_format) | |
| return reader(self._table_url, **self.reader_kwargs) | |
| reader_kwargs = self.reader_kwargs.copy() | |
| if not ray.is_initialized(): | |
| ray.init(runtime_env=self._runtime_env, **self.ray_init_kwargs) | |
| else: | |
| # If Ray is already initialized, we need to pass the credentials | |
| # via runtime_env to the read tasks. | |
| ray_remote_args = reader_kwargs.get("ray_remote_args", {}).copy() | |
| ray_remote_args["runtime_env"] = self._runtime_env | |
| reader_kwargs["ray_remote_args"] = ray_remote_args | |
| # Use special Delta reader for proper filesystem handling | |
| if data_format == "delta": | |
| return self._read_delta_with_credentials(reader_kwargs) | |
| # Use standard reader for other formats | |
| reader = self._get_ray_reader(data_format) | |
| return reader(self._table_url, **reader_kwargs) |
| temp_file.close() | ||
| env_vars["GOOGLE_APPLICATION_CREDENTIALS"] = temp_file.name | ||
| self._gcp_temp_file = temp_file.name | ||
| atexit.register(self._cleanup_gcp_temp_file, temp_file.name) |
There was a problem hiding this comment.
Using atexit for cleanup is a good step, but it has limitations in a distributed environment. For instance, it won't trigger if the process is killed with SIGKILL, or if this code runs on a worker actor that fails. This could lead to temporary credential files being left on worker nodes.
For more robust cleanup in a distributed setting, consider leveraging Ray's actor lifecycle. For example, you could manage the temporary file within a dedicated actor and perform cleanup in its __del__ method or a custom shutdown hook.
Since these credentials are temporary, this isn't a critical security risk, but it's a good practice to ensure resource cleanup is as robust as possible.
…ct#57954) (ray-project#58049) ## Description This PR adds support for reading Unity Catalog Delta tables in Ray Data with automatic credential vending. This enables secure, temporary access to Delta Lake tables stored in Databricks Unity Catalog without requiring users to manage cloud credentials manually. ### What's Added - **`ray.data.read_unity_catalog()`** - Updated public API for reading Unity Catalog Delta tables - **`UnityCatalogConnector`** - Handles Unity Catalog REST API integration and credential vending - **Multi-cloud support** - Works with AWS S3, Azure Data Lake Storage, and Google Cloud Storage - **Automatic credential management** - Obtains temporary, least-privilege credentials via Unity Catalog API - **Delta Lake integration** - Properly configures PyArrow filesystem for Delta tables with session tokens ### Key Features ✅ **Production-ready credential vending API** - Uses stable, public Unity Catalog APIs ✅ **Secure by default** - Temporary credentials with automatic cleanup ✅ **Multi-cloud** - AWS (S3), Azure (Blob Storage), and GCP (Cloud Storage) ✅ **Delta Lake optimized** - Handles session tokens and PyArrow filesystem configuration ✅ **Comprehensive error handling** - Helpful messages for common issues (deletion vectors, permissions, etc.) ✅ **Full logging support** - Debug and info logging throughout ### Usage Example ```python import ray # Read a Unity Catalog Delta table ds = ray.data.read_unity_catalog( table="main.sales.transactions", url="https://dbc-XXXXXXX-XXXX.cloud.databricks.com", token="dapi...", region="us-west-2" # Optional, for AWS ) # Use standard Ray Data operations ds = ds.filter(lambda row: row["amount"] > 100) ds.show(5) ``` ### Implementation Notes This is a **simplified, focused implementation** that: - Supports **Unity Catalog tables only** (no volumes - that's in private preview) - Assumes **Delta Lake format** (most common Unity Catalog use case) - Uses **production-ready APIs** only (no private preview features) - Provides ~600 lines of clean, reviewable code The full implementation with volumes and multi-format support is available in the `data_uc_volumes` branch and can be added in a future PR once this foundation is reviewed. ### Testing - ✅ All ruff lint checks pass - ✅ Code formatted per Ray standards - ✅ Tested with real Unity Catalog Delta tables on AWS S3 - ✅ Proper PyArrow filesystem configuration verified - ✅ Credential vending flow validated ## Related issues Related to Unity Catalog and Delta Lake support requests in Ray Data. ## Additional information ### Architecture The implementation follows the **connector pattern** rather than a `Datasource` subclass because Unity Catalog is a metadata/credential layer, not a data format. The connector: 1. Fetches table metadata from Unity Catalog REST API 2. Obtains temporary credentials via credential vending API 3. Configures cloud-specific environment variables 4. Delegates to `ray.data.read_delta()` with proper filesystem configuration ### Delta Lake Special Handling Delta Lake on AWS requires explicit PyArrow S3FileSystem configuration with session tokens (environment variables alone are insufficient). This implementation correctly creates and passes the filesystem object to the `deltalake` library. ### Cloud Provider Support | Provider | Credential Type | Implementation | |----------|----------------|----------------| | AWS S3 | Temporary IAM credentials | PyArrow S3FileSystem with session token | | Azure Blob | SAS tokens | Environment variables (AZURE_STORAGE_SAS_TOKEN) | | GCP Cloud Storage | OAuth tokens / Service account | Environment variables (GCP_OAUTH_TOKEN, GOOGLE_APPLICATION_CREDENTIALS) | ### Error Handling Comprehensive error messages for common issues: - **Deletion Vectors**: Guidance on upgrading deltalake library or disabling the feature - **Column Mapping**: Compatibility information and solutions - **Permissions**: Clear list of required Unity Catalog permissions - **Credential issues**: Detailed troubleshooting steps ### Future Enhancements Potential follow-up PRs: - Unity Catalog volumes support (when out of private preview) - Multi-format support (Parquet, CSV, JSON, images, etc.) - Custom datasource integration - Advanced Delta Lake features (time travel, partition filters) ### Dependencies - Requires `deltalake` package for Delta Lake support - Uses standard Ray Data APIs (`read_delta`, `read_datasource`) - Integrates with existing PyArrow filesystem infrastructure ### Documentation - Full docstrings with examples - Type hints throughout - Inline comments with references to external documentation - Comprehensive error messages with actionable guidance --------- > Thank you for contributing to Ray! 🚀 > Please review the [Ray Contribution Guide](https://docs.ray.io/en/master/ray-contribute/getting-involved.html) before opening a pull request. >⚠️ Remove these instructions before submitting your PR. > 💡 Tip: Mark as draft if you want early feedback, or ready for review when it's complete. ## Description > Briefly describe what this PR accomplishes and why it's needed. ## Related issues > Link related issues: "Fixes ray-project#1234", "Closes ray-project#1234", or "Related to ray-project#1234". ## Additional information > Optional: Add implementation details, API changes, usage examples, screenshots, etc. Signed-off-by: soffer-anyscale <[email protected]> Co-authored-by: soffer-anyscale <[email protected]>
Description
This PR adds support for reading Unity Catalog Delta tables in Ray Data with automatic credential vending. This enables secure, temporary access to Delta Lake tables stored in Databricks Unity Catalog without requiring users to manage cloud credentials manually.
What's Added
ray.data.read_unity_catalog()- Updated public API for reading Unity Catalog Delta tablesUnityCatalogConnector- Handles Unity Catalog REST API integration and credential vendingKey Features
✅ Production-ready credential vending API - Uses stable, public Unity Catalog APIs
✅ Secure by default - Temporary credentials with automatic cleanup ✅ Multi-cloud - AWS (S3), Azure (Blob Storage), and GCP (Cloud Storage)
✅ Delta Lake optimized - Handles session tokens and PyArrow filesystem configuration
✅ Comprehensive error handling - Helpful messages for common issues (deletion vectors, permissions, etc.)
✅ Full logging support - Debug and info logging throughout
Usage Example
Implementation Notes
This is a simplified, focused implementation that:
The full implementation with volumes and multi-format support is available in the
data_uc_volumesbranch and can be added in a future PR once this foundation is reviewed.Testing
Related issues
Related to Unity Catalog and Delta Lake support requests in Ray Data.
Additional information
Architecture
The implementation follows the connector pattern rather than a
Datasourcesubclass because Unity Catalog is a metadata/credential layer, not a data format. The connector:ray.data.read_delta()with proper filesystem configurationDelta Lake Special Handling
Delta Lake on AWS requires explicit PyArrow S3FileSystem configuration with session tokens (environment variables alone are insufficient). This implementation correctly creates and passes the filesystem object to the
deltalakelibrary.Cloud Provider Support
Error Handling
Comprehensive error messages for common issues:
Future Enhancements
Potential follow-up PRs:
Dependencies
deltalakepackage for Delta Lake supportread_delta,read_datasource)Documentation
Description
Related issues
Additional information