diff --git a/crates/vm/src/gc_state.rs b/crates/vm/src/gc_state.rs index 8e1c81e6201..4b6f3f6bd0b 100644 --- a/crates/vm/src/gc_state.rs +++ b/crates/vm/src/gc_state.rs @@ -3,12 +3,11 @@ //! This module implements CPython-compatible generational garbage collection //! for RustPython, using an intrusive doubly-linked list approach. -use crate::common::lock::PyMutex; +use crate::common::lock::{PyMutex, PyRwLock}; use crate::{AsObject, PyObject, PyObjectRef}; use core::ptr::NonNull; use core::sync::atomic::{AtomicBool, AtomicU32, AtomicUsize, Ordering}; use std::collections::HashSet; -use std::sync::{Mutex, RwLock}; bitflags::bitflags! { /// GC debug flags (see Include/internal/pycore_gc.h) @@ -85,6 +84,18 @@ impl GcGeneration { guard.collected += collected; guard.uncollectable += uncollectable; } + + /// Force-unlock the stats mutex after fork() in the child process. + /// + /// # Safety + /// Must only be called after fork() in the child process when no other + /// threads exist. + #[cfg(unix)] + unsafe fn force_unlock_stats_after_fork(&self) { + if self.stats.try_lock().is_none() { + unsafe { self.stats.force_unlock() }; + } + } } /// Wrapper for raw pointer to make it Send + Sync @@ -105,9 +116,9 @@ pub struct GcState { pub enabled: AtomicBool, /// Per-generation object tracking (for correct gc_refs algorithm) /// Objects start in gen0, survivors move to gen1, then gen2 - generation_objects: [RwLock>; 3], + generation_objects: [PyRwLock>; 3], /// Frozen/permanent objects (excluded from normal GC) - permanent_objects: RwLock>, + permanent_objects: PyRwLock>, /// Debug flags pub debug: AtomicU32, /// gc.garbage list (uncollectable objects with __del__) @@ -115,14 +126,14 @@ pub struct GcState { /// gc.callbacks list pub callbacks: PyMutex>, /// Mutex for collection (prevents concurrent collections) - collecting: Mutex<()>, + collecting: PyMutex<()>, /// Allocation counter for gen0 alloc_count: AtomicUsize, /// Registry of all tracked objects (for cycle detection) - tracked_objects: RwLock>, + tracked_objects: PyRwLock>, /// Objects that have been finalized (__del__ already called) /// Prevents calling __del__ multiple times on resurrected objects - finalized_objects: RwLock>, + finalized_objects: PyRwLock>, } // SAFETY: All fields are either inherently Send/Sync (atomics, RwLock, Mutex) or protected by PyMutex. @@ -148,18 +159,18 @@ impl GcState { permanent: GcGeneration::new(0), enabled: AtomicBool::new(true), generation_objects: [ - RwLock::new(HashSet::new()), - RwLock::new(HashSet::new()), - RwLock::new(HashSet::new()), + PyRwLock::new(HashSet::new()), + PyRwLock::new(HashSet::new()), + PyRwLock::new(HashSet::new()), ], - permanent_objects: RwLock::new(HashSet::new()), + permanent_objects: PyRwLock::new(HashSet::new()), debug: AtomicU32::new(0), garbage: PyMutex::new(Vec::new()), callbacks: PyMutex::new(Vec::new()), - collecting: Mutex::new(()), + collecting: PyMutex::new(()), alloc_count: AtomicUsize::new(0), - tracked_objects: RwLock::new(HashSet::new()), - finalized_objects: RwLock::new(HashSet::new()), + tracked_objects: PyRwLock::new(HashSet::new()), + finalized_objects: PyRwLock::new(HashSet::new()), } } @@ -240,17 +251,16 @@ impl GcState { // Add to generation 0 tracking first (for correct gc_refs algorithm) // Only increment count if we successfully add to the set - if let Ok(mut gen0) = self.generation_objects[0].write() - && gen0.insert(gc_ptr) { - self.generations[0].count.fetch_add(1, Ordering::SeqCst); - self.alloc_count.fetch_add(1, Ordering::SeqCst); + let mut gen0 = self.generation_objects[0].write(); + if gen0.insert(gc_ptr) { + self.generations[0].count.fetch_add(1, Ordering::SeqCst); + self.alloc_count.fetch_add(1, Ordering::SeqCst); + } } // Also add to global tracking (for get_objects, etc.) - if let Ok(mut tracked) = self.tracked_objects.write() { - tracked.insert(gc_ptr); - } + self.tracked_objects.write().insert(gc_ptr); } /// Untrack an object (remove from GC lists) @@ -263,9 +273,8 @@ impl GcState { // Remove from generation tracking lists and decrement the correct generation's count for (gen_idx, generation) in self.generation_objects.iter().enumerate() { - if let Ok(mut gen_set) = generation.write() - && gen_set.remove(&gc_ptr) - { + let mut gen_set = generation.write(); + if gen_set.remove(&gc_ptr) { // Decrement count for the generation we removed from let count = self.generations[gen_idx].count.load(Ordering::SeqCst); if count > 0 { @@ -278,42 +287,33 @@ impl GcState { } // Remove from global tracking - if let Ok(mut tracked) = self.tracked_objects.write() { - tracked.remove(&gc_ptr); - } + self.tracked_objects.write().remove(&gc_ptr); // Remove from permanent tracking - if let Ok(mut permanent) = self.permanent_objects.write() - && permanent.remove(&gc_ptr) { - let count = self.permanent.count.load(Ordering::SeqCst); - if count > 0 { - self.permanent.count.fetch_sub(1, Ordering::SeqCst); + let mut permanent = self.permanent_objects.write(); + if permanent.remove(&gc_ptr) { + let count = self.permanent.count.load(Ordering::SeqCst); + if count > 0 { + self.permanent.count.fetch_sub(1, Ordering::SeqCst); + } } } // Remove from finalized set - if let Ok(mut finalized) = self.finalized_objects.write() { - finalized.remove(&gc_ptr); - } + self.finalized_objects.write().remove(&gc_ptr); } /// Check if an object has been finalized pub fn is_finalized(&self, obj: NonNull) -> bool { let gc_ptr = GcObjectPtr(obj); - if let Ok(finalized) = self.finalized_objects.read() { - finalized.contains(&gc_ptr) - } else { - false - } + self.finalized_objects.read().contains(&gc_ptr) } /// Mark an object as finalized pub fn mark_finalized(&self, obj: NonNull) { let gc_ptr = GcObjectPtr(obj); - if let Ok(mut finalized) = self.finalized_objects.write() { - finalized.insert(gc_ptr); - } + self.finalized_objects.write().insert(gc_ptr); } /// Get tracked objects (for gc.get_objects) @@ -323,40 +323,34 @@ impl GcState { match generation { None => { // Return all tracked objects - if let Ok(tracked) = self.tracked_objects.read() { - tracked - .iter() - .filter_map(|ptr| { - let obj = unsafe { ptr.0.as_ref() }; - if obj.strong_count() > 0 { - Some(obj.to_owned()) - } else { - None - } - }) - .collect() - } else { - Vec::new() - } + self.tracked_objects + .read() + .iter() + .filter_map(|ptr| { + let obj = unsafe { ptr.0.as_ref() }; + if obj.strong_count() > 0 { + Some(obj.to_owned()) + } else { + None + } + }) + .collect() } Some(g) if (0..=2).contains(&g) => { // Return objects in specific generation let gen_idx = g as usize; - if let Ok(gen_set) = self.generation_objects[gen_idx].read() { - gen_set - .iter() - .filter_map(|ptr| { - let obj = unsafe { ptr.0.as_ref() }; - if obj.strong_count() > 0 { - Some(obj.to_owned()) - } else { - None - } - }) - .collect() - } else { - Vec::new() - } + self.generation_objects[gen_idx] + .read() + .iter() + .filter_map(|ptr| { + let obj = unsafe { ptr.0.as_ref() }; + if obj.strong_count() > 0 { + Some(obj.to_owned()) + } else { + None + } + }) + .collect() } _ => Vec::new(), } @@ -407,9 +401,8 @@ impl GcState { } // Try to acquire the collecting lock - let _guard = match self.collecting.try_lock() { - Ok(g) => g, - Err(_) => return (0, 0), + let Some(_guard) = self.collecting.try_lock() else { + return (0, 0); }; // Memory barrier to ensure visibility of all reference count updates @@ -423,7 +416,7 @@ impl GcState { // Hold read locks for the entire collection to prevent other threads // from untracking objects while we're iterating. let gen_locks: Vec<_> = (0..=generation) - .filter_map(|i| self.generation_objects[i].read().ok()) + .map(|i| self.generation_objects[i].read()) .collect(); let mut collecting: HashSet = HashSet::new(); @@ -597,18 +590,12 @@ impl GcState { // Skip objects that have already been finalized (prevents multiple __del__ calls) for obj_ref in &unreachable_refs { let ptr = GcObjectPtr(core::ptr::NonNull::from(obj_ref.as_ref())); - let already_finalized = if let Ok(finalized) = self.finalized_objects.read() { - finalized.contains(&ptr) - } else { - false - }; + let already_finalized = self.finalized_objects.read().contains(&ptr); if !already_finalized { // Mark as finalized BEFORE calling __del__ // This ensures is_finalized() returns True inside __del__ - if let Ok(mut finalized) = self.finalized_objects.write() { - finalized.insert(ptr); - } + self.finalized_objects.write().insert(ptr); obj_ref.try_call_finalizer(); } } @@ -733,9 +720,8 @@ impl GcState { for &ptr in survivors { // Remove from current generation for gen_idx in 0..=from_gen { - if let Ok(mut gen_set) = self.generation_objects[gen_idx].write() - && gen_set.remove(&ptr) - { + let mut gen_set = self.generation_objects[gen_idx].write(); + if gen_set.remove(&ptr) { // Decrement count for source generation let count = self.generations[gen_idx].count.load(Ordering::SeqCst); if count > 0 { @@ -744,10 +730,12 @@ impl GcState { .fetch_sub(1, Ordering::SeqCst); } + // Release before acquiring next lock + drop(gen_set); + // Add to next generation - if let Ok(mut next_set) = self.generation_objects[next_gen].write() - && next_set.insert(ptr) - { + let mut next_set = self.generation_objects[next_gen].write(); + if next_set.insert(ptr) { // Increment count for target generation self.generations[next_gen] .count @@ -770,39 +758,100 @@ impl GcState { let mut objects_to_freeze: Vec = Vec::new(); for (gen_idx, generation) in self.generation_objects.iter().enumerate() { - if let Ok(mut gen_set) = generation.write() { - objects_to_freeze.extend(gen_set.drain()); - self.generations[gen_idx].count.store(0, Ordering::SeqCst); - } + let mut gen_set = generation.write(); + objects_to_freeze.extend(gen_set.drain()); + self.generations[gen_idx].count.store(0, Ordering::SeqCst); } // Add to permanent set - if let Ok(mut permanent) = self.permanent_objects.write() { - let count = objects_to_freeze.len(); - for ptr in objects_to_freeze { - permanent.insert(ptr); - } - self.permanent.count.fetch_add(count, Ordering::SeqCst); + let mut permanent = self.permanent_objects.write(); + let count = objects_to_freeze.len(); + for ptr in objects_to_freeze { + permanent.insert(ptr); } + self.permanent.count.fetch_add(count, Ordering::SeqCst); } /// Unfreeze all objects (move from permanent to gen2) pub fn unfreeze(&self) { let mut objects_to_unfreeze: Vec = Vec::new(); - if let Ok(mut permanent) = self.permanent_objects.write() { + { + let mut permanent = self.permanent_objects.write(); objects_to_unfreeze.extend(permanent.drain()); self.permanent.count.store(0, Ordering::SeqCst); } // Add to generation 2 - if let Ok(mut gen2) = self.generation_objects[2].write() { - let count = objects_to_unfreeze.len(); - for ptr in objects_to_unfreeze { - gen2.insert(ptr); + let mut gen2 = self.generation_objects[2].write(); + let count = objects_to_unfreeze.len(); + for ptr in objects_to_unfreeze { + gen2.insert(ptr); + } + self.generations[2].count.fetch_add(count, Ordering::SeqCst); + } + + /// Force-unlock all locks after fork() in the child process. + /// + /// After fork(), only the forking thread survives. Any lock held by another + /// thread is permanently stuck. This method releases all such stuck locks. + /// + /// # Safety + /// Must only be called after fork() in the child process when no other + /// threads exist. The calling thread must NOT hold any of these locks. + #[cfg(unix)] + pub unsafe fn force_unlock_after_fork(&self) { + // Force-unlock the collecting mutex + if self.collecting.try_lock().is_none() { + unsafe { self.collecting.force_unlock() }; + } + + // Force-unlock garbage and callbacks mutexes + if self.garbage.try_lock().is_none() { + unsafe { self.garbage.force_unlock() }; + } + if self.callbacks.try_lock().is_none() { + unsafe { self.callbacks.force_unlock() }; + } + + // Force-unlock generation stats mutexes + for generation in &self.generations { + unsafe { generation.force_unlock_stats_after_fork() }; + } + unsafe { self.permanent.force_unlock_stats_after_fork() }; + + // Force-unlock RwLocks + for rw in &self.generation_objects { + unsafe { force_unlock_rwlock_after_fork(rw) }; + } + unsafe { force_unlock_rwlock_after_fork(&self.permanent_objects) }; + unsafe { force_unlock_rwlock_after_fork(&self.tracked_objects) }; + unsafe { force_unlock_rwlock_after_fork(&self.finalized_objects) }; + } +} + +/// Force-unlock a PyRwLock after fork() in the child process. +/// +/// # Safety +/// Must only be called after fork() in the child process when no other +/// threads exist. The calling thread must NOT hold this lock. +#[cfg(unix)] +unsafe fn force_unlock_rwlock_after_fork(lock: &PyRwLock) { + if lock.try_write().is_some() { + return; + } + let is_shared = lock.try_read().is_some(); + if is_shared { + loop { + // SAFETY: Lock is shared-locked by dead thread(s). + unsafe { lock.force_unlock_read() }; + if lock.try_write().is_some() { + return; } - self.generations[2].count.fetch_add(count, Ordering::SeqCst); } + } else { + // SAFETY: Lock is exclusively locked by a dead thread. + unsafe { lock.force_unlock_write() }; } } diff --git a/crates/vm/src/stdlib/posix.rs b/crates/vm/src/stdlib/posix.rs index c13bfb51f12..fcec8298d13 100644 --- a/crates/vm/src/stdlib/posix.rs +++ b/crates/vm/src/stdlib/posix.rs @@ -721,6 +721,7 @@ pub mod module { force_unlock_mutex_after_fork(&vm.state.after_forkers_parent); force_unlock_mutex_after_fork(&vm.state.global_trace_func); force_unlock_mutex_after_fork(&vm.state.global_profile_func); + crate::gc_state::gc_state().force_unlock_after_fork(); } // Mark all other threads as done before running Python callbacks