Properly wait consumers before shutting down Kafka engine#80795
Properly wait consumers before shutting down Kafka engine#80795azat merged 6 commits intoClickHouse:masterfrom
Conversation
Previously it was possibility to have active consumers after storage shutdown, and not only it is bad because it can read from the broker, it also can lead to internal problems with ThreadStatus, which will be destroyed from the thread that calls shutdown() for storage, which is not recommended usage.
|
|
||
| void StorageKafka::cleanConsumers() | ||
| { | ||
| /// We need to clear the cppkafka::Consumer separately from KafkaConsumer, since cppkafka::Consumer holds a weak_ptr to the KafkaConsumer (for logging callback) |
There was a problem hiding this comment.
I cannot find the place where the cppkafka::Consumer holds the weak_ptr of KafkaConsumer? Could you show me where it is?
I can notice some callbacks passed to cppkafka::Consumer in KafkaConsumer::createConsumer.
There was a problem hiding this comment.
I cannot find the place where the cppkafka::Consumer holds the weak_ptr of KafkaConsumer? Could you show me where it is?
| { | ||
| /// We need to clear the cppkafka::Consumer separately from KafkaConsumer, since cppkafka::Consumer holds a weak_ptr to the KafkaConsumer (for logging callback) | ||
| /// So if we will remove cppkafka::Consumer from KafkaConsumer destructor, then due to librdkafka will call the logging again from destructor, it will lead to a deadlock | ||
| std::vector<ConsumerPtr> consumers_to_close; |
There was a problem hiding this comment.
I think we should put a log before waiting for the cv and after clearing consumers_to_close. It would be easier when troubleshooting as we would know if all consumers have been returned back yet.
There was a problem hiding this comment.
Indeed. And there are already such logs in the caller of cleanConsumers
|
One more issue in kafka |
…e cppkafka::Consumer
cppkafka::Consumer may use the KafkaConsumer via stat callback, and this
will lead to use-after-free:
Sanitizer trap.
Stack trace: 0x000055ce71048c47 0x000055ce7173072b 0x000055ce5fe1bb50 0x000055ce5fe02229 0x000055ce5fe05367 0x000055ce5fdfb869 0x000055ce95085934 0x000055ce7b9dd3b4 0x000055ce8d0992aa 0x000055ce8d098d29 0x000055ce8d0e63fa 0x000055ce8d0e97bc 0x000055ce8d09c3d0 0x000055ce7b9f5dcb 0x000055ce7b9f0bc9 0x000055ce7f77b4c4 0x000055ce7f77841d 0x000055ce7f776aa1 0x000055ce7f77>
0.0. inlined from ./ci/tmp/build/./src/Common/StackTrace.cpp:389: StackTrace::tryCapture()
0. ./ci/tmp/build/./src/Common/StackTrace.cpp:61: StackTrace::StackTrace() @ 0x000000001d611c47
1. ./ci/tmp/build/./src/Common/SignalHandlers.cpp:212: sanitizerDeathCallback() @ 0x000000001dcf972b
2. __sanitizer::Die() @ 0x000000000c3e4b50
3. __asan::ScopedInErrorReport::~ScopedInErrorReport() @ 0x000000000c3cb229
4. __asan::ReportGenericError(unsigned long, unsigned long, unsigned long, unsigned long, bool, unsigned long, unsigned int, bool) @ 0x000000000c3ce367
5. __asan_memmove @ 0x000000000c3c4869
6.0. inlined from ./contrib/llvm-project/libcxx/include/__string/constexpr_c_functions.h:227: char* std::__constexpr_memmove[abi:ne190107]<char, char const, 0>(char*, char const*, std::__element_count)
6.1. inlined from ./contrib/llvm-project/libcxx/include/__string/char_traits.h:148: std::char_traits<char>::copy[abi:ne190107](char*, char const*, unsigned long)
6. ./contrib/llvm-project/libcxx/include/string:2575: String& String::__assign_no_alias<false>(char const*, unsigned long) @ 0x000000004164e934
7.0. inlined from ./contrib/llvm-project/libcxx/include/string:2665: String::operator=(String const&)
7.1. inlined from ./src/Storages/Kafka/KafkaConsumer.h:125: DB::KafkaConsumer::setRDKafkaStat(String const&)
7.2. inlined from ./ci/tmp/build/./src/Storages/Kafka/KafkaConsumer.cpp:76: operator()
9. ./ci/tmp/build/./contrib/cppkafka/src/configuration.cpp:92: cppkafka::stats_callback_proxy(rd_kafka_s*, char*, unsigned long, void*) @ 0x0000000039661d29
10. ./ci/tmp/build/./contrib/librdkafka/src/rdkafka.c:4065: rd_kafka_poll_cb @ 0x00000000396af3fa
11. ./ci/tmp/build/./contrib/librdkafka/src/rdkafka.c:3459: rd_kafka_consumer_close @ 0x00000000396b27bc
12.0. inlined from ./ci/tmp/build/./contrib/cppkafka/src/consumer.cpp:296: cppkafka::Consumer::close()
12. ./ci/tmp/build/./contrib/cppkafka/src/consumer.cpp:82: cppkafka::Consumer::~Consumer() @ 0x00000000396653d0
13. ./ci/tmp/build/./src/Storages/Kafka/StorageKafka.cpp:363: DB::StorageKafka::cleanConsumers() @ 0x0000000027fbedcb
14. ./ci/tmp/build/./src/Storages/Kafka/StorageKafka.cpp:323: DB::StorageKafka::shutdown(bool) @ 0x0000000027fb9bc9
15.0. inlined from ./src/Storages/IStorage.h:571: DB::IStorage::flushAndShutdown(bool)
|
Test can reproduce the problem reliably only with ASan/debug build. |
feef575
Cherry pick #80795 to 24.8: Properly wait consumers before shutting down Kafka engine
Cherry pick #80795 to 25.3: Properly wait consumers before shutting down Kafka engine
Cherry pick #80795 to 25.4: Properly wait consumers before shutting down Kafka engine
Cherry pick #80795 to 25.5: Properly wait consumers before shutting down Kafka engine
There is likely the same problem as has been fixed by ClickHouse#80795, `cppkafka::Consumer` can hold the `shared_ptr` copy of the `KafkaConsumer2`, and if during destroying `librdkafka` will log something, then we will have a deadlock, with a stacktrace similar to this one: * thread ClickHouse#28, name = 'rdk:b/dummy_htt' frame 5: 0x00007ffff7e222a3 libc.so.6`___pthread_join(threadid=<unavailable>, thread_return=<unavailable>) + 19 at pthread_join.c:24 frame 6: 0x000055556d0a2012 ch`rd_kafka_thrd_join(thr=<unavailable>, res=0x00007fffe59f4354) + 18 at tinycthread.c:692 frame 7: 0x000055556cfb1c01 ch`rd_kafka_destroy_app(rk=0x00007fffe6ccc000, flags=<unavailable>) + 609 at rdkafka.c:1143 frame 8: 0x000055556cf9826a ch`cppkafka::Consumer::~Consumer() [inlined] std::__1::unique_ptr<rd_kafka_s, cppkafka::KafkaHandleBase::HandleDeleter>::reset[abi:ne190107](this=0x00007fffe6cb11d8, __p=0x0000000000000000) + 330 at unique_ptr.h:292 frame 9: 0x000055556cf98247 ch`cppkafka::Consumer::~Consumer() [inlined] std::__1::unique_ptr<rd_kafka_s, cppkafka::KafkaHandleBase::HandleDeleter>::~unique_ptr[abi:ne190107](this=0x00007fffe6cb11d8) at unique_ptr.h:261 frame 10: 0x000055556cf98247 ch`cppkafka::Consumer::~Consumer() [inlined] cppkafka::KafkaHandleBase::~KafkaHandleBase(this=0x00007fffe6cb1018) + 14 at kafka_handle_base.h:66 frame 11: 0x000055556cf98239 ch`cppkafka::Consumer::~Consumer(this=0x00007fffe6cb1018) + 281 at consumer.cpp:99 frame 12: 0x0000555566718c70 ch`DB::KafkaConsumer::~KafkaConsumer() [inlined] std::__1::__shared_count::__release_shared[abi:ne190107](this=0x00007fffe6cb1000) + 1008 at shared_ptr.h:156 frame 13: 0x0000555566718c67 ch`DB::KafkaConsumer::~KafkaConsumer() [inlined] std::__1::__shared_weak_count::__release_shared[abi:ne190107](this=0x00007fffe6cb1000) at shared_ptr.h:185 frame 14: 0x0000555566718c67 ch`DB::KafkaConsumer::~KafkaConsumer() [inlined] std::__1::shared_ptr<cppkafka::Consumer>::~shared_ptr[abi:ne190107](this=0x00007ffff4cf7ff0) at shared_ptr.h:668 frame 15: 0x0000555566718c67 ch`DB::KafkaConsumer::~KafkaConsumer(this=0x00007ffff4cf7f98) + 999 at KafkaConsumer.cpp:175 ... frame ClickHouse#19: 0x00005555667968f2 ch`void void DB::()::updateGlobalConfiguration<DB::StorageKafka>()::'lambda'()::operator()() const + 1105 at KafkaConfigLoader.cpp:409 ... frame 27: 0x000055556cf96779 ch`cppkafka::log_callback_proxy(h=<unavailable>, level=3, facility="ERROR", message="[thrd:localhost:9092/bootstrap]: 1/1 brokers are down") + 89 at configuration.cpp:85 frame 28: 0x000055556cfb0be2 ch`rd_kafka_log0 [inlined] rd_kafka_log_buf(conf=0x00007fffe6ccc138, rk=0x00007fffe6ccc000, level=3, ctx=0, fac="ERROR", buf="[thrd:localhost:9092/bootstrap]: 1/1 brokers are down") + 546 at rdkafka.c:276 frame 29: 0x000055556cfb0b02 ch`rd_kafka_log0(conf=0x00007fffe6ccc138, rk=0x00007fffe6ccc000, extra=0x0000000000000000, level=3, ctx=0, fac="ERROR", fmt="%i/%i brokers are down") + 322 at rdkafka.c:320 frame 30: 0x000055556cf9d6aa ch`rd_kafka_broker_set_state(rkb=0x00007fffe6cbf800, state=1) + 298 at rdkafka_broker.c:348 frame 31: 0x000055556cf9dbf6 ch`rd_kafka_broker_fail(rkb=0x00007fffe6cbf800, level=3, err=RD_KAFKA_RESP_ERR__TRANSPORT, fmt=<unavailable>) + 374 at rdkafka_broker.c:623 frame 32: 0x000055556cfa1246 ch`rd_kafka_broker_connect_done(rkb=<unavailable>, errstr=<unavailable>) + 70 at rdkafka_broker.c:2641 frame 33: 0x000055556d096efb ch`rd_kafka_transport_io_serve [inlined] rd_kafka_transport_connect_done(rktrans=0x00007ffff279c000, errstr="Connect to ipv4#127.0.0.1:9092 failed: Connection refused") + 731 at rdkafka_transport.c:381 frame 34: 0x000055556d096eeb ch`rd_kafka_transport_io_serve [inlined] rd_kafka_transport_io_event(rktrans=0x00007ffff279c000, events=<unavailable>, socket_errstr=0x0000000000000000) + 153 at rdkafka_transport.c:691 frame 35: 0x000055556d096e52 ch`rd_kafka_transport_io_serve(rktrans=0x00007ffff279c000, rkq=0x00007fffe7131240, timeout_ms=<unavailable>) + 562 at rdkafka_transport.c:1007 frame 36: 0x000055556cfab0ef ch`rd_kafka_broker_ops_io_serve(rkb=0x00007fffe6cbf800, abs_timeout=373906299505) + 367 at rdkafka_broker.c:3670 frame 37: 0x000055556cfa8d6c ch`rd_kafka_broker_serve [inlined] rd_kafka_broker_consumer_serve(rkb=0x00007fffe6cbf800, abs_timeout=373906299505) + 297 at rdkafka_broker.c:4373 frame 38: 0x000055556cfa8c43 ch`rd_kafka_broker_serve(rkb=0x00007fffe6cbf800, timeout_ms=<unavailable>) + 4675 at rdkafka_broker.c:4515 frame 39: 0x000055556cfa4120 ch`rd_kafka_broker_thread_main(arg=0x00007fffe6cbf800) + 320 at rdkafka_broker.c:4663 frame 40: 0x000055556d0a1fd9 ch`_thrd_wrapper_function(aArg=<unavailable>) + 25 at tinycthread.c:576 frame 41: 0x00007ffff7e20708 libc.so.6`start_thread(arg=<unavailable>) + 455209 at pthread_create.c:448 frame 42: 0x00007ffff7ea4aac libc.so.6`__clone3 + 44 at clone3.S:78 thread ClickHouse#26, name = 'rdk:m/dummy_htt', stop reason = signal SIGSTOP frame 0: 0x00007ffff7e28be2 libc.so.6`__syscall_cancel_arch + 50 at syscall_cancel.S:56 frame 1: 0x00007ffff7e1ce33 libc.so.6`__internal_syscall_cancel(a1=<unavailable>, a2=<unavailable>, a3=<unavailable>, a4=<unavailable>, a5=<unavailable>, a6=<unavailable>, nr=202) + 83 at cancellation.c:49 frame 2: 0x00007ffff7e1d4bc libc.so.6`__futex_abstimed_wait_common [inlined] __futex_abstimed_wait_common64(private=<unavailable>, futex_word=<unavailable>, expected=<unavailable>, op=<unavailable>, abstime=<unavailable>, cancel=<unavailable>) + 28 at futex-internal.c:57 frame 3: 0x00007ffff7e1d4a0 libc.so.6`__futex_abstimed_wait_common(futex_word=<unavailable>, expected=<unavailable>, clockid=<unavailable>, abstime=<unavailable>, private=<unavailable>, cancel=<unavailable>) + 96 at futex-internal.c:87 frame 4: 0x00007ffff7e22414 libc.so.6`__pthread_clockjoin_ex(threadid=140737033270976, thread_return=0x00007fffe61e9f70, clockid=0, abstime=0x0000000000000000, block=true) + 324 at pthread_join_common.c:108 frame 5: 0x00007ffff7e222a3 libc.so.6`___pthread_join(threadid=<unavailable>, thread_return=<unavailable>) + 19 at pthread_join.c:24 frame 6: 0x000055556d0a1f52 clickhouse`rd_kafka_thrd_join(thr=<unavailable>, res=0x00007fffe61e9fd4) + 18 at tinycthread.c:692 frame 7: 0x000055556cfb439b clickhouse`rd_kafka_destroy_internal(rk=0x00007fffe6245000) + 1083 at rdkafka.c:1306 frame 8: 0x000055556cfb3e34 clickhouse`rd_kafka_thread_main(arg=0x00007fffe6245000) + 788 at rdkafka.c:2182 frame 9: 0x000055556d0a1f19 clickhouse`_thrd_wrapper_function(aArg=<unavailable>) + 25 at tinycthread.c:576 frame 10: 0x00007ffff7e20708 libc.so.6`start_thread(arg=<unavailable>) + 455209 at pthread_create.c:448 frame 11: 0x00007ffff7ea4aac libc.so.6`__clone3 + 44 at clone3.S:78
There is likely the same problem as has been fixed by ClickHouse#80795, `cppkafka::Consumer` can hold the `shared_ptr` copy of the `KafkaConsumer2`, and if during destroying `librdkafka` will log something, then we will have a deadlock, with a stacktrace similar to this one: * thread ClickHouse#28, name = 'rdk:b/dummy_htt' frame 5: 0x00007ffff7e222a3 libc.so.6`___pthread_join(threadid=<unavailable>, thread_return=<unavailable>) + 19 at pthread_join.c:24 frame 6: 0x000055556d0a2012 ch`rd_kafka_thrd_join(thr=<unavailable>, res=0x00007fffe59f4354) + 18 at tinycthread.c:692 frame 7: 0x000055556cfb1c01 ch`rd_kafka_destroy_app(rk=0x00007fffe6ccc000, flags=<unavailable>) + 609 at rdkafka.c:1143 frame 8: 0x000055556cf9826a ch`cppkafka::Consumer::~Consumer() [inlined] std::__1::unique_ptr<rd_kafka_s, cppkafka::KafkaHandleBase::HandleDeleter>::reset[abi:ne190107](this=0x00007fffe6cb11d8, __p=0x0000000000000000) + 330 at unique_ptr.h:292 frame 9: 0x000055556cf98247 ch`cppkafka::Consumer::~Consumer() [inlined] std::__1::unique_ptr<rd_kafka_s, cppkafka::KafkaHandleBase::HandleDeleter>::~unique_ptr[abi:ne190107](this=0x00007fffe6cb11d8) at unique_ptr.h:261 frame 10: 0x000055556cf98247 ch`cppkafka::Consumer::~Consumer() [inlined] cppkafka::KafkaHandleBase::~KafkaHandleBase(this=0x00007fffe6cb1018) + 14 at kafka_handle_base.h:66 frame 11: 0x000055556cf98239 ch`cppkafka::Consumer::~Consumer(this=0x00007fffe6cb1018) + 281 at consumer.cpp:99 frame 12: 0x0000555566718c70 ch`DB::KafkaConsumer::~KafkaConsumer() [inlined] std::__1::__shared_count::__release_shared[abi:ne190107](this=0x00007fffe6cb1000) + 1008 at shared_ptr.h:156 frame 13: 0x0000555566718c67 ch`DB::KafkaConsumer::~KafkaConsumer() [inlined] std::__1::__shared_weak_count::__release_shared[abi:ne190107](this=0x00007fffe6cb1000) at shared_ptr.h:185 frame 14: 0x0000555566718c67 ch`DB::KafkaConsumer::~KafkaConsumer() [inlined] std::__1::shared_ptr<cppkafka::Consumer>::~shared_ptr[abi:ne190107](this=0x00007ffff4cf7ff0) at shared_ptr.h:668 frame 15: 0x0000555566718c67 ch`DB::KafkaConsumer::~KafkaConsumer(this=0x00007ffff4cf7f98) + 999 at KafkaConsumer.cpp:175 ... frame ClickHouse#19: 0x00005555667968f2 ch`void void DB::()::updateGlobalConfiguration<DB::StorageKafka>()::'lambda'()::operator()() const + 1105 at KafkaConfigLoader.cpp:409 ... frame 27: 0x000055556cf96779 ch`cppkafka::log_callback_proxy(h=<unavailable>, level=3, facility="ERROR", message="[thrd:localhost:9092/bootstrap]: 1/1 brokers are down") + 89 at configuration.cpp:85 frame 28: 0x000055556cfb0be2 ch`rd_kafka_log0 [inlined] rd_kafka_log_buf(conf=0x00007fffe6ccc138, rk=0x00007fffe6ccc000, level=3, ctx=0, fac="ERROR", buf="[thrd:localhost:9092/bootstrap]: 1/1 brokers are down") + 546 at rdkafka.c:276 frame 29: 0x000055556cfb0b02 ch`rd_kafka_log0(conf=0x00007fffe6ccc138, rk=0x00007fffe6ccc000, extra=0x0000000000000000, level=3, ctx=0, fac="ERROR", fmt="%i/%i brokers are down") + 322 at rdkafka.c:320 frame 30: 0x000055556cf9d6aa ch`rd_kafka_broker_set_state(rkb=0x00007fffe6cbf800, state=1) + 298 at rdkafka_broker.c:348 frame 31: 0x000055556cf9dbf6 ch`rd_kafka_broker_fail(rkb=0x00007fffe6cbf800, level=3, err=RD_KAFKA_RESP_ERR__TRANSPORT, fmt=<unavailable>) + 374 at rdkafka_broker.c:623 frame 32: 0x000055556cfa1246 ch`rd_kafka_broker_connect_done(rkb=<unavailable>, errstr=<unavailable>) + 70 at rdkafka_broker.c:2641 frame 33: 0x000055556d096efb ch`rd_kafka_transport_io_serve [inlined] rd_kafka_transport_connect_done(rktrans=0x00007ffff279c000, errstr="Connect to ipv4#127.0.0.1:9092 failed: Connection refused") + 731 at rdkafka_transport.c:381 frame 34: 0x000055556d096eeb ch`rd_kafka_transport_io_serve [inlined] rd_kafka_transport_io_event(rktrans=0x00007ffff279c000, events=<unavailable>, socket_errstr=0x0000000000000000) + 153 at rdkafka_transport.c:691 frame 35: 0x000055556d096e52 ch`rd_kafka_transport_io_serve(rktrans=0x00007ffff279c000, rkq=0x00007fffe7131240, timeout_ms=<unavailable>) + 562 at rdkafka_transport.c:1007 frame 36: 0x000055556cfab0ef ch`rd_kafka_broker_ops_io_serve(rkb=0x00007fffe6cbf800, abs_timeout=373906299505) + 367 at rdkafka_broker.c:3670 frame 37: 0x000055556cfa8d6c ch`rd_kafka_broker_serve [inlined] rd_kafka_broker_consumer_serve(rkb=0x00007fffe6cbf800, abs_timeout=373906299505) + 297 at rdkafka_broker.c:4373 frame 38: 0x000055556cfa8c43 ch`rd_kafka_broker_serve(rkb=0x00007fffe6cbf800, timeout_ms=<unavailable>) + 4675 at rdkafka_broker.c:4515 frame 39: 0x000055556cfa4120 ch`rd_kafka_broker_thread_main(arg=0x00007fffe6cbf800) + 320 at rdkafka_broker.c:4663 frame 40: 0x000055556d0a1fd9 ch`_thrd_wrapper_function(aArg=<unavailable>) + 25 at tinycthread.c:576 frame 41: 0x00007ffff7e20708 libc.so.6`start_thread(arg=<unavailable>) + 455209 at pthread_create.c:448 frame 42: 0x00007ffff7ea4aac libc.so.6`__clone3 + 44 at clone3.S:78 thread ClickHouse#26, name = 'rdk:m/dummy_htt', stop reason = signal SIGSTOP frame 5: 0x00007ffff7e222a3 libc.so.6`___pthread_join(threadid=<unavailable>, thread_return=<unavailable>) + 19 at pthread_join.c:24 frame 6: 0x000055556d0a1f52 clickhouse`rd_kafka_thrd_join(thr=<unavailable>, res=0x00007fffe61e9fd4) + 18 at tinycthread.c:692 frame 7: 0x000055556cfb439b clickhouse`rd_kafka_destroy_internal(rk=0x00007fffe6245000) + 1083 at rdkafka.c:1306 frame 8: 0x000055556cfb3e34 clickhouse`rd_kafka_thread_main(arg=0x00007fffe6245000) + 788 at rdkafka.c:2182 frame 9: 0x000055556d0a1f19 clickhouse`_thrd_wrapper_function(aArg=<unavailable>) + 25 at tinycthread.c:576 frame 10: 0x00007ffff7e20708 libc.so.6`start_thread(arg=<unavailable>) + 455209 at pthread_create.c:448 frame 11: 0x00007ffff7ea4aac libc.so.6`__clone3 + 44 at clone3.S:78
Backport #80795 to 25.5: Properly wait consumers before shutting down Kafka engine
Backport #80795 to 25.4: Properly wait consumers before shutting down Kafka engine
Backport #80795 to 25.3: Properly wait consumers before shutting down Kafka engine
Changelog category (leave one):
Changelog entry (a user-readable short description of the changes that goes to CHANGELOG.md):
Properly wait consumers before shutting down Kafka engine (active consumers after shutdown can trigger various debug assertions and also may read data from brokers in background after table has been dropped/detached)
See commit messages for details
Fixes: #80674
Fixes: #58310