Skip to content

S3Queue ordered mode with hive partitioning#81040

Merged
kssenii merged 55 commits intoClickHouse:masterfrom
ianton-ru:storage_s3_queue_prefix
Jan 2, 2026
Merged

S3Queue ordered mode with hive partitioning#81040
kssenii merged 55 commits intoClickHouse:masterfrom
ianton-ru:storage_s3_queue_prefix

Conversation

@ianton-ru
Copy link
Contributor

@ianton-ru ianton-ru commented May 30, 2025

Changelog category (leave one):

  • Improvement

Changelog entry (a user-readable short description of the changes that goes into CHANGELOG.md):

Tracking hive partitioning for ordered mode in S3Queue. Resolves #71161

Documentation entry for user-facing changes

Creates record in Keeper with last successfully processed file for every hive partition. Do not modify bucket mechanism, logic changed only inside bucket, so separate synchronization is not required.

@ianton-ru ianton-ru marked this pull request as ready for review May 30, 2025 11:48
@GrigoryPervakov GrigoryPervakov added the can be tested Allows running workflows for external contributors label May 30, 2025
@clickhouse-gh
Copy link
Contributor

clickhouse-gh bot commented May 30, 2025

Workflow [PR], commit [547e34e]

@clickhouse-gh clickhouse-gh bot added the pr-improvement Pull request with some product improvements label May 30, 2025
@kssenii kssenii self-assigned this Jun 1, 2025
@ianton-ru ianton-ru force-pushed the storage_s3_queue_prefix branch from db2db26 to 2f7d37e Compare June 4, 2025 15:18
@ianton-ru
Copy link
Contributor Author

Integration tests for aarch64 interrupted by timeout, looks like some infrastructure-related issue.

@clickhouse-gh
Copy link
Contributor

clickhouse-gh bot commented Jun 16, 2025

Workflow [PR], commit [92edb60]

Summary:

job_name test_name status info comment
Stateless tests (arm_asan, targeted) failure
02473_multistep_prewhere FAIL cidb IGNORED
02473_multistep_split_prewhere FAIL cidb IGNORED
Integration tests (amd_tsan, 1/6) failure
test_storage_s3_queue/test_migration.py::test_migration[1-] FAIL cidb IGNORED
test_storage_s3_queue/test_migration.py::test_migration[1-s3queue_] FAIL cidb IGNORED
BuzzHouse (amd_msan) failure
Logical error: '!table_alias.empty()' (STID: None) FAIL cidb, issue ISSUE CREATED
BuzzHouse (amd_ubsan) failure
Logical error: Bad cast from type A to B (STID: 1635-4058) FAIL cidb, issue ISSUE CREATED
AST fuzzer (arm_asan) error IGNORED

@ianton-ru ianton-ru force-pushed the storage_s3_queue_prefix branch from 9471804 to bdce2c4 Compare June 16, 2025 23:12
@ianton-ru
Copy link
Contributor Author

Failed fast test 03373_named_session_try_recreate_before_timeout looks unrelated.

@kssenii
Copy link
Member

kssenii commented Dec 1, 2025

Yes, looks unrelated, but it blocks other tests from starting so at the moment no other tests were actually run and will not be, so could you please push some empty commit to restart the checks?

Comment on lines +92 to +97
@pytest.mark.parametrize("hosts", [1, 2])
@pytest.mark.parametrize("processing_threads_num", [1, 16])
@pytest.mark.parametrize("buckets", [1, 4])
@pytest.mark.parametrize("engine_name", ["S3Queue",
"AzureQueue",
])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you please split this test into separate ones? Flaky check fails because a single test runs for too long https://s3.amazonaws.com/clickhouse-test-reports/PRs/81040/27bf90da21b8557c55b73a9ae98cdfc01707b626//integration_tests_amd_asan_flaky/job.log
Also please avoid force-push from now on, as I've fixed private synchronization conflicts and a force push will reset my fixes.

@ianton-ru
Copy link
Contributor Author

Need to wait after each data portion.
I need to find another way to speedup tests...

@ianton-ru
Copy link
Contributor Author

Found a bug in test, now must be faster.

@ianton-ru
Copy link
Contributor Author

ianton-ru commented Jan 2, 2026

Failed tests are looked unrelated

@kssenii kssenii added this pull request to the merge queue Jan 2, 2026
Merged via the queue into ClickHouse:master with commit fefb9e1 Jan 2, 2026
125 of 131 checks passed
@robot-ch-test-poll2 robot-ch-test-poll2 added the pr-synced-to-cloud The PR is synced to the cloud repo label Jan 2, 2026
Comment on lines +1029 to +1032
if (code.value() == Coordination::Error::ZBADVERSION && retry_count < retry_limit)
{
++retry_count;
LOG_INFO(log, "Keeper Bad Version error, other node wrote something, retry {}", retry_count);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ianton-ru, hi, could you please clarify why do we need this check? As we use persistent nodes for processing/buckets nodes, this error should not happen, otherwise it would mean duplicated data which must not happen and this retry could just hide some bugs

Copy link
Contributor Author

@ianton-ru ianton-ru Jan 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Scenario with multiple nodes:

  • two hive partition, part1 with file file11 and part2 with file file21.
  • records in Keeper have version 1
  • node1 processes files file11 and file21
  • meanwhile new file file12 is added in part1
  • node2 processes file file12
  • node1 complete processing, starts to create records list. Extract versions for parts, both have version 1
  • node2 complete processing, starts to create records list, with version1 too
  • node2 commits records without errors, record in Keeper for part1 gets version 2
  • node1 tries to commit records as single transaction, gets error Bad Version for part1
    Without hive partitions node1 does not need to write something after that, because node2 wrote more actual information about last processing file.
    But with hive partitions node1 still need to write last processing file for part2.

Copy link
Member

@kssenii kssenii Jan 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So you mean node1 was processing file /part1/file11 concurrently with node2 processing file /part1/file12?
But if file11 and file12 share the same processed_node_path, then they are in the same bucket, and a single bucket can only be processed by a single processor, not allowing any concurrent processing (persistent bucket lock must maintain this invariant). Then the situation you describe should not happen.

Copy link
Contributor Author

@ianton-ru ianton-ru Jan 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm. Looks like ClickHouse locks buckets only when number of buckets is greater than 1 (when useBucketsForProcessing returns true).
Is configuration with multiple nodes and single bucket valid?

Copy link
Member

@kssenii kssenii Jan 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is configuration with multiple nodes and single bucket valid?

Not valid as it can lead to inconsistent processing (in some failure scenarios we can end up with files which will never be processed) and retries will not work properly. Currently if processing_threads_num is >1 (it will almost always be so as we take default from the number of cpu cores) and buckets is disabled, then buckets are anyway enforced automatically as well. Moreover, in cloud we do not allow to create tables without bucket-based processing:

if (!warned && settings_ref[Setting::cloud_mode]
&& table_metadata.getMode() == ObjectStorageQueueMode::ORDERED
&& table_metadata.buckets <= 1 && table_metadata.processing_threads_num <= 1)
{
const std::string message = "Ordered mode in cloud without "
"either `buckets`>1 or `processing_threads_num`>1 (works as `buckets` if it's not specified) "
"will not work properly. Please specify them in the CREATE query. See documentation for more details.";

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh. I use "buckets=1" in tests, getBucketsNum returns 1, because buckets is not zero.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a gray zone. useBucketsForProcessing checks that buckets_num>1.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

useBucketsForProcessing checks that buckets_num>1

Yes, but ObjectStorageQueueMetadata sets buckets_num as table_metadata->getBucketsNum, which returns what I've sent in the above message https://github.com/ClickHouse/clickhouse-private/blob/d442af4b207906e5c3600e35eb82742a1d0f2d97/src/Storages/ObjectStorageQueue/ObjectStorageQueueTableMetadata.h#L102-L106

I use "buckets=1" in tests,

In this case yes, it will not work, this is an omission which needs to be fixed. But by default buckets = 0, so if user does not touch this setting and only changes processing_threads_num, then buckets must be correctly set to the value of processing_threads_num

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed here: #93843

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

can be tested Allows running workflows for external contributors pr-improvement Pull request with some product improvements pr-synced-to-cloud The PR is synced to the cloud repo

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Per-prefix tracking for Ordered S3Queue

5 participants