Iceberg Partition Pruning for time-related partition transforms#72044
Iceberg Partition Pruning for time-related partition transforms#72044
Conversation
|
This is an automated comment for commit 63ae661 with description of existing statuses. It's updated for the latest CI running ❌ Click here to open a full report in a separate page
Successful checks
|
…k/add_partition_pruning
| std::vector<size_t> partition_pruning_indices; | ||
| for (size_t i = 0; i < manifest_entry.getContent().getPartitionColumnInfos().size(); ++i) | ||
| { | ||
| std::optional<NameAndTypePair> name_and_type = schema_processor.tryGetFieldCharacteristics( |
There was a problem hiding this comment.
Why do we use tryGetFieldCharacteristics? Is it's possible that schema doesn't have field with getPartitionColumnInfos()[i].source_id?
There was a problem hiding this comment.
In ManifestFile.cpp::217 we use getFieldCharacteristics for the same fields. Or I am missing something?
There was a problem hiding this comment.
Yes, the current schema may not include the source_id field that was present in previous manifest files because this column was deleted afterwards, for example.
There was a problem hiding this comment.
Let's then maybe remember the list of types for partition columns in the ManifestFileContentImpl during its parsing?
Imho, the code in getDataFilesImpl ideally should look like this:
Strings IcebergMetadata::getDataFilesImpl(const ActionsDAG * filter_dag) const
{
if (!current_snapshot)
return {};
if (!filter_dag && cached_unprunned_files_for_current_snapshot.has_value())
return cached_unprunned_files_for_current_snapshot.value();
Strings data_files;
for (const auto & manifest_entry : current_snapshot->getManifestList().getManifestFiles())
{
const auto & manifest_entry_content = manifest_entry.getContent();
const auto & partition_columns_names_and_types = manifest_entry_content.getPartitionColumnsNamesAndTypes();
ExpressionActionsPtr partition_minmax_idx_expr
= std::make_shared<ExpressionActions>(ActionsDAG(partition_pruning_names_and_types), ExpressionActionsSettings(getContext()));
const KeyCondition partition_key_condition(
filter_dag, getContext(), partition_pruning_names_and_types.getNames(), partition_minmax_idx_expr);
const auto & data_files_in_manifest = manifest_entry_contentgetDataFiles();
for (const auto & data_file : data_files_in_manifest)
{
if (data_file.status != ManifestEntryStatus::DELETED)
{
if (partition_key_condition.checkInHyperrectangle(data_file.partition_ranges[j], partition_pruning_names_and_types.getTypes()).can_be_true)
data_files.push_back(data_file.data_file_name);
else
ProfileEvents::increment(ProfileEvents::IcebergPartitionPrunnedFiles);
}
}
}
if (!filter_dag)
return (cached_unprunned_files_for_current_snapshot = data_files).value();
return data_files;
}There was a problem hiding this comment.
This proposal might not fully handle renamed columns. It’s important to ensure that renamed columns are processed correctly. The names of columns in the query's WHERE expression should comply with the current schema, not the schema that was present when the manifest file was written. Names are not a characteristic of a data file entry in manifest file. There are source ids in it and we need to match it with current schema. That's why we currently use current_schema_id here. This was the main reason I had the SpecificPartition information before refactoring, but I agree that this was an overcomplication. Also take into account that if column was logically deleted there is no chance that this column will be used in user's where query, that's why we need to skip it
There was a problem hiding this comment.
I wrote the comment to state it clearer in the code
There was a problem hiding this comment.
Got it, thanks for the explanation. But I still want to simplify the code of getDataFilesImpl by moving some code to separate methods. What about this:
Change std::vector<DB::Range> partition_ranges to std::unordered_map<Int32, DB::Range> partition_ranges where keys are source_id inside DataFileEntry. Add method
std::vector<Int32> ManifestFileContent::getPartitionColumnsIds(const IcebergSchemaProcessor & schema_processor, Int32 current_schema_id)That will return vector of partition column ids that are presented in the current schema.
Also add
NamesAndTypesList IcebergSchemaProcessor::getFieldsCharacteristics(Int32 schema_id, const std::vector<Int32> & source_ids)And add
std::vector<Range> DataFileEntry::getPartitionRanges(const std::vector<Int32> & partition_columns_ids)So the code in getDataFilesImpl will look like this:
Strings IcebergMetadata::getDataFilesImpl(const ActionsDAG * filter_dag) const
{
if (!current_snapshot)
return {};
if (!filter_dag && cached_unprunned_files_for_current_snapshot.has_value())
return cached_unprunned_files_for_current_snapshot.value();
Strings data_files;
for (const auto & manifest_entry : current_snapshot->getManifestList().getManifestFiles())
{
const auto & manifest_entry_content = manifest_entry.getContent();
const auto & partition_columns_ids = manifest_entry_content.getPartitionColumnsIds(schema_processor, current_schema_id);
const auto & partition_columns_names_and_types = schema_processor.getFieldsCharacteristics(current_schema_id, partition_columns_ids);
ExpressionActionsPtr partition_minmax_idx_expr
= std::make_shared<ExpressionActions>(ActionsDAG(partition_pruning_names_and_types), ExpressionActionsSettings(getContext()));
const KeyCondition partition_key_condition(
filter_dag, getContext(), partition_pruning_names_and_types.getNames(), partition_minmax_idx_expr);
const auto & data_files_in_manifest = manifest_entry_contentgetDataFiles();
for (const auto & data_file : data_files_in_manifest)
{
if (data_file.status != ManifestEntryStatus::DELETED)
{
if (partition_key_condition.checkInHyperrectangle(data_file.getPartitionRanges(partition_columns_ids), partition_pruning_names_and_types.getTypes()).can_be_true)
data_files.push_back(data_file.data_file_name);
else
ProfileEvents::increment(ProfileEvents::IcebergPartitionPrunnedFiles);
}
}
}
if (!filter_dag)
return (cached_unprunned_files_for_current_snapshot = data_files).value();
return data_files;
}
src/Storages/ObjectStorage/DataLakes/Iceberg/PartitionPruning.cpp
Outdated
Show resolved
Hide resolved
src/Storages/ObjectStorage/DataLakes/Iceberg/PartitionPruning.cpp
Outdated
Show resolved
Hide resolved
src/Storages/ObjectStorage/DataLakes/Iceberg/PartitionPruning.cpp
Outdated
Show resolved
Hide resolved
src/Storages/ObjectStorage/DataLakes/Iceberg/PartitionPruning.cpp
Outdated
Show resolved
Hide resolved
| auto nested_data_type = column_data_type; | ||
| while (nested_data_type->isNullable()) | ||
| nested_data_type = removeNullable(nested_data_type); |
There was a problem hiding this comment.
It's not possible to have multiple nested Nullable. Also removeNullable returns the type iteslf if it's not actually Nullable
| auto nested_data_type = column_data_type; | |
| while (nested_data_type->isNullable()) | |
| nested_data_type = removeNullable(nested_data_type); | |
| auto nested_data_type = removeNullable(nested_data_type); |
There was a problem hiding this comment.
Ok, I will replace while with if, but if is necessary, because the type can be not nullable
There was a problem hiding this comment.
if is not needed, because removeNullable already has this if:
DataTypePtr removeNullable(const DataTypePtr & type)
{
if (type->isNullable())
return static_cast<const DataTypeNullable &>(*type).getNestedType();
return type;
}
src/Storages/ObjectStorage/DataLakes/Iceberg/PartitionPruning.cpp
Outdated
Show resolved
Hide resolved
…k/add_partition_pruning
|
01801_s3_cluster is Flaky |
Changelog category (leave one):
Changelog entry (a user-readable short description of the changes that goes to CHANGELOG.md):
Implement Iceberg tables partition pruning for time-related transform partition operations in Iceberg
Documentation entry for user-facing changes
CI Settings (Only check the boxes if you know what you are doing):