Create consumers for Kafka tables on fly with TTL (resubmit)#58310
Merged
alexey-milovidov merged 15 commits intoClickHouse:masterfrom Dec 30, 2023
Merged
Create consumers for Kafka tables on fly with TTL (resubmit)#58310alexey-milovidov merged 15 commits intoClickHouse:masterfrom
alexey-milovidov merged 15 commits intoClickHouse:masterfrom
Conversation
….size()) Signed-off-by: Azat Khuzhin <[email protected]> (cherry picked from commit 123d63e)
Fixes: ClickHouse#42777 Signed-off-by: Azat Khuzhin <[email protected]> (cherry picked from commit 51d4f58)
…e last used) Pool of consumers created a problem for librdkafka internal statistics, you need to read from the queue always, while in ClickHouse consumers created regardless are there any readers or not (attached materialized views or direct SELECTs). Otherwise, this statistics messages got queued and never released, which: - creates live memory leak - and also makes destroy very slow, due to librdkafka internals (it moves entries from this queue into another linked list, but in a with sorting, which is incredibly slow for linked lists) So the idea is simple, let's create a pool of consumers only when they are required, and destroy them after some timeout (right now it is 60 seconds) if nobody uses them, that way this problem should gone. This should also reduce number of internal librdkafka threads, when nobody reads from Kafka tables. Signed-off-by: Azat Khuzhin <[email protected]> (cherry picked from commit e7592c1)
Signed-off-by: Azat Khuzhin <[email protected]> (cherry picked from commit db74549)
This will make system.kafka_consumers more useful, since after TTL consumer object will be removed prio this patch, but after, all information will be preserved. Signed-off-by: Azat Khuzhin <[email protected]> (cherry picked from commit 2ff0bfb)
Signed-off-by: Azat Khuzhin <[email protected]> (cherry picked from commit b19b70b)
Signed-off-by: Azat Khuzhin <[email protected]> (cherry picked from commit a7453f7)
Since pool may exceed threads, while we need to run this thread always to avoid memory leaking. And this should not be a problem since librdkafka has multiple threads for each consumer (5!) anyway. Signed-off-by: Azat Khuzhin <[email protected]> (cherry picked from commit 06a9e9a)
Signed-off-by: Azat Khuzhin <[email protected]> (cherry picked from commit 1f03a21)
Actually now we can create consumer object in the ctor, no need to do this in startup(), since consumer now do not connects to kafka. Signed-off-by: Azat Khuzhin <[email protected]> (cherry picked from commit 0321820)
Signed-off-by: Azat Khuzhin <[email protected]> (cherry picked from commit ebad1bf)
CI founds [1]:
Exception: Sanitizer assert found for instance �=================================================================
==1==ERROR: AddressSanitizer: heap-use-after-free on address 0x5250006a4100 at pc 0x55d4ed46d2e2 bp 0x7f7e33b40190 sp 0x7f7e33b3f950
WRITE of size 5390 at 0x5250006a4100 thread T2 (TCPHandler)
8 0x55d50eba9497 in DB::KafkaConsumer::setRDKafkaStat(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char>> const&) build_docker/./src/Storages/Kafka/KafkaConsumer.h:117:22
12 0x55d51e0eebfe in cppkafka::stats_callback_proxy(rd_kafka_s*, char*, unsigned long, void*) build_docker/./contrib/cppkafka/src/configuration.cpp:92:5
13 0x55d51e151e3d in rd_kafka_poll_cb build_docker/./contrib/librdkafka/src/rdkafka.c:3790:7
14 0x55d51e15531b in rd_kafka_consumer_close build_docker/./contrib/librdkafka/src/rdkafka.c:3200:31
15 0x55d51e0f3241 in cppkafka::Consumer::close() build_docker/./contrib/cppkafka/src/consumer.cpp:293:33
16 0x55d51e0f3241 in cppkafka::Consumer::~Consumer() build_docker/./contrib/cppkafka/src/consumer.cpp:82:9
20 0x55d50eb8d12e in DB::KafkaConsumer::~KafkaConsumer() build_docker/./src/Storages/Kafka/KafkaConsumer.cpp:179:1
0x5250006a4100 is located 0 bytes inside of 8736-byte region [0x5250006a4100,0x5250006a6320)
freed by thread T2 (TCPHandler) here:
0 0x55d4ed4a26b2 in operator delete(void*, unsigned long) (/usr/bin/clickhouse+0xa94b6b2) (BuildId: 74ec4a14a5109c41de109e82d56d8d863845144d)
1 0x55d50eb8ca55 in void std::__1::__libcpp_operator_delete[abi:v15000]<void*, unsigned long>(void*, unsigned long) build_docker/./contrib/llvm-project/libcxx/include/new:256:3
2 0x55d50eb8ca55 in void std::__1::__do_deallocate_handle_size[abi:v15000]<>(void*, unsigned long) build_docker/./contrib/llvm-project/libcxx/include/new:282:10
3 0x55d50eb8ca55 in std::__1::__libcpp_deallocate[abi:v15000](void*, unsigned long, unsigned long) build_docker/./contrib/llvm-project/libcxx/include/new:296:14
4 0x55d50eb8ca55 in std::__1::allocator<char>::deallocate[abi:v15000](char*, unsigned long) build_docker/./contrib/llvm-project/libcxx/include/__memory/allocator.h:128:13
5 0x55d50eb8ca55 in std::__1::allocator_traits<std::__1::allocator<char>>::deallocate[abi:v15000](std::__1::allocator<char>&, char*, unsigned long) build_docker/./contrib/llvm-project/libcxx/include/__memory/allocator_traits.h:282:13
6 0x55d50eb8ca55 in std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char>>::~basic_string() build_docker/./contrib/llvm-project/libcxx/include/string:2334:9
7 0x55d50eb8ca55 in DB::KafkaConsumer::~KafkaConsumer() build_docker/./src/Storages/Kafka/KafkaConsumer.cpp:179:1
[1]: https://s3.amazonaws.com/clickhouse-test-reports/0/745d9bb47f3425e28e5660ed7c730038ffece4ee/integration_tests__asan__analyzer__%5B6_6%5D/integration_run_parallel4_0.log
Signed-off-by: Azat Khuzhin <[email protected]>
af6f594 to
ecf7188
Compare
Member
|
This is an automated comment for commit 853fdfe with description of existing statuses. It's updated for the latest CI running ❌ Click here to open a full report in a separate page Successful checks
|
alexey-milovidov
approved these changes
Dec 28, 2023
Member
Author
|
We've discussed this with @filimonov and he pointed out that everything else (except for rdkafka_stat/rdkafka_stat_mutex) is done via members orders, so let's do it in the same style. Signed-off-by: Azat Khuzhin <[email protected]>
filimonov
reviewed
Dec 29, 2023
Signed-off-by: Azat Khuzhin <[email protected]>
The callchain of the kafka consumer is very tricky, so for the sake of common sense let's just clean the messages on moving out consumer (and in dtor, but this is just to keep that two code path in sync). (Also reported by @filimonov) Signed-off-by: Azat Khuzhin <[email protected]>
Member
|
filimonov
approved these changes
Dec 29, 2023
AVMusorin
added a commit
to AVMusorin/ClickHouse
that referenced
this pull request
Jan 28, 2025
Previously, variables with `_usec` in their names were incorrectly storing values in seconds. These variables were renamed, `_usec` was removed for names. After this commit all time columns store seconds except `last_used` column. Only for `last_used` it makes sense to store microseconds to support `StorageKafka::cleanConsumers()`. See also ClickHouse#58310
AVMusorin
added a commit
to AVMusorin/ClickHouse
that referenced
this pull request
Feb 10, 2025
Previously, variables with `_usec` in their names were incorrectly storing values in seconds. These variables were renamed, `_usec` was removed for names. After this commit all time columns store seconds except `last_used` column. Only for `last_used` it makes sense to store microseconds to support `StorageKafka::cleanConsumers()`. See also ClickHouse#58310
AVMusorin
added a commit
to AVMusorin/ClickHouse
that referenced
this pull request
Feb 10, 2025
Previously, variables with `_usec` in their names were incorrectly storing values in seconds. These variables were renamed, `_usec` was removed for names. After this commit all time columns store seconds except `last_used` column. Only for `last_used` it makes sense to store microseconds to support `StorageKafka::cleanConsumers()`. See also ClickHouse#58310
This was referenced May 25, 2025
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Changelog category (leave one):
Changelog entry (a user-readable short description of the changes that goes to CHANGELOG.md):
Create consumers for Kafka tables on fly (but keep them for some period -
kafka_consumers_pool_ttl_ms, since last used), this should fix problem with statistics forsystem.kafka_consumers(that does not consumed when nobody reads from Kafka table, which leads to live memory leak and slow table detach) and also this PR enables stats forsystem.kafka_consumersby default again.Pool of consumers created a problem for librdkafka internal statistics,
you need to read from the queue always, while in ClickHouse consumers
created regardless are there any readers or not (attached materialized
views or direct SELECTs).
Otherwise, this statistics messages got queued and never released,
which:
moves entries from this queue into another linked list, but in a
with sorting, which is incredibly slow for linked lists)
So the idea is simple, let's create a pool of consumers only when they
are required, and destroy them after some timeout (right now it is 60
seconds) if nobody uses them, that way this problem should gone.
This should also reduce number of internal librdkafka threads, when
nobody reads from Kafka tables.
Fixes: #50999 (cc @ilejn)
Resubmits: #57829 (the last patch from the PR fixes the use-after-free reported here #57829 (comment))