Conversation
…o add-storage-rabbitmq-read-only
alesapin
left a comment
There was a problem hiding this comment.
Almost ready! Now we need to fix style check https://clickhouse-test-reports.s3.yandex.net/11069/c3569882bbcc33c047d9d9b9424bf06b9a50a3bf/style_check.html#fail1 and clang-tidy warnings https://clickhouse-builds.s3.yandex.net/11069/c3569882bbcc33c047d9d9b9424bf06b9a50a3bf/build_log_680227519_1590004367.txt. After that I'll try to help with compatibility.
Do you have any ideas why some kafka tests failed in your PR?
1314807 to
14c67c6
Compare
…o add-storage-rabbitmq-read-only
…o add-storage-rabbitmq-read-only
alesapin
left a comment
There was a problem hiding this comment.
We discussed that we have to simplify the code of (Read/Write)Buffer with separation of the logic of reading from RabbitMQ (and moving it into separate background task) and reading into views/select queries.
…lickHouse into add-storage-rabbitmq-read-only
alesapin
left a comment
There was a problem hiding this comment.
We still have several points to improve and clarify. However this code is tested well and quite isolated, so I think we can merge it and perform some improvements in separate PRs.
|
@kssenii this feature is really fantastic |
I hereby agree to the terms of the CLA available at: https://yandex.ru/legal/cla/?lang=en
Changelog category (leave one):
Changelog entry (a user-readable short description of the changes that goes to CHANGELOG.md):
Add storage RabbitMQ.
Detailed description / Documentation draft:
CRATE TABLE query parameters:
Some explanation:
The routing key of the message and exchange name are parameters that are set while publishing messages from any RabbitMQ client. Therefore, they are speicified by the client.
All exchange types, known in RabbitMQ, are supported: direct-exchange, fanout-exchange, topic-exchange, headers-exchange, consistent-hash-exchange. The preferred exchange type is specified by the client in exchange_type parameter.
There can be no more than one exchange per table. Also one exchange can be shared between multiple tables - it enables routing into multiple tables at the same time.
If exchange type is set to 'direct', then messages go to table(s) with a routing_key parameter that exactly matches the routing key of the message.
If exchange type is set to 'fanout', it routes received messages to all tables that are bound to it regardless of routing keys. (To table(s), where the exchange_name is the same.)
If exchange type is set to 'topic', then messages are routed to one or many tables based on a matching between a message routing key and a pattern, which is set in routing_key_list parameter. By RabbitMQ documentation, if messages are sent to topic-exchange, then a routing key must be a list of words, delimited by a dot. The routing patterns may contain “*”. For example, if a pattern (specified in routing_key_list) of some tables is '*.logs', then messages with a routing key with suffix '.logs' will be sent to all those tables. Patterns can be complicated (like 'agreements.*.*.data.'). Patterns can also contain '#' - indicates a match of zero or more words. For more details see RabbitMQ documentation.
If exchange type is set to 'headers', the routing is even more flexible. If using headers-exchange, routing key can be a map (dictionary). For example, if (key = value) headers of published messages are 'format=logs', 'type=report', 'year=2020' with setting 'x-match=all', then these messages will be routed to all tables, where routing_key_list contains all those matches. If a setting is 'x-match=any', then those messages are send to all tables where at least one dictionary value matches its key. The setting is passed in the same list as headers:
rabbitmq_routing_key_list = 'x-match=all,format=logs,type=report,year=2020'.
If exchange type is set to 'consistent_hash', which is a special sharding exchange, - there will be an even distribution of messages between all tables, where the exchange_name is the same. Note that, by RabbitMQ documentation, if using consistent-hash-exchange, by default it will distribute messages based on a hash value of a routing key. Therefore, the key must be a string integer, randomised for every batch.
Multiple routing keys for each table are supported. They are specified in a list of keys, separated by commas: rabbitmq_routing_key_list = 'key1,key2,key3,key4,key5'. In case of topic-exchange it can be a list of patterns, separated by commas: rabbitmq_routing_key_list = '*.logs,data.*,*.work.data.*.logs'. In case of headers-exchange it can be a list of (key = value) headers: rabbitmq_routing_key_list = 'x-match=any,format=logs,type=report,year=2020'.
If parameter num_consumers is set, then in current implementation the exchange, specified by the client, is bound to a consistent-hash exchange (by the specified routing keys) and this exchange will make an even distribution between all (concurrent) consumers within one table, because, without this binding to consistent-hash exchange, it is impossible to ensure that one message is received no more than once, as it is the only way to enable sharding between queues (~ consumers)) with the same routing key.
If parameter num_queues (number of queues per consumer) is set, then distribution of messages is done between all queues of all consumers. It is worth setting num_queues parameter as one queue can handle up to 50K messages and it is in general highly recommended to keep queues as short as possible to increase throughput. By default, there is one unique queue for each consumer. Also there is no limit in size of parameters num_consumers, num_queues.
Also, as a local (unique for each table) consistent-hash-exchange is used for sharding within one table and as there still remains a rule that hash-exchange distributes messages based on a hash value of an integer-key, which in our case can be a string, a complicated pattern or a key=value header, then in current implementation hash-property of the exchange is changed to message_id. (This will make it possible to enable sharding with non-integer routing keys). But remember, that to be able to use a different hash-property, by RabbitMQ documentation, when messages are published from some RabbitMQ client, then the message_id property must be set in publishing parameters (otherwise all messages are routed to one arbitrary chosen queue). And to enable sharding - this message_id should be unique for each message or at least unique for every batch of messages.
(!) If there is no need for a specific exchange type, then there is a default implementation. It should be used, if there is just a need for specific table to receive messages quickly. Because in this case only direct-exchange type is used, which is the quickest type of all, and for sharding consistent-hash exchange is not used - everything is done only by direct-exchange. This makes a certain difference in speed as direct-exchange works quicker than consistent-hash-exchange. Sharding with direct-exchange is possible because of the special internal bindings, which are made by default. Therefore, with default implementation, a much faster sharding between all concurrent consumers (~ their queues) is done within each table. This implementation is also used in INSERT query.
To use this default behaviour, exchange_type parameter must not be used, exchange_name must be different for each table, there is no need to set routing_key_list parameter (as it will be ignored). Also when publishing messages - the routing key message parameter must be a string integer, randomized in range [1, num_consumers] for every batch of messages (the smaller the batch size, the better). And if num_queues is set, then in range [1, num_consumers * num_queues]. (In this case, the exchange type, where messages are first published by the client is fanout-exchange, later it will be routed to the needed local table's exchange.)
If transactional_channel parameter is set to 1 (true), then publishing inside INSERT queries implementation will be wrapped in transactions.
Note: