Cherry pick #80795 to 25.4: Properly wait consumers before shutting down Kafka engine#80909
Merged
robot-clickhouse merged 7 commits intobackport/25.4/80795from May 27, 2025
Merged
Conversation
Previously it was possibility to have active consumers after storage shutdown, and not only it is bad because it can read from the broker, it also can lead to internal problems with ThreadStatus, which will be destroyed from the thread that calls shutdown() for storage, which is not recommended usage.
…e cppkafka::Consumer
cppkafka::Consumer may use the KafkaConsumer via stat callback, and this
will lead to use-after-free:
Sanitizer trap.
Stack trace: 0x000055ce71048c47 0x000055ce7173072b 0x000055ce5fe1bb50 0x000055ce5fe02229 0x000055ce5fe05367 0x000055ce5fdfb869 0x000055ce95085934 0x000055ce7b9dd3b4 0x000055ce8d0992aa 0x000055ce8d098d29 0x000055ce8d0e63fa 0x000055ce8d0e97bc 0x000055ce8d09c3d0 0x000055ce7b9f5dcb 0x000055ce7b9f0bc9 0x000055ce7f77b4c4 0x000055ce7f77841d 0x000055ce7f776aa1 0x000055ce7f77>
0.0. inlined from ./ci/tmp/build/./src/Common/StackTrace.cpp:389: StackTrace::tryCapture()
0. ./ci/tmp/build/./src/Common/StackTrace.cpp:61: StackTrace::StackTrace() @ 0x000000001d611c47
1. ./ci/tmp/build/./src/Common/SignalHandlers.cpp:212: sanitizerDeathCallback() @ 0x000000001dcf972b
2. __sanitizer::Die() @ 0x000000000c3e4b50
3. __asan::ScopedInErrorReport::~ScopedInErrorReport() @ 0x000000000c3cb229
4. __asan::ReportGenericError(unsigned long, unsigned long, unsigned long, unsigned long, bool, unsigned long, unsigned int, bool) @ 0x000000000c3ce367
5. __asan_memmove @ 0x000000000c3c4869
6.0. inlined from ./contrib/llvm-project/libcxx/include/__string/constexpr_c_functions.h:227: char* std::__constexpr_memmove[abi:ne190107]<char, char const, 0>(char*, char const*, std::__element_count)
6.1. inlined from ./contrib/llvm-project/libcxx/include/__string/char_traits.h:148: std::char_traits<char>::copy[abi:ne190107](char*, char const*, unsigned long)
6. ./contrib/llvm-project/libcxx/include/string:2575: String& String::__assign_no_alias<false>(char const*, unsigned long) @ 0x000000004164e934
7.0. inlined from ./contrib/llvm-project/libcxx/include/string:2665: String::operator=(String const&)
7.1. inlined from ./src/Storages/Kafka/KafkaConsumer.h:125: DB::KafkaConsumer::setRDKafkaStat(String const&)
7.2. inlined from ./ci/tmp/build/./src/Storages/Kafka/KafkaConsumer.cpp:76: operator()
9. ./ci/tmp/build/./contrib/cppkafka/src/configuration.cpp:92: cppkafka::stats_callback_proxy(rd_kafka_s*, char*, unsigned long, void*) @ 0x0000000039661d29
10. ./ci/tmp/build/./contrib/librdkafka/src/rdkafka.c:4065: rd_kafka_poll_cb @ 0x00000000396af3fa
11. ./ci/tmp/build/./contrib/librdkafka/src/rdkafka.c:3459: rd_kafka_consumer_close @ 0x00000000396b27bc
12.0. inlined from ./ci/tmp/build/./contrib/cppkafka/src/consumer.cpp:296: cppkafka::Consumer::close()
12. ./ci/tmp/build/./contrib/cppkafka/src/consumer.cpp:82: cppkafka::Consumer::~Consumer() @ 0x00000000396653d0
13. ./ci/tmp/build/./src/Storages/Kafka/StorageKafka.cpp:363: DB::StorageKafka::cleanConsumers() @ 0x0000000027fbedcb
14. ./ci/tmp/build/./src/Storages/Kafka/StorageKafka.cpp:323: DB::StorageKafka::shutdown(bool) @ 0x0000000027fb9bc9
15.0. inlined from ./src/Storages/IStorage.h:571: DB::IStorage::flushAndShutdown(bool)
Properly wait consumers before shutting down Kafka engine
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Original pull-request #80795
This pull-request is a first step of an automated backporting.
It contains changes similar to calling
git cherry-picklocally.If you intend to continue backporting the changes, then resolve all conflicts if any.
Otherwise, if you do not want to backport them, then just close this pull-request.
The check results does not matter at this step - you can safely ignore them.
Note
This pull-request will be merged automatically. Please, do not merge it manually (but if you accidentally did, nothing bad will happen).
Troubleshooting
If the PR was manually reopened after being closed
If this PR is stuck (i.e. not automatically merged after one day), check #80795 for
pr-backports-createdlabel and delete it.Manually merging will do nothing. The
pr-backports-createdlabel prevents the original PR #80795 from being processed.If the conflicts were resolved in a wrong way
If this cherry-pick PR is completely screwed by a wrong conflicts resolution, and you want to recreate it:
pr-cherrypicklabel from the PRYou also need to check the original PR #80795 for
pr-backports-created, and delete if it's presented thereThe PR source
The PR is created in the CI job