feat: session-first DataFusion integration + session resolution policies#4145
Conversation
Signed-off-by: Ethan Urbanski <[email protected]>
Signed-off-by: Ethan Urbanski <[email protected]>
Signed-off-by: Ethan Urbanski <[email protected]>
Signed-off-by: Ethan Urbanski <[email protected]>
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #4145 +/- ##
==========================================
+ Coverage 76.24% 76.35% +0.11%
==========================================
Files 164 165 +1
Lines 45807 46151 +344
Branches 45807 46151 +344
==========================================
+ Hits 34924 35239 +315
- Misses 9210 9234 +24
- Partials 1673 1678 +5 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
|
I like the idea behind this, very cool. @roeap weren't we going to eventually do something with the datafusion session such that we might not need to do this much resolution on it? |
roeap
left a comment
There was a problem hiding this comment.
Just flushing some initial comments.
Really looking forward to getting our sessions straight, and this is a big step!
While writing #4138 I started wondering about something more fundamental. I.e. where should we depend on Session and where can we use SessionState and SessionContext?
IF we expose an API that can consume DataFrame, then we will also always have SessionState in our dependencies.
Folks who implement their own session would probably(?) not go through the top level api, but rather build plans using the lower level stuff like TableProvider.
If we end up in a place where our builders can produce logical plans they are then free to execute that using their session and we should not.
To be clear, we should still push put SessionState from the operations!
Just some thoughts that came up :).
| operation_id: Option<Uuid>, | ||
| ) -> DeltaResult<()> { | ||
| let url = log_store.root_url().as_object_store_url(); | ||
| if self.runtime_env().object_store(&url).is_err() { |
There was a problem hiding this comment.
You may be able to drop the ?Sized constraint by ...
| if self.runtime_env().object_store(&url).is_err() { | |
| if T::runtime_env(self).object_store(&url).is_err() { |
There was a problem hiding this comment.
Extension needs to work on &dyn Session (e.g. DeltaTable::update_datafusion_session), impl must support unsized receivers. UFCS is fine but doesn't remove the ?Sized requirement unless we add impl DeltaSessionExt for dyn DataFusionSession or change the shim to take concrete SessionState/SessionContext.
There was a problem hiding this comment.
let's just keep it like this :)
Signed-off-by: Ethan Urbanski <[email protected]>
Signed-off-by: Ethan Urbanski <[email protected]>
roeap
left a comment
There was a problem hiding this comment.
Coming along really nicely!
Just a few minor comments to maybe look at.
| let object_store_url = log_store.object_store_url(); | ||
| if self.runtime_env().object_store(&object_store_url).is_err() { | ||
| self.runtime_env() | ||
| .register_object_store(object_store_url.as_ref(), log_store.object_store(None)); |
There was a problem hiding this comment.
As you mention, the object_store method returns a pre-fixed object store that points to a path inside the root store and we create this cryptic URL. However this will never work properly if we have delta logs with fully qualified URLs (e.g. shallow clones). So I personally consider this a deprecated behaviour.
Not sure how to best do this, but I believe we should either directly deprecate this method (mark it as migration helper) or keep register_store around and deprecate that?
There was a problem hiding this comment.
only non-migrated operations should rely on this behavior. The new table provider does not make use of these internal / special.
Not sure if this already landed, but in my more experimental code, we are building the log store from the session (actually TaskContext) and pass it in our code. deep in the log handling, a scoped store is still convenient. so i.e.
session + table__url = log_store
| /// internal defaults. To make this strict (error instead), set | ||
| /// `with_session_fallback_policy(SessionFallbackPolicy::RequireSessionState)`. | ||
| /// | ||
| /// Example: `Arc::new(create_session().state())`. |
There was a problem hiding this comment.
seeing this made me wonder if create_session should just return the DF session? Do we even invoke any method from our session wrapper ever? I feel like we almost always immediately convert this to a DF session?
Signed-off-by: Ethan Urbanski <[email protected]>
Signed-off-by: Ethan Urbanski <[email protected]>
Description
Make delta-rs' DataFusion integration consistently honor caller provided sessions and introduce a session first API for registering Delta object stores.
Changes:
Session resolution:
resolve_session_state(...)&SessionFallbackPolicy(InternalDefaults/DeriveFromTrait/RequireSessionState)with_session_fallback_policy(...)to control strictnessSession first registration:
DeltaSessionExttrait:ensure_object_store_registered(...)ensure_log_store_registered(...)DeltaTable::update_datafusion_session(...)(shim kept for compatibility)Predicate parsing:
SessionStatesessions can preserve UDFs when configured viaDeriveFromTraitCompatibility:
SessionFallbackPolicy::InternalDefaultswarns but doesn't breakwith_session_fallback_policy(RequireSessionState)errors instead of falling backDeltaTable::update_datafusion_sessionremains but is deprecatedTests:
RequireSessionStatepath)deltalake-coreDataFusion test suite passes with--features datafusionRelated Issue(s)
Addresses:
Documentation
Follow ups:
RequireSessionState(breaking change)DeltaTable::update_datafusion_sessionafter deprecation window