S3Queue auxiliary Zookeeper support (not clean)#93786
S3Queue auxiliary Zookeeper support (not clean)#93786lesandie wants to merge 12 commits intoClickHouse:masterfrom
Conversation
|
Please @ianton-ru and @kssenii could you final review?. I added all the comments and changes @ianton-ru proposed here: lesandie#2 and resolved conflicts from here: #81040 |
| zk_retries.retryLoop([&] | ||
| { | ||
| response = getZooKeeper(log)->tryGet(paths); | ||
| response = zk_client->tryGet(paths); |
There was a problem hiding this comment.
As I understand need to get client from getZooKeeper inside retryLoop because retry can be caused by disconnect.
There was a problem hiding this comment.
I collapsed it when switching to the cached zk_client, but that did drop the retry behavior 😓
will add back the getZookeeper
| { | ||
| code = getZooKeeper(log)->tryMulti(remove_requests, remove_responses); | ||
| }); | ||
| code = zk_client->tryMulti(remove_requests, remove_responses); |
There was a problem hiding this comment.
Why retryLoop removed here?
There was a problem hiding this comment.
same as above. I'll add here the retry but it is better to leave the cached zk_client and not use the getZookeeper() here right? (as it is cleaning nodes). At least it makes sense to me but not 100% sure.
|
Workflow [PR], commit [58eb559] Summary: ❌
|
There was a problem hiding this comment.
Pull request overview
This PR adds support for auxiliary ZooKeeper clusters in S3Queue and AzureQueue tables through the keeper_path setting. This feature enables workload separation for heavy ZooKeeper usage patterns by allowing tables to specify a dedicated keeper instance.
Changes:
- Extended
keeper_pathsetting to support auxiliary ZooKeeper name prefix (e.g.,zookeeper2:/clickhouse/s3queue/table) - Threaded
zookeeper_namethrough the metadata, file metadata, and bucket holder classes - Added validation for unknown auxiliary ZooKeeper names
- Updated documentation with usage examples
Reviewed changes
Copilot reviewed 16 out of 16 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| tests/integration/test_storage_s3_queue/test_0.py | Added integration tests for auxiliary ZooKeeper support and missing configuration handling |
| tests/integration/test_storage_s3_queue/configs/zookeeper.xml | Added auxiliary ZooKeeper configuration for tests |
| src/Storages/ObjectStorageQueue/StorageObjectStorageQueue.h | Added getter for ZooKeeper name and updated constructor signature |
| src/Storages/ObjectStorageQueue/StorageObjectStorageQueue.cpp | Implemented ZooKeeper name extraction and path parsing logic |
| src/Storages/ObjectStorageQueue/ObjectStorageQueueUnorderedFileMetadata.h | Added zookeeper_name parameter to constructor and static methods |
| src/Storages/ObjectStorageQueue/ObjectStorageQueueUnorderedFileMetadata.cpp | Updated to pass zookeeper_name to ZooKeeper client calls |
| src/Storages/ObjectStorageQueue/ObjectStorageQueueSource.cpp | Modified to use metadata's getZooKeeper() method instead of static calls |
| src/Storages/ObjectStorageQueue/ObjectStorageQueueOrderedFileMetadata.h | Added zookeeper_name parameter throughout the class |
| src/Storages/ObjectStorageQueue/ObjectStorageQueueOrderedFileMetadata.cpp | Updated all ZooKeeper client calls to pass zookeeper_name |
| src/Storages/ObjectStorageQueue/ObjectStorageQueueMetadataFactory.h | Added zookeeper_name parameter to getOrCreate and remove methods |
| src/Storages/ObjectStorageQueue/ObjectStorageQueueMetadataFactory.cpp | Implemented metadata key composition including zookeeper_name |
| src/Storages/ObjectStorageQueue/ObjectStorageQueueMetadata.h | Added zookeeper_name field and updated getZooKeeper method signature |
| src/Storages/ObjectStorageQueue/ObjectStorageQueueMetadata.cpp | Updated all ZooKeeper operations to use auxiliary keeper when specified |
| src/Storages/ObjectStorageQueue/ObjectStorageQueueIFileMetadata.h | Added zookeeper_name field to base file metadata class |
| src/Storages/ObjectStorageQueue/ObjectStorageQueueIFileMetadata.cpp | Updated ZooKeeper client calls to use zookeeper_name |
| docs/en/engines/table-engines/integrations/s3queue.md | Documented auxiliary ZooKeeper support in keeper_path setting |
| return {false, FileStatus::State::Processed}; | ||
| Coordination::Requests requests; | ||
| std::vector<std::string> paths{processed_node_path, failed_node_path}; | ||
| zkutil::ZooKeeper::MultiTryGetResponse responses = ObjectStorageQueueMetadata::getZooKeeper(log, zookeeper_name)->tryGet(paths); |
There was a problem hiding this comment.
The multi-get ZooKeeper request at line 486 is not wrapped in a retry loop like similar operations elsewhere in the codebase. If this request encounters a transient failure (session expiration, network issue), it will not be retried, potentially causing inconsistent state. Wrap this call in getKeeperRetriesControl(log).retryLoop([&] { ... }) similar to other ZooKeeper operations in this file.
| zkutil::ZooKeeper::MultiTryGetResponse responses = ObjectStorageQueueMetadata::getZooKeeper(log, zookeeper_name)->tryGet(paths); | |
| zkutil::ZooKeeper::MultiTryGetResponse responses; | |
| getKeeperRetriesControl(log).retryLoop([&] | |
| { | |
| responses = ObjectStorageQueueMetadata::getZooKeeper(log, zookeeper_name)->tryGet(paths); | |
| }); |
There was a problem hiding this comment.
Same issue pointed out by @ianton-ru ... so it seems we should wrap in retryLoop all getZookeeper() just to be consistent even when method is cleaning nodes (dropping a queue) right?
| if (getMaxProcessedNode(node_metadata, &processed_node_stat_local, processed_node_path, log, zookeeper_name)) | ||
| { | ||
| processed_node_stat = processed_node_stat_local; | ||
| bool failed_node_exists = ObjectStorageQueueMetadata::getZooKeeper(log, zookeeper_name)->exists(failed_node_path); |
There was a problem hiding this comment.
The exists() check at line 525 is not wrapped in a retry loop. If this encounters a transient ZooKeeper failure, it could incorrectly conclude the node doesn't exist. Wrap this call in getKeeperRetriesControl(log).retryLoop([&] { ... }) for consistency with other ZooKeeper operations.
| bool failed_node_exists = ObjectStorageQueueMetadata::getZooKeeper(log, zookeeper_name)->exists(failed_node_path); | |
| bool failed_node_exists = false; | |
| getKeeperRetriesControl(log).retryLoop([&] | |
| { | |
| failed_node_exists = ObjectStorageQueueMetadata::getZooKeeper(log, zookeeper_name)->exists(failed_node_path); | |
| }); |
| std::optional<std::string> processed_node_hive_partitioning_path; | ||
| if (is_path_with_hive_partitioning) | ||
| processed_node_hive_partitioning_path = std::filesystem::path(processed_node_path_) / getHivePart(path); | ||
| auto processed_file_info = getLastProcessedFile( | ||
| &processed_node_stat, | ||
| processed_node_path_, | ||
| path, | ||
| processed_node_hive_partitioning_path, | ||
| std::nullopt, | ||
| log, | ||
| zookeeper_name); |
There was a problem hiding this comment.
The removed code at lines 647-650 (constructing processed_node_hive_partitioning_path) appears to have been replaced with a different logic block that duplicates this same logic. The new code in lines 648-650 recreates the same variable and value that was previously computed. This refactor doesn't change behavior but the diff structure makes it appear more significant than it is.
|
@kssenii and @ianton-ru I merged from master lastest changes should I commit copilot suggestions or should copilot review again after merging master and applying some changes?. The previous ones look to me good, just using the retryLoop with lambda. The last comment about processed_node_hive_partitioning_path it is related to the hive modifications. I'm not sure how to proceed, but is also seems the added params to getLastProcessedFile are ok.
is optional so if hive partitioning is activated then params provided to method are good Am I correct? |
|
Hi @kssenii After #94321 is merged, I’ll need to review and resolve any conflicts. I’m happy to do that, but I wanted to check with you first: should I wait for the changes in #94412 as well and then proceed, so we can avoid resolving conflicts multiple times? I understand that the partitioning feature should be stabilized before we introduce another feature like this one. I’m completely fine waiting and following your preferred order of changes. I'm just trying to help and keep things moving in line with your plan. |
| const auto & getFormatName() const { return configuration->format; } | ||
|
|
||
| const fs::path & getZooKeeperPath() const { return zk_path; } | ||
| const String & getZooKeeperName() const { return zookeeper_name; } |
There was a problem hiding this comment.
This method seems unused? I see only same method from queue metadata is used
| const ObjectStorageQueueSettings & queue_settings, | ||
| UUID database_uuid = UUIDHelpers::Nil); | ||
| UUID database_uuid = UUIDHelpers::Nil, | ||
| String * zookeeper_name_out = nullptr); |
There was a problem hiding this comment.
| String * zookeeper_name_out = nullptr); | |
| String * result_zookeeper_name = nullptr); |
| bool starts_with_slash = !configured_path.empty() && configured_path.front() == '/'; | ||
| bool has_keeper_prefix = zkutil::extractZooKeeperName(configured_path) != zkutil::DEFAULT_ZOOKEEPER_NAME; | ||
|
|
||
| if (has_keeper_prefix || starts_with_slash) |
There was a problem hiding this comment.
Why avoid zk_path_prefix with auxiliary zk? Seems we still could use it if keeper_path is relative
| bool released = false; | ||
| bool finished = false; | ||
| LoggerPtr log; | ||
| std::string zookeeper_name; |
There was a problem hiding this comment.
Let's put it inside bucket info struct?
| zk_retries.retryLoop([&] | ||
| { | ||
| code = getZooKeeper(log)->tryMulti(remove_requests, remove_responses); | ||
| code = zk_client->tryMulti(remove_requests, remove_responses); |
There was a problem hiding this comment.
We should get zk client from getZooKeeper to make sure it is reinitialized on next retry if session expired
Hi! This is up to you, I will merge whichever of them is green and approved first, and that one (mine) currently has a failing test, I will likely fix it today. Did not have time to review your PR before now, not because I was planning to merge something else first. |
Hi @kssenii thanks for your response!, I acted to fast and applied your comments and changes before merging latest master changes, so I have to do that again 🤦🏼 . I will align my code with the latests bucketing changes you've merged. |
…per_name next to zookeeper_path in any related APIs for readability.
|
Hi @kssenii, Some tests do fails but IDK if they are related or not. I compiled/tested locally and it works. Could you do one final review? Thanks!!! |
|
Hi @kssenii it seems this test is failing: Float to Enum overflow ... seems not related |
|
Hi, it looks like you incorrectly merged master, because of lot of unrelated changes appeared in the PR diff |
Sorry! I see, I will close this PR and create a new one only with the actual changes |
Changelog category (leave one):
Changelog entry (a user-readable short description of the changes that goes into CHANGELOG.md):
S3Queue auxiliary Zookeeper support using
keeper_pathsetting from s3QueueThis will allow to do workload separation for heavy zk usage modes like unordered, by creating a zk/keeper specifically for some S3Queue tables and creating an S3queue specifying keeper_path with an auxiliary zookeeper like in a ReplicatedMergeTree table:
'auxiliary_zookeeper:/clickhouse/s3queue/my_s3queue_table'
Added integration tests and doc
Documentation entry for user-facing changes