Skip to content

Properly wait consumers before shutting down Kafka engine#80795

Merged
azat merged 6 commits intoClickHouse:masterfrom
azat:kafka-proper-shutdown
May 27, 2025
Merged

Properly wait consumers before shutting down Kafka engine#80795
azat merged 6 commits intoClickHouse:masterfrom
azat:kafka-proper-shutdown

Conversation

@azat
Copy link
Member

@azat azat commented May 25, 2025

Changelog category (leave one):

  • Bug Fix (user-visible misbehavior in an official stable release)

Changelog entry (a user-readable short description of the changes that goes to CHANGELOG.md):

Properly wait consumers before shutting down Kafka engine (active consumers after shutdown can trigger various debug assertions and also may read data from brokers in background after table has been dropped/detached)

See commit messages for details

Fixes: #80674
Fixes: #58310

azat added 4 commits May 25, 2025 21:34
Previously it was possibility to have active consumers after storage
shutdown, and not only it is bad because it can read from the broker, it
also can lead to internal problems with ThreadStatus, which will be
destroyed from the thread that calls shutdown() for storage, which is
not recommended usage.
@azat azat added the pr-must-backport Pull request should be backported intentionally. Use this label with great care! label May 25, 2025
@clickhouse-gh
Copy link
Contributor

clickhouse-gh bot commented May 25, 2025

Workflow [PR], commit [f7262e4]

@clickhouse-gh clickhouse-gh bot added the pr-bugfix Pull request with bugfix, not backported by default label May 25, 2025
@Avogar Avogar self-assigned this May 26, 2025

void StorageKafka::cleanConsumers()
{
/// We need to clear the cppkafka::Consumer separately from KafkaConsumer, since cppkafka::Consumer holds a weak_ptr to the KafkaConsumer (for logging callback)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I cannot find the place where the cppkafka::Consumer holds the weak_ptr of KafkaConsumer? Could you show me where it is?
I can notice some callbacks passed to cppkafka::Consumer in KafkaConsumer::createConsumer.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I cannot find the place where the cppkafka::Consumer holds the weak_ptr of KafkaConsumer? Could you show me where it is?

if (auto sink_shared_ptr = sink.lock())

{
/// We need to clear the cppkafka::Consumer separately from KafkaConsumer, since cppkafka::Consumer holds a weak_ptr to the KafkaConsumer (for logging callback)
/// So if we will remove cppkafka::Consumer from KafkaConsumer destructor, then due to librdkafka will call the logging again from destructor, it will lead to a deadlock
std::vector<ConsumerPtr> consumers_to_close;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should put a log before waiting for the cv and after clearing consumers_to_close. It would be easier when troubleshooting as we would know if all consumers have been returned back yet.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed. And there are already such logs in the caller of cleanConsumers

@azat
Copy link
Member Author

azat commented May 26, 2025

One more issue in kafka

2025.05.25 21:40:23.677721 [ 899 ] {} <Fatal> BaseDaemon: ########## Short fault info ############
2025.05.25 21:40:23.677840 [ 899 ] {} <Fatal> BaseDaemon: (version 25.6.1.1, build id: 9FD48A61A5A5F0872E3193E3AD84F4BCF773051C, git hash: 761af4c14f6f43fbcd2b969dc66a3f735e284f90, architecture: x86_64) (from thread 9) Received signal -3
2025.05.25 21:40:23.677905 [ 899 ] {} <Fatal> BaseDaemon: Signal description: sanitizer trap
2025.05.25 21:40:23.677965 [ 899 ] {} <Fatal> BaseDaemon: Sanitizer trap.
2025.05.25 21:40:23.678019 [ 899 ] {} <Fatal> BaseDaemon: Stack trace: 0x000055ce71048c47 0x000055ce7173072b 0x000055ce5fe1bb50 0x000055ce5fe02229 0x000055ce5fe05367 0x000055ce5fdfb869 0x000055ce95085934 0x000055ce7b9dd3b4 0x000055ce8d0992aa 0x000055ce8d098d29 0x000055ce8d0e63fa 0x000055ce8d0e97bc 0x000055ce8d09c3d0 0x000055ce7b9f5dcb 0x000055ce7b9f0bc9 0x000055ce7f77b4c4 0x000055ce7f77841d 0x000055ce7f776aa1 0x000055ce7f77>
2025.05.25 21:40:23.678070 [ 899 ] {} <Fatal> BaseDaemon: ########################################
2025.05.25 21:40:23.678315 [ 899 ] {} <Fatal> BaseDaemon: (version 25.6.1.1, build id: 9FD48A61A5A5F0872E3193E3AD84F4BCF773051C, git hash: 761af4c14f6f43fbcd2b969dc66a3f735e284f90) (from thread 9) (query_id: 7f0ecc6e-f399-4581-9f92-1704972d3a55) (query: DROP TABLE test.kafka) Received signal sanitizer trap (-3)
2025.05.25 21:40:23.678425 [ 899 ] {} <Fatal> BaseDaemon: Sanitizer trap.
2025.05.25 21:40:23.678510 [ 899 ] {} <Fatal> BaseDaemon: Stack trace: 0x000055ce71048c47 0x000055ce7173072b 0x000055ce5fe1bb50 0x000055ce5fe02229 0x000055ce5fe05367 0x000055ce5fdfb869 0x000055ce95085934 0x000055ce7b9dd3b4 0x000055ce8d0992aa 0x000055ce8d098d29 0x000055ce8d0e63fa 0x000055ce8d0e97bc 0x000055ce8d09c3d0 0x000055ce7b9f5dcb 0x000055ce7b9f0bc9 0x000055ce7f77b4c4 0x000055ce7f77841d 0x000055ce7f776aa1 0x000055ce7f77>
2025.05.25 21:40:23.716550 [ 899 ] {} <Fatal> BaseDaemon: 0.0. inlined from ./ci/tmp/build/./src/Common/StackTrace.cpp:389: StackTrace::tryCapture()
2025.05.25 21:40:23.716696 [ 899 ] {} <Fatal> BaseDaemon: 0. ./ci/tmp/build/./src/Common/StackTrace.cpp:61: StackTrace::StackTrace() @ 0x000000001d611c47
2025.05.25 21:40:23.778614 [ 899 ] {} <Fatal> BaseDaemon: 1. ./ci/tmp/build/./src/Common/SignalHandlers.cpp:212: sanitizerDeathCallback() @ 0x000000001dcf972b
2025.05.25 21:40:23.848476 [ 899 ] {} <Fatal> BaseDaemon: 2. __sanitizer::Die() @ 0x000000000c3e4b50
2025.05.25 21:40:23.887747 [ 899 ] {} <Fatal> BaseDaemon: 3. __asan::ScopedInErrorReport::~ScopedInErrorReport() @ 0x000000000c3cb229
2025.05.25 21:40:23.926353 [ 899 ] {} <Fatal> BaseDaemon: 4. __asan::ReportGenericError(unsigned long, unsigned long, unsigned long, unsigned long, bool, unsigned long, unsigned int, bool) @ 0x000000000c3ce367
2025.05.25 21:40:23.965043 [ 899 ] {} <Fatal> BaseDaemon: 5. __asan_memmove @ 0x000000000c3c4869
2025.05.25 21:40:24.011708 [ 899 ] {} <Fatal> BaseDaemon: 6.0. inlined from ./contrib/llvm-project/libcxx/include/__string/constexpr_c_functions.h:227: char* std::__constexpr_memmove[abi:ne190107]<char, char const, 0>(char*, char const*, std::__element_count)
2025.05.25 21:40:24.011830 [ 899 ] {} <Fatal> BaseDaemon: 6.1. inlined from ./contrib/llvm-project/libcxx/include/__string/char_traits.h:148: std::char_traits<char>::copy[abi:ne190107](char*, char const*, unsigned long)
2025.05.25 21:40:24.011899 [ 899 ] {} <Fatal> BaseDaemon: 6. ./contrib/llvm-project/libcxx/include/string:2575: String& String::__assign_no_alias<false>(char const*, unsigned long) @ 0x000000004164e934
2025.05.25 21:40:24.081113 [ 899 ] {} <Fatal> BaseDaemon: 7.0. inlined from ./contrib/llvm-project/libcxx/include/string:2665: String::operator=(String const&)
2025.05.25 21:40:24.081297 [ 899 ] {} <Fatal> BaseDaemon: 7.1. inlined from ./src/Storages/Kafka/KafkaConsumer.h:125: DB::KafkaConsumer::setRDKafkaStat(String const&)
2025.05.25 21:40:24.081356 [ 899 ] {} <Fatal> BaseDaemon: 7.2. inlined from ./ci/tmp/build/./src/Storages/Kafka/KafkaConsumer.cpp:76: operator()
2025.05.25 21:40:24.081447 [ 899 ] {} <Fatal> BaseDaemon: 7.3. inlined from ./contrib/llvm-project/libcxx/include/__type_traits/invoke.h:149: decltype(std::declval<DB::KafkaConsumer::createConsumer(cppkafka::Configuration)::$_0&>()(std::declval<cppkafka::KafkaHandleBase&>(), std::declval<String const&>())) std::__invoke[abi:ne190107]<DB::KafkaConsumer::createConsumer(cppkafka::Configuration)::$_0&, cppkafka::KafkaHandleBase>
2025.05.25 21:40:24.081533 [ 899 ] {} <Fatal> BaseDaemon: 7.4. inlined from ./contrib/llvm-project/libcxx/include/__type_traits/invoke.h:224: void std::__invoke_void_return_wrapper<void, true>::__call[abi:ne190107]<DB::KafkaConsumer::createConsumer(cppkafka::Configuration)::$_0&, cppkafka::KafkaHandleBase&, String const&>(DB::KafkaConsumer::createConsumer(cppkafka::Configuration)::$_0&, cppkafka::KafkaHandleBase&, String co>
2025.05.25 21:40:24.081590 [ 899 ] {} <Fatal> BaseDaemon: 7.5. inlined from ./contrib/llvm-project/libcxx/include/__functional/function.h:210: ?
2025.05.25 21:40:24.081642 [ 899 ] {} <Fatal> BaseDaemon: 7. ./contrib/llvm-project/libcxx/include/__functional/function.h:610: ? @ 0x0000000027fa63b4
2025.05.25 21:40:24.105704 [ 899 ] {} <Fatal> BaseDaemon: 8.0. inlined from ./contrib/llvm-project/libcxx/include/__functional/function.h:716: ?
2025.05.25 21:40:24.105801 [ 899 ] {} <Fatal> BaseDaemon: 8.1. inlined from ./contrib/llvm-project/libcxx/include/__functional/function.h:989: ?
2025.05.25 21:40:24.105869 [ 899 ] {} <Fatal> BaseDaemon: 8. ./contrib/cppkafka/include/cppkafka/detail/callback_invoker.h:84: void cppkafka::CallbackInvoker<std::function<void (cppkafka::KafkaHandleBase&, String const&)>>::operator()<cppkafka::KafkaHandleBase&, String>(cppkafka::KafkaHandleBase&, String&&) const @ 0x00000000396622aa
2025.05.25 21:40:24.139455 [ 899 ] {} <Fatal> BaseDaemon: 9. ./ci/tmp/build/./contrib/cppkafka/src/configuration.cpp:92: cppkafka::stats_callback_proxy(rd_kafka_s*, char*, unsigned long, void*) @ 0x0000000039661d29
2025.05.25 21:40:24.149244 [ 899 ] {} <Fatal> BaseDaemon: 10. ./ci/tmp/build/./contrib/librdkafka/src/rdkafka.c:4065: rd_kafka_poll_cb @ 0x00000000396af3fa
2025.05.25 21:40:24.157533 [ 899 ] {} <Fatal> BaseDaemon: 11. ./ci/tmp/build/./contrib/librdkafka/src/rdkafka.c:3459: rd_kafka_consumer_close @ 0x00000000396b27bc
2025.05.25 21:40:24.184795 [ 899 ] {} <Fatal> BaseDaemon: 12.0. inlined from ./ci/tmp/build/./contrib/cppkafka/src/consumer.cpp:296: cppkafka::Consumer::close()
2025.05.25 21:40:24.184887 [ 899 ] {} <Fatal> BaseDaemon: 12. ./ci/tmp/build/./contrib/cppkafka/src/consumer.cpp:82: cppkafka::Consumer::~Consumer() @ 0x00000000396653d0
2025.05.25 21:40:24.364999 [ 899 ] {} <Fatal> BaseDaemon: 13.0. inlined from ./contrib/llvm-project/libcxx/include/__memory/shared_ptr.h:156: std::__shared_count::__release_shared[abi:ne190107]()
2025.05.25 21:40:24.365095 [ 899 ] {} <Fatal> BaseDaemon: 13.1. inlined from ./contrib/llvm-project/libcxx/include/__memory/shared_ptr.h:185: std::__shared_weak_count::__release_shared[abi:ne190107]()
2025.05.25 21:40:24.365139 [ 899 ] {} <Fatal> BaseDaemon: 13.2. inlined from ./contrib/llvm-project/libcxx/include/__memory/shared_ptr.h:668: ~shared_ptr
2025.05.25 21:40:24.365224 [ 899 ] {} <Fatal> BaseDaemon: 13.3. inlined from ./contrib/llvm-project/libcxx/include/__memory/construct_at.h:67: void std::__destroy_at[abi:ne190107]<std::shared_ptr<cppkafka::Consumer>, 0>(std::shared_ptr<cppkafka::Consumer>*)
2025.05.25 21:40:24.365271 [ 899 ] {} <Fatal> BaseDaemon: 13.4. inlined from ./contrib/llvm-project/libcxx/include/__memory/allocator_traits.h:339: void std::allocator_traits<std::allocator<std::shared_ptr<cppkafka::Consumer>>>::destroy[abi:ne190107]<std::shared_ptr<cppkafka::Consumer>, void, 0>(std::allocator<std::shared_ptr<cppkafka::Consumer>>&, std::shared_ptr<cppkafka::Consumer>*)
2025.05.25 21:40:24.365311 [ 899 ] {} <Fatal> BaseDaemon: 13.5. inlined from ./contrib/llvm-project/libcxx/include/vector:985: std::vector<std::shared_ptr<cppkafka::Consumer>, std::allocator<std::shared_ptr<cppkafka::Consumer>>>::__base_destruct_at_end[abi:ne190107](std::shared_ptr<cppkafka::Consumer>*)
2025.05.25 21:40:24.365350 [ 899 ] {} <Fatal> BaseDaemon: 13.6. inlined from ./contrib/llvm-project/libcxx/include/vector:979: std::vector<std::shared_ptr<cppkafka::Consumer>, std::allocator<std::shared_ptr<cppkafka::Consumer>>>::__clear[abi:ne190107]()
2025.05.25 21:40:24.365384 [ 899 ] {} <Fatal> BaseDaemon: 13.7. inlined from ./contrib/llvm-project/libcxx/include/vector:749: std::vector<std::shared_ptr<cppkafka::Consumer>, std::allocator<std::shared_ptr<cppkafka::Consumer>>>::clear[abi:ne190107]()
2025.05.25 21:40:24.365424 [ 899 ] {} <Fatal> BaseDaemon: 13. ./ci/tmp/build/./src/Storages/Kafka/StorageKafka.cpp:363: DB::StorageKafka::cleanConsumers() @ 0x0000000027fbedcb
2025.05.25 21:40:24.531732 [ 899 ] {} <Fatal> BaseDaemon: 14. ./ci/tmp/build/./src/Storages/Kafka/StorageKafka.cpp:323: DB::StorageKafka::shutdown(bool) @ 0x0000000027fb9bc9
2025.05.25 21:40:24.615145 [ 899 ] {} <Fatal> BaseDaemon: 15.0. inlined from ./src/Storages/IStorage.h:571: DB::IStorage::flushAndShutdown(bool)
2025.05.25 21:40:24.615247 [ 899 ] {} <Fatal> BaseDaemon: 15. ./ci/tmp/build/./src/Interpreters/InterpreterDropQuery.cpp:306: DB::InterpreterDropQuery::executeToTableImpl(std::shared_ptr<DB::Context const> const&, DB::ASTDropQuery&, std::shared_ptr<DB::IDatabase>&, StrongTypedef<wide::integer<128ul, unsigned int>, DB::UUIDTag>&) @ 0x000000002bd444c4
2025.05.25 21:40:24.691976 [ 899 ] {} <Fatal> BaseDaemon: 16. ./ci/tmp/build/./src/Interpreters/InterpreterDropQuery.cpp:122: DB::InterpreterDropQuery::executeToTable(DB::ASTDropQuery&) @ 0x000000002bd4141d
2025.05.25 21:40:24.764821 [ 899 ] {} <Fatal> BaseDaemon: 17. ./ci/tmp/build/./src/Interpreters/InterpreterDropQuery.cpp:95: DB::InterpreterDropQuery::executeSingleDropQuery(std::shared_ptr<DB::IAST> const&) @ 0x000000002bd3faa1
2025.05.25 21:40:24.837965 [ 899 ] {} <Fatal> BaseDaemon: 18. ./ci/tmp/build/./src/Interpreters/InterpreterDropQuery.cpp:76: DB::InterpreterDropQuery::execute() @ 0x000000002bd3f530
2025.05.25 21:40:24.982954 [ 899 ] {} <Fatal> BaseDaemon: 19. ./ci/tmp/build/./src/Interpreters/executeQuery.cpp:1522: DB::executeQueryImpl(char const*, char const*, std::shared_ptr<DB::Context>, DB::QueryFlags, DB::QueryProcessingStage::Enum, DB::ReadBuffer*, std::shared_ptr<DB::IAST>&) @ 0x000000002c531192
2025.05.25 21:40:25.168009 [ 899 ] {} <Fatal> BaseDaemon: 20. ./ci/tmp/build/./src/Interpreters/executeQuery.cpp:1714: DB::executeQuery(String const&, std::shared_ptr<DB::Context>, DB::QueryFlags, DB::QueryProcessingStage::Enum) @ 0x000000002c526806
2025.05.25 21:40:25.284522 [ 899 ] {} <Fatal> BaseDaemon: 21. ./ci/tmp/build/./src/Server/TCPHandler.cpp:686: DB::TCPHandler::runImpl() @ 0x0000000031819b8e
2025.05.25 21:40:25.521021 [ 899 ] {} <Fatal> BaseDaemon: 22. ./ci/tmp/build/./src/Server/TCPHandler.cpp:2674: DB::TCPHandler::run() @ 0x000000003185aaba
2025.05.25 21:40:25.526077 [ 899 ] {} <Fatal> BaseDaemon: 23. ./ci/tmp/build/./base/poco/Net/src/TCPServerConnection.cpp:40: Poco::Net::TCPServerConnection::start() @ 0x0000000039f610af
2025.05.25 21:40:25.535259 [ 899 ] {} <Fatal> BaseDaemon: 24. ./ci/tmp/build/./base/poco/Net/src/TCPServerDispatcher.cpp:115: Poco::Net::TCPServerDispatcher::run() @ 0x0000000039f61d77
2025.05.25 21:40:25.545738 [ 899 ] {} <Fatal> BaseDaemon: 25. ./ci/tmp/build/./base/poco/Foundation/src/ThreadPool.cpp:205: Poco::PooledThread::run() @ 0x0000000039e664af
2025.05.25 21:40:25.556607 [ 899 ] {} <Fatal> BaseDaemon: 26. ./base/poco/Foundation/src/Thread_POSIX.cpp:335: Poco::ThreadImpl::runnableEntry(void*) @ 0x0000000039e600c8
2025.05.25 21:40:25.596845 [ 899 ] {} <Fatal> BaseDaemon: 27. asan_thread_start(void*) @ 0x000000000c3c3e77
2025.05.25 21:40:25.596970 [ 899 ] {} <Fatal> BaseDaemon: 28. ? @ 0x0000000000094ac3
2025.05.25 21:40:25.597004 [ 899 ] {} <Fatal> BaseDaemon: 29. ? @ 0x0000000000126850
2025.05.25 21:40:25.597039 [ 899 ] {} <Fatal> BaseDaemon: Integrity check of the executable skipped because the reference checksum could not be read.
2025.05.25 21:40:27.479082 [ 899 ] {} <Fatal> BaseDaemon: Changed settings: max_block_size = 38604, max_joined_block_size_rows = 28231, stream_like_engine_allow_direct_select = true, query_plan_join_swap_table = 'auto', function_sleep_max_microseconds_per_block = 0, insert_keeper_max_retries = 0

…e cppkafka::Consumer

cppkafka::Consumer may use the KafkaConsumer via stat callback, and this
will lead to use-after-free:

    Sanitizer trap.
    Stack trace: 0x000055ce71048c47 0x000055ce7173072b 0x000055ce5fe1bb50 0x000055ce5fe02229 0x000055ce5fe05367 0x000055ce5fdfb869 0x000055ce95085934 0x000055ce7b9dd3b4 0x000055ce8d0992aa 0x000055ce8d098d29 0x000055ce8d0e63fa 0x000055ce8d0e97bc 0x000055ce8d09c3d0 0x000055ce7b9f5dcb 0x000055ce7b9f0bc9 0x000055ce7f77b4c4 0x000055ce7f77841d 0x000055ce7f776aa1 0x000055ce7f77>
    0.0. inlined from ./ci/tmp/build/./src/Common/StackTrace.cpp:389: StackTrace::tryCapture()
    0. ./ci/tmp/build/./src/Common/StackTrace.cpp:61: StackTrace::StackTrace() @ 0x000000001d611c47
    1. ./ci/tmp/build/./src/Common/SignalHandlers.cpp:212: sanitizerDeathCallback() @ 0x000000001dcf972b
    2. __sanitizer::Die() @ 0x000000000c3e4b50
    3. __asan::ScopedInErrorReport::~ScopedInErrorReport() @ 0x000000000c3cb229
    4. __asan::ReportGenericError(unsigned long, unsigned long, unsigned long, unsigned long, bool, unsigned long, unsigned int, bool) @ 0x000000000c3ce367
    5. __asan_memmove @ 0x000000000c3c4869
    6.0. inlined from ./contrib/llvm-project/libcxx/include/__string/constexpr_c_functions.h:227: char* std::__constexpr_memmove[abi:ne190107]<char, char const, 0>(char*, char const*, std::__element_count)
    6.1. inlined from ./contrib/llvm-project/libcxx/include/__string/char_traits.h:148: std::char_traits<char>::copy[abi:ne190107](char*, char const*, unsigned long)
    6. ./contrib/llvm-project/libcxx/include/string:2575: String& String::__assign_no_alias<false>(char const*, unsigned long) @ 0x000000004164e934
    7.0. inlined from ./contrib/llvm-project/libcxx/include/string:2665: String::operator=(String const&)
    7.1. inlined from ./src/Storages/Kafka/KafkaConsumer.h:125: DB::KafkaConsumer::setRDKafkaStat(String const&)
    7.2. inlined from ./ci/tmp/build/./src/Storages/Kafka/KafkaConsumer.cpp:76: operator()

    9. ./ci/tmp/build/./contrib/cppkafka/src/configuration.cpp:92: cppkafka::stats_callback_proxy(rd_kafka_s*, char*, unsigned long, void*) @ 0x0000000039661d29
    10. ./ci/tmp/build/./contrib/librdkafka/src/rdkafka.c:4065: rd_kafka_poll_cb @ 0x00000000396af3fa
    11. ./ci/tmp/build/./contrib/librdkafka/src/rdkafka.c:3459: rd_kafka_consumer_close @ 0x00000000396b27bc
    12.0. inlined from ./ci/tmp/build/./contrib/cppkafka/src/consumer.cpp:296: cppkafka::Consumer::close()
    12. ./ci/tmp/build/./contrib/cppkafka/src/consumer.cpp:82: cppkafka::Consumer::~Consumer() @ 0x00000000396653d0

    13. ./ci/tmp/build/./src/Storages/Kafka/StorageKafka.cpp:363: DB::StorageKafka::cleanConsumers() @ 0x0000000027fbedcb
    14. ./ci/tmp/build/./src/Storages/Kafka/StorageKafka.cpp:323: DB::StorageKafka::shutdown(bool) @ 0x0000000027fb9bc9
    15.0. inlined from ./src/Storages/IStorage.h:571: DB::IStorage::flushAndShutdown(bool)
@azat
Copy link
Member Author

azat commented May 27, 2025

Stateless tests (release, ParallelReplicas, s3 storage) — fail: 1, passed: 6925, skipped: 981

Stateless tests (tsan, s3 storage, 3/3) — Server died, fail: 12, passed: 1320, skipped: 30

@Avogar Avogar assigned tuanpach and unassigned Avogar May 27, 2025
@azat azat enabled auto-merge May 27, 2025 14:52
@azat
Copy link
Member Author

azat commented May 27, 2025

Bugfix validation — New test(s) failed to reproduce a bug

Test can reproduce the problem reliably only with ASan/debug build.

@azat azat added this pull request to the merge queue May 27, 2025
Merged via the queue into ClickHouse:master with commit feef575 May 27, 2025
117 of 123 checks passed
@azat azat deleted the kafka-proper-shutdown branch May 27, 2025 15:07
robot-clickhouse added a commit that referenced this pull request May 27, 2025
Cherry pick #80795 to 24.8: Properly wait consumers before shutting down Kafka engine
robot-clickhouse added a commit that referenced this pull request May 27, 2025
robot-clickhouse added a commit that referenced this pull request May 27, 2025
Cherry pick #80795 to 25.3: Properly wait consumers before shutting down Kafka engine
robot-clickhouse added a commit that referenced this pull request May 27, 2025
robot-clickhouse added a commit that referenced this pull request May 27, 2025
Cherry pick #80795 to 25.4: Properly wait consumers before shutting down Kafka engine
robot-clickhouse added a commit that referenced this pull request May 27, 2025
robot-clickhouse added a commit that referenced this pull request May 27, 2025
Cherry pick #80795 to 25.5: Properly wait consumers before shutting down Kafka engine
robot-clickhouse added a commit that referenced this pull request May 27, 2025
@robot-clickhouse-ci-1 robot-clickhouse-ci-1 added the pr-synced-to-cloud The PR is synced to the cloud repo label May 27, 2025
azat added a commit to azat/ClickHouse that referenced this pull request May 27, 2025
There is likely the same problem as has been fixed by ClickHouse#80795,
`cppkafka::Consumer` can hold the `shared_ptr` copy of the
`KafkaConsumer2`, and if during destroying `librdkafka` will log
something, then we will have a deadlock, with a stacktrace similar to
this one:

    * thread ClickHouse#28, name = 'rdk:b/dummy_htt'
        frame 5: 0x00007ffff7e222a3 libc.so.6`___pthread_join(threadid=<unavailable>, thread_return=<unavailable>) + 19 at pthread_join.c:24
        frame 6: 0x000055556d0a2012 ch`rd_kafka_thrd_join(thr=<unavailable>, res=0x00007fffe59f4354) + 18 at tinycthread.c:692
        frame 7: 0x000055556cfb1c01 ch`rd_kafka_destroy_app(rk=0x00007fffe6ccc000, flags=<unavailable>) + 609 at rdkafka.c:1143
        frame 8: 0x000055556cf9826a ch`cppkafka::Consumer::~Consumer() [inlined] std::__1::unique_ptr<rd_kafka_s, cppkafka::KafkaHandleBase::HandleDeleter>::reset[abi:ne190107](this=0x00007fffe6cb11d8, __p=0x0000000000000000) + 330 at unique_ptr.h:292
        frame 9: 0x000055556cf98247 ch`cppkafka::Consumer::~Consumer() [inlined] std::__1::unique_ptr<rd_kafka_s, cppkafka::KafkaHandleBase::HandleDeleter>::~unique_ptr[abi:ne190107](this=0x00007fffe6cb11d8) at unique_ptr.h:261
        frame 10: 0x000055556cf98247 ch`cppkafka::Consumer::~Consumer() [inlined] cppkafka::KafkaHandleBase::~KafkaHandleBase(this=0x00007fffe6cb1018) + 14 at kafka_handle_base.h:66
        frame 11: 0x000055556cf98239 ch`cppkafka::Consumer::~Consumer(this=0x00007fffe6cb1018) + 281 at consumer.cpp:99
        frame 12: 0x0000555566718c70 ch`DB::KafkaConsumer::~KafkaConsumer() [inlined] std::__1::__shared_count::__release_shared[abi:ne190107](this=0x00007fffe6cb1000) + 1008 at shared_ptr.h:156
        frame 13: 0x0000555566718c67 ch`DB::KafkaConsumer::~KafkaConsumer() [inlined] std::__1::__shared_weak_count::__release_shared[abi:ne190107](this=0x00007fffe6cb1000) at shared_ptr.h:185
        frame 14: 0x0000555566718c67 ch`DB::KafkaConsumer::~KafkaConsumer() [inlined] std::__1::shared_ptr<cppkafka::Consumer>::~shared_ptr[abi:ne190107](this=0x00007ffff4cf7ff0) at shared_ptr.h:668
        frame 15: 0x0000555566718c67 ch`DB::KafkaConsumer::~KafkaConsumer(this=0x00007ffff4cf7f98) + 999 at KafkaConsumer.cpp:175
        ...
        frame ClickHouse#19: 0x00005555667968f2 ch`void void DB::()::updateGlobalConfiguration<DB::StorageKafka>()::'lambda'()::operator()() const + 1105 at KafkaConfigLoader.cpp:409
        ...
        frame 27: 0x000055556cf96779 ch`cppkafka::log_callback_proxy(h=<unavailable>, level=3, facility="ERROR", message="[thrd:localhost:9092/bootstrap]: 1/1 brokers are down") + 89 at configuration.cpp:85
        frame 28: 0x000055556cfb0be2 ch`rd_kafka_log0 [inlined] rd_kafka_log_buf(conf=0x00007fffe6ccc138, rk=0x00007fffe6ccc000, level=3, ctx=0, fac="ERROR", buf="[thrd:localhost:9092/bootstrap]: 1/1 brokers are down") + 546 at rdkafka.c:276
        frame 29: 0x000055556cfb0b02 ch`rd_kafka_log0(conf=0x00007fffe6ccc138, rk=0x00007fffe6ccc000, extra=0x0000000000000000, level=3, ctx=0, fac="ERROR", fmt="%i/%i brokers are down") + 322 at rdkafka.c:320
        frame 30: 0x000055556cf9d6aa ch`rd_kafka_broker_set_state(rkb=0x00007fffe6cbf800, state=1) + 298 at rdkafka_broker.c:348
        frame 31: 0x000055556cf9dbf6 ch`rd_kafka_broker_fail(rkb=0x00007fffe6cbf800, level=3, err=RD_KAFKA_RESP_ERR__TRANSPORT, fmt=<unavailable>) + 374 at rdkafka_broker.c:623
        frame 32: 0x000055556cfa1246 ch`rd_kafka_broker_connect_done(rkb=<unavailable>, errstr=<unavailable>) + 70 at rdkafka_broker.c:2641
        frame 33: 0x000055556d096efb ch`rd_kafka_transport_io_serve [inlined] rd_kafka_transport_connect_done(rktrans=0x00007ffff279c000, errstr="Connect to ipv4#127.0.0.1:9092 failed: Connection refused") + 731 at rdkafka_transport.c:381
        frame 34: 0x000055556d096eeb ch`rd_kafka_transport_io_serve [inlined] rd_kafka_transport_io_event(rktrans=0x00007ffff279c000, events=<unavailable>, socket_errstr=0x0000000000000000) + 153 at rdkafka_transport.c:691
        frame 35: 0x000055556d096e52 ch`rd_kafka_transport_io_serve(rktrans=0x00007ffff279c000, rkq=0x00007fffe7131240, timeout_ms=<unavailable>) + 562 at rdkafka_transport.c:1007
        frame 36: 0x000055556cfab0ef ch`rd_kafka_broker_ops_io_serve(rkb=0x00007fffe6cbf800, abs_timeout=373906299505) + 367 at rdkafka_broker.c:3670
        frame 37: 0x000055556cfa8d6c ch`rd_kafka_broker_serve [inlined] rd_kafka_broker_consumer_serve(rkb=0x00007fffe6cbf800, abs_timeout=373906299505) + 297 at rdkafka_broker.c:4373
        frame 38: 0x000055556cfa8c43 ch`rd_kafka_broker_serve(rkb=0x00007fffe6cbf800, timeout_ms=<unavailable>) + 4675 at rdkafka_broker.c:4515
        frame 39: 0x000055556cfa4120 ch`rd_kafka_broker_thread_main(arg=0x00007fffe6cbf800) + 320 at rdkafka_broker.c:4663
        frame 40: 0x000055556d0a1fd9 ch`_thrd_wrapper_function(aArg=<unavailable>) + 25 at tinycthread.c:576
        frame 41: 0x00007ffff7e20708 libc.so.6`start_thread(arg=<unavailable>) + 455209 at pthread_create.c:448
        frame 42: 0x00007ffff7ea4aac libc.so.6`__clone3 + 44 at clone3.S:78

      thread ClickHouse#26, name = 'rdk:m/dummy_htt', stop reason = signal SIGSTOP
        frame 0: 0x00007ffff7e28be2 libc.so.6`__syscall_cancel_arch + 50 at syscall_cancel.S:56
        frame 1: 0x00007ffff7e1ce33 libc.so.6`__internal_syscall_cancel(a1=<unavailable>, a2=<unavailable>, a3=<unavailable>, a4=<unavailable>, a5=<unavailable>, a6=<unavailable>, nr=202) + 83 at cancellation.c:49
        frame 2: 0x00007ffff7e1d4bc libc.so.6`__futex_abstimed_wait_common [inlined] __futex_abstimed_wait_common64(private=<unavailable>, futex_word=<unavailable>, expected=<unavailable>, op=<unavailable>, abstime=<unavailable>, cancel=<unavailable>) + 28 at futex-internal.c:57
        frame 3: 0x00007ffff7e1d4a0 libc.so.6`__futex_abstimed_wait_common(futex_word=<unavailable>, expected=<unavailable>, clockid=<unavailable>, abstime=<unavailable>, private=<unavailable>, cancel=<unavailable>) + 96 at futex-internal.c:87
        frame 4: 0x00007ffff7e22414 libc.so.6`__pthread_clockjoin_ex(threadid=140737033270976, thread_return=0x00007fffe61e9f70, clockid=0, abstime=0x0000000000000000, block=true) + 324 at pthread_join_common.c:108
        frame 5: 0x00007ffff7e222a3 libc.so.6`___pthread_join(threadid=<unavailable>, thread_return=<unavailable>) + 19 at pthread_join.c:24
        frame 6: 0x000055556d0a1f52 clickhouse`rd_kafka_thrd_join(thr=<unavailable>, res=0x00007fffe61e9fd4) + 18 at tinycthread.c:692
        frame 7: 0x000055556cfb439b clickhouse`rd_kafka_destroy_internal(rk=0x00007fffe6245000) + 1083 at rdkafka.c:1306
        frame 8: 0x000055556cfb3e34 clickhouse`rd_kafka_thread_main(arg=0x00007fffe6245000) + 788 at rdkafka.c:2182
        frame 9: 0x000055556d0a1f19 clickhouse`_thrd_wrapper_function(aArg=<unavailable>) + 25 at tinycthread.c:576
        frame 10: 0x00007ffff7e20708 libc.so.6`start_thread(arg=<unavailable>) + 455209 at pthread_create.c:448
        frame 11: 0x00007ffff7ea4aac libc.so.6`__clone3 + 44 at clone3.S:78
azat added a commit to azat/ClickHouse that referenced this pull request May 27, 2025
There is likely the same problem as has been fixed by ClickHouse#80795,
`cppkafka::Consumer` can hold the `shared_ptr` copy of the
`KafkaConsumer2`, and if during destroying `librdkafka` will log
something, then we will have a deadlock, with a stacktrace similar to
this one:

    * thread ClickHouse#28, name = 'rdk:b/dummy_htt'
        frame 5: 0x00007ffff7e222a3 libc.so.6`___pthread_join(threadid=<unavailable>, thread_return=<unavailable>) + 19 at pthread_join.c:24
        frame 6: 0x000055556d0a2012 ch`rd_kafka_thrd_join(thr=<unavailable>, res=0x00007fffe59f4354) + 18 at tinycthread.c:692
        frame 7: 0x000055556cfb1c01 ch`rd_kafka_destroy_app(rk=0x00007fffe6ccc000, flags=<unavailable>) + 609 at rdkafka.c:1143
        frame 8: 0x000055556cf9826a ch`cppkafka::Consumer::~Consumer() [inlined] std::__1::unique_ptr<rd_kafka_s, cppkafka::KafkaHandleBase::HandleDeleter>::reset[abi:ne190107](this=0x00007fffe6cb11d8, __p=0x0000000000000000) + 330 at unique_ptr.h:292
        frame 9: 0x000055556cf98247 ch`cppkafka::Consumer::~Consumer() [inlined] std::__1::unique_ptr<rd_kafka_s, cppkafka::KafkaHandleBase::HandleDeleter>::~unique_ptr[abi:ne190107](this=0x00007fffe6cb11d8) at unique_ptr.h:261
        frame 10: 0x000055556cf98247 ch`cppkafka::Consumer::~Consumer() [inlined] cppkafka::KafkaHandleBase::~KafkaHandleBase(this=0x00007fffe6cb1018) + 14 at kafka_handle_base.h:66
        frame 11: 0x000055556cf98239 ch`cppkafka::Consumer::~Consumer(this=0x00007fffe6cb1018) + 281 at consumer.cpp:99
        frame 12: 0x0000555566718c70 ch`DB::KafkaConsumer::~KafkaConsumer() [inlined] std::__1::__shared_count::__release_shared[abi:ne190107](this=0x00007fffe6cb1000) + 1008 at shared_ptr.h:156
        frame 13: 0x0000555566718c67 ch`DB::KafkaConsumer::~KafkaConsumer() [inlined] std::__1::__shared_weak_count::__release_shared[abi:ne190107](this=0x00007fffe6cb1000) at shared_ptr.h:185
        frame 14: 0x0000555566718c67 ch`DB::KafkaConsumer::~KafkaConsumer() [inlined] std::__1::shared_ptr<cppkafka::Consumer>::~shared_ptr[abi:ne190107](this=0x00007ffff4cf7ff0) at shared_ptr.h:668
        frame 15: 0x0000555566718c67 ch`DB::KafkaConsumer::~KafkaConsumer(this=0x00007ffff4cf7f98) + 999 at KafkaConsumer.cpp:175
        ...
        frame ClickHouse#19: 0x00005555667968f2 ch`void void DB::()::updateGlobalConfiguration<DB::StorageKafka>()::'lambda'()::operator()() const + 1105 at KafkaConfigLoader.cpp:409
        ...
        frame 27: 0x000055556cf96779 ch`cppkafka::log_callback_proxy(h=<unavailable>, level=3, facility="ERROR", message="[thrd:localhost:9092/bootstrap]: 1/1 brokers are down") + 89 at configuration.cpp:85
        frame 28: 0x000055556cfb0be2 ch`rd_kafka_log0 [inlined] rd_kafka_log_buf(conf=0x00007fffe6ccc138, rk=0x00007fffe6ccc000, level=3, ctx=0, fac="ERROR", buf="[thrd:localhost:9092/bootstrap]: 1/1 brokers are down") + 546 at rdkafka.c:276
        frame 29: 0x000055556cfb0b02 ch`rd_kafka_log0(conf=0x00007fffe6ccc138, rk=0x00007fffe6ccc000, extra=0x0000000000000000, level=3, ctx=0, fac="ERROR", fmt="%i/%i brokers are down") + 322 at rdkafka.c:320
        frame 30: 0x000055556cf9d6aa ch`rd_kafka_broker_set_state(rkb=0x00007fffe6cbf800, state=1) + 298 at rdkafka_broker.c:348
        frame 31: 0x000055556cf9dbf6 ch`rd_kafka_broker_fail(rkb=0x00007fffe6cbf800, level=3, err=RD_KAFKA_RESP_ERR__TRANSPORT, fmt=<unavailable>) + 374 at rdkafka_broker.c:623
        frame 32: 0x000055556cfa1246 ch`rd_kafka_broker_connect_done(rkb=<unavailable>, errstr=<unavailable>) + 70 at rdkafka_broker.c:2641
        frame 33: 0x000055556d096efb ch`rd_kafka_transport_io_serve [inlined] rd_kafka_transport_connect_done(rktrans=0x00007ffff279c000, errstr="Connect to ipv4#127.0.0.1:9092 failed: Connection refused") + 731 at rdkafka_transport.c:381
        frame 34: 0x000055556d096eeb ch`rd_kafka_transport_io_serve [inlined] rd_kafka_transport_io_event(rktrans=0x00007ffff279c000, events=<unavailable>, socket_errstr=0x0000000000000000) + 153 at rdkafka_transport.c:691
        frame 35: 0x000055556d096e52 ch`rd_kafka_transport_io_serve(rktrans=0x00007ffff279c000, rkq=0x00007fffe7131240, timeout_ms=<unavailable>) + 562 at rdkafka_transport.c:1007
        frame 36: 0x000055556cfab0ef ch`rd_kafka_broker_ops_io_serve(rkb=0x00007fffe6cbf800, abs_timeout=373906299505) + 367 at rdkafka_broker.c:3670
        frame 37: 0x000055556cfa8d6c ch`rd_kafka_broker_serve [inlined] rd_kafka_broker_consumer_serve(rkb=0x00007fffe6cbf800, abs_timeout=373906299505) + 297 at rdkafka_broker.c:4373
        frame 38: 0x000055556cfa8c43 ch`rd_kafka_broker_serve(rkb=0x00007fffe6cbf800, timeout_ms=<unavailable>) + 4675 at rdkafka_broker.c:4515
        frame 39: 0x000055556cfa4120 ch`rd_kafka_broker_thread_main(arg=0x00007fffe6cbf800) + 320 at rdkafka_broker.c:4663
        frame 40: 0x000055556d0a1fd9 ch`_thrd_wrapper_function(aArg=<unavailable>) + 25 at tinycthread.c:576
        frame 41: 0x00007ffff7e20708 libc.so.6`start_thread(arg=<unavailable>) + 455209 at pthread_create.c:448
        frame 42: 0x00007ffff7ea4aac libc.so.6`__clone3 + 44 at clone3.S:78

      thread ClickHouse#26, name = 'rdk:m/dummy_htt', stop reason = signal SIGSTOP
        frame 5: 0x00007ffff7e222a3 libc.so.6`___pthread_join(threadid=<unavailable>, thread_return=<unavailable>) + 19 at pthread_join.c:24
        frame 6: 0x000055556d0a1f52 clickhouse`rd_kafka_thrd_join(thr=<unavailable>, res=0x00007fffe61e9fd4) + 18 at tinycthread.c:692
        frame 7: 0x000055556cfb439b clickhouse`rd_kafka_destroy_internal(rk=0x00007fffe6245000) + 1083 at rdkafka.c:1306
        frame 8: 0x000055556cfb3e34 clickhouse`rd_kafka_thread_main(arg=0x00007fffe6245000) + 788 at rdkafka.c:2182
        frame 9: 0x000055556d0a1f19 clickhouse`_thrd_wrapper_function(aArg=<unavailable>) + 25 at tinycthread.c:576
        frame 10: 0x00007ffff7e20708 libc.so.6`start_thread(arg=<unavailable>) + 455209 at pthread_create.c:448
        frame 11: 0x00007ffff7ea4aac libc.so.6`__clone3 + 44 at clone3.S:78
@robot-clickhouse robot-clickhouse added the pr-backports-created Backport PRs are successfully created, it won't be processed by CI script anymore label May 27, 2025
@robot-clickhouse-ci-1 robot-clickhouse-ci-1 added the pr-backports-created-cloud deprecated label, NOOP label May 27, 2025
azat added a commit that referenced this pull request Jun 1, 2025
Backport #80795 to 25.5: Properly wait consumers before shutting down Kafka engine
azat added a commit that referenced this pull request Jun 1, 2025
Backport #80795 to 25.4: Properly wait consumers before shutting down Kafka engine
azat added a commit that referenced this pull request Jun 1, 2025
Backport #80795 to 25.3: Properly wait consumers before shutting down Kafka engine
@robot-clickhouse robot-clickhouse added the pr-must-backport-synced The `*-must-backport` labels are synced into the cloud Sync PR label Jul 2, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

pr-backports-created Backport PRs are successfully created, it won't be processed by CI script anymore pr-backports-created-cloud deprecated label, NOOP pr-bugfix Pull request with bugfix, not backported by default pr-must-backport Pull request should be backported intentionally. Use this label with great care! pr-must-backport-synced The `*-must-backport` labels are synced into the cloud Sync PR pr-synced-to-cloud The PR is synced to the cloud repo

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Segfault on attempt to create engine=Kafka table without group name

5 participants