Skip to content

A new parquet reader that supports filter push down, which improves the total time on clickbench by 50+% compared to arrow parquet reader#70611

Closed
liuneng1994 wants to merge 96 commits intoClickHouse:masterfrom
liuneng1994:parquet-filter-pushdown
Closed

A new parquet reader that supports filter push down, which improves the total time on clickbench by 50+% compared to arrow parquet reader#70611
liuneng1994 wants to merge 96 commits intoClickHouse:masterfrom
liuneng1994:parquet-filter-pushdown

Conversation

@liuneng1994
Copy link
Contributor

@liuneng1994 liuneng1994 commented Oct 14, 2024

A parquet reader with native support for filter push down.

  • Supports most commonly used data types, and supports automatic conversion for derived types
  • Supports nested type reading
  • Supports push-down of common expressions
  • Supports direct calculation and filtering of some expressions on the decompression buffer
  • Significantly improves performance by delaying materialization of non-conditional columns
  • Dynamically skips page decompression based on the result of conditional expressions at runtime

Changelog category (leave one):

  • Experimental Feature

Changelog entry (a user-readable short description of the changes that goes to CHANGELOG.md):

A parquet reader with native support for filter push down.

Documentation entry for user-facing changes

  • Documentation is written (mandatory for new features)

Information about CI checks: https://clickhouse.com/docs/en/development/continuous-integration/

CI Settings (Only check the boxes if you know what you are doing):

  • Allow: All Required Checks
  • Allow: Stateless tests
  • Allow: Stateful tests
  • Allow: Integration Tests
  • Allow: Performance tests
  • Allow: All Builds
  • Allow: batch 1, 2 for multi-batch jobs
  • Allow: batch 3, 4, 5, 6 for multi-batch jobs

  • Exclude: Style check
  • Exclude: Fast test
  • Exclude: All with ASAN
  • Exclude: All with TSAN, MSAN, UBSAN, Coverage
  • Exclude: All with aarch64, release, debug

  • Run only fuzzers related jobs (libFuzzer fuzzers, AST fuzzers, etc.)
  • Exclude: AST fuzzers

  • Do not test
  • Woolen Wolfdog
  • Upload binaries for special builds
  • Disable merge-commit
  • Disable CI cache

@CLAassistant
Copy link

CLAassistant commented Oct 14, 2024

CLA assistant check
Thank you for your submission! We really appreciate it. Like many open source projects, we ask that you all sign our Contributor License Agreement before we can accept your contribution.
1 out of 2 committers have signed the CLA.

✅ yxheartipp
❌ liuneng1994


liuneng1994 seems not to be a GitHub user. You need a GitHub account to be able to sign the CLA. If you have already a GitHub account, please add the email address used for this commit to your account.
You have signed the CLA already but the status is still pending? Let us recheck it.

/// Create a column reader for a specific physical type and target type.
/// \tparam physical_type physical type in parquet file
/// \tparam target_type target type in ClickHouse
/// \tparam dict is dictionary column or not
Copy link
Member

@al13n321 al13n321 Mar 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't read this in detail yet, but it seems strange to have separate readers for dict and non-dict cases. Column chunk may have some pages encoded with dictionary, then some pages encoded without dictionary (parquet writer falls back to non-dict encoding if the dictionary gets too big). So the dict = true reader must be able to switch to non-dict encoding on the fly. So why not use the same reader for both cases? If dict = false, initialize the reader in a state as if it already switched from dict to non-dict encoding.

@al13n321
Copy link
Member

al13n321 commented Mar 11, 2025

This is probably outside the scope of this PR, but I think there's a better way to do PREWHERE-like prefiltering and input_format_parquet_bloom_filter_push_down. Especially when reading from network (rather than local file), especially with high latency. It seems that the best read strategy would be to read in 4 stages, pipelined as 4 sliding windows of tasks running in parallel (with some dependencies between them):

  1. Filter prefetch: read file byte ranges corresponding to bloom filters and/or the smallest/most-promising filter columns (similar to PREWHERE). Runs on IO threads with very high parallelism (important when reading from network), for lots of row groups at once, because these filters/columns are small and don't take a lot of memory per row group.
  2. Decompress and decode those bloom filters and filter columns. Evaluate filter expressions and produce filter masks. On non-IO threads, with parallelism matching the number of cores. One task per row group is ok, no need to parallelize by individual columns. The resulting filter masks are small (especially if we bit-pack them), so we can afford these tasks to run far ahead of the main data reading, usually all the way to the end of the file (for good read size estimate for the progress bar).
  3. Main data prefetch: read file byte ranges corresponding to the remaining columns. Use filter masks from previous step and page offset index to determine byte ranges to read. That's why this is a separate step from filtering - we want to know tight read ranges before we start reading. Runs on IO threads, with medium parallelism. Can't have very high parallelism because this takes a lot of memory. But higher than decompression+decoding because data is still compressed at this stage. Can read byte ranges within the same row group in parallel, can merge multiple small row groups into one byte range.
  4. Decompress and decode the remaining columns. Runs on non-IO threads, with parallelism matching the number of cores, or limited by memory. Parallelized at the level of primitive columns, not row groups (this is a problem for the current reader that can use at most one thread per row group, but we often don't have enough memory for many row groups) (this is another reason to build filter masks separately - it allows reading columns independently from each other). Composite columns like tuples can be reassembled at the very end before returning the block from ParquetBlockInputFormat.

Each of these would be pipelined across row groups, with a low and high watermark. The dependencies are simple: next stage for a row group can begin when the previous stage completed for that row group (but we won't necessarily begin one row group at a time; for small row groups we'll wait for a few row groups to be ready and schedule them as one task, based a target byte size).

(As a slight generalization, there can be multiple filtering stages: first by the smallest and most promising filter columns, then by other filter columns, then read main data. This hopefully won't be needed.)

(Another consideration is that sometimes when reading filter byte ranges we'll also read some data along the way, if gaps between ranges are small enough that we decided to read instead of seeking. Would be nice to preserve and reuse that read data in later stages instead of reading again. But other times we would rather deallocate it to free up more memory for later stages. Not sure how exactly to handle that.)

Maybe I'll make a native reader v3 at some point :) . This is speculative though, please finish this PR too.

(I previously assumed that this PR would have close to optimal performance and would need at most need incremental optimizations/simplifications. But now that I wrote down the above idea, it sounds very different from this PR, and potentially significantly faster and simpler (though it's likely that I missed something and it won't work). Probably faster to prototype it from scratch than modifying reader v2.)

Comment on lines +141 to +142
else if constexpr (std::is_same_v<T, Int16> || std::is_same_v<T, UInt16>)
sets.set(i, filter->testInt16(start[count]));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This case is different between computeRowSetPlainSpace and computeRowSetPlain. Unintended?

Comment on lines +135 to +148
if constexpr (std::is_same_v<T, Int64>)
sets.set(i, filter->testInt64(start[count]));
else if constexpr (std::is_same_v<T, DateTime64>)
sets.set(i, filter->testInt64(reinterpret_cast<const Int64 *>(start)[count]));
else if constexpr (std::is_same_v<T, Int32>)
sets.set(i, filter->testInt32(start[count]));
else if constexpr (std::is_same_v<T, Int16> || std::is_same_v<T, UInt16>)
sets.set(i, filter->testInt16(start[count]));
else if constexpr (std::is_same_v<T, Float32>)
sets.set(i, filter->testFloat32(start[count]));
else if constexpr (std::is_same_v<T, Float64>)
sets.set(i, filter->testFloat64(start[count]));
else
throw Exception(ErrorCodes::PARQUET_EXCEPTION, "unsupported type {} on filter {}", typeid(T).name(), filter->toString());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can be extracted into a function and reused between computeRowSetPlainSpace and computeRowSetPlain?

Comment on lines +226 to +227
/// read next page
void readAndDecodePage()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// read next page
void readAndDecodePage()
/// read next page if there are no rows left in current page
void readAndDecodePageIfNeeded()

{
throw Exception(ErrorCodes::LOGICAL_ERROR, "Page exhausted");
}
decodePage();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
decodePage();
decodePageIfNeeded();

/// levels offset in current page
virtual size_t levelsOffset() const { return state.offsets.levels_offset; }
virtual size_t availableRows() const { return std::max(state.offsets.remain_rows - state.lazy_skip_rows, 0UL); }
/// for nested type, 因为不同的列的page大小不同,需要获得当前reader最小可用的level数
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

niepoprawny język :)

@liuneng1994
Copy link
Contributor Author

This is probably outside the scope of this PR, but I think there's a better way to do PREWHERE-like prefiltering and input_format_parquet_bloom_filter_push_down. Especially when reading from network (rather than local file), especially with high latency. It seems that the best read strategy would be to read in 4 stages, pipelined as 4 sliding windows of tasks running in parallel (with some dependencies between them):

  1. Filter prefetch: read file byte ranges corresponding to bloom filters and/or the smallest/most-promising filter columns (similar to PREWHERE). Runs on IO threads with very high parallelism (important when reading from network), for lots of row groups at once, because these filters/columns are small and don't take a lot of memory per row group.
  2. Decompress and decode those bloom filters and filter columns. Evaluate filter expressions and produce filter masks. On non-IO threads, with parallelism matching the number of cores. One task per row group is ok, no need to parallelize by individual columns. The resulting filter masks are small (especially if we bit-pack them), so we can afford these tasks to run far ahead of the main data reading, usually all the way to the end of the file (for good read size estimate for the progress bar).
  3. Main data prefetch: read file byte ranges corresponding to the remaining columns. Use filter masks from previous step and page offset index to determine byte ranges to read. That's why this is a separate step from filtering - we want to know tight read ranges before we start reading. Runs on IO threads, with medium parallelism. Can't have very high parallelism because this takes a lot of memory. But higher than decompression+decoding because data is still compressed at this stage. Can read byte ranges within the same row group in parallel, can merge multiple small row groups into one byte range.
  4. Decompress and decode the remaining columns. Runs on non-IO threads, with parallelism matching the number of cores, or limited by memory. Parallelized at the level of primitive columns, not row groups (this is a problem for the current reader that can use at most one thread per row group, but we often don't have enough memory for many row groups) (this is another reason to build filter masks separately - it allows reading columns independently from each other). Composite columns like tuples can be reassembled at the very end before returning the block from ParquetBlockInputFormat.

Each of these would be pipelined across row groups, with a low and high watermark. The dependencies are simple: next stage for a row group can begin when the previous stage completed for that row group (but we won't necessarily begin one row group at a time; for small row groups we'll wait for a few row groups to be ready and schedule them as one task, based a target byte size).

(As a slight generalization, there can be multiple filtering stages: first by the smallest and most promising filter columns, then by other filter columns, then read main data. This hopefully won't be needed.)

(Another consideration is that sometimes when reading filter byte ranges we'll also read some data along the way, if gaps between ranges are small enough that we decided to read instead of seeking. Would be nice to preserve and reuse that read data in later stages instead of reading again. But other times we would rather deallocate it to free up more memory for later stages. Not sure how exactly to handle that.)

Maybe I'll make a native reader v3 at some point :) . This is speculative though, please finish this PR too.

(I previously assumed that this PR would have close to optimal performance and would need at most need incremental optimizations/simplifications. But now that I wrote down the above idea, it sounds very different from this PR, and potentially significantly faster and simpler (though it's likely that I missed something and it won't work). Probably faster to prototype it from scratch than modifying reader v2.)

This is a very professional suggestion. I originally made this reader hoping to have a more efficient single-threaded reader. I did not consider the parallel issues too much and only did some parallel processing on IO.

@clickhouse-gh
Copy link
Contributor

clickhouse-gh bot commented Apr 22, 2025

Dear @al13n321, this PR hasn't been updated for a while. You will be unassigned. Will you continue working on it? If so, please feel free to reassign yourself.

@yxheartipp yxheartipp force-pushed the parquet-filter-pushdown branch from a1639e5 to 30912ab Compare May 21, 2025 08:30
@al13n321
Copy link
Member

al13n321 commented May 22, 2025

I'm working on another parquet reader: #78380 (only the last 3 commits in that PR are relevant). Current plan is to merge both this PR ("v2") and that PR ("v3"), then see if one is pareto-better than the other and maybe remove one (and also remove v1).

The number of parquet readers in the codebase is getting kind of ridiculous, we should do something to separate them more cleanly and make them easy to remove. I'm thinking:

  • Move the writer, native readers v1, v2 (yours), and v3 (mine) to 4 subdirectories of Formats/Impl/Parquet. Please move the v2 into a directory in this PR, then I'll send a separate PR to move the existing files (writer and v1).
  • Don't add parquet-specific things to KeyCondition. Looks like the KeyCondition always comes from a call to SourceWithKeyCondition::setKeyCondition(const std::optional<ActionsDAG>, ContextPtr). So maybe we can just remove the overload setKeyCondition(const std::shared_ptr<const KeyCondition>) and make SourceWithKeyCondition store an ActionsDAG instead of KeyCondition? The format can create KeyCondition itself. Please try this. (Ideally we would also do something to reuse the KeyCondition/ColumnFilterHelper when reading many files in one query; I'm working on it, will try to unify it with Utilizing a Shared Parsing Pool for Multiple Parquet Streams #66253 as well.) Use Refactor how IInputFormat deals with reading many files at once #80931 instead.

(I was also hoping that some v3 things would be easy to reuse in v2, but that doesn't seem to be the case. V3 integrates with prewhere, and will probably use KeyCondition for all non-prewhere filtering instead of re-analyzing the ActionsDAG; but that doesn't seem to fit v2 because v2 wants to split all conditions by columns, so it would need to pointlessly reimplement the splitting for KeyCondition and prewhere separately. V3 has some complicated fine-grained task scheduling and prefetching to extract ~all available parallelism and to avoid prefetching data before filters are applied; it doesn't seem feasible to reuse at all.)

Please make these changes, then we can get this merged and start benchmarking (and hopefully v3 will be ready soon so we can benchmark everything at once). You don't have to address all the nitpicks from review comments, up to you.

@al13n321
Copy link
Member

Don't add parquet-specific things to KeyCondition.

Use #80931 to get access to ActionsDAG from ParquetBlockInputFormat.

@alexey-milovidov
Copy link
Member

@liuneng1994, thank you so much for the contribution, which provided the basis of the currently merged implementation!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

can be tested Allows running workflows for external contributors pr-experimental Experimental Feature submodule changed At least one submodule changed in this PR.

Projects

None yet

Development

Successfully merging this pull request may close these issues.