A new parquet reader that supports filter push down, which improves the total time on clickbench by 50+% compared to arrow parquet reader#70611
Conversation
|
liuneng1994 seems not to be a GitHub user. You need a GitHub account to be able to sign the CLA. If you have already a GitHub account, please add the email address used for this commit to your account. You have signed the CLA already but the status is still pending? Let us recheck it. |
| /// Create a column reader for a specific physical type and target type. | ||
| /// \tparam physical_type physical type in parquet file | ||
| /// \tparam target_type target type in ClickHouse | ||
| /// \tparam dict is dictionary column or not |
There was a problem hiding this comment.
I didn't read this in detail yet, but it seems strange to have separate readers for dict and non-dict cases. Column chunk may have some pages encoded with dictionary, then some pages encoded without dictionary (parquet writer falls back to non-dict encoding if the dictionary gets too big). So the dict = true reader must be able to switch to non-dict encoding on the fly. So why not use the same reader for both cases? If dict = false, initialize the reader in a state as if it already switched from dict to non-dict encoding.
|
This is probably outside the scope of this PR, but I think there's a better way to do PREWHERE-like prefiltering and
Each of these would be pipelined across row groups, with a low and high watermark. The dependencies are simple: next stage for a row group can begin when the previous stage completed for that row group (but we won't necessarily begin one row group at a time; for small row groups we'll wait for a few row groups to be ready and schedule them as one task, based a target byte size). (As a slight generalization, there can be multiple filtering stages: first by the smallest and most promising filter columns, then by other filter columns, then read main data. This hopefully won't be needed.) (Another consideration is that sometimes when reading filter byte ranges we'll also read some data along the way, if gaps between ranges are small enough that we decided to read instead of seeking. Would be nice to preserve and reuse that read data in later stages instead of reading again. But other times we would rather deallocate it to free up more memory for later stages. Not sure how exactly to handle that.) Maybe I'll make a native reader v3 at some point :) . This is speculative though, please finish this PR too. (I previously assumed that this PR would have close to optimal performance and would need at most need incremental optimizations/simplifications. But now that I wrote down the above idea, it sounds very different from this PR, and potentially significantly faster and simpler (though it's likely that I missed something and it won't work). Probably faster to prototype it from scratch than modifying reader v2.) |
| else if constexpr (std::is_same_v<T, Int16> || std::is_same_v<T, UInt16>) | ||
| sets.set(i, filter->testInt16(start[count])); |
There was a problem hiding this comment.
This case is different between computeRowSetPlainSpace and computeRowSetPlain. Unintended?
| if constexpr (std::is_same_v<T, Int64>) | ||
| sets.set(i, filter->testInt64(start[count])); | ||
| else if constexpr (std::is_same_v<T, DateTime64>) | ||
| sets.set(i, filter->testInt64(reinterpret_cast<const Int64 *>(start)[count])); | ||
| else if constexpr (std::is_same_v<T, Int32>) | ||
| sets.set(i, filter->testInt32(start[count])); | ||
| else if constexpr (std::is_same_v<T, Int16> || std::is_same_v<T, UInt16>) | ||
| sets.set(i, filter->testInt16(start[count])); | ||
| else if constexpr (std::is_same_v<T, Float32>) | ||
| sets.set(i, filter->testFloat32(start[count])); | ||
| else if constexpr (std::is_same_v<T, Float64>) | ||
| sets.set(i, filter->testFloat64(start[count])); | ||
| else | ||
| throw Exception(ErrorCodes::PARQUET_EXCEPTION, "unsupported type {} on filter {}", typeid(T).name(), filter->toString()); |
There was a problem hiding this comment.
Can be extracted into a function and reused between computeRowSetPlainSpace and computeRowSetPlain?
| /// read next page | ||
| void readAndDecodePage() |
There was a problem hiding this comment.
| /// read next page | |
| void readAndDecodePage() | |
| /// read next page if there are no rows left in current page | |
| void readAndDecodePageIfNeeded() |
| { | ||
| throw Exception(ErrorCodes::LOGICAL_ERROR, "Page exhausted"); | ||
| } | ||
| decodePage(); |
There was a problem hiding this comment.
| decodePage(); | |
| decodePageIfNeeded(); |
| /// levels offset in current page | ||
| virtual size_t levelsOffset() const { return state.offsets.levels_offset; } | ||
| virtual size_t availableRows() const { return std::max(state.offsets.remain_rows - state.lazy_skip_rows, 0UL); } | ||
| /// for nested type, 因为不同的列的page大小不同,需要获得当前reader最小可用的level数 |
This is a very professional suggestion. I originally made this reader hoping to have a more efficient single-threaded reader. I did not consider the parallel issues too much and only did some parallel processing on IO. |
|
Dear @al13n321, this PR hasn't been updated for a while. You will be unassigned. Will you continue working on it? If so, please feel free to reassign yourself. |
a1639e5 to
30912ab
Compare
|
I'm working on another parquet reader: #78380 (only the last 3 commits in that PR are relevant). Current plan is to merge both this PR ("v2") and that PR ("v3"), then see if one is pareto-better than the other and maybe remove one (and also remove v1). The number of parquet readers in the codebase is getting kind of ridiculous, we should do something to separate them more cleanly and make them easy to remove. I'm thinking:
(I was also hoping that some v3 things would be easy to reuse in v2, but that doesn't seem to be the case. V3 integrates with prewhere, and will probably use KeyCondition for all non-prewhere filtering instead of re-analyzing the ActionsDAG; but that doesn't seem to fit v2 because v2 wants to split all conditions by columns, so it would need to pointlessly reimplement the splitting for KeyCondition and prewhere separately. V3 has some complicated fine-grained task scheduling and prefetching to extract ~all available parallelism and to avoid prefetching data before filters are applied; it doesn't seem feasible to reuse at all.) Please make these changes, then we can get this merged and start benchmarking (and hopefully v3 will be ready soon so we can benchmark everything at once). You don't have to address all the nitpicks from review comments, up to you. |
Use #80931 to get access to ActionsDAG from ParquetBlockInputFormat. |
|
@liuneng1994, thank you so much for the contribution, which provided the basis of the currently merged implementation! |
A parquet reader with native support for filter push down.
Changelog category (leave one):
Changelog entry (a user-readable short description of the changes that goes to CHANGELOG.md):
A parquet reader with native support for filter push down.
Documentation entry for user-facing changes
CI Settings (Only check the boxes if you know what you are doing):