Skip to content

S3Queue auxiliary Zookeeper support (not clean)#93786

Closed
lesandie wants to merge 12 commits intoClickHouse:masterfrom
lesandie:s3queue_aux_zookeeper
Closed

S3Queue auxiliary Zookeeper support (not clean)#93786
lesandie wants to merge 12 commits intoClickHouse:masterfrom
lesandie:s3queue_aux_zookeeper

Conversation

@lesandie
Copy link
Contributor

@lesandie lesandie commented Jan 9, 2026

Changelog category (leave one):

  • Improvement

Changelog entry (a user-readable short description of the changes that goes into CHANGELOG.md):

S3Queue auxiliary Zookeeper support using keeper_path setting from s3Queue

This will allow to do workload separation for heavy zk usage modes like unordered, by creating a zk/keeper specifically for some S3Queue tables and creating an S3queue specifying keeper_path with an auxiliary zookeeper like in a ReplicatedMergeTree table:

'auxiliary_zookeeper:/clickhouse/s3queue/my_s3queue_table'

Added integration tests and doc

Documentation entry for user-facing changes

  • Documentation is written (mandatory for new features)

@lesandie
Copy link
Contributor Author

lesandie commented Jan 9, 2026

Please @ianton-ru and @kssenii could you final review?.

I added all the comments and changes @ianton-ru proposed here: lesandie#2

and resolved conflicts from here: #81040

@lesandie lesandie changed the title Fix compilation errors S3Queue auxiliary Zookeeper support Jan 9, 2026
zk_retries.retryLoop([&]
{
response = getZooKeeper(log)->tryGet(paths);
response = zk_client->tryGet(paths);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I understand need to get client from getZooKeeper inside retryLoop because retry can be caused by disconnect.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I collapsed it when switching to the cached zk_client, but that did drop the retry behavior 😓

will add back the getZookeeper

{
code = getZooKeeper(log)->tryMulti(remove_requests, remove_responses);
});
code = zk_client->tryMulti(remove_requests, remove_responses);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why retryLoop removed here?

Copy link
Contributor Author

@lesandie lesandie Jan 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as above. I'll add here the retry but it is better to leave the cached zk_client and not use the getZookeeper() here right? (as it is cleaning nodes). At least it makes sense to me but not 100% sure.

@kssenii kssenii self-assigned this Jan 9, 2026
@kssenii kssenii added the can be tested Allows running workflows for external contributors label Jan 9, 2026
@clickhouse-gh
Copy link
Contributor

clickhouse-gh bot commented Jan 9, 2026

Workflow [PR], commit [58eb559]

Summary:

job_name test_name status info comment
Fast test failure
03802_float_to_enum_overflow FAIL cidb
Docs check dropped
Build (amd_debug) dropped
Build (amd_asan) dropped
Build (amd_tsan) dropped
Build (amd_msan) dropped
Build (amd_ubsan) dropped
Build (amd_binary) dropped
Build (arm_asan) dropped
Build (arm_binary) dropped

@clickhouse-gh clickhouse-gh bot added the pr-improvement Pull request with some product improvements label Jan 9, 2026
@kssenii kssenii requested a review from Copilot January 12, 2026 12:27
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR adds support for auxiliary ZooKeeper clusters in S3Queue and AzureQueue tables through the keeper_path setting. This feature enables workload separation for heavy ZooKeeper usage patterns by allowing tables to specify a dedicated keeper instance.

Changes:

  • Extended keeper_path setting to support auxiliary ZooKeeper name prefix (e.g., zookeeper2:/clickhouse/s3queue/table)
  • Threaded zookeeper_name through the metadata, file metadata, and bucket holder classes
  • Added validation for unknown auxiliary ZooKeeper names
  • Updated documentation with usage examples

Reviewed changes

Copilot reviewed 16 out of 16 changed files in this pull request and generated 3 comments.

Show a summary per file
File Description
tests/integration/test_storage_s3_queue/test_0.py Added integration tests for auxiliary ZooKeeper support and missing configuration handling
tests/integration/test_storage_s3_queue/configs/zookeeper.xml Added auxiliary ZooKeeper configuration for tests
src/Storages/ObjectStorageQueue/StorageObjectStorageQueue.h Added getter for ZooKeeper name and updated constructor signature
src/Storages/ObjectStorageQueue/StorageObjectStorageQueue.cpp Implemented ZooKeeper name extraction and path parsing logic
src/Storages/ObjectStorageQueue/ObjectStorageQueueUnorderedFileMetadata.h Added zookeeper_name parameter to constructor and static methods
src/Storages/ObjectStorageQueue/ObjectStorageQueueUnorderedFileMetadata.cpp Updated to pass zookeeper_name to ZooKeeper client calls
src/Storages/ObjectStorageQueue/ObjectStorageQueueSource.cpp Modified to use metadata's getZooKeeper() method instead of static calls
src/Storages/ObjectStorageQueue/ObjectStorageQueueOrderedFileMetadata.h Added zookeeper_name parameter throughout the class
src/Storages/ObjectStorageQueue/ObjectStorageQueueOrderedFileMetadata.cpp Updated all ZooKeeper client calls to pass zookeeper_name
src/Storages/ObjectStorageQueue/ObjectStorageQueueMetadataFactory.h Added zookeeper_name parameter to getOrCreate and remove methods
src/Storages/ObjectStorageQueue/ObjectStorageQueueMetadataFactory.cpp Implemented metadata key composition including zookeeper_name
src/Storages/ObjectStorageQueue/ObjectStorageQueueMetadata.h Added zookeeper_name field and updated getZooKeeper method signature
src/Storages/ObjectStorageQueue/ObjectStorageQueueMetadata.cpp Updated all ZooKeeper operations to use auxiliary keeper when specified
src/Storages/ObjectStorageQueue/ObjectStorageQueueIFileMetadata.h Added zookeeper_name field to base file metadata class
src/Storages/ObjectStorageQueue/ObjectStorageQueueIFileMetadata.cpp Updated ZooKeeper client calls to use zookeeper_name
docs/en/engines/table-engines/integrations/s3queue.md Documented auxiliary ZooKeeper support in keeper_path setting

return {false, FileStatus::State::Processed};
Coordination::Requests requests;
std::vector<std::string> paths{processed_node_path, failed_node_path};
zkutil::ZooKeeper::MultiTryGetResponse responses = ObjectStorageQueueMetadata::getZooKeeper(log, zookeeper_name)->tryGet(paths);
Copy link

Copilot AI Jan 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The multi-get ZooKeeper request at line 486 is not wrapped in a retry loop like similar operations elsewhere in the codebase. If this request encounters a transient failure (session expiration, network issue), it will not be retried, potentially causing inconsistent state. Wrap this call in getKeeperRetriesControl(log).retryLoop([&] { ... }) similar to other ZooKeeper operations in this file.

Suggested change
zkutil::ZooKeeper::MultiTryGetResponse responses = ObjectStorageQueueMetadata::getZooKeeper(log, zookeeper_name)->tryGet(paths);
zkutil::ZooKeeper::MultiTryGetResponse responses;
getKeeperRetriesControl(log).retryLoop([&]
{
responses = ObjectStorageQueueMetadata::getZooKeeper(log, zookeeper_name)->tryGet(paths);
});

Copilot uses AI. Check for mistakes.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same issue pointed out by @ianton-ru ... so it seems we should wrap in retryLoop all getZookeeper() just to be consistent even when method is cleaning nodes (dropping a queue) right?

if (getMaxProcessedNode(node_metadata, &processed_node_stat_local, processed_node_path, log, zookeeper_name))
{
processed_node_stat = processed_node_stat_local;
bool failed_node_exists = ObjectStorageQueueMetadata::getZooKeeper(log, zookeeper_name)->exists(failed_node_path);
Copy link

Copilot AI Jan 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The exists() check at line 525 is not wrapped in a retry loop. If this encounters a transient ZooKeeper failure, it could incorrectly conclude the node doesn't exist. Wrap this call in getKeeperRetriesControl(log).retryLoop([&] { ... }) for consistency with other ZooKeeper operations.

Suggested change
bool failed_node_exists = ObjectStorageQueueMetadata::getZooKeeper(log, zookeeper_name)->exists(failed_node_path);
bool failed_node_exists = false;
getKeeperRetriesControl(log).retryLoop([&]
{
failed_node_exists = ObjectStorageQueueMetadata::getZooKeeper(log, zookeeper_name)->exists(failed_node_path);
});

Copilot uses AI. Check for mistakes.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

Comment on lines +648 to +658
std::optional<std::string> processed_node_hive_partitioning_path;
if (is_path_with_hive_partitioning)
processed_node_hive_partitioning_path = std::filesystem::path(processed_node_path_) / getHivePart(path);
auto processed_file_info = getLastProcessedFile(
&processed_node_stat,
processed_node_path_,
path,
processed_node_hive_partitioning_path,
std::nullopt,
log,
zookeeper_name);
Copy link

Copilot AI Jan 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The removed code at lines 647-650 (constructing processed_node_hive_partitioning_path) appears to have been replaced with a different logic block that duplicates this same logic. The new code in lines 648-650 recreates the same variable and value that was previously computed. This refactor doesn't change behavior but the diff structure makes it appear more significant than it is.

Copilot uses AI. Check for mistakes.
@lesandie
Copy link
Contributor Author

lesandie commented Jan 14, 2026

@kssenii and @ianton-ru I merged from master lastest changes

should I commit copilot suggestions or should copilot review again after merging master and applying some changes?.

The previous ones look to me good, just using the retryLoop with lambda. The last comment about processed_node_hive_partitioning_path it is related to the hive modifications. I'm not sure how to proceed, but is also seems the added params to getLastProcessedFile are ok.

std::optional<std::string> processed_node_hive_partitioning_path;

is optional so if hive partitioning is activated then params provided to method are good

Am I correct?

@lesandie
Copy link
Contributor Author

Hi @kssenii seems like the buzzhouse tests are not passing. The CI log itself marks it flaky and links to issue #93861. It doesn’t appear tied to current s3queue changes. Other than this PR seems good now after adding all comments.

@lesandie
Copy link
Contributor Author

lesandie commented Jan 21, 2026

Hi @kssenii

After #94321 is merged, I’ll need to review and resolve any conflicts. I’m happy to do that, but I wanted to check with you first: should I wait for the changes in #94412 as well and then proceed, so we can avoid resolving conflicts multiple times?

I understand that the partitioning feature should be stabilized before we introduce another feature like this one. I’m completely fine waiting and following your preferred order of changes. I'm just trying to help and keep things moving in line with your plan.

const auto & getFormatName() const { return configuration->format; }

const fs::path & getZooKeeperPath() const { return zk_path; }
const String & getZooKeeperName() const { return zookeeper_name; }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method seems unused? I see only same method from queue metadata is used

const ObjectStorageQueueSettings & queue_settings,
UUID database_uuid = UUIDHelpers::Nil);
UUID database_uuid = UUIDHelpers::Nil,
String * zookeeper_name_out = nullptr);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
String * zookeeper_name_out = nullptr);
String * result_zookeeper_name = nullptr);

bool starts_with_slash = !configured_path.empty() && configured_path.front() == '/';
bool has_keeper_prefix = zkutil::extractZooKeeperName(configured_path) != zkutil::DEFAULT_ZOOKEEPER_NAME;

if (has_keeper_prefix || starts_with_slash)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why avoid zk_path_prefix with auxiliary zk? Seems we still could use it if keeper_path is relative

bool released = false;
bool finished = false;
LoggerPtr log;
std::string zookeeper_name;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's put it inside bucket info struct?

zk_retries.retryLoop([&]
{
code = getZooKeeper(log)->tryMulti(remove_requests, remove_responses);
code = zk_client->tryMulti(remove_requests, remove_responses);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should get zk client from getZooKeeper to make sure it is reinitialized on next retry if session expired

@kssenii
Copy link
Member

kssenii commented Jan 21, 2026

should I wait for the changes in #94412 as well and then proceed, so we can avoid resolving conflicts multiple times?

Hi! This is up to you, I will merge whichever of them is green and approved first, and that one (mine) currently has a failing test, I will likely fix it today. Did not have time to review your PR before now, not because I was planning to merge something else first.

@lesandie
Copy link
Contributor Author

should I wait for the changes in #94412 as well and then proceed, so we can avoid resolving conflicts multiple times?

Hi! This is up to you, I will merge whichever of them is green and approved first, and that one (mine) currently has a failing test, I will likely fix it today. Did not have time to review your PR before now, not because I was planning to merge something else first.

Hi @kssenii thanks for your response!,

I acted to fast and applied your comments and changes before merging latest master changes, so I have to do that again 🤦🏼 .

I will align my code with the latests bucketing changes you've merged.

@lesandie
Copy link
Contributor Author

Hi @kssenii,

Some tests do fails but IDK if they are related or not. I compiled/tested locally and it works.

Could you do one final review?

Thanks!!!

@lesandie
Copy link
Contributor Author

@kssenii
Copy link
Member

kssenii commented Jan 26, 2026

Hi, it looks like you incorrectly merged master, because of lot of unrelated changes appeared in the PR diff

@lesandie
Copy link
Contributor Author

Hi, it looks like you incorrectly merged master, because of lot of unrelated changes appeared in the PR diff

Sorry! I see, I will close this PR and create a new one only with the actual changes

@lesandie lesandie marked this pull request as draft January 26, 2026 15:57
@lesandie lesandie changed the title S3Queue auxiliary Zookeeper support S3Queue auxiliary Zookeeper support (not clean) Jan 28, 2026
@lesandie lesandie closed this Jan 28, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

can be tested Allows running workflows for external contributors pr-improvement Pull request with some product improvements

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants