Conversation
| toc_priority: 6 | ||
| toc_title: RabbitMQ | ||
| --- | ||
|
|
There was a problem hiding this comment.
| # RabbitMQ Engine |
| toc_title: RabbitMQ | ||
| --- | ||
|
|
||
| This engine works with [RabbitMQ](https://www.rabbitmq.com). |
There was a problem hiding this comment.
| This engine works with [RabbitMQ](https://www.rabbitmq.com). | |
| This engine allows integrating ClickHouse with [RabbitMQ](https://www.rabbitmq.com). |
|
|
||
| <details markdown="1"> | ||
|
|
||
| <summary>Deprecated Method for Creating a Table</summary> |
There was a problem hiding this comment.
let's just drop it, no need to advertise how to use new features with legacy syntax
| ``` | ||
|
|
||
| ## Configuration {#configuration} | ||
|
|
There was a problem hiding this comment.
needs some details on where to put it + let's move it higher, before examples
| rabbitmq_routing_key_list = 'format=logs,type=report,year=2020', | ||
| rabbitmq_format = 'JSONEachRow', | ||
| rabbitmq_num_consumers = 5; | ||
|
|
There was a problem hiding this comment.
is there a way to access some message metadata (virtual columns)? (usually people need that to ensure no duplicates were consumer)
How should it work in cluster?
How can I rewind or replay messages? How can I fast-forward or skip messages?
There was a problem hiding this comment.
is there a way to access some message metadata (virtual columns)? (usually people need that to ensure no duplicates were consumer)
Yes, I put there only exchange-name of received messages, because I did not know for what purposes this metadata was actually needed. Now as you wrote it - I understand.
The available properties of received messages by default are the message itself, deliveryTag and redelivered-flag. DeliveryTag is scoped per channel (==consumer), so it cannot be used as a unique identifier for a message to check if it was duplicated or not, but a unique identifier for a message can be constructed as a combo of deliveryTag and channel_id (for example, something like “9-1401”, where the first number is a channel_id and the second - deliveryTag).
There are also other message properties like timestamp, messageID, userID, clusterID, etc, but they are only available if they are defined by the client and set in message properties when a message is published. There are special api methods to check whether any needed property was set by the client, so any of these properties can also be added into metadata in case they are available.
How should it work in cluster?
In a rabbitmq cluster, definitions of exchanges, queue bindings and all states are replicated across cluster by default, but queues with messages are not. I did not consider what should be done in this case. Now queues are declared as exclusive (unique to connection) and replication is not handled.
To be replicated there need to be not classic queues, but ha-queues: “mirrored queues” or “quorum queues” (the only options of highly available queues in rabbitmq). Both define master and slave queues on nodes and should rebalance master after failure.
Also, the definition of queue names (on which consumer subscription is based) is now layed upon the library, which means that the queue names are randomly generated. But to be able to subscribe to specific mirrored queue in case of node failure, we need to know these queue names. So this moment should also be changed to be able to consume from mirrored queues. May be some rabbitmq_queue_base parameter can be added in a table, from which all the queue names are constructed uniformly - based on channel_ids (as queues are unique to channels and channel_ids are always in the same sequential order).
If you tell what behaviour in cluster is expected, I can try to implement it. Same about metadata.
How can I rewind or replay messages?
You mean within the code? In rabbitmq there is ‘push’ format of receiving messages (not ‘pull’ like in kafka). It means that if a consumer is subscribed to a queue when some message gets there, then the message is at once pushed to consumer and no record of it remains in the queue after it is pushed.
How can I fast-forward or skip messages?
Why, where and when?
There was a problem hiding this comment.
How can I rewind or replay messages?
You mean within the code? In rabbitmq there is ‘push’ format of receiving messages (not ‘pull’ like in kafka). It means that if a consumer is subscribed to a queue when some message gets there, then the message is at once pushed to consumer and no record of it remains in the queue after it is pushed.
Hm, ok. I thought it has durability. What are the delivery guarantees there when? Though it offers at-least-once, but it rather sounds like at-most-once.
What will happen if flushing data to target table will fail? So you collect block of (let's say) 100K messages, try to flush it, but there was no disk space?
How can I fast-forward or skip messages?
Why, where and when?
Your prev. the answer covers that partially. What will happen if you can read / parse some message? During parsing exception happens?
There was a problem hiding this comment.
Hm, ok. I thought it has durability. What are the delivery guarantees there when? Though it offers at-least-once, but it rather sounds like at-most-once.
delivery guarantees can be achievied with:
- publisher confirms - rabbitmq notifies publishers if it got and processed messages. If failed then it is returned to the publisher in some ReturnedMessage callback where it might be republished.
- consumer acknowledgements - consumers notify rabbitmq if they processed messages. An acknowledgement is sent back by the consumer to tell rabbitmq that a particular message had been received, processed and that rabbitmq is free to delete it. If acknowledgement is not received then rabbitmq will understand that a message wasn't processed fully and will requeue it.
- message durability - marking message as persistent (== will survive broker restart) and changing delivery mode. This is done when messages are published, so lies on the publisher. But queues must then be declared as durable (to survive connection loss), and now they are not - because again if connection is lost then it cannot be restored for the current table (the library does not allow). Then to be able to consume from those queues if they were defined as durable - these queue names must be known - reproduced - i explained how it could be done in one of the previous answers. So to make it work - needs to change queues to durable and to make construction of queue names uniformly - based on same given base-name (now the names are generated by the library so cannot be known to other tables).
Use of acknowledgements + message durability guarantees at-least-once delivery. Without them only at-most-once delivery is guaranteed. I did not add all these eventually because 1) with them performance became quite noticably worse (as even without them it is not very good at all), but may be should have added it as an option if some special parameter is set. And reason 2) I did not know how to test it all properly. At least i don’t see how it can be put in an integration test as it requires killing connection.
What will happen if flushing data to target table will fail? So you collect block of (let's say) 100K messages, try to flush it, but there was no disk space?
Partially covered above - without those acknowledges they are lost. Also I have just read in rabbitmq docs that when free disk space drops below a configured limit, an alarm will be triggered and all producers will be blocked (their connections). The goal is to avoid filling up the entire disk which will lead all write operations on the node to fail and can lead to rabbitmq termination. No losses here if publisher confirms are enabled by the publisher. And probably connection.blocked and connection.unblocked handlers should be registered.
What will happen if you can read / parse some message? During parsing exception happens?
If rabbitmq failed to process and parse the message then all ok if those publisher confirms are set, if not - loss. Same on consumer side. Now there are no consumer acknowledgements, but publisher confirms again is a responsibility of a publisher, not added for insert query now, but was added at first and then deleted (because slowed performance) - can be found somewhere in commit history.
* master: (27 commits) Whitespaces Fix typo Fix UBSan report in base64 Correct default secure port for clickhouse-benchmark ClickHouse#11044 Remove test with bug ClickHouse#10697 Update in-functions.md (ClickHouse#12430) Allow nullable key in MergeTree Update arithmetic-functions.md [docs] add rabbitmq docs (ClickHouse#12326) Lower block sizes and look what will happen ClickHouse#9248 Fix lifetime_bytes/lifetime_rows for Buffer direct block write Retrigger CI Fix up test_mysql_protocol failed Implement lifetime_rows/lifetime_bytes for Buffer engine Add comment regarding proxy tunnel usage in PocoHTTPClient.cpp Add lifetime_rows/lifetime_bytes interface (exported via system.tables) Tiny IStorage refactoring Trigger integration-test-runner image rebuild. Delete log.txt Fix test_mysql_client/test_python_client error ...
I hereby agree to the terms of the CLA available at: https://yandex.ru/legal/cla/?lang=en
Changelog category (leave one):