Skip to content

[cherry-pick] [core] Fix deadlock when cancelling stale requests on in-order actors (#57746)#57768

Merged
aslonnie merged 1 commit intoray-project:releases/2.50.1from
dayshah:2.50.1-cherrypick
Oct 16, 2025
Merged

[cherry-pick] [core] Fix deadlock when cancelling stale requests on in-order actors (#57746)#57768
aslonnie merged 1 commit intoray-project:releases/2.50.1from
dayshah:2.50.1-cherrypick

Conversation

@dayshah
Copy link
Contributor

@dayshah dayshah commented Oct 15, 2025

Description

Cherry picking #57746

@dayshah dayshah added the go add ONLY when ready to merge, run all tests label Oct 15, 2025
@dayshah dayshah requested a review from a team as a code owner October 15, 2025 23:37
Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request aims to fix a deadlock when cancelling stale requests for in-order actors by replacing a mutex with an atomic boolean flag in TaskReceiver. While this change correctly addresses the deadlock potential, it appears to introduce a critical data race. The ActorSchedulingQueue, used for in-order actors, is not thread-safe. By removing the lock that previously serialized HandleTask and Stop methods in TaskReceiver, concurrent calls can now lead to unsafe access to the queue's internal data structures. I have added a critical review comment detailing this issue. The other changes, including the addition of new fault tolerance tests, are well-implemented.

Comment on lines +187 to 261
task_spec = TaskSpecification(std::move(*request.mutable_task_spec()));
if (stopping_) {
reply->set_was_cancelled_before_running(true);
if (task_spec.IsActorTask()) {
reply->set_worker_exiting(true);
}
send_reply_callback(Status::OK(), nullptr, nullptr);
return;
}

if (task_spec.IsActorCreationTask()) {
SetupActor(task_spec.IsAsyncioActor(),
task_spec.MaxActorConcurrency(),
task_spec.AllowOutOfOrderExecution());
}
if (task_spec.IsActorCreationTask()) {
SetupActor(task_spec.IsAsyncioActor(),
task_spec.MaxActorConcurrency(),
task_spec.AllowOutOfOrderExecution());
}

if (!task_spec.IsActorTask()) {
resource_ids = ResourceMappingType{};
for (const auto &mapping : request.resource_mapping()) {
std::vector<std::pair<int64_t, double>> rids;
rids.reserve(mapping.resource_ids().size());
for (const auto &ids : mapping.resource_ids()) {
rids.emplace_back(ids.index(), ids.quantity());
}
(*resource_ids)[mapping.name()] = std::move(rids);
if (!task_spec.IsActorTask()) {
resource_ids = ResourceMappingType{};
for (const auto &mapping : request.resource_mapping()) {
std::vector<std::pair<int64_t, double>> rids;
rids.reserve(mapping.resource_ids().size());
for (const auto &ids : mapping.resource_ids()) {
rids.emplace_back(ids.index(), ids.quantity());
}
(*resource_ids)[mapping.name()] = std::move(rids);
}
}

if (task_spec.IsActorTask()) {
auto it = actor_scheduling_queues_.find(task_spec.CallerWorkerId());
if (it == actor_scheduling_queues_.end()) {
it = actor_scheduling_queues_
.emplace(
task_spec.CallerWorkerId(),
allow_out_of_order_execution_
? std::unique_ptr<SchedulingQueue>(
std::make_unique<OutOfOrderActorSchedulingQueue>(
task_execution_service_,
waiter_,
task_event_buffer_,
pool_manager_,
fiber_state_manager_,
is_asyncio_,
fiber_max_concurrency_,
concurrency_groups_))
: std::unique_ptr<
SchedulingQueue>(std::make_unique<ActorSchedulingQueue>(
task_execution_service_,
waiter_,
task_event_buffer_,
pool_manager_,
RayConfig::instance()
.actor_scheduling_queue_max_reorder_wait_seconds())))
.first;
}

auto accept_callback = make_accept_callback();
it->second->Add(request.sequence_number(),
request.client_processed_up_to(),
std::move(accept_callback),
std::move(cancel_callback),
std::move(send_reply_callback),
std::move(task_spec));
} else {
RAY_LOG(DEBUG) << "Adding task " << task_spec.TaskId()
<< " to normal scheduling task queue.";
auto accept_callback = make_accept_callback();
normal_scheduling_queue_->Add(request.sequence_number(),
request.client_processed_up_to(),
std::move(accept_callback),
std::move(cancel_callback),
std::move(send_reply_callback),
std::move(task_spec));
if (task_spec.IsActorTask()) {
auto it = actor_scheduling_queues_.find(task_spec.CallerWorkerId());
if (it == actor_scheduling_queues_.end()) {
it = actor_scheduling_queues_
.emplace(
task_spec.CallerWorkerId(),
allow_out_of_order_execution_
? std::unique_ptr<SchedulingQueue>(
std::make_unique<OutOfOrderActorSchedulingQueue>(
task_execution_service_,
waiter_,
task_event_buffer_,
pool_manager_,
fiber_state_manager_,
is_asyncio_,
fiber_max_concurrency_,
concurrency_groups_))
: std::unique_ptr<SchedulingQueue>(
std::make_unique<ActorSchedulingQueue>(
task_execution_service_,
waiter_,
task_event_buffer_,
pool_manager_,
RayConfig::instance()
.actor_scheduling_queue_max_reorder_wait_seconds())))
.first;
}

auto accept_callback = make_accept_callback();
it->second->Add(request.sequence_number(),
request.client_processed_up_to(),
std::move(accept_callback),
std::move(cancel_callback),
std::move(send_reply_callback),
std::move(task_spec));
} else {
RAY_LOG(DEBUG) << "Adding task " << task_spec.TaskId()
<< " to normal scheduling task queue.";
auto accept_callback = make_accept_callback();
normal_scheduling_queue_->Add(request.sequence_number(),
request.client_processed_up_to(),
std::move(accept_callback),
std::move(cancel_callback),
std::move(send_reply_callback),
std::move(task_spec));
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

Removing the absl::MutexLock that was previously protecting this entire block introduces a potential data race. The TaskReceiver::Stop() method can be called concurrently with TaskReceiver::HandleTask() from a different thread.

HandleTask() calls scheduling_queue->Add(), and Stop() calls scheduling_queue->Stop(), which in turn calls CancelAllPending(). The ActorSchedulingQueue implementation is not thread-safe for concurrent calls to Add() and CancelAllPending(), as they both access and modify internal data structures (like pending_actor_tasks_) without synchronization. This can lead to memory corruption or crashes.

While OutOfOrderActorSchedulingQueue appears to be thread-safe due to its internal mutex, ActorSchedulingQueue (used for in-order actors) is not. Given the PR title mentions in-order actors, this seems particularly relevant.

To fix this, ActorSchedulingQueue should be made thread-safe, for example by adding a mutex to protect its internal state, similar to OutOfOrderActorSchedulingQueue.

std::move(accept_callback),
std::move(cancel_callback),
std::move(send_reply_callback),
std::move(task_spec));
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Mutex Removal Causes TOCTOU Race Condition

Removing the mutex in HandleTask introduces a TOCTOU race condition. A task can be added to a scheduling queue after stopping_ is checked but before the task is fully enqueued. This allows Stop() to be called by another thread in between, resulting in tasks being added to already-stopped queues and violating shutdown semantics.

Fix in Cursor Fix in Web

@ray-gardener ray-gardener bot added the core Issues that should be addressed in Ray Core label Oct 16, 2025
@aslonnie aslonnie merged commit 7cf6817 into ray-project:releases/2.50.1 Oct 16, 2025
4 of 5 checks passed
@dayshah dayshah deleted the 2.50.1-cherrypick branch October 16, 2025 01:46
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

core Issues that should be addressed in Ray Core go add ONLY when ready to merge, run all tests

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants