Skip to content

RabbitMQ improvements#12761

Merged
alesapin merged 40 commits intoClickHouse:masterfrom
kssenii:rabbitmq-improvements
Sep 8, 2020
Merged

RabbitMQ improvements#12761
alesapin merged 40 commits intoClickHouse:masterfrom
kssenii:rabbitmq-improvements

Conversation

@kssenii
Copy link
Member

@kssenii kssenii commented Jul 25, 2020

I hereby agree to the terms of the CLA available at: https://yandex.ru/legal/cla/?lang=en

Changelog category (leave one):

  • Improvement

Changelog entry (a user-readable short description of the changes that goes to CHANGELOG.md):
Improvements in StorageRabbitMQ: Added connection and channels failure handling, proper commits, insert failures handling, better exchanges, queue durability and queue resume opportunity, new queue settings. Fixed tests.

@kssenii
Copy link
Member Author

kssenii commented Jul 25, 2020

I was reading some related articles and realized that I made a logical mistake in insert query part, so thought I should fix it.
But then I thought why not also add some improvements...

@robot-clickhouse robot-clickhouse added the pr-improvement Pull request with some product improvements label Jul 25, 2020
@alesapin alesapin self-assigned this Jul 27, 2020
Copy link
Member

@alesapin alesapin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know nothing about RabbitMQ( Will try to review better next time.

local_exchange_declared = false;
LOG_ERROR(log, "Failed to declare local direct-exchange. Reason: {}", message);
});
std::atomic<bool> bindings_created = false, bindings_error = false;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why they have to be atomic?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because otherwise later in the loop you'll catch error: this loop is infinite; none of its condition variables (bindings_created, bindings_error) are updated in the loop body

@kssenii kssenii force-pushed the rabbitmq-improvements branch from 834101f to 1db3f9e Compare July 31, 2020 20:34
@kssenii kssenii force-pushed the rabbitmq-improvements branch from 1db3f9e to d5b1332 Compare August 3, 2020 14:47
@kssenii kssenii force-pushed the rabbitmq-improvements branch from ff4b693 to 139a19d Compare August 8, 2020 18:41
Copy link
Member

@alesapin alesapin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need new review type: Request clarifications


auto new_context = std::make_shared<Context>(context);
if (!schema_name.empty())
new_context->setSetting("format_schema", schema_name);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to share context in each read? Actually here just created another shared ptr, all changes will be reflected in the original object.

looping_task->deactivate();
}

if ((update_channels = restoreConnection(true)))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Quite unclear, let's do it in separate lines because this flag used later.

std::atomic<bool> stub = {false};
copyData(*in, *block_io.out, &stub);

/* Need to stop loop even if connection is ok, because sending ack() with loop running in another thread will lead to a lot of data
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can have a separate connection for it? Current logic is quite complicated, needs additional flags to stop the loop, to deactivate tasks, and so on.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't have a separate connection, because: there is only one unique event loop for each connection, loop handles onReceived() callbacks for channel, which made consume() call, therefore committing messages with channel->ack() needs to be called on the same channel, this means that the channel and the loop must share the same connection as a loop declared with a different connection will not be able to access those channel's callbacks

}

UInt64 num_consumers = rabbitmq_settings.rabbitmq_num_consumers;
String exchange_type = rabbitmq_settings.rabbitmq_exchange_type.value;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Incompatible change because the order of arguments changed.

parsed_address, global_context, login_password, routing_keys[0], local_exchange_name,
log, num_consumers * num_queues, bind_by_id, use_transactional_channel,
parsed_address, global_context, login_password, routing_keys, exchange_name, exchange_type,
producer_id.fetch_add(1), unique_strbase, persistent, wait_confirm, log,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

producer_id just a unique number?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

kind of, serial number to represent a channel id

connection->close();

size_t cnt_retries = 0;
while (!connection->closed() && ++cnt_retries != (RETRIES_MAX >> 1))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What this bit-shift means?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just to decrease the number of retries

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Confusing)

while (!connection->closed() && ++cnt_retries != (RETRIES_MAX >> 1))
{
event_handler->iterateLoop();
std::this_thread::sleep_for(std::chrono::milliseconds(CONNECT_SLEEP >> 3));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it bad to reuse constants in another context but modifying them for needed case? may be I should just change connect_sleep to wait_sleep not to be case-specific

/// Commit
for (auto & stream : streams)
{
if (!stream->as<RabbitMQBlockInputStream>()->sendAck())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What will happen if we have already successfully inserted data to all views but one of streams got error sending ack? Is data duplication possible?

Copy link
Member Author

@kssenii kssenii Aug 28, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hm, false is returned by the sendAck function in only two cases:

  1. if connection failed. In this case all channels will be closed and will be unable to send ack. Also ack is made based on delivery tags, which is unique to channel, so if channels fail, those delivery tags will become invalid and there is no way to send specific ack from a different channel. Actually once the server realises that it has messages in a queue waiting for confirm from a channel which suddenly closed, it will immediately make those messages accessible to other consumers. So yes, in this case duplicates are inevitable.
  2. size of the sent frame (libraries's internal request interface) exceeds max frame - internal library error. This is more common for message frames, but not likely to happen to ack frame I suppose. So I do not believe it is likely to happen. But I can add one more check: if sendAck failed and connection still usable, retry to send ack and do not break from the loop in this case, but if frame size exceeded in the first time, then I believe for the second time nothing will change. Also in this case it is ok if failed to send ack, because the next attempt to send ack on the same channel will also commit all previously not-committed messages, so there will be no duplicates. Anyway I do not think that for ack frame this will ever happen

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for butting in, but this explanation would make an ideal comment for this code. This also goes for most things you ever have to explain to someone about the code.

{
throw Exception("Cannot set up connection for producer", ErrorCodes::CANNOT_CONNECT_RABBITMQ);
}
if (setupConnection(false))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But what happens in the bad case? Maybe we should throw an exception?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

now it will continue to attempt reconnect in writingFunc and will set up channel for producer the moment it managed to do so, but yes, should better throw if unable to connect while in buffer constructor

String channel_id;
ConcurrentBoundedQueue<std::pair<UInt64, String>> payloads, returned;
UInt64 delivery_tag = 0;
std::atomic<bool> wait_all = true;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need comments for each flag.

@alesapin
Copy link
Member

@kssenii kssenii force-pushed the rabbitmq-improvements branch from a977457 to 7b0713b Compare September 1, 2020 08:29
Copy link
Member

@alesapin alesapin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not finished, will try tomorrow.

@kssenii kssenii force-pushed the rabbitmq-improvements branch 3 times, most recently from dd1c27b to e1ef558 Compare September 3, 2020 10:52
@alesapin
Copy link
Member

alesapin commented Sep 4, 2020

test_host_ip_change/test.py::test_ip_change_update_dns_cache flaky test.

Copy link
Member

@alesapin alesapin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Despite the code became more complicated, a lot of useful fixes and improvements were added. Now we have different failovers on interaction with RabbitMQ, virtual columns, new settings, fixes for race conditions and so on. Also, tests should become more stable. I think we can merge it because the code quite isolated and tests are good.

@alesapin
Copy link
Member

alesapin commented Sep 8, 2020

test_distributed_over_live_view flaky

@alesapin
Copy link
Member

alesapin commented Sep 8, 2020

OOM in GCC build not related to changes.

@alesapin alesapin merged commit 4364bff into ClickHouse:master Sep 8, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

pr-improvement Pull request with some product improvements

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants