[cherry-pick] [core] Fix deadlock when cancelling stale requests on in-order actors (#57746)#57768
Conversation
…n-order actors (ray-project#57746) Signed-off-by: dayshah <[email protected]>
There was a problem hiding this comment.
Code Review
This pull request aims to fix a deadlock when cancelling stale requests for in-order actors by replacing a mutex with an atomic boolean flag in TaskReceiver. While this change correctly addresses the deadlock potential, it appears to introduce a critical data race. The ActorSchedulingQueue, used for in-order actors, is not thread-safe. By removing the lock that previously serialized HandleTask and Stop methods in TaskReceiver, concurrent calls can now lead to unsafe access to the queue's internal data structures. I have added a critical review comment detailing this issue. The other changes, including the addition of new fault tolerance tests, are well-implemented.
| task_spec = TaskSpecification(std::move(*request.mutable_task_spec())); | ||
| if (stopping_) { | ||
| reply->set_was_cancelled_before_running(true); | ||
| if (task_spec.IsActorTask()) { | ||
| reply->set_worker_exiting(true); | ||
| } | ||
| send_reply_callback(Status::OK(), nullptr, nullptr); | ||
| return; | ||
| } | ||
|
|
||
| if (task_spec.IsActorCreationTask()) { | ||
| SetupActor(task_spec.IsAsyncioActor(), | ||
| task_spec.MaxActorConcurrency(), | ||
| task_spec.AllowOutOfOrderExecution()); | ||
| } | ||
| if (task_spec.IsActorCreationTask()) { | ||
| SetupActor(task_spec.IsAsyncioActor(), | ||
| task_spec.MaxActorConcurrency(), | ||
| task_spec.AllowOutOfOrderExecution()); | ||
| } | ||
|
|
||
| if (!task_spec.IsActorTask()) { | ||
| resource_ids = ResourceMappingType{}; | ||
| for (const auto &mapping : request.resource_mapping()) { | ||
| std::vector<std::pair<int64_t, double>> rids; | ||
| rids.reserve(mapping.resource_ids().size()); | ||
| for (const auto &ids : mapping.resource_ids()) { | ||
| rids.emplace_back(ids.index(), ids.quantity()); | ||
| } | ||
| (*resource_ids)[mapping.name()] = std::move(rids); | ||
| if (!task_spec.IsActorTask()) { | ||
| resource_ids = ResourceMappingType{}; | ||
| for (const auto &mapping : request.resource_mapping()) { | ||
| std::vector<std::pair<int64_t, double>> rids; | ||
| rids.reserve(mapping.resource_ids().size()); | ||
| for (const auto &ids : mapping.resource_ids()) { | ||
| rids.emplace_back(ids.index(), ids.quantity()); | ||
| } | ||
| (*resource_ids)[mapping.name()] = std::move(rids); | ||
| } | ||
| } | ||
|
|
||
| if (task_spec.IsActorTask()) { | ||
| auto it = actor_scheduling_queues_.find(task_spec.CallerWorkerId()); | ||
| if (it == actor_scheduling_queues_.end()) { | ||
| it = actor_scheduling_queues_ | ||
| .emplace( | ||
| task_spec.CallerWorkerId(), | ||
| allow_out_of_order_execution_ | ||
| ? std::unique_ptr<SchedulingQueue>( | ||
| std::make_unique<OutOfOrderActorSchedulingQueue>( | ||
| task_execution_service_, | ||
| waiter_, | ||
| task_event_buffer_, | ||
| pool_manager_, | ||
| fiber_state_manager_, | ||
| is_asyncio_, | ||
| fiber_max_concurrency_, | ||
| concurrency_groups_)) | ||
| : std::unique_ptr< | ||
| SchedulingQueue>(std::make_unique<ActorSchedulingQueue>( | ||
| task_execution_service_, | ||
| waiter_, | ||
| task_event_buffer_, | ||
| pool_manager_, | ||
| RayConfig::instance() | ||
| .actor_scheduling_queue_max_reorder_wait_seconds()))) | ||
| .first; | ||
| } | ||
|
|
||
| auto accept_callback = make_accept_callback(); | ||
| it->second->Add(request.sequence_number(), | ||
| request.client_processed_up_to(), | ||
| std::move(accept_callback), | ||
| std::move(cancel_callback), | ||
| std::move(send_reply_callback), | ||
| std::move(task_spec)); | ||
| } else { | ||
| RAY_LOG(DEBUG) << "Adding task " << task_spec.TaskId() | ||
| << " to normal scheduling task queue."; | ||
| auto accept_callback = make_accept_callback(); | ||
| normal_scheduling_queue_->Add(request.sequence_number(), | ||
| request.client_processed_up_to(), | ||
| std::move(accept_callback), | ||
| std::move(cancel_callback), | ||
| std::move(send_reply_callback), | ||
| std::move(task_spec)); | ||
| if (task_spec.IsActorTask()) { | ||
| auto it = actor_scheduling_queues_.find(task_spec.CallerWorkerId()); | ||
| if (it == actor_scheduling_queues_.end()) { | ||
| it = actor_scheduling_queues_ | ||
| .emplace( | ||
| task_spec.CallerWorkerId(), | ||
| allow_out_of_order_execution_ | ||
| ? std::unique_ptr<SchedulingQueue>( | ||
| std::make_unique<OutOfOrderActorSchedulingQueue>( | ||
| task_execution_service_, | ||
| waiter_, | ||
| task_event_buffer_, | ||
| pool_manager_, | ||
| fiber_state_manager_, | ||
| is_asyncio_, | ||
| fiber_max_concurrency_, | ||
| concurrency_groups_)) | ||
| : std::unique_ptr<SchedulingQueue>( | ||
| std::make_unique<ActorSchedulingQueue>( | ||
| task_execution_service_, | ||
| waiter_, | ||
| task_event_buffer_, | ||
| pool_manager_, | ||
| RayConfig::instance() | ||
| .actor_scheduling_queue_max_reorder_wait_seconds()))) | ||
| .first; | ||
| } | ||
|
|
||
| auto accept_callback = make_accept_callback(); | ||
| it->second->Add(request.sequence_number(), | ||
| request.client_processed_up_to(), | ||
| std::move(accept_callback), | ||
| std::move(cancel_callback), | ||
| std::move(send_reply_callback), | ||
| std::move(task_spec)); | ||
| } else { | ||
| RAY_LOG(DEBUG) << "Adding task " << task_spec.TaskId() | ||
| << " to normal scheduling task queue."; | ||
| auto accept_callback = make_accept_callback(); | ||
| normal_scheduling_queue_->Add(request.sequence_number(), | ||
| request.client_processed_up_to(), | ||
| std::move(accept_callback), | ||
| std::move(cancel_callback), | ||
| std::move(send_reply_callback), | ||
| std::move(task_spec)); | ||
| } | ||
| } |
There was a problem hiding this comment.
Removing the absl::MutexLock that was previously protecting this entire block introduces a potential data race. The TaskReceiver::Stop() method can be called concurrently with TaskReceiver::HandleTask() from a different thread.
HandleTask() calls scheduling_queue->Add(), and Stop() calls scheduling_queue->Stop(), which in turn calls CancelAllPending(). The ActorSchedulingQueue implementation is not thread-safe for concurrent calls to Add() and CancelAllPending(), as they both access and modify internal data structures (like pending_actor_tasks_) without synchronization. This can lead to memory corruption or crashes.
While OutOfOrderActorSchedulingQueue appears to be thread-safe due to its internal mutex, ActorSchedulingQueue (used for in-order actors) is not. Given the PR title mentions in-order actors, this seems particularly relevant.
To fix this, ActorSchedulingQueue should be made thread-safe, for example by adding a mutex to protect its internal state, similar to OutOfOrderActorSchedulingQueue.
| std::move(accept_callback), | ||
| std::move(cancel_callback), | ||
| std::move(send_reply_callback), | ||
| std::move(task_spec)); |
There was a problem hiding this comment.
Bug: Mutex Removal Causes TOCTOU Race Condition
Removing the mutex in HandleTask introduces a TOCTOU race condition. A task can be added to a scheduling queue after stopping_ is checked but before the task is fully enqueued. This allows Stop() to be called by another thread in between, resulting in tasks being added to already-stopped queues and violating shutdown semantics.
Description
Cherry picking #57746