Fix SIGSEGV in StorageKafka on DROP TABLE#12075
Conversation
|
Ok, i think i understand what happened. I think it was not between poll and In other words - i think that your fix will not help. I would try smth like that: diff --git a/src/Storages/Kafka/ReadBufferFromKafkaConsumer.cpp b/src/Storages/Kafka/ReadBufferFromKafkaConsumer.cpp
index 3fd28cde5e..fc239d422d 100644
--- a/src/Storages/Kafka/ReadBufferFromKafkaConsumer.cpp
+++ b/src/Storages/Kafka/ReadBufferFromKafkaConsumer.cpp
@@ -424,9 +424,13 @@ bool ReadBufferFromKafkaConsumer::nextImpl()
/// NOTE: ReadBuffer was implemented with an immutable underlying contents in mind.
/// If we failed to poll any message once - don't try again.
/// Otherwise, the |poll_timeout| expectations get flawn.
+
+ if (!allowed)
+ return false;
+
resetIfStopped();
- if (!allowed || !hasMorePolledMessages())
+ if (!hasMorePolledMessages())
return false;
// XXX: very fishy place with const casting. |
084dabe to
278061f
Compare
Yes
But you are right it is still possible to get empty messages
Yes, this should work, but looks messy (too much flags in that reader), how about not checking stop flag in the (NOTE: that |
+1. Lets do it. I had the same thoughts.
It is not. Check that line: |
It will go to the first branch https://github.com/azat/ClickHouse/blob/278061f0b20234787b631397a6fdf1e1243e3180/src/Storages/Kafka/KafkaBlockInputStream.cpp#L198 (remember |
|
Strange, and seems it has no sence. (next f branch is dead in that case). The intent was different, i can recheck that tomorrow or you can just try to switch the order of if else to check dataunusable first. |
|
Yes, it seems like logical error (the behavior of functions isStalled changed during refactorings, and that part of code was not fixed accordingly). I've pushed a commit to your branch see 821317e |
ef522c8 to
d9f0729
Compare
Ok, squashed |
|
Should be backported to 20.5 (#11870 ) |
d9f0729 to
db618bb
Compare
After ClickHouse#11599 it is possible that messages of the ReadBufferFromKafkaConsumer will be cleaned-up right in read_kafka_message callback (from KafkaBlockInputStream) if the stop flag isset (i.e. DROP TABLE is waiting the consumer), and if read_kafka_message already processed some rows it will not return 0 and the loop after will try to get current topic from the buffer, which uses messages in the underlying and this will got SIGSEGV: 12:14:56.173262 [ 55421 ] {f7930856-d478-4e41-af56-24ce7b693e95} <Debug> executeQuery: (from 0.0.0.0:0, user: ) DROP TABLE IF EXISTS data.queue 12:14:56.173285 [ 55421 ] {f7930856-d478-4e41-af56-24ce7b693e95} <Trace> StorageKafka (newly_queue): Waiting for cleanup 12:14:56.180016 [ 55390 ] {} <Trace> BaseDaemon: Received signal 11 12:14:56.180267 [ 4914 ] {} <Fatal> BaseDaemon: ######################################## 12:14:56.181879 [ 4914 ] {} <Fatal> BaseDaemon: (version 20.6.1.1, build id: 4CE0298F08583658) (from thread 55468) (no query) Received signal Segmentation fault (11) 12:14:56.181900 [ 4914 ] {} <Fatal> BaseDaemon: Address: 0x8 Access: read. Address not mapped to object. 12:14:56.181909 [ 4914 ] {} <Fatal> BaseDaemon: Stack trace: 12:14:56.184676 [ 4914 ] {} <Fatal> BaseDaemon: 3. /ch/contrib/cppkafka/include/cppkafka/message.h:111: DB::KafkaBlockInputStream::readImpl() @ 0xe343f1c in /usr/lib/debug/usr/bin/clickhouse 12:14:56.185553 [ 4914 ] {} <Fatal> BaseDaemon: 4. /ch/contrib/libcxx/include/vector:1003: DB::IBlockInputStream::read() @ 0xd9d95bd in /usr/lib/debug/usr/bin/clickhouse 12:14:56.188238 [ 4914 ] {} <Fatal> BaseDaemon: 5. /ch/src/DataStreams/copyData.cpp:26: DB::copyData() @ 0xd9f712a in /usr/lib/debug/usr/bin/clickhouse 12:14:56.188780 [ 4914 ] {} <Fatal> BaseDaemon: 6. /ch/contrib/libcxx/include/vector:1532: DB::StorageKafka::streamToViews() @ 0xe335e73 in /usr/lib/debug/usr/bin/clickhouse 12:14:56.189331 [ 4914 ] {} <Fatal> BaseDaemon: 7. /ch/src/Storages/Kafka/StorageKafka.cpp:491: DB::StorageKafka::threadFunc() @ 0xe336738 in /usr/lib/debug/usr/bin/clickhouse 55421 thread (shows that it still waiting for deactivation): 5 std::__1::lock_guard<>::lock_guard () at ../contrib/libcxx/include/__mutex_base:90 6 DB::BackgroundSchedulePoolTaskInfo::deactivate (this=0x7fc7e4465f20) at ../src/Core/BackgroundSchedulePool.cpp:59 7 DB::StorageKafka::shutdown (this=0x7fc7e45e4600) at ../contrib/libcxx/include/memory:3821 And just in case thread where read_kafka_message is called: 0 DB::ReadBufferFromKafkaConsumer::nextImpl (this=0x7fd4901d4118) at ../contrib/libcxx/include/atomic:1491 1 DB::ReadBuffer::next (this=0x7fd4901d4118) at ../src/IO/ReadBuffer.h:59 2 DB::ReadBuffer::eof (this=0x7fd4901d4118) at ../src/IO/ReadBuffer.h:81 3 DB::skipWhitespaceIfAny (buf=...) at ../src/IO/ReadHelpers.h:945 4 DB::JSONEachRowRowInputFormat::readRow (ext=..., columns=..., this=0x7fd499a7a020) at ../src/Processors/Formats/Impl/JSONEachRowRowInputFormat.cpp:222 5 DB::JSONEachRowRowInputFormat::readRow (this=0x7fd499a7a020, columns=..., ext=...) at ../src/Processors/Formats/Impl/JSONEachRowRowInputFormat.cpp:218 6 DB::IRowInputFormat::generate (this=0x7fd499a7a020) at ../src/Processors/Formats/IRowInputFormat.cpp:64 7 DB::ISource::work (this=0x7fd499a7a020) at ../src/Processors/ISource.cpp:48 8 DB::KafkaBlockInputStream::<lambda()>::operator()(void) const () at ../contrib/libcxx/include/memory:3826 9 DB::KafkaBlockInputStream::readImpl (this=0x7fd46e718820) at ../contrib/libcxx/include/new:340 Cc: @filimonov
This will help with tracking possible issues, when you need to know was buffer released or not.
Before this patch isStalled() was checked before polledDataUnusable(), and after DROP TABLE isStalled() == true (although this looks tricky).
As stated by @filimonov it is not relevant (after ClickHouse#11599)
db618bb to
bd5e5e9
Compare
…f1a4c9292d3d1ccd339d20cd15b49 Cherry pick #12075 to 20.5: Fix SIGSEGV in StorageKafka on DROP TABLE
Changelog category (leave one):
Changelog entry (a user-readable short description of the changes that goes to CHANGELOG.md):
Fix SIGSEGV in StorageKafka on DROP TABLE
Detailed description / Documentation draft:
After #11599 it is possible that messages of the
ReadBufferFromKafkaConsumer will be cleaned-up right in
read_kafka_message callback (from KafkaBlockInputStream) if the stop
flag isset (i.e. DROP TABLE is waiting the consumer), and if
read_kafka_message already processed some rows it will not return 0 and
the loop after will try to get current topic from the buffer, which uses
messages in the underlying and this will got SIGSEGV:
55421 thread (shows that it still waiting for deactivation):
And just in case thread where read_kafka_message is called:
Cc: @filimonov
Details
HEADs: