Skip to content

Iceberg Partition Pruning for time-related partition transforms#72044

Merged
divanik merged 39 commits intomasterfrom
divanik/add_partition_pruning
Jan 20, 2025
Merged

Iceberg Partition Pruning for time-related partition transforms#72044
divanik merged 39 commits intomasterfrom
divanik/add_partition_pruning

Conversation

@divanik
Copy link
Member

@divanik divanik commented Nov 18, 2024

Changelog category (leave one):

  • New Feature

Changelog entry (a user-readable short description of the changes that goes to CHANGELOG.md):

Implement Iceberg tables partition pruning for time-related transform partition operations in Iceberg

Documentation entry for user-facing changes

  • Documentation is written (mandatory for new features)

Information about CI checks: https://clickhouse.com/docs/en/development/continuous-integration/

CI Settings (Only check the boxes if you know what you are doing):

  • Allow: All Required Checks
  • Allow: Stateless tests
  • Allow: Stateful tests
  • Allow: Integration Tests
  • Allow: Performance tests
  • Allow: All Builds
  • Allow: batch 1, 2 for multi-batch jobs
  • Allow: batch 3, 4, 5, 6 for multi-batch jobs

  • Exclude: Style check
  • Exclude: Fast test
  • Exclude: All with ASAN
  • Exclude: All with TSAN, MSAN, UBSAN, Coverage
  • Exclude: All with aarch64, release, debug

  • Run only fuzzers related jobs (libFuzzer fuzzers, AST fuzzers, etc.)
  • Exclude: AST fuzzers

  • Do not test
  • Woolen Wolfdog
  • Upload binaries for special builds
  • Disable merge-commit
  • Disable CI cache

@robot-ch-test-poll1 robot-ch-test-poll1 added the pr-feature Pull request with new product feature label Nov 18, 2024
@divanik divanik changed the title [[WIP]] Iceberg Partition Pruning for time-related [[WIP]] Iceberg Partition Pruning for time-related partition transforms Nov 18, 2024
@robot-ch-test-poll4
Copy link
Contributor

robot-ch-test-poll4 commented Nov 18, 2024

This is an automated comment for commit 63ae661 with description of existing statuses. It's updated for the latest CI running

❌ Click here to open a full report in a separate page

Check nameDescriptionStatus
Stateless testsRuns stateless functional tests for ClickHouse binaries built in various configurations -- release, debug, with sanitizers, etc❌ failure
Successful checks
Check nameDescriptionStatus
AST fuzzerRuns randomly generated queries to catch program errors. The build type is optionally given in parenthesis. If it fails, ask a maintainer for help✅ success
BuildsThere's no description for the check yet, please add it to tests/ci/ci_config.py:CHECK_DESCRIPTIONS✅ success
BuzzHouse (asan)There's no description for the check yet, please add it to tests/ci/ci_config.py:CHECK_DESCRIPTIONS✅ success
BuzzHouse (debug)There's no description for the check yet, please add it to tests/ci/ci_config.py:CHECK_DESCRIPTIONS✅ success
BuzzHouse (msan)There's no description for the check yet, please add it to tests/ci/ci_config.py:CHECK_DESCRIPTIONS✅ success
BuzzHouse (tsan)There's no description for the check yet, please add it to tests/ci/ci_config.py:CHECK_DESCRIPTIONS✅ success
BuzzHouse (ubsan)There's no description for the check yet, please add it to tests/ci/ci_config.py:CHECK_DESCRIPTIONS✅ success
ClickBenchRuns ClickBench with instant-attach table✅ success
Compatibility checkChecks that clickhouse binary runs on distributions with old libc versions. If it fails, ask a maintainer for help✅ success
Docker keeper imageThe check to build and optionally push the mentioned image to docker hub✅ success
Docker server imageThe check to build and optionally push the mentioned image to docker hub✅ success
Docs checkBuilds and tests the documentation✅ success
Fast testNormally this is the first check that is ran for a PR. It builds ClickHouse and runs most of stateless functional tests, omitting some. If it fails, further checks are not started until it is fixed. Look at the report to see which tests fail, then reproduce the failure locally as described here✅ success
Flaky testsChecks if new added or modified tests are flaky by running them repeatedly, in parallel, with more randomization. Functional tests are run 100 times with address sanitizer, and additional randomization of thread scheduling. Integration tests are run up to 10 times. If at least once a new test has failed, or was too long, this check will be red. We don't allow flaky tests, read the doc✅ success
Install packagesChecks that the built packages are installable in a clear environment✅ success
Integration testsThe integration tests report. In parenthesis the package type is given, and in square brackets are the optional part/total tests✅ success
Performance ComparisonMeasure changes in query performance. The performance test report is described in detail here. In square brackets are the optional part/total tests✅ success
Stateful testsRuns stateful functional tests for ClickHouse binaries built in various configurations -- release, debug, with sanitizers, etc✅ success
Stress testRuns stateless functional tests concurrently from several clients to detect concurrency-related errors✅ success
Style checkRuns a set of checks to keep the code style clean. If some of tests failed, see the related log from the report✅ success
Unit testsRuns the unit tests for different release types✅ success
Upgrade checkRuns stress tests on server version from last release and then tries to upgrade it to the version from the PR. It checks if the new server can successfully startup without any errors, crashes or sanitizer asserts✅ success

@divanik divanik changed the title [[WIP]] Iceberg Partition Pruning for time-related partition transforms Iceberg Partition Pruning for time-related partition transforms Dec 19, 2024
@divanik divanik marked this pull request as ready for review December 19, 2024 11:59
@Avogar Avogar self-assigned this Dec 19, 2024
@divanik divanik requested a review from Avogar January 10, 2025 09:02
std::vector<size_t> partition_pruning_indices;
for (size_t i = 0; i < manifest_entry.getContent().getPartitionColumnInfos().size(); ++i)
{
std::optional<NameAndTypePair> name_and_type = schema_processor.tryGetFieldCharacteristics(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we use tryGetFieldCharacteristics? Is it's possible that schema doesn't have field with getPartitionColumnInfos()[i].source_id?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In ManifestFile.cpp::217 we use getFieldCharacteristics for the same fields. Or I am missing something?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the current schema may not include the source_id field that was present in previous manifest files because this column was deleted afterwards, for example.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's then maybe remember the list of types for partition columns in the ManifestFileContentImpl during its parsing?
Imho, the code in getDataFilesImpl ideally should look like this:

Strings IcebergMetadata::getDataFilesImpl(const ActionsDAG * filter_dag) const
{
    if (!current_snapshot)
        return {};

    if (!filter_dag && cached_unprunned_files_for_current_snapshot.has_value())
        return cached_unprunned_files_for_current_snapshot.value();

    Strings data_files;
    for (const auto & manifest_entry : current_snapshot->getManifestList().getManifestFiles())
    {
        const auto & manifest_entry_content = manifest_entry.getContent();
        const auto & partition_columns_names_and_types = manifest_entry_content.getPartitionColumnsNamesAndTypes();

        ExpressionActionsPtr partition_minmax_idx_expr
            = std::make_shared<ExpressionActions>(ActionsDAG(partition_pruning_names_and_types), ExpressionActionsSettings(getContext()));
        const KeyCondition partition_key_condition(
            filter_dag, getContext(), partition_pruning_names_and_types.getNames(), partition_minmax_idx_expr);

        const auto & data_files_in_manifest = manifest_entry_contentgetDataFiles();
        for (const auto & data_file : data_files_in_manifest)
        {
            if (data_file.status != ManifestEntryStatus::DELETED)
            {
                if (partition_key_condition.checkInHyperrectangle(data_file.partition_ranges[j], partition_pruning_names_and_types.getTypes()).can_be_true)
                    data_files.push_back(data_file.data_file_name);
                else
                    ProfileEvents::increment(ProfileEvents::IcebergPartitionPrunnedFiles);
            }
        }
    }


    if (!filter_dag)
        return (cached_unprunned_files_for_current_snapshot = data_files).value();

    return data_files;
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This proposal might not fully handle renamed columns. It’s important to ensure that renamed columns are processed correctly. The names of columns in the query's WHERE expression should comply with the current schema, not the schema that was present when the manifest file was written. Names are not a characteristic of a data file entry in manifest file. There are source ids in it and we need to match it with current schema. That's why we currently use current_schema_id here. This was the main reason I had the SpecificPartition information before refactoring, but I agree that this was an overcomplication. Also take into account that if column was logically deleted there is no chance that this column will be used in user's where query, that's why we need to skip it

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wrote the comment to state it clearer in the code

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it, thanks for the explanation. But I still want to simplify the code of getDataFilesImpl by moving some code to separate methods. What about this:
Change std::vector<DB::Range> partition_ranges to std::unordered_map<Int32, DB::Range> partition_ranges where keys are source_id inside DataFileEntry. Add method

std::vector<Int32> ManifestFileContent::getPartitionColumnsIds(const IcebergSchemaProcessor & schema_processor, Int32 current_schema_id)

That will return vector of partition column ids that are presented in the current schema.
Also add

NamesAndTypesList IcebergSchemaProcessor::getFieldsCharacteristics(Int32 schema_id, const std::vector<Int32> & source_ids)

And add

std::vector<Range> DataFileEntry::getPartitionRanges(const std::vector<Int32> & partition_columns_ids)

So the code in getDataFilesImpl will look like this:

Strings IcebergMetadata::getDataFilesImpl(const ActionsDAG * filter_dag) const
{
    if (!current_snapshot)
        return {};

    if (!filter_dag && cached_unprunned_files_for_current_snapshot.has_value())
        return cached_unprunned_files_for_current_snapshot.value();

    Strings data_files;
    for (const auto & manifest_entry : current_snapshot->getManifestList().getManifestFiles())
    {
        const auto & manifest_entry_content = manifest_entry.getContent();
        const auto & partition_columns_ids = manifest_entry_content.getPartitionColumnsIds(schema_processor, current_schema_id);
        const auto & partition_columns_names_and_types = schema_processor.getFieldsCharacteristics(current_schema_id, partition_columns_ids);

        ExpressionActionsPtr partition_minmax_idx_expr
            = std::make_shared<ExpressionActions>(ActionsDAG(partition_pruning_names_and_types), ExpressionActionsSettings(getContext()));
        const KeyCondition partition_key_condition(
            filter_dag, getContext(), partition_pruning_names_and_types.getNames(), partition_minmax_idx_expr);

        const auto & data_files_in_manifest = manifest_entry_contentgetDataFiles();
        for (const auto & data_file : data_files_in_manifest)
        {
            if (data_file.status != ManifestEntryStatus::DELETED)
            {
                if (partition_key_condition.checkInHyperrectangle(data_file.getPartitionRanges(partition_columns_ids), partition_pruning_names_and_types.getTypes()).can_be_true)
                    data_files.push_back(data_file.data_file_name);
                else
                    ProfileEvents::increment(ProfileEvents::IcebergPartitionPrunnedFiles);
            }
        }
    }


    if (!filter_dag)
        return (cached_unprunned_files_for_current_snapshot = data_files).value();

    return data_files;
}

Comment on lines +109 to +111
auto nested_data_type = column_data_type;
while (nested_data_type->isNullable())
nested_data_type = removeNullable(nested_data_type);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not possible to have multiple nested Nullable. Also removeNullable returns the type iteslf if it's not actually Nullable

Suggested change
auto nested_data_type = column_data_type;
while (nested_data_type->isNullable())
nested_data_type = removeNullable(nested_data_type);
auto nested_data_type = removeNullable(nested_data_type);

Copy link
Member Author

@divanik divanik Jan 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I will replace while with if, but if is necessary, because the type can be not nullable

Copy link
Member

@Avogar Avogar Jan 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if is not needed, because removeNullable already has this if:

DataTypePtr removeNullable(const DataTypePtr & type)
{
    if (type->isNullable())
        return static_cast<const DataTypeNullable &>(*type).getNestedType();
    return type;
}

@divanik divanik requested a review from Avogar January 17, 2025 11:13
@divanik
Copy link
Member Author

divanik commented Jan 20, 2025

01801_s3_cluster is Flaky
Logical error with ZooKeeper seems unrelated (and I saw it in other PRs)

@divanik divanik added this pull request to the merge queue Jan 20, 2025
Merged via the queue into master with commit 15fe351 Jan 20, 2025
127 checks passed
@divanik divanik deleted the divanik/add_partition_pruning branch January 20, 2025 12:02
@robot-ch-test-poll robot-ch-test-poll added the pr-synced-to-cloud The PR is synced to the cloud repo label Jan 20, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

pr-feature Pull request with new product feature pr-synced-to-cloud The PR is synced to the cloud repo

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants