Skip to content

Fix SIGSEGV in StorageKafka on DROP TABLE#12075

Merged
akuzm merged 5 commits intoClickHouse:masterfrom
azat:StorageKafka-SIGSEGV-fix
Jul 6, 2020
Merged

Fix SIGSEGV in StorageKafka on DROP TABLE#12075
akuzm merged 5 commits intoClickHouse:masterfrom
azat:StorageKafka-SIGSEGV-fix

Conversation

@azat
Copy link
Member

@azat azat commented Jul 1, 2020

Changelog category (leave one):

  • Bug Fix

Changelog entry (a user-readable short description of the changes that goes to CHANGELOG.md):
Fix SIGSEGV in StorageKafka on DROP TABLE

Detailed description / Documentation draft:
After #11599 it is possible that messages of the
ReadBufferFromKafkaConsumer will be cleaned-up right in
read_kafka_message callback (from KafkaBlockInputStream) if the stop
flag isset (i.e. DROP TABLE is waiting the consumer), and if
read_kafka_message already processed some rows it will not return 0 and
the loop after will try to get current topic from the buffer, which uses
messages in the underlying and this will got SIGSEGV:

12:14:56.173262 [ 55421 ] {f7930856-d478-4e41-af56-24ce7b693e95} <Debug> executeQuery: (from 0.0.0.0:0, user: ) DROP TABLE IF EXISTS data.queue
12:14:56.173285 [ 55421 ] {f7930856-d478-4e41-af56-24ce7b693e95} <Trace> StorageKafka (newly_queue): Waiting for cleanup
12:14:56.180016 [ 55390 ] {} <Trace> BaseDaemon: Received signal 11
12:14:56.180267 [ 4914 ] {} <Fatal> BaseDaemon: ########################################
12:14:56.181879 [ 4914 ] {} <Fatal> BaseDaemon: (version 20.6.1.1, build id: 4CE0298F08583658) (from thread 55468) (no query) Received signal Segmentation fault (11)
12:14:56.181900 [ 4914 ] {} <Fatal> BaseDaemon: Address: 0x8 Access: read. Address not mapped to object.
12:14:56.181909 [ 4914 ] {} <Fatal> BaseDaemon: Stack trace:
12:14:56.184676 [ 4914 ] {} <Fatal> BaseDaemon: 3. /ch/contrib/cppkafka/include/cppkafka/message.h:111: DB::KafkaBlockInputStream::readImpl() @ 0xe343f1c in /usr/lib/debug/usr/bin/clickhouse
12:14:56.185553 [ 4914 ] {} <Fatal> BaseDaemon: 4. /ch/contrib/libcxx/include/vector:1003: DB::IBlockInputStream::read() @ 0xd9d95bd in /usr/lib/debug/usr/bin/clickhouse
12:14:56.188238 [ 4914 ] {} <Fatal> BaseDaemon: 5. /ch/src/DataStreams/copyData.cpp:26: DB::copyData() @ 0xd9f712a in /usr/lib/debug/usr/bin/clickhouse
12:14:56.188780 [ 4914 ] {} <Fatal> BaseDaemon: 6. /ch/contrib/libcxx/include/vector:1532: DB::StorageKafka::streamToViews() @ 0xe335e73 in /usr/lib/debug/usr/bin/clickhouse
12:14:56.189331 [ 4914 ] {} <Fatal> BaseDaemon: 7. /ch/src/Storages/Kafka/StorageKafka.cpp:491: DB::StorageKafka::threadFunc() @ 0xe336738 in /usr/lib/debug/usr/bin/clickhouse

55421 thread (shows that it still waiting for deactivation):

5  std::__1::lock_guard<>::lock_guard () at ../contrib/libcxx/include/__mutex_base:90
6  DB::BackgroundSchedulePoolTaskInfo::deactivate (this=0x7fc7e4465f20) at ../src/Core/BackgroundSchedulePool.cpp:59
7  DB::StorageKafka::shutdown (this=0x7fc7e45e4600) at ../contrib/libcxx/include/memory:3821

And just in case thread where read_kafka_message is called:

0  DB::ReadBufferFromKafkaConsumer::nextImpl (this=0x7fd4901d4118) at ../contrib/libcxx/include/atomic:1491
1  DB::ReadBuffer::next (this=0x7fd4901d4118) at ../src/IO/ReadBuffer.h:59
2  DB::ReadBuffer::eof (this=0x7fd4901d4118) at ../src/IO/ReadBuffer.h:81
3  DB::skipWhitespaceIfAny (buf=...) at ../src/IO/ReadHelpers.h:945
4  DB::JSONEachRowRowInputFormat::readRow (ext=..., columns=..., this=0x7fd499a7a020) at ../src/Processors/Formats/Impl/JSONEachRowRowInputFormat.cpp:222
5  DB::JSONEachRowRowInputFormat::readRow (this=0x7fd499a7a020, columns=..., ext=...) at ../src/Processors/Formats/Impl/JSONEachRowRowInputFormat.cpp:218
6  DB::IRowInputFormat::generate (this=0x7fd499a7a020) at ../src/Processors/Formats/IRowInputFormat.cpp:64
7  DB::ISource::work (this=0x7fd499a7a020) at ../src/Processors/ISource.cpp:48
8  DB::KafkaBlockInputStream::<lambda()>::operator()(void) const () at ../contrib/libcxx/include/memory:3826
9  DB::KafkaBlockInputStream::readImpl (this=0x7fd46e718820) at ../contrib/libcxx/include/new:340

Cc: @filimonov

Details

HEADs:

  • 084dabefefdffdfa2125c4bf95bced7275e446e3
  • 278061f0b20234787b631397a6fdf1e1243e3180
  • db618bbe3fbebebfe979ffa4ffb59e79374524b8

@azat azat marked this pull request as draft July 1, 2020 17:48
@blinkov blinkov added the pr-bugfix Pull request with bugfix, not backported by default label Jul 1, 2020
@filimonov
Copy link
Contributor

Ok, i think i understand what happened. I think it was not between poll and read_kafka_message (in that case next() call would just return false, 0 rows world be parsed and it would never enter if (new_rows),
but stop should happened inside read_kafka_message itself. First call of next() function return the message and it's parsed, after that it call next() one more time (to ensure that it's on eof), and in that moment next() clean up messages inside resetIfStopped. And after that we have a situation then some rows were parsed, but message was already destructed.

In other words - i think that your fix will not help. I would try smth like that:

diff --git a/src/Storages/Kafka/ReadBufferFromKafkaConsumer.cpp b/src/Storages/Kafka/ReadBufferFromKafkaConsumer.cpp
index 3fd28cde5e..fc239d422d 100644
--- a/src/Storages/Kafka/ReadBufferFromKafkaConsumer.cpp
+++ b/src/Storages/Kafka/ReadBufferFromKafkaConsumer.cpp
@@ -424,9 +424,13 @@ bool ReadBufferFromKafkaConsumer::nextImpl()
     /// NOTE: ReadBuffer was implemented with an immutable underlying contents in mind.
     ///       If we failed to poll any message once - don't try again.
     ///       Otherwise, the |poll_timeout| expectations get flawn.
+
+    if (!allowed)
+        return false;
+
     resetIfStopped();
 
-    if (!allowed || !hasMorePolledMessages())
+    if (!hasMorePolledMessages())
         return false;
 
     // XXX: very fishy place with const casting.

@azat azat force-pushed the StorageKafka-SIGSEGV-fix branch from 084dabe to 278061f Compare July 1, 2020 20:42
@azat
Copy link
Member Author

azat commented Jul 1, 2020

Ok, i think i understand what happened. I think it was not between poll and read_kafka_message (in that case next() call would just return false, 0 rows world be parsed and it would never enter if (new_rows),
but stop should happened inside read_kafka_message itself. First call of next() function return the message and it's parsed, after that it call next() one more time (to ensure that it's on eof), and in that moment next() clean up messages inside resetIfStopped. And after that we have a situation then some rows were parsed, but message was already destructed.

Yes

"messages of the ReadBufferFromKafkaConsumer will be cleaned-up right in read_kafka_message callback (from KafkaBlockInputStream) if the stop flag isset"

i think that your fix will not help

But you are right it is still possible to get empty messages

I would try smth like that:

Yes, this should work, but looks messy (too much flags in that reader), how about not checking stop flag in the ReadBufferFromKafkaConsumer::nextImpl?

(NOTE: that consumerStopped() is still required, since otherwise it will retry poll)

@filimonov
Copy link
Contributor

filimonov commented Jul 1, 2020

Yes, this should work, but looks messy (too much flags in that reader), how about not checking stop flag in the ReadBufferFromKafkaConsumer::nextImpl?

+1. Lets do it. I had the same thoughts.

(NOTE: that consumerStopped() is still required, since otherwise it will retry poll)

It is not. Check that line:
https://github.com/azat/ClickHouse/blob/278061f0b20234787b631397a6fdf1e1243e3180/src/Storages/Kafka/KafkaBlockInputStream.cpp#L202

@azat
Copy link
Member Author

azat commented Jul 1, 2020

It is not. Check that line:

It will go to the first branch https://github.com/azat/ClickHouse/blob/278061f0b20234787b631397a6fdf1e1243e3180/src/Storages/Kafka/KafkaBlockInputStream.cpp#L198 (remember stalled_status == CONSUMER_STOPPED, so buffer->isStalled() == true), although not sure this is ok in itself

@filimonov
Copy link
Contributor

filimonov commented Jul 1, 2020

Strange, and seems it has no sence. (next f branch is dead in that case).

The intent was different, i can recheck that tomorrow or you can just try to switch the order of if else to check dataunusable first.

@filimonov
Copy link
Contributor

Yes, it seems like logical error (the behavior of functions isStalled changed during refactorings, and that part of code was not fixed accordingly).

I've pushed a commit to your branch see 821317e

@azat azat force-pushed the StorageKafka-SIGSEGV-fix branch from ef522c8 to d9f0729 Compare July 2, 2020 07:37
@azat
Copy link
Member Author

azat commented Jul 2, 2020

Yes, it seems like logical error

Ok, squashed

@azat azat marked this pull request as ready for review July 2, 2020 07:38
Copy link
Contributor

@filimonov filimonov left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGFM

@azat
Copy link
Member Author

azat commented Jul 2, 2020

Should be backported to 20.5 (#11870 )

@akuzm akuzm self-assigned this Jul 2, 2020
@azat azat force-pushed the StorageKafka-SIGSEGV-fix branch from d9f0729 to db618bb Compare July 2, 2020 19:54
azat added 5 commits July 3, 2020 09:22
After ClickHouse#11599 it is possible that messages of the
ReadBufferFromKafkaConsumer will be cleaned-up right in
read_kafka_message callback (from KafkaBlockInputStream) if the stop
flag isset (i.e. DROP TABLE is waiting the consumer), and if
read_kafka_message already processed some rows it will not return 0 and
the loop after will try to get current topic from the buffer, which uses
messages in the underlying and this will got SIGSEGV:

    12:14:56.173262 [ 55421 ] {f7930856-d478-4e41-af56-24ce7b693e95} <Debug> executeQuery: (from 0.0.0.0:0, user: ) DROP TABLE IF EXISTS data.queue
    12:14:56.173285 [ 55421 ] {f7930856-d478-4e41-af56-24ce7b693e95} <Trace> StorageKafka (newly_queue): Waiting for cleanup
    12:14:56.180016 [ 55390 ] {} <Trace> BaseDaemon: Received signal 11
    12:14:56.180267 [ 4914 ] {} <Fatal> BaseDaemon: ########################################
    12:14:56.181879 [ 4914 ] {} <Fatal> BaseDaemon: (version 20.6.1.1, build id: 4CE0298F08583658) (from thread 55468) (no query) Received signal Segmentation fault (11)
    12:14:56.181900 [ 4914 ] {} <Fatal> BaseDaemon: Address: 0x8 Access: read. Address not mapped to object.
    12:14:56.181909 [ 4914 ] {} <Fatal> BaseDaemon: Stack trace:
    12:14:56.184676 [ 4914 ] {} <Fatal> BaseDaemon: 3. /ch/contrib/cppkafka/include/cppkafka/message.h:111: DB::KafkaBlockInputStream::readImpl() @ 0xe343f1c in /usr/lib/debug/usr/bin/clickhouse
    12:14:56.185553 [ 4914 ] {} <Fatal> BaseDaemon: 4. /ch/contrib/libcxx/include/vector:1003: DB::IBlockInputStream::read() @ 0xd9d95bd in /usr/lib/debug/usr/bin/clickhouse
    12:14:56.188238 [ 4914 ] {} <Fatal> BaseDaemon: 5. /ch/src/DataStreams/copyData.cpp:26: DB::copyData() @ 0xd9f712a in /usr/lib/debug/usr/bin/clickhouse
    12:14:56.188780 [ 4914 ] {} <Fatal> BaseDaemon: 6. /ch/contrib/libcxx/include/vector:1532: DB::StorageKafka::streamToViews() @ 0xe335e73 in /usr/lib/debug/usr/bin/clickhouse
    12:14:56.189331 [ 4914 ] {} <Fatal> BaseDaemon: 7. /ch/src/Storages/Kafka/StorageKafka.cpp:491: DB::StorageKafka::threadFunc() @ 0xe336738 in /usr/lib/debug/usr/bin/clickhouse

55421 thread (shows that it still waiting for deactivation):

    5  std::__1::lock_guard<>::lock_guard () at ../contrib/libcxx/include/__mutex_base:90
    6  DB::BackgroundSchedulePoolTaskInfo::deactivate (this=0x7fc7e4465f20) at ../src/Core/BackgroundSchedulePool.cpp:59
    7  DB::StorageKafka::shutdown (this=0x7fc7e45e4600) at ../contrib/libcxx/include/memory:3821

And just in case thread where read_kafka_message is called:

    0  DB::ReadBufferFromKafkaConsumer::nextImpl (this=0x7fd4901d4118) at ../contrib/libcxx/include/atomic:1491
    1  DB::ReadBuffer::next (this=0x7fd4901d4118) at ../src/IO/ReadBuffer.h:59
    2  DB::ReadBuffer::eof (this=0x7fd4901d4118) at ../src/IO/ReadBuffer.h:81
    3  DB::skipWhitespaceIfAny (buf=...) at ../src/IO/ReadHelpers.h:945
    4  DB::JSONEachRowRowInputFormat::readRow (ext=..., columns=..., this=0x7fd499a7a020) at ../src/Processors/Formats/Impl/JSONEachRowRowInputFormat.cpp:222
    5  DB::JSONEachRowRowInputFormat::readRow (this=0x7fd499a7a020, columns=..., ext=...) at ../src/Processors/Formats/Impl/JSONEachRowRowInputFormat.cpp:218
    6  DB::IRowInputFormat::generate (this=0x7fd499a7a020) at ../src/Processors/Formats/IRowInputFormat.cpp:64
    7  DB::ISource::work (this=0x7fd499a7a020) at ../src/Processors/ISource.cpp:48
    8  DB::KafkaBlockInputStream::<lambda()>::operator()(void) const () at ../contrib/libcxx/include/memory:3826
    9  DB::KafkaBlockInputStream::readImpl (this=0x7fd46e718820) at ../contrib/libcxx/include/new:340

Cc: @filimonov
This will help with tracking possible issues, when you need to know was
buffer released or not.
Before this patch isStalled() was checked before polledDataUnusable(),
and after DROP TABLE isStalled() == true (although this looks tricky).
As stated by @filimonov it is not relevant (after ClickHouse#11599)
@azat azat force-pushed the StorageKafka-SIGSEGV-fix branch from db618bb to bd5e5e9 Compare July 3, 2020 06:24
@akuzm akuzm merged commit d648628 into ClickHouse:master Jul 6, 2020
@azat azat deleted the StorageKafka-SIGSEGV-fix branch July 6, 2020 11:20
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

no-docs-needed pr-bugfix Pull request with bugfix, not backported by default

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants