Change object storage cluster table functions to prefer specific replicas to improve cache locality#77326
Conversation
…icas to improve cache locality
src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp
Outdated
Show resolved
Hide resolved
src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp
Outdated
Show resolved
Hide resolved
src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp
Outdated
Show resolved
Hide resolved
src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp
Outdated
Show resolved
Hide resolved
src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp
Outdated
Show resolved
Hide resolved
|
Thanks for the review @nickitat, planning to address your comments either today or tomorrow |
src/Storages/IStorageCluster.cpp
Outdated
| { | ||
| IConnections::ReplicaInfo replica_info{ | ||
| .number_of_current_replica = replica_index++, | ||
| .number_of_replicas = number_of_replicas, |
There was a problem hiding this comment.
Not sure sure about this. I've seen that for parallel replicas we hold replicas_count inside the ParallelReplicasReadingCoordinator, not sure if it makes sense to use that here.
# Conflicts: # src/Storages/IStorageCluster.cpp # src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp
|
@nickitat Rewritten to use |
|
Thanks for the amazing work, it looks like exactly what we need! |
| ProfileEvents::increment(ProfileEvents::ReadTaskRequestsReceived); | ||
|
|
||
| auto response = (*extension->task_iterator)(connections->getLastPacketConnection()); | ||
| auto response = (*extension->task_iterator)(extension->replica_info->number_of_current_replica, extension->replica_info->number_of_replicas); |
There was a problem hiding this comment.
extension and replica_info are optionals, i.e. they can be not set.
src/Core/Settings.cpp
Outdated
| - 1 — `SELECT` returns empty result. | ||
| - 0 — `SELECT` throws an exception. | ||
| )", 0) \ | ||
| DECLARE(Bool, object_storage_stable_cluster_task_distribution, false, R"( |
There was a problem hiding this comment.
Let's avoid adding a new setting - hardly anybody will want the old behaviour
| std::mutex mutex; | ||
| bool iterator_exhausted = false; | ||
|
|
||
| LoggerPtr log = getLogger("StorageObjectStorageStableTaskDistributor"); |
There was a problem hiding this comment.
| LoggerPtr log = getLogger("StorageObjectStorageStableTaskDistributor"); | |
| LoggerPtr log = getLogger("StorageClusterTaskDistributor"); |
| return archive_object_info->getPathToArchive(); | ||
| auto callback = std::make_shared<TaskIterator>( | ||
| [task_distributor](size_t number_of_current_replica, size_t number_of_replicas) mutable -> String { | ||
| if (auto next_task = task_distributor->getNextTask(number_of_current_replica, number_of_replicas)) |
There was a problem hiding this comment.
a more concise way would be:
return foo(...).value_or("");| std::shared_ptr<IObjectIterator> iterator_) | ||
| : iterator(std::move(iterator_)) | ||
| , iterator_exhausted(false) | ||
| , log(getLogger("StorageObjectStorageStableTaskDistributor")) |
There was a problem hiding this comment.
pls leave only one initialization; either here or in the header
src/Client/IConnections.h
Outdated
| struct ReplicaInfo | ||
| { | ||
| size_t number_of_current_replica{0}; | ||
| size_t number_of_replicas{0}; |
There was a problem hiding this comment.
If we add a new field, we have to make sure it is either optional (and it is at least stated in the comment) or (preferably) initialized in all places.
But it is not there already because the total number of replicas doesn't depend on the specific replica making a request. It is a constant that can be provided once to StorageObjectStorageStableTaskDistributor-s constructor.
| String next_file = files.back(); | ||
| files.pop_back(); | ||
|
|
||
| if (!unprocessed_files.contains(next_file)) |
There was a problem hiding this comment.
a little more performant alternative is find + erase(it).
|
|
||
| std::shared_ptr<IObjectIterator> iterator; | ||
|
|
||
| std::unordered_map<size_t, std::vector<String>> connection_to_files; |
There was a problem hiding this comment.
since replica ids always should lie in range [0; N), it could be replaced with std::vector.
|
|
||
| std::optional<String> StorageObjectStorageStableTaskDistributor::getMatchingFileFromIterator(size_t number_of_current_replica, size_t number_of_replicas) | ||
| { | ||
| while (!iterator_exhausted) |
There was a problem hiding this comment.
this access to iterator_exhausted should probably be also guarded by the lock.
Initialize task distributor with number of replicas directly, instead of passing it through replica info
|
Thanks @nickitat and sorry for the delay here. |
| auto max_replicas_to_use = static_cast<UInt64>(cluster->getShardsInfo().size()); | ||
| if (context->getSettingsRef()[Setting::max_parallel_replicas] > 1) | ||
| max_replicas_to_use = std::min(max_replicas_to_use, context->getSettingsRef()[Setting::max_parallel_replicas].value); | ||
|
|
||
| createExtension(predicate, max_replicas_to_use); |
There was a problem hiding this comment.
One thing I wasn't quite sure is how the the initialization works here in the applyFilters - not sure what is the code path to get here.
I copied parts from below to get the same number of replicas, so hopefully that will work ok.
|
Looks like two tests failed on the "Replica info is not initialized" check, will have a look at those tomorrow. |
|
Initialized replica info on all extensions that also use task iterator, even though it is not currently used for most of them. Not sure if this is going too deep in this PR. |
|
Thanks again for the review @nickitat! Could you please take another look? |
# Conflicts: # src/Storages/IStorageCluster.cpp # src/Storages/StorageDistributed.cpp
477887a
Changelog category (leave one):
Changelog entry (a user-readable short description of the changes that goes to CHANGELOG.md):
Object storage cluster table functions (e.g.
s3Cluster) will now assign files to replicas for reading based on consistent hash to improve cache locality.Documentation entry for user-facing changes
Addresses: #72816
This works by assigning files from the file iterator to specific replicas.
This PR also implements task stealing, so that if a replica is not available or is slow, other replicas will process its tasks. In terms of filesystem cache locality, though, this means that at the end of iterating through tasks, some task stealing is inevitable, preventing us from achieving perfect cache locality.
I've added a 50ms sleep before the stealing is allowed to proceed, which helps a bit, but I'm not sure if that's a good approach.Added aobject_storage_stable_cluster_task_distributionsetting to enable the new behaviour.I'm happy to write some tests if I get a 👍 on the approach.