Skip to content

Change object storage cluster table functions to prefer specific replicas to improve cache locality#77326

Merged
nickitat merged 19 commits intoClickHouse:masterfrom
BetterStackHQ:ah/stable-s3-filesystem-cache
Apr 25, 2025
Merged

Change object storage cluster table functions to prefer specific replicas to improve cache locality#77326
nickitat merged 19 commits intoClickHouse:masterfrom
BetterStackHQ:ah/stable-s3-filesystem-cache

Conversation

@adikus
Copy link
Contributor

@adikus adikus commented Mar 7, 2025

Changelog category (leave one):

  • Improvement

Changelog entry (a user-readable short description of the changes that goes to CHANGELOG.md):

Object storage cluster table functions (e.g. s3Cluster) will now assign files to replicas for reading based on consistent hash to improve cache locality.

Documentation entry for user-facing changes

  • Documentation is written (mandatory for new features)

Addresses: #72816
This works by assigning files from the file iterator to specific replicas.
This PR also implements task stealing, so that if a replica is not available or is slow, other replicas will process its tasks. In terms of filesystem cache locality, though, this means that at the end of iterating through tasks, some task stealing is inevitable, preventing us from achieving perfect cache locality. I've added a 50ms sleep before the stealing is allowed to proceed, which helps a bit, but I'm not sure if that's a good approach.

Added a object_storage_stable_cluster_task_distribution setting to enable the new behaviour.

I'm happy to write some tests if I get a 👍 on the approach.

@nickitat nickitat self-assigned this Mar 10, 2025
@clickhouse-gh
Copy link
Contributor

clickhouse-gh bot commented Mar 10, 2025

Workflow [PR], commit [7d8ef02]

@clickhouse-gh clickhouse-gh bot added the pr-feature Pull request with new product feature label Mar 10, 2025
@nickitat nickitat added the can be tested Allows running workflows for external contributors label Mar 11, 2025
@adikus
Copy link
Contributor Author

adikus commented Mar 12, 2025

Thanks for the review @nickitat, planning to address your comments either today or tomorrow

{
IConnections::ReplicaInfo replica_info{
.number_of_current_replica = replica_index++,
.number_of_replicas = number_of_replicas,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure sure about this. I've seen that for parallel replicas we hold replicas_count inside the ParallelReplicasReadingCoordinator, not sure if it makes sense to use that here.

@adikus
Copy link
Contributor Author

adikus commented Mar 14, 2025

@nickitat Rewritten to use number_of_current_replica, let me know what you think. 🙂

@alexey-milovidov
Copy link
Member

Thanks for the amazing work, it looks like exactly what we need!
Let's take a look at failed tests.

ProfileEvents::increment(ProfileEvents::ReadTaskRequestsReceived);

auto response = (*extension->task_iterator)(connections->getLastPacketConnection());
auto response = (*extension->task_iterator)(extension->replica_info->number_of_current_replica, extension->replica_info->number_of_replicas);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

extension and replica_info are optionals, i.e. they can be not set.

- 1 — `SELECT` returns empty result.
- 0 — `SELECT` throws an exception.
)", 0) \
DECLARE(Bool, object_storage_stable_cluster_task_distribution, false, R"(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's avoid adding a new setting - hardly anybody will want the old behaviour

std::mutex mutex;
bool iterator_exhausted = false;

LoggerPtr log = getLogger("StorageObjectStorageStableTaskDistributor");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
LoggerPtr log = getLogger("StorageObjectStorageStableTaskDistributor");
LoggerPtr log = getLogger("StorageClusterTaskDistributor");

return archive_object_info->getPathToArchive();
auto callback = std::make_shared<TaskIterator>(
[task_distributor](size_t number_of_current_replica, size_t number_of_replicas) mutable -> String {
if (auto next_task = task_distributor->getNextTask(number_of_current_replica, number_of_replicas))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a more concise way would be:

return foo(...).value_or("");

std::shared_ptr<IObjectIterator> iterator_)
: iterator(std::move(iterator_))
, iterator_exhausted(false)
, log(getLogger("StorageObjectStorageStableTaskDistributor"))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pls leave only one initialization; either here or in the header

struct ReplicaInfo
{
size_t number_of_current_replica{0};
size_t number_of_replicas{0};
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we add a new field, we have to make sure it is either optional (and it is at least stated in the comment) or (preferably) initialized in all places.
But it is not there already because the total number of replicas doesn't depend on the specific replica making a request. It is a constant that can be provided once to StorageObjectStorageStableTaskDistributor-s constructor.

String next_file = files.back();
files.pop_back();

if (!unprocessed_files.contains(next_file))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a little more performant alternative is find + erase(it).


std::shared_ptr<IObjectIterator> iterator;

std::unordered_map<size_t, std::vector<String>> connection_to_files;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since replica ids always should lie in range [0; N), it could be replaced with std::vector.


std::optional<String> StorageObjectStorageStableTaskDistributor::getMatchingFileFromIterator(size_t number_of_current_replica, size_t number_of_replicas)
{
while (!iterator_exhausted)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this access to iterator_exhausted should probably be also guarded by the lock.

@adikus
Copy link
Contributor Author

adikus commented Apr 1, 2025

Thanks @nickitat and sorry for the delay here.
I incorporated your feedback, let me know how it looks.

Comment on lines +107 to +111
auto max_replicas_to_use = static_cast<UInt64>(cluster->getShardsInfo().size());
if (context->getSettingsRef()[Setting::max_parallel_replicas] > 1)
max_replicas_to_use = std::min(max_replicas_to_use, context->getSettingsRef()[Setting::max_parallel_replicas].value);

createExtension(predicate, max_replicas_to_use);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One thing I wasn't quite sure is how the the initialization works here in the applyFilters - not sure what is the code path to get here.

I copied parts from below to get the same number of replicas, so hopefully that will work ok.

@adikus
Copy link
Contributor Author

adikus commented Apr 1, 2025

Looks like two tests failed on the "Replica info is not initialized" check, will have a look at those tomorrow.

@adikus
Copy link
Contributor Author

adikus commented Apr 2, 2025

Initialized replica info on all extensions that also use task iterator, even though it is not currently used for most of them. Not sure if this is going too deep in this PR.

@adikus
Copy link
Contributor Author

adikus commented Apr 14, 2025

Thanks again for the review @nickitat! Could you please take another look?

# Conflicts:
#	src/Storages/IStorageCluster.cpp
#	src/Storages/StorageDistributed.cpp
@nickitat nickitat added pr-improvement Pull request with some product improvements and removed pr-feature Pull request with new product feature labels Apr 25, 2025
@nickitat nickitat enabled auto-merge April 25, 2025 20:50
@nickitat nickitat added this pull request to the merge queue Apr 25, 2025
Merged via the queue into ClickHouse:master with commit 477887a Apr 25, 2025
117 of 120 checks passed
@robot-clickhouse-ci-2 robot-clickhouse-ci-2 added the pr-synced-to-cloud The PR is synced to the cloud repo label Apr 25, 2025
pipe01 added a commit to BetterStackHQ/ClickHouse that referenced this pull request Apr 30, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

can be tested Allows running workflows for external contributors pr-improvement Pull request with some product improvements pr-synced-to-cloud The PR is synced to the cloud repo

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants