diff --git a/crates/common/src/lock.rs b/crates/common/src/lock.rs index 8317184027d..74679bd9d8f 100644 --- a/crates/common/src/lock.rs +++ b/crates/common/src/lock.rs @@ -61,8 +61,9 @@ pub type PyMappedRwLockWriteGuard<'a, T> = MappedRwLockWriteGuard<'a, RawRwLock, /// Reset a `PyMutex` to its initial (unlocked) state after `fork()`. /// /// After `fork()`, locks held by dead parent threads would deadlock in the -/// child. This zeroes the raw lock bytes directly, bypassing the normal unlock -/// path which may interact with parking_lot's internal waiter queues. +/// child. This writes `RawMutex::INIT` via the `Mutex::raw()` accessor, +/// bypassing the normal unlock path which may interact with parking_lot's +/// internal waiter queues. /// /// # Safety /// @@ -70,11 +71,28 @@ pub type PyMappedRwLockWriteGuard<'a, T> = MappedRwLockWriteGuard<'a, RawRwLock, /// after `fork()`, before any other thread is created. #[cfg(unix)] pub unsafe fn reinit_mutex_after_fork(mutex: &PyMutex) { - // lock_api::Mutex layout: raw R at offset 0, then UnsafeCell. - // Zeroing R resets to unlocked for both parking_lot::RawMutex (AtomicU8) - // and RawCellMutex (Cell). + // Use Mutex::raw() to access the underlying lock without layout assumptions. + // parking_lot::RawMutex (AtomicU8) and RawCellMutex (Cell) both + // represent the unlocked state as all-zero bytes. unsafe { - let ptr = mutex as *const PyMutex as *mut u8; - core::ptr::write_bytes(ptr, 0, core::mem::size_of::()); + let raw = mutex.raw() as *const RawMutex as *mut u8; + core::ptr::write_bytes(raw, 0, core::mem::size_of::()); + } +} + +/// Reset a `PyRwLock` to its initial (unlocked) state after `fork()`. +/// +/// Same rationale as [`reinit_mutex_after_fork`] — dead threads' read or +/// write locks would cause permanent deadlock in the child. +/// +/// # Safety +/// +/// Must only be called from the single-threaded child process immediately +/// after `fork()`, before any other thread is created. +#[cfg(unix)] +pub unsafe fn reinit_rwlock_after_fork(rwlock: &PyRwLock) { + unsafe { + let raw = rwlock.raw() as *const RawRwLock as *mut u8; + core::ptr::write_bytes(raw, 0, core::mem::size_of::()); } } diff --git a/crates/vm/src/codecs.rs b/crates/vm/src/codecs.rs index 94b4b67e33c..87c3a4a0a9d 100644 --- a/crates/vm/src/codecs.rs +++ b/crates/vm/src/codecs.rs @@ -153,29 +153,14 @@ impl ToPyObject for PyCodec { } impl CodecsRegistry { - /// Force-unlock the inner RwLock after fork in the child process. + /// Reset the inner RwLock to unlocked state after fork(). /// /// # Safety /// Must only be called after fork() in the child process when no other - /// threads exist. The calling thread must NOT hold this lock. - #[cfg(all(unix, feature = "host_env"))] - pub(crate) unsafe fn force_unlock_after_fork(&self) { - if self.inner.try_write().is_some() { - return; - } - let is_shared = self.inner.try_read().is_some(); - if is_shared { - loop { - // SAFETY: Lock is shared-locked by dead thread(s). - unsafe { self.inner.force_unlock_read() }; - if self.inner.try_write().is_some() { - return; - } - } - } else { - // SAFETY: Lock is exclusively locked by a dead thread. - unsafe { self.inner.force_unlock_write() }; - } + /// threads exist. + #[cfg(all(unix, feature = "threading"))] + pub(crate) unsafe fn reinit_after_fork(&self) { + unsafe { crate::common::lock::reinit_rwlock_after_fork(&self.inner) }; } pub(crate) fn new(ctx: &Context) -> Self { diff --git a/crates/vm/src/gc_state.rs b/crates/vm/src/gc_state.rs index 4b6f3f6bd0b..a54efb424c7 100644 --- a/crates/vm/src/gc_state.rs +++ b/crates/vm/src/gc_state.rs @@ -85,16 +85,14 @@ impl GcGeneration { guard.uncollectable += uncollectable; } - /// Force-unlock the stats mutex after fork() in the child process. + /// Reset the stats mutex to unlocked state after fork(). /// /// # Safety /// Must only be called after fork() in the child process when no other /// threads exist. - #[cfg(unix)] - unsafe fn force_unlock_stats_after_fork(&self) { - if self.stats.try_lock().is_none() { - unsafe { self.stats.force_unlock() }; - } + #[cfg(all(unix, feature = "threading"))] + unsafe fn reinit_stats_after_fork(&self) { + unsafe { crate::common::lock::reinit_mutex_after_fork(&self.stats) }; } } @@ -793,65 +791,35 @@ impl GcState { /// Force-unlock all locks after fork() in the child process. /// + /// Reset all locks to unlocked state after fork(). + /// /// After fork(), only the forking thread survives. Any lock held by another - /// thread is permanently stuck. This method releases all such stuck locks. + /// thread is permanently stuck. This resets them by zeroing the raw bytes. /// /// # Safety /// Must only be called after fork() in the child process when no other /// threads exist. The calling thread must NOT hold any of these locks. - #[cfg(unix)] - pub unsafe fn force_unlock_after_fork(&self) { - // Force-unlock the collecting mutex - if self.collecting.try_lock().is_none() { - unsafe { self.collecting.force_unlock() }; - } + #[cfg(all(unix, feature = "threading"))] + pub unsafe fn reinit_after_fork(&self) { + use crate::common::lock::{reinit_mutex_after_fork, reinit_rwlock_after_fork}; - // Force-unlock garbage and callbacks mutexes - if self.garbage.try_lock().is_none() { - unsafe { self.garbage.force_unlock() }; - } - if self.callbacks.try_lock().is_none() { - unsafe { self.callbacks.force_unlock() }; - } + unsafe { + reinit_mutex_after_fork(&self.collecting); + reinit_mutex_after_fork(&self.garbage); + reinit_mutex_after_fork(&self.callbacks); - // Force-unlock generation stats mutexes - for generation in &self.generations { - unsafe { generation.force_unlock_stats_after_fork() }; - } - unsafe { self.permanent.force_unlock_stats_after_fork() }; - - // Force-unlock RwLocks - for rw in &self.generation_objects { - unsafe { force_unlock_rwlock_after_fork(rw) }; - } - unsafe { force_unlock_rwlock_after_fork(&self.permanent_objects) }; - unsafe { force_unlock_rwlock_after_fork(&self.tracked_objects) }; - unsafe { force_unlock_rwlock_after_fork(&self.finalized_objects) }; - } -} + for generation in &self.generations { + generation.reinit_stats_after_fork(); + } + self.permanent.reinit_stats_after_fork(); -/// Force-unlock a PyRwLock after fork() in the child process. -/// -/// # Safety -/// Must only be called after fork() in the child process when no other -/// threads exist. The calling thread must NOT hold this lock. -#[cfg(unix)] -unsafe fn force_unlock_rwlock_after_fork(lock: &PyRwLock) { - if lock.try_write().is_some() { - return; - } - let is_shared = lock.try_read().is_some(); - if is_shared { - loop { - // SAFETY: Lock is shared-locked by dead thread(s). - unsafe { lock.force_unlock_read() }; - if lock.try_write().is_some() { - return; + for rw in &self.generation_objects { + reinit_rwlock_after_fork(rw); } + reinit_rwlock_after_fork(&self.permanent_objects); + reinit_rwlock_after_fork(&self.tracked_objects); + reinit_rwlock_after_fork(&self.finalized_objects); } - } else { - // SAFETY: Lock is exclusively locked by a dead thread. - unsafe { lock.force_unlock_write() }; } } diff --git a/crates/vm/src/intern.rs b/crates/vm/src/intern.rs index afc30e7ac04..37b971b8dca 100644 --- a/crates/vm/src/intern.rs +++ b/crates/vm/src/intern.rs @@ -31,30 +31,14 @@ impl Clone for StringPool { } impl StringPool { - /// Force-unlock the inner RwLock after fork in the child process. + /// Reset the inner RwLock to unlocked state after fork(). /// /// # Safety /// Must only be called after fork() in the child process when no other - /// threads exist. The calling thread must NOT hold this lock. - #[cfg(all(unix, feature = "host_env"))] - pub(crate) unsafe fn force_unlock_after_fork(&self) { - if self.inner.try_write().is_some() { - return; - } - // Lock is stuck from a thread that no longer exists. - let is_shared = self.inner.try_read().is_some(); - if is_shared { - loop { - // SAFETY: Lock is shared-locked by dead thread(s). - unsafe { self.inner.force_unlock_read() }; - if self.inner.try_write().is_some() { - return; - } - } - } else { - // SAFETY: Lock is exclusively locked by a dead thread. - unsafe { self.inner.force_unlock_write() }; - } + /// threads exist. + #[cfg(all(unix, feature = "threading"))] + pub(crate) unsafe fn reinit_after_fork(&self) { + unsafe { crate::common::lock::reinit_rwlock_after_fork(&self.inner) }; } #[inline] diff --git a/crates/vm/src/stdlib/posix.rs b/crates/vm/src/stdlib/posix.rs index 87c4fa5c845..7076e42b9a5 100644 --- a/crates/vm/src/stdlib/posix.rs +++ b/crates/vm/src/stdlib/posix.rs @@ -713,8 +713,13 @@ pub mod module { } fn py_os_after_fork_child(vm: &VirtualMachine) { - // Reset low-level state before any Python code runs in the child. - // Signal triggers from the parent must not fire in the child. + // Phase 1: Reset all internal locks FIRST. + // After fork(), locks held by dead parent threads would deadlock + // if we try to acquire them. This must happen before anything else. + #[cfg(feature = "threading")] + reinit_locks_after_fork(vm); + + // Phase 2: Reset low-level atomic state (no locks needed). crate::signal::clear_after_fork(); crate::stdlib::signal::_signal::clear_wakeup_fd_after_fork(); @@ -722,28 +727,8 @@ pub mod module { #[cfg(feature = "threading")] crate::object::reset_weakref_locks_after_fork(); - // Force-unlock all global VM locks that may have been held by - // threads that no longer exist in the child process after fork. - // SAFETY: After fork, only the forking thread survives. Any lock - // held by another thread is permanently stuck. The forking thread - // does not hold these locks during fork() (a high-level Python op). - unsafe { - vm.ctx.string_pool.force_unlock_after_fork(); - vm.state.codec_registry.force_unlock_after_fork(); - force_unlock_mutex_after_fork(&vm.state.atexit_funcs); - force_unlock_mutex_after_fork(&vm.state.before_forkers); - force_unlock_mutex_after_fork(&vm.state.after_forkers_child); - force_unlock_mutex_after_fork(&vm.state.after_forkers_parent); - force_unlock_mutex_after_fork(&vm.state.global_trace_func); - force_unlock_mutex_after_fork(&vm.state.global_profile_func); - crate::gc_state::gc_state().force_unlock_after_fork(); - - // Import lock (ReentrantMutex) — was previously not reinit'd - #[cfg(feature = "threading")] - crate::stdlib::imp::reinit_imp_lock_after_fork(); - } - - // Mark all other threads as done before running Python callbacks + // Phase 3: Clean up thread state. Locks are now reinit'd so we can + // acquire them normally instead of using try_lock(). #[cfg(feature = "threading")] crate::stdlib::thread::after_fork_child(vm); @@ -752,18 +737,45 @@ pub mod module { vm.signal_handlers .get_or_init(crate::signal::new_signal_handlers); + // Phase 4: Run Python-level at-fork callbacks. let after_forkers_child: Vec = vm.state.after_forkers_child.lock().clone(); run_at_forkers(after_forkers_child, false, vm); } - /// Force-unlock a PyMutex if held by a dead thread after fork. + /// Reset all parking_lot-based locks in the interpreter state after fork(). /// - /// # Safety - /// Must only be called after fork() in the child process. - unsafe fn force_unlock_mutex_after_fork(mutex: &crate::common::lock::PyMutex) { - if mutex.try_lock().is_none() { - // SAFETY: Lock is held by a dead thread after fork. - unsafe { mutex.force_unlock() }; + /// After fork(), only the calling thread survives. Any locks held by other + /// (now-dead) threads would cause deadlocks. We unconditionally reset them + /// to unlocked by zeroing the raw lock bytes. + #[cfg(all(unix, feature = "threading"))] + fn reinit_locks_after_fork(vm: &VirtualMachine) { + use rustpython_common::lock::reinit_mutex_after_fork; + + unsafe { + // PyGlobalState PyMutex locks + reinit_mutex_after_fork(&vm.state.before_forkers); + reinit_mutex_after_fork(&vm.state.after_forkers_child); + reinit_mutex_after_fork(&vm.state.after_forkers_parent); + reinit_mutex_after_fork(&vm.state.atexit_funcs); + reinit_mutex_after_fork(&vm.state.global_trace_func); + reinit_mutex_after_fork(&vm.state.global_profile_func); + + // PyGlobalState parking_lot::Mutex locks + reinit_mutex_after_fork(&vm.state.thread_frames); + reinit_mutex_after_fork(&vm.state.thread_handles); + reinit_mutex_after_fork(&vm.state.shutdown_handles); + + // Context-level RwLock + vm.ctx.string_pool.reinit_after_fork(); + + // Codec registry RwLock + vm.state.codec_registry.reinit_after_fork(); + + // GC state (multiple Mutex + RwLock) + crate::gc_state::gc_state().reinit_after_fork(); + + // Import lock (RawReentrantMutex) + crate::stdlib::imp::reinit_imp_lock_after_fork(); } } diff --git a/crates/vm/src/stdlib/thread.rs b/crates/vm/src/stdlib/thread.rs index 7df37c145d9..00068705ada 100644 --- a/crates/vm/src/stdlib/thread.rs +++ b/crates/vm/src/stdlib/thread.rs @@ -897,6 +897,9 @@ pub(crate) mod _thread { /// Called after fork() in child process to mark all other threads as done. /// This prevents join() from hanging on threads that don't exist in the child. + /// + /// Precondition: `reinit_locks_after_fork()` has already been called, so all + /// parking_lot-based locks in VmState are in unlocked state. #[cfg(unix)] pub fn after_fork_child(vm: &VirtualMachine) { let current_ident = get_ident(); @@ -904,31 +907,27 @@ pub(crate) mod _thread { // Update main thread ident - after fork, the current thread becomes the main thread vm.state.main_thread_ident.store(current_ident); - // Reinitialize frame slot for current thread + // Reinitialize frame slot for current thread. + // Locks are already reinit'd, so lock() is safe. crate::vm::thread::reinit_frame_slot_after_fork(vm); - // Clean up thread handles. Force-unlock if held by a dead thread. - // SAFETY: After fork, only the current thread exists. - if vm.state.thread_handles.try_lock().is_none() { - unsafe { vm.state.thread_handles.force_unlock() }; - } + // Clean up thread handles. All VmState locks were reinit'd to unlocked, + // so lock() won't deadlock. Per-thread Arc> + // locks are also reinit'd below before use. { let mut handles = vm.state.thread_handles.lock(); - // Clean up dead weak refs and mark non-current threads as done handles.retain(|(inner_weak, done_event_weak): &HandleEntry| { let Some(inner) = inner_weak.upgrade() else { - return false; // Remove dead entries + return false; }; let Some(done_event) = done_event_weak.upgrade() else { return false; }; - // Try to lock the inner state - skip if we can't - let Some(mut inner_guard) = inner.try_lock() else { - return false; - }; + // Reinit this per-handle lock in case a dead thread held it + reinit_parking_lot_mutex(&inner); + let mut inner_guard = inner.lock(); - // Skip current thread and not-started threads if inner_guard.ident == current_ident { return true; } @@ -936,70 +935,63 @@ pub(crate) mod _thread { return true; } - // Mark as done and notify waiters inner_guard.state = ThreadHandleState::Done; - inner_guard.join_handle = None; // Can't join OS thread from child + inner_guard.join_handle = None; drop(inner_guard); - // Try to notify waiters - skip if we can't acquire the lock + // Reinit and set the done event let (lock, cvar) = &*done_event; - if let Some(mut done) = lock.try_lock() { - *done = true; - cvar.notify_all(); - } + reinit_parking_lot_mutex(lock); + *lock.lock() = true; + cvar.notify_all(); true }); } - // Clean up shutdown_handles. Force-unlock if held by a dead thread. - // SAFETY: After fork, only the current thread exists. - if vm.state.shutdown_handles.try_lock().is_none() { - unsafe { vm.state.shutdown_handles.force_unlock() }; - } + // Clean up shutdown_handles. { let mut handles = vm.state.shutdown_handles.lock(); - // Mark all non-current threads as done in shutdown_handles handles.retain(|(inner_weak, done_event_weak): &ShutdownEntry| { let Some(inner) = inner_weak.upgrade() else { - return false; // Remove dead entries + return false; }; let Some(done_event) = done_event_weak.upgrade() else { return false; }; - // Try to lock the inner state - skip if we can't - let Some(mut inner_guard) = inner.try_lock() else { - return false; - }; + reinit_parking_lot_mutex(&inner); + let mut inner_guard = inner.lock(); - // Skip current thread if inner_guard.ident == current_ident { return true; } - - // Keep handles for threads that have not been started yet. - // They are safe to start in the child process. if inner_guard.state == ThreadHandleState::NotStarted { return true; } - // Mark as done so _shutdown() won't wait on it inner_guard.state = ThreadHandleState::Done; drop(inner_guard); - // Notify waiters let (lock, cvar) = &*done_event; - if let Some(mut done) = lock.try_lock() { - *done = true; - cvar.notify_all(); - } + reinit_parking_lot_mutex(lock); + *lock.lock() = true; + cvar.notify_all(); - false // Remove from shutdown_handles - these threads don't exist in child + false }); } } + /// Reset a parking_lot::Mutex to unlocked state after fork. + #[cfg(unix)] + fn reinit_parking_lot_mutex(mutex: &parking_lot::Mutex) { + unsafe { + let ptr = mutex as *const parking_lot::Mutex as *mut u8; + core::ptr::write_bytes(ptr, 0, core::mem::size_of::()); + } + } + // Thread handle state enum #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum ThreadHandleState { diff --git a/crates/vm/src/vm/thread.rs b/crates/vm/src/vm/thread.rs index e7c5da63037..c164883ddd9 100644 --- a/crates/vm/src/vm/thread.rs +++ b/crates/vm/src/vm/thread.rs @@ -169,6 +169,9 @@ pub fn cleanup_current_thread_frames(vm: &VirtualMachine) { /// Reinitialize thread slot after fork. Called in child process. /// Creates a fresh slot and registers it for the current thread, /// preserving the current thread's frames from `vm.frames`. +/// +/// Precondition: `reinit_locks_after_fork()` has already reset all +/// VmState locks to unlocked. #[cfg(feature = "threading")] pub fn reinit_frame_slot_after_fork(vm: &VirtualMachine) { let current_ident = crate::stdlib::thread::get_ident(); @@ -178,17 +181,8 @@ pub fn reinit_frame_slot_after_fork(vm: &VirtualMachine) { exception: crate::PyAtomicRef::from(vm.topmost_exception()), }); - // After fork, only the current thread exists. If the lock was held by - // another thread during fork, force unlock it. - let mut registry = match vm.state.thread_frames.try_lock() { - Some(guard) => guard, - None => { - // SAFETY: After fork in child process, only the current thread - // exists. The lock holder no longer exists. - unsafe { vm.state.thread_frames.force_unlock() }; - vm.state.thread_frames.lock() - } - }; + // Lock is safe: reinit_locks_after_fork() already reset it to unlocked. + let mut registry = vm.state.thread_frames.lock(); registry.clear(); registry.insert(current_ident, new_slot.clone()); drop(registry);