Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
e27648e
allow out-of-order buckets
nickitat Mar 20, 2025
0589563
fix
nickitat Mar 20, 2025
5ca4175
stash
nickitat Mar 21, 2025
6a8dc12
stash
nickitat Mar 25, 2025
4a40ff4
stash
nickitat May 8, 2025
28ca900
stash
nickitat May 8, 2025
1551795
stash
nickitat May 8, 2025
c596f4c
stash
nickitat May 10, 2025
e8f1370
better
nickitat May 10, 2025
362c78e
better
nickitat May 10, 2025
b64ac87
stash
nickitat May 11, 2025
e3509f1
stash
nickitat May 11, 2025
3de0c0a
stash
nickitat May 11, 2025
277f1bf
rename setting
nickitat May 11, 2025
aeb6f88
Merge branch 'master' into out_of_order_buckets
nickitat May 13, 2025
e5b3512
fix test
nickitat May 26, 2025
e1ec824
fix test
nickitat May 26, 2025
cfc391a
Merge branch 'master' into out_of_order_buckets
nickitat May 27, 2025
f085975
fix test
nickitat May 27, 2025
337d9b2
better
nickitat May 27, 2025
0062e33
mark test as long
nickitat May 28, 2025
5aac074
Merge branch 'master' into out_of_order_buckets
nickitat Jun 1, 2025
0efa944
add perf test
nickitat Jun 3, 2025
383541a
Merge branch 'master' into out_of_order_buckets
nickitat Jun 4, 2025
820b944
fix test
nickitat Jun 5, 2025
6ec8781
Merge branch 'master' into out_of_order_buckets
nickitat Jun 10, 2025
f88dfce
Merge branch 'master' into out_of_order_buckets
nickitat Jul 10, 2025
02d46be
fix tidy
nickitat Jul 11, 2025
0509172
Merge branch 'master' into out_of_order_buckets
nickitat Jul 30, 2025
d4233dd
rework test
nickitat Jul 30, 2025
34bb93f
extend comment
nickitat Jul 30, 2025
407e14d
rm redundant setting
nickitat Jul 30, 2025
41d671c
upd settings changes
nickitat Jul 30, 2025
6c1a94f
upd test
nickitat Jul 31, 2025
fabe10c
Apply suggestion from @nickitat
nickitat Aug 5, 2025
3dedea0
Merge branch 'master' into out_of_order_buckets
nickitat Aug 8, 2025
9e7d540
Merge branch 'master' into out_of_order_buckets
nickitat Aug 11, 2025
e099aea
Merge branch 'master' into out_of_order_buckets
nickitat Aug 12, 2025
f29d418
Merge branch 'master' into out_of_order_buckets
nickitat Aug 12, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 13 additions & 9 deletions src/Core/BlockInfo.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,15 @@ namespace ErrorCodes


/// Write values in binary form. NOTE: You could use protobuf, but it would be overkill for this case.
void BlockInfo::write(WriteBuffer & out) const
void BlockInfo::write(WriteBuffer & out, UInt64 server_protocol_revision) const
{
/// Set of pairs `FIELD_NUM`, value in binary form. Then 0.
#define WRITE_FIELD(TYPE, NAME, DEFAULT, FIELD_NUM) \
writeVarUInt(FIELD_NUM, out); \
writeBinary(NAME, out);
#define WRITE_FIELD(TYPE, NAME, DEFAULT, FIELD_NUM, MIN_PROTOCOL_REVISION) \
if (server_protocol_revision >= (MIN_PROTOCOL_REVISION)) \
{ \
writeVarUInt(FIELD_NUM, out); \
writeBinary(NAME, out); \
}

APPLY_FOR_BLOCK_INFO_FIELDS(WRITE_FIELD)

Expand All @@ -32,7 +35,7 @@ void BlockInfo::write(WriteBuffer & out) const
}

/// Read values in binary form.
void BlockInfo::read(ReadBuffer & in)
void BlockInfo::read(ReadBuffer & in, UInt64 client_protocol_revision)
{
UInt64 field_num = 0;

Expand All @@ -44,10 +47,11 @@ void BlockInfo::read(ReadBuffer & in)

switch (field_num)
{
#define READ_FIELD(TYPE, NAME, DEFAULT, FIELD_NUM) \
case FIELD_NUM: \
readBinary(NAME, in); \
break;
#define READ_FIELD(TYPE, NAME, DEFAULT, FIELD_NUM, MIN_PROTOCOL_REVISION) \
case FIELD_NUM: \
if (client_protocol_revision >= (MIN_PROTOCOL_REVISION)) \
readBinary(NAME, in); \
break;

APPLY_FOR_BLOCK_INFO_FIELDS(READ_FIELD)

Expand Down
23 changes: 16 additions & 7 deletions src/Core/BlockInfo.h
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
#pragma once

#include <Core/ProtocolDefines.h>
#include <base/types.h>

#include <vector>

namespace DB
{

Expand All @@ -14,7 +17,7 @@ struct BlockInfo
{
/** is_overflows:
* After running GROUP BY ... WITH TOTALS with the max_rows_to_group_by and group_by_overflow_mode = 'any' settings,
* a row is inserted in the separate block with aggregated values that have not passed max_rows_to_group_by.
* a row is inserted in the separate block with aggregated values that have not passed max_rows_to_group_by.
* If it is such a block, then is_overflows is set to true for it.
*/

Expand All @@ -24,22 +27,28 @@ struct BlockInfo
* Otherwise -1.
*/

#define APPLY_FOR_BLOCK_INFO_FIELDS(M) \
M(bool, is_overflows, false, 1) \
M(Int32, bucket_num, -1, 2)
/** out_of_order_buckets:
* List of id-s of buckets delayed by `ConvertingAggregatedToChunksTransform` on the current node.
* Please refer to the comment in `ConvertingAggregatedToChunksTransform` for more details.
*/

#define APPLY_FOR_BLOCK_INFO_FIELDS(M) \
M(bool, is_overflows, false, 1, 0) \
M(Int32, bucket_num, -1, 2, 0) \
M(std::vector<Int32>, out_of_order_buckets, {}, 3, DBMS_MIN_REVISION_WITH_OUT_OF_ORDER_BUCKETS_IN_AGGREGATION)

#define DECLARE_FIELD(TYPE, NAME, DEFAULT, FIELD_NUM) \
#define DECLARE_FIELD(TYPE, NAME, DEFAULT, FIELD_NUM, MIN_PROTOCOL_REVISION) \
TYPE NAME = DEFAULT;

APPLY_FOR_BLOCK_INFO_FIELDS(DECLARE_FIELD)

#undef DECLARE_FIELD

/// Write the values in binary form. NOTE: You could use protobuf, but it would be overkill for this case.
void write(WriteBuffer & out) const;
void write(WriteBuffer & out, UInt64 server_protocol_revision) const;

/// Read the values in binary form.
void read(ReadBuffer & in);
void read(ReadBuffer & in, UInt64 client_protocol_revision);
};

}
4 changes: 3 additions & 1 deletion src/Core/ProtocolDefines.h
Original file line number Diff line number Diff line change
Expand Up @@ -112,12 +112,14 @@ static constexpr auto DBMS_MIN_REVISON_WITH_PARALLEL_BLOCK_MARSHALLING = 54478;

static constexpr auto DBMS_MIN_REVISION_WITH_VERSIONED_CLUSTER_FUNCTION_PROTOCOL = 54479;

static constexpr auto DBMS_MIN_REVISION_WITH_OUT_OF_ORDER_BUCKETS_IN_AGGREGATION = 54480;

/// Version of ClickHouse TCP protocol.
///
/// Should be incremented manually on protocol changes.
///
/// NOTE: DBMS_TCP_PROTOCOL_VERSION has nothing common with VERSION_REVISION,
/// later is just a number for server version (one number instead of commit SHA)
/// for simplicity (sometimes it may be more convenient in some use cases).
static constexpr auto DBMS_TCP_PROTOCOL_VERSION = 54479;
static constexpr auto DBMS_TCP_PROTOCOL_VERSION = 54480;
}
5 changes: 5 additions & 0 deletions src/Core/Settings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6770,6 +6770,11 @@ When the query prioritization mechanism is employed (see setting `priority`), lo
)", BETA) \
DECLARE(Float, min_os_cpu_wait_time_ratio_to_throw, 0.0, "Min ratio between OS CPU wait (OSCPUWaitMicroseconds metric) and busy (OSCPUVirtualTimeMicroseconds metric) times to consider rejecting queries. Linear interpolation between min and max ratio is used to calculate the probability, the probability is 0 at this point.", 0) \
DECLARE(Float, max_os_cpu_wait_time_ratio_to_throw, 0.0, "Max ratio between OS CPU wait (OSCPUWaitMicroseconds metric) and busy (OSCPUVirtualTimeMicroseconds metric) times to consider rejecting queries. Linear interpolation between min and max ratio is used to calculate the probability, the probability is 1 at this point.", 0) \
DECLARE(Bool, enable_producing_buckets_out_of_order_in_aggregation, true, R"(
Allow memory-efficient aggregation (see `distributed_aggregation_memory_efficient`) to produce buckets out of order.
It may improve performance when aggregation bucket sizes are skewed by letting a replica to send buckets with higher id-s to the initiator while it is still processing some heavy buckets with lower id-s.
The downside is potentially higher memory usage.
)", 0) \
DECLARE(Bool, enable_parallel_blocks_marshalling, true, "Affects only distributed queries. If enabled, blocks will be (de)serialized and (de)compressed on pipeline threads (i.e. with higher parallelism that what we have by default) before/after sending to the initiator.", 0) \
DECLARE(UInt64, min_outstreams_per_resize_after_split, 24, R"(
Specifies the minimum number of output streams of a `Resize` or `StrictResize` processor after the split is performed during pipeline generation. If the resulting number of streams is less than this value, the split operation will not occur.
Expand Down
1 change: 1 addition & 0 deletions src/Core/SettingsChangesHistory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
{"backup_slow_all_threads_after_retryable_s3_error", true, true, "New setting"},
{"iceberg_metadata_compression_method", "", "", "New setting"},
{"allow_experimental_correlated_subqueries", false, true, "Mark correlated subqueries support as Beta."},
{"enable_producing_buckets_out_of_order_in_aggregation", false, true, "New setting"},
});
addSettingsChanges(settings_changes_history, "25.7",
{
Expand Down
2 changes: 1 addition & 1 deletion src/Formats/NativeReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ Block NativeReader::read()

/// Additional information about the block.
if (server_revision > 0)
res.info.read(istr);
res.info.read(istr, server_revision);

/// Dimensions
size_t columns = 0;
Expand Down
2 changes: 1 addition & 1 deletion src/Formats/NativeWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ size_t NativeWriter::write(const Block & block)

/// Additional information about the block.
if (client_revision > 0)
block.info.write(ostr);
block.info.write(ostr, client_revision);

block.checkNumberOfRows();

Expand Down
4 changes: 3 additions & 1 deletion src/Interpreters/Aggregator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,8 @@ Aggregator::Params::Params(
bool only_merge_, // true for projections
bool optimize_group_by_constant_keys_,
float min_hit_rate_to_use_consecutive_keys_optimization_,
const StatsCollectingParams & stats_collecting_params_)
const StatsCollectingParams & stats_collecting_params_,
bool enable_producing_buckets_out_of_order_in_aggregation_)
: keys(keys_)
, keys_size(keys.size())
, aggregates(aggregates_)
Expand All @@ -232,6 +233,7 @@ Aggregator::Params::Params(
, optimize_group_by_constant_keys(optimize_group_by_constant_keys_)
, min_hit_rate_to_use_consecutive_keys_optimization(min_hit_rate_to_use_consecutive_keys_optimization_)
, stats_collecting_params(stats_collecting_params_)
, enable_producing_buckets_out_of_order_in_aggregation(enable_producing_buckets_out_of_order_in_aggregation_)
{
}

Expand Down
5 changes: 4 additions & 1 deletion src/Interpreters/Aggregator.h
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,8 @@ class Aggregator final
const float min_hit_rate_to_use_consecutive_keys_optimization = 0.;
StatsCollectingParams stats_collecting_params;

bool enable_producing_buckets_out_of_order_in_aggregation = true;

static size_t getMaxBytesBeforeExternalGroupBy(size_t max_bytes_before_external_group_by, double max_bytes_ratio_before_external_group_by);

Params(
Expand All @@ -139,7 +141,8 @@ class Aggregator final
bool only_merge_, // true for projections
bool optimize_group_by_constant_keys_,
float min_hit_rate_to_use_consecutive_keys_optimization_,
const StatsCollectingParams & stats_collecting_params_);
const StatsCollectingParams & stats_collecting_params_,
bool enable_producing_buckets_out_of_order_in_aggregation_);

/// Only parameters that matter during merge.
Params(
Expand Down
11 changes: 6 additions & 5 deletions src/Interpreters/InterpreterSelectQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ namespace Setting
extern const SettingsUInt64 max_rows_to_transfer;
extern const SettingsOverflowMode transfer_overflow_mode;
extern const SettingsString implicit_table_at_top_level;
extern const SettingsBool enable_producing_buckets_out_of_order_in_aggregation;
}

namespace ServerSetting
Expand Down Expand Up @@ -2782,16 +2783,16 @@ static Aggregator::Params getAggregatorParams(
context.getServerSettings()[ServerSetting::max_entries_for_hash_table_stats],
settings[Setting::max_size_to_preallocate_for_aggregation]);

return Aggregator::Params
{
return Aggregator::Params{
keys,
aggregates,
overflow_row,
settings[Setting::max_rows_to_group_by],
settings[Setting::group_by_overflow_mode],
group_by_two_level_threshold,
group_by_two_level_threshold_bytes,
Aggregator::Params::getMaxBytesBeforeExternalGroupBy(settings[Setting::max_bytes_before_external_group_by], settings[Setting::max_bytes_ratio_before_external_group_by]),
Aggregator::Params::getMaxBytesBeforeExternalGroupBy(
settings[Setting::max_bytes_before_external_group_by], settings[Setting::max_bytes_ratio_before_external_group_by]),
settings[Setting::empty_result_for_aggregation_by_empty_set]
|| (settings[Setting::empty_result_for_aggregation_by_constant_keys_on_empty_set] && keys.empty()
&& query_analyzer.hasConstAggregationKeys()),
Expand All @@ -2805,8 +2806,8 @@ static Aggregator::Params getAggregatorParams(
/* only_merge */ false,
settings[Setting::optimize_group_by_constant_keys],
settings[Setting::min_hit_rate_to_use_consecutive_keys_optimization],
stats_collecting_params
};
stats_collecting_params,
settings[Setting::enable_producing_buckets_out_of_order_in_aggregation]};
}

void InterpreterSelectQuery::executeAggregation(
Expand Down
6 changes: 3 additions & 3 deletions src/Interpreters/tests/gtest_filecache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1189,12 +1189,12 @@ TEST_F(FileCacheTest, TemporaryDataReadBufferSize)

/// We allocate buffer of size min(stat.compressed_size, DBMS_DEFAULT_BUFFER_SIZE)
/// We do care about buffer size because realistic external group by could generate 10^5 temporary files
ASSERT_EQ(stat.compressed_size, 62);
ASSERT_EQ(stat.compressed_size, 64);

auto reader = stream.getReadStream();
auto * read_buf = reader.getHolder();
const auto & internal_buffer = static_cast<TemporaryDataReadBuffer *>(read_buf)->compressed_buf.getHolder()->internalBuffer();
ASSERT_EQ(internal_buffer.size(), 62);
ASSERT_EQ(internal_buffer.size(), 64);
}

/// Temporary data stored on disk
Expand All @@ -1212,7 +1212,7 @@ TEST_F(FileCacheTest, TemporaryDataReadBufferSize)
stream->write(block);
auto stat = stream.finishWriting();

ASSERT_EQ(stat.compressed_size, 62);
ASSERT_EQ(stat.compressed_size, 64);
}
}

Expand Down
11 changes: 7 additions & 4 deletions src/Planner/Planner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ namespace Setting
extern const SettingsUInt64 max_bytes_to_transfer;
extern const SettingsUInt64 max_rows_to_transfer;
extern const SettingsOverflowMode transfer_overflow_mode;
extern const SettingsBool enable_producing_buckets_out_of_order_in_aggregation;
extern const SettingsBool enable_parallel_blocks_marshalling;
}

Expand Down Expand Up @@ -503,10 +504,11 @@ Aggregator::Params getAggregatorParams(const PlannerContextPtr & planner_context
settings[Setting::group_by_overflow_mode],
settings[Setting::group_by_two_level_threshold],
settings[Setting::group_by_two_level_threshold_bytes],
Aggregator::Params::getMaxBytesBeforeExternalGroupBy(settings[Setting::max_bytes_before_external_group_by], settings[Setting::max_bytes_ratio_before_external_group_by]),
Aggregator::Params::getMaxBytesBeforeExternalGroupBy(
settings[Setting::max_bytes_before_external_group_by], settings[Setting::max_bytes_ratio_before_external_group_by]),
settings[Setting::empty_result_for_aggregation_by_empty_set]
|| (settings[Setting::empty_result_for_aggregation_by_constant_keys_on_empty_set] && aggregation_analysis_result.aggregation_keys.empty()
&& aggregation_analysis_result.group_by_with_constant_keys),
|| (settings[Setting::empty_result_for_aggregation_by_constant_keys_on_empty_set]
&& aggregation_analysis_result.aggregation_keys.empty() && aggregation_analysis_result.group_by_with_constant_keys),
query_context->getTempDataOnDisk(),
settings[Setting::max_threads],
settings[Setting::min_free_disk_space_for_temporary_data],
Expand All @@ -517,7 +519,8 @@ Aggregator::Params getAggregatorParams(const PlannerContextPtr & planner_context
/* only_merge */ false,
settings[Setting::optimize_group_by_constant_keys],
settings[Setting::min_hit_rate_to_use_consecutive_keys_optimization],
stats_collecting_params);
stats_collecting_params,
settings[Setting::enable_producing_buckets_out_of_order_in_aggregation]);

return aggregator_params;
}
Expand Down
1 change: 1 addition & 0 deletions src/Processors/Executors/PullingAsyncPipelineExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ bool PullingAsyncPipelineExecutor::pull(Block & block, uint64_t milliseconds)
{
block.info.bucket_num = agg_info->bucket_num;
block.info.is_overflows = agg_info->is_overflows;
block.info.out_of_order_buckets = agg_info->out_of_order_buckets;
}

return true;
Expand Down
1 change: 1 addition & 0 deletions src/Processors/Executors/PullingPipelineExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ bool PullingPipelineExecutor::pull(Block & block)
{
block.info.bucket_num = agg_info->bucket_num;
block.info.is_overflows = agg_info->is_overflows;
block.info.out_of_order_buckets = agg_info->out_of_order_buckets;
}

return true;
Expand Down
10 changes: 6 additions & 4 deletions src/Processors/QueryPlan/AggregatingStep.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ namespace QueryPlanSerializationSetting
extern const QueryPlanSerializationSettingsUInt64 min_free_disk_space_for_temporary_data;
extern const QueryPlanSerializationSettingsFloat min_hit_rate_to_use_consecutive_keys_optimization;
extern const QueryPlanSerializationSettingsBool optimize_group_by_constant_keys;
extern const QueryPlanSerializationSettingsBool enable_producing_buckets_out_of_order_in_aggregation;
}

namespace ErrorCodes
Expand Down Expand Up @@ -729,6 +730,8 @@ void AggregatingStep::serializeSettings(QueryPlanSerializationSettings & setting
settings[QueryPlanSerializationSetting::collect_hash_table_stats_during_aggregation] = params.stats_collecting_params.isCollectionAndUseEnabled();
settings[QueryPlanSerializationSetting::max_entries_for_hash_table_stats] = params.stats_collecting_params.max_entries_for_hash_table_stats;
settings[QueryPlanSerializationSetting::max_size_to_preallocate_for_aggregation] = params.stats_collecting_params.max_size_to_preallocate;

settings[QueryPlanSerializationSetting::enable_producing_buckets_out_of_order_in_aggregation] = params.enable_producing_buckets_out_of_order_in_aggregation;
}

void AggregatingStep::serialize(Serialization & ctx) const
Expand Down Expand Up @@ -846,8 +849,7 @@ std::unique_ptr<IQueryPlanStep> AggregatingStep::deserialize(Deserialization & c
ctx.settings[QueryPlanSerializationSetting::max_entries_for_hash_table_stats],
ctx.settings[QueryPlanSerializationSetting::max_size_to_preallocate_for_aggregation]);

Aggregator::Params params
{
Aggregator::Params params{
keys,
aggregates,
overflow_row,
Expand All @@ -867,8 +869,8 @@ std::unique_ptr<IQueryPlanStep> AggregatingStep::deserialize(Deserialization & c
/* only_merge */ false,
ctx.settings[QueryPlanSerializationSetting::optimize_group_by_constant_keys],
ctx.settings[QueryPlanSerializationSetting::min_hit_rate_to_use_consecutive_keys_optimization],
stats_collecting_params
};
stats_collecting_params,
ctx.settings[QueryPlanSerializationSetting::enable_producing_buckets_out_of_order_in_aggregation]};

SortDescription sort_description_for_merging;

Expand Down
2 changes: 1 addition & 1 deletion src/Processors/QueryPlan/MergingAggregatedStep.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ void MergingAggregatedStep::transformPipeline(QueryPipelineBuilder & pipeline, c
: max_threads;

auto transform_params = std::make_shared<AggregatingTransformParams>(pipeline.getSharedHeader(), std::move(params), final);
pipeline.addMergingAggregatedMemoryEfficientTransform(transform_params, num_merge_threads);
pipeline.addMergingAggregatedMemoryEfficientTransform(transform_params, num_merge_threads, should_produce_results_in_order_of_bucket_number);
}

pipeline.resize(should_produce_results_in_order_of_bucket_number ? 1 : max_threads);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ namespace DB
DECLARE(Bool, collect_hash_table_stats_during_aggregation, true, "Enable collecting hash table statistics to optimize memory allocation", 0) \
DECLARE(UInt64, max_entries_for_hash_table_stats, 10'000, "How many entries hash table statistics collected during aggregation is allowed to have", 0) \
DECLARE(UInt64, max_size_to_preallocate_for_aggregation, 100'000'000, "For how many elements it is allowed to preallocate space in all hash tables in total before aggregation", 0) \
DECLARE(Bool, enable_producing_buckets_out_of_order_in_aggregation, true, "Allow aggregation to produce buckets out of order.", 0) \
DECLARE(Bool, distributed_aggregation_memory_efficient, true, "Is the memory-saving mode of distributed aggregation enabled", 0) \
\
DECLARE(TotalsMode, totals_mode, TotalsMode::AFTER_HAVING_EXCLUSIVE, "How to calculate TOTALS when HAVING is present, as well as when max_rows_to_group_by and group_by_overflow_mode = 'any' are present.", IMPORTANT) \
Expand Down
1 change: 1 addition & 0 deletions src/Processors/Sources/BlocksSource.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ class BlocksSource : public ISource
auto info = std::make_shared<AggregatedChunkInfo>();
info->bucket_num = res.info.bucket_num;
info->is_overflows = res.info.is_overflows;
info->out_of_order_buckets = res.info.out_of_order_buckets;

auto chunk = Chunk(res.getColumns(), res.rows());
chunk.getChunkInfos().add(std::move(info));
Expand Down
1 change: 1 addition & 0 deletions src/Processors/Sources/RemoteSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ std::optional<Chunk> RemoteSource::tryGenerate()
auto info = std::make_shared<AggregatedChunkInfo>();
info->bucket_num = block.info.bucket_num;
info->is_overflows = block.info.is_overflows;
info->out_of_order_buckets = block.info.out_of_order_buckets;
chunk.getChunkInfos().add(std::move(info));
}

Expand Down
1 change: 1 addition & 0 deletions src/Processors/Sources/SourceFromSingleChunk.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ SourceFromSingleChunk::SourceFromSingleChunk(SharedHeader data) : ISource(std::m
auto info = std::make_shared<AggregatedChunkInfo>();
info->bucket_num = data->info.bucket_num;
info->is_overflows = data->info.is_overflows;
info->out_of_order_buckets = data->info.out_of_order_buckets;
chunk.getChunkInfos().add(std::move(info));
}
}
Expand Down
Loading
Loading