Conversation
|
Workflow [PR], commit [ab4a5b6] Summary: ❌
|
|
Failed tests |
There was a problem hiding this comment.
Pull request overview
This PR optimizes the s3Cluster table function by implementing object-level filtering when use_hive_partitioning is enabled. Previously, s3Cluster loaded all objects and only filtered data after loading, unlike the regular s3 function which could skip loading unused objects based on partition paths. This change introduces a new ObjectFilterStep that filters objects before loading them, matching the optimization behavior of the non-cluster variant.
Changes:
- Introduced
ObjectFilterStepto filter S3 objects based on hive partitioning before loading - Modified
ReadFromClusterclass to be defined in the header file instead of inline in the implementation - Added integration test to verify the optimization works for both
s3ands3Clusterfunctions
Reviewed changes
Copilot reviewed 11 out of 11 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| tests/integration/test_s3_cluster/test.py | Added comprehensive test verifying hive partitioning optimization reduces file reads for both s3 and s3Cluster |
| src/Storages/ObjectStorage/StorageObjectStorageCluster.h | Removed unused virtual_columns field |
| src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp | Updated to use getVirtualsList() instead of removed virtual_columns field |
| src/Storages/IStorageCluster.h | Moved ReadFromCluster class definition from .cpp to header for broader visibility |
| src/Storages/IStorageCluster.cpp | Removed inline ReadFromCluster class definition (moved to header) |
| src/Processors/QueryPlan/QueryPlanStepRegistry.cpp | Registered new ObjectFilterStep in the query plan step registry |
| src/Processors/QueryPlan/Optimizations/optimizePrimaryKeyConditionAndLimit.cpp | Added handling for ObjectFilterStep in query plan optimization |
| src/Processors/QueryPlan/ObjectFilterStep.h | New header defining ObjectFilterStep for filtering objects before loading |
| src/Processors/QueryPlan/ObjectFilterStep.cpp | Implementation of ObjectFilterStep with serialization support |
| src/Planner/Planner.cpp | Added logic to insert ObjectFilterStep for cluster queries with hive partitioning |
| src/Interpreters/InterpreterSelectQuery.cpp | Added logic to insert ObjectFilterStep for cluster queries with hive partitioning in legacy interpreter |
| import os | ||
| import shutil | ||
| import time | ||
| import uuid |
There was a problem hiding this comment.
The uuid module is imported but the standard library uuid.uuid4() is only used for generating unique query IDs in the test. Consider using the existing random_string helper from helpers.utility (already imported on line 14) for consistency with the codebase pattern, unless UUID format is specifically required for query ID tracking.
There was a problem hiding this comment.
In most integration tests where query_id is used it is created based on uuid.
|
|
||
| QueryPipelineBuilderPtr ObjectFilterStep::updatePipeline(QueryPipelineBuilders pipelines, const BuildQueryPipelineSettings & /* settings */) | ||
| { | ||
| return std::move(pipelines.front()); |
There was a problem hiding this comment.
The updatePipeline method returns the pipeline unchanged without applying the filter transformation. This means the ObjectFilterStep does not actually filter any data in the pipeline execution. The method should create and add a FilterTransform (already included in headers) using actions_dag and filter_column_name to perform the actual filtering.
| return std::move(pipelines.front()); | |
| auto pipeline = std::move(pipelines.front()); | |
| pipeline->addSimpleTransform([this](const Block & header) | |
| { | |
| return std::make_shared<FilterTransform>( | |
| header, | |
| std::make_shared<ActionsDAG>(actions_dag), | |
| filter_column_name, | |
| false); | |
| }); | |
| return pipeline; |
There was a problem hiding this comment.
I created ObjectFilterStep exactly because can't use FilterStep for this purpose.
src/Planner/Planner.cpp
Outdated
| && !query_processing_info.isFirstStage() | ||
| && expression_analysis_result.hasWhere()) | ||
| { | ||
| if (typeid_cast<ReadFromCluster *>(query_plan.getRootNode()->step.get())) |
There was a problem hiding this comment.
The logic to insert ObjectFilterStep is duplicated between Planner.cpp and InterpreterSelectQuery.cpp with nearly identical conditions and implementations. Consider extracting this into a shared helper function to reduce duplication and ensure consistent behavior across both query execution paths.
There was a problem hiding this comment.
I'm not sure if this makes sense given future plans to remove old analyzer.
| && !expressions.first_stage | ||
| && expressions.hasWhere()) | ||
| { | ||
| if (typeid_cast<ReadFromCluster *>(query_plan.getRootNode()->step.get())) |
There was a problem hiding this comment.
The logic to insert ObjectFilterStep is duplicated between InterpreterSelectQuery.cpp and Planner.cpp with nearly identical conditions and implementations. Consider extracting this into a shared helper function to reduce duplication and ensure consistent behavior across both query execution paths.
There was a problem hiding this comment.
The same as above.
| namespace DB | ||
| { | ||
|
|
||
| /// Implements WHERE operation. |
There was a problem hiding this comment.
Please add a short comment here describing key difference from FilterStep and why/where it is needed
| FROM s3Cluster(cluster_simple, 'http://minio1:9001/{data_path}/key=**.parquet', 'minio', '{minio_secret_key}', 'Parquet', 'key Int32, value Int32') | ||
| WHERE key <= 2 | ||
| FORMAT TSV | ||
| SETTINGS enable_filesystem_cache = 0, use_query_cache = 0, use_cache_for_count_from_files = 0, use_hive_partitioning = 1, allow_experimental_analyzer={allow_experimental_analyzer} |
There was a problem hiding this comment.
Let's also add a test with partition_strategy='hive' as it works a bit differently from default hive setting.
There was a problem hiding this comment.
As I understand, partition_strategy makes sense only for INSERT, not for SELECT.
There was a problem hiding this comment.
The difference is also that with that setting hive columns start to be physical, while without it they are virtual (if not directly specified in schema).
|
Integration tests were interrupted by timeout: |
|
@kssenii Is anything more required? |
|
Hi, sorry for the wait! I wanted to verify something before merge and postponed too much, thank you for pinging... |
|
@kssenii Does this PR need something else from my side? |
|
Hi! See my above comment - I suggested to simplify this fix and attached a patch in pastila link showing my suggestion. |
|
Sorry, I missed it. |
|
Hi @ianton-ru, |
|
@alexbakharew Done. |
LLVM Coverage Report
PR changed lines: PR changed-lines coverage: 82.29% (79/96, 0 noise lines excluded) |
Changelog category (leave one):
Changelog entry (a [user-readable short description]
s3Clustertable function optimization withuse_hive_partitioningsetting.Documentation entry for user-facing changes
s3table function withuse_hive_partitioningsetting can skip loading unused objects based on object paths.s3Clusterignored this and loaded all objects.s3usesFilterstep to filter unused files.s3Clustercan't use this step, because doesn't have column to filter data after loading.Added new step
ObjectFilterto filter only loading objects.Note
Medium Risk
Touches distributed query planning and filter pushdown for cluster table functions; mistakes could change which objects are scanned or break predicate propagation/serialization across nodes.
Overview
Adds a new query plan step
ObjectFilterStep(serializable/registrable) to carry a WHERE predicate specifically for object-path/virtual-column pruning in distributed object-storage reads.When
use_hive_partitioningis enabled,InterpreterSelectQuerynow injectsObjectFilterStepon the initiator forReadFromClusterplans sos3Clustercan skip unrelated Hive-partitioned objects even if the filtering column isn’t present in returned blocks; the primary-key/limit optimization pass is updated to push this step’s predicate down intoSourceStepWithFilter.Refactors
ReadFromClusterintoIStorageCluster.hand updates cluster filtering to use either locally-added filter DAGs orquery_info.filter_actions_dag, plus adjusts object-storage cluster iteration to usegetVirtualsList(); adds an integration test validating reduced file reads fors3Clusterwith Hive partitioning.Written by Cursor Bugbot for commit 25cfa06. This will update automatically on new commits. Configure here.