Conversation
|
I was reading some related articles and realized that I made a logical mistake in insert query part, so thought I should fix it. |
alesapin
left a comment
There was a problem hiding this comment.
I know nothing about RabbitMQ( Will try to review better next time.
| local_exchange_declared = false; | ||
| LOG_ERROR(log, "Failed to declare local direct-exchange. Reason: {}", message); | ||
| }); | ||
| std::atomic<bool> bindings_created = false, bindings_error = false; |
There was a problem hiding this comment.
because otherwise later in the loop you'll catch error: this loop is infinite; none of its condition variables (bindings_created, bindings_error) are updated in the loop body
834101f to
1db3f9e
Compare
1db3f9e to
d5b1332
Compare
ff4b693 to
139a19d
Compare
alesapin
left a comment
There was a problem hiding this comment.
I need new review type: Request clarifications
|
|
||
| auto new_context = std::make_shared<Context>(context); | ||
| if (!schema_name.empty()) | ||
| new_context->setSetting("format_schema", schema_name); |
There was a problem hiding this comment.
Why do we need to share context in each read? Actually here just created another shared ptr, all changes will be reflected in the original object.
| looping_task->deactivate(); | ||
| } | ||
|
|
||
| if ((update_channels = restoreConnection(true))) |
There was a problem hiding this comment.
Quite unclear, let's do it in separate lines because this flag used later.
| std::atomic<bool> stub = {false}; | ||
| copyData(*in, *block_io.out, &stub); | ||
|
|
||
| /* Need to stop loop even if connection is ok, because sending ack() with loop running in another thread will lead to a lot of data |
There was a problem hiding this comment.
Maybe we can have a separate connection for it? Current logic is quite complicated, needs additional flags to stop the loop, to deactivate tasks, and so on.
There was a problem hiding this comment.
Can't have a separate connection, because: there is only one unique event loop for each connection, loop handles onReceived() callbacks for channel, which made consume() call, therefore committing messages with channel->ack() needs to be called on the same channel, this means that the channel and the loop must share the same connection as a loop declared with a different connection will not be able to access those channel's callbacks
| } | ||
|
|
||
| UInt64 num_consumers = rabbitmq_settings.rabbitmq_num_consumers; | ||
| String exchange_type = rabbitmq_settings.rabbitmq_exchange_type.value; |
There was a problem hiding this comment.
Incompatible change because the order of arguments changed.
| parsed_address, global_context, login_password, routing_keys[0], local_exchange_name, | ||
| log, num_consumers * num_queues, bind_by_id, use_transactional_channel, | ||
| parsed_address, global_context, login_password, routing_keys, exchange_name, exchange_type, | ||
| producer_id.fetch_add(1), unique_strbase, persistent, wait_confirm, log, |
There was a problem hiding this comment.
producer_id just a unique number?
There was a problem hiding this comment.
kind of, serial number to represent a channel id
| connection->close(); | ||
|
|
||
| size_t cnt_retries = 0; | ||
| while (!connection->closed() && ++cnt_retries != (RETRIES_MAX >> 1)) |
There was a problem hiding this comment.
just to decrease the number of retries
| while (!connection->closed() && ++cnt_retries != (RETRIES_MAX >> 1)) | ||
| { | ||
| event_handler->iterateLoop(); | ||
| std::this_thread::sleep_for(std::chrono::milliseconds(CONNECT_SLEEP >> 3)); |
There was a problem hiding this comment.
is it bad to reuse constants in another context but modifying them for needed case? may be I should just change connect_sleep to wait_sleep not to be case-specific
| /// Commit | ||
| for (auto & stream : streams) | ||
| { | ||
| if (!stream->as<RabbitMQBlockInputStream>()->sendAck()) |
There was a problem hiding this comment.
What will happen if we have already successfully inserted data to all views but one of streams got error sending ack? Is data duplication possible?
There was a problem hiding this comment.
hm, false is returned by the sendAck function in only two cases:
- if connection failed. In this case all channels will be closed and will be unable to send
ack. Alsoackis made based on delivery tags, which is unique to channel, so if channels fail, those delivery tags will become invalid and there is no way to send specificackfrom a different channel. Actually once the server realises that it has messages in a queue waiting for confirm from a channel which suddenly closed, it will immediately make those messages accessible to other consumers. So yes, in this case duplicates are inevitable. - size of the sent frame (libraries's internal request interface) exceeds max frame - internal library error. This is more common for
message frames, but not likely to happen toack frameI suppose. So I do not believe it is likely to happen. But I can add one more check: if sendAck failed and connection still usable, retry to sendackand do not break from the loop in this case, but if frame size exceeded in the first time, then I believe for the second time nothing will change. Also in this case it is ok if failed to sendack, because the next attempt to sendackon the same channel will also commit all previously not-committed messages, so there will be no duplicates. Anyway I do not think that forack framethis will ever happen
There was a problem hiding this comment.
Sorry for butting in, but this explanation would make an ideal comment for this code. This also goes for most things you ever have to explain to someone about the code.
| { | ||
| throw Exception("Cannot set up connection for producer", ErrorCodes::CANNOT_CONNECT_RABBITMQ); | ||
| } | ||
| if (setupConnection(false)) |
There was a problem hiding this comment.
But what happens in the bad case? Maybe we should throw an exception?
There was a problem hiding this comment.
now it will continue to attempt reconnect in writingFunc and will set up channel for producer the moment it managed to do so, but yes, should better throw if unable to connect while in buffer constructor
| String channel_id; | ||
| ConcurrentBoundedQueue<std::pair<UInt64, String>> payloads, returned; | ||
| UInt64 delivery_tag = 0; | ||
| std::atomic<bool> wait_all = true; |
a977457 to
7b0713b
Compare
…o rabbitmq-improvements
9df2378 to
6682c62
Compare
alesapin
left a comment
There was a problem hiding this comment.
Not finished, will try tomorrow.
dd1c27b to
e1ef558
Compare
|
|
alesapin
left a comment
There was a problem hiding this comment.
Despite the code became more complicated, a lot of useful fixes and improvements were added. Now we have different failovers on interaction with RabbitMQ, virtual columns, new settings, fixes for race conditions and so on. Also, tests should become more stable. I think we can merge it because the code quite isolated and tests are good.
|
|
|
OOM in GCC build not related to changes. |
I hereby agree to the terms of the CLA available at: https://yandex.ru/legal/cla/?lang=en
Changelog category (leave one):
Changelog entry (a user-readable short description of the changes that goes to CHANGELOG.md):
Improvements in StorageRabbitMQ: Added connection and channels failure handling, proper commits, insert failures handling, better exchanges, queue durability and queue resume opportunity, new queue settings. Fixed tests.