Skip to content

Add storage RabbitMQ#11069

Merged
alesapin merged 58 commits intoClickHouse:masterfrom
kssenii:add-storage-rabbitmq-read-only
Jul 4, 2020
Merged

Add storage RabbitMQ#11069
alesapin merged 58 commits intoClickHouse:masterfrom
kssenii:add-storage-rabbitmq-read-only

Conversation

@kssenii
Copy link
Member

@kssenii kssenii commented May 20, 2020

I hereby agree to the terms of the CLA available at: https://yandex.ru/legal/cla/?lang=en

Changelog category (leave one):

  • New Feature

Changelog entry (a user-readable short description of the changes that goes to CHANGELOG.md):
Add storage RabbitMQ.

Detailed description / Documentation draft:

CRATE TABLE query parameters:

  • rabbitmq_host_port
  • rabbitmq_routing_key_list
  • rabbitmq_exchange_name
  • rabbitmq_exchange_type
  • rabbitmq_format
  • rabbitmq_row_delimiter
  • rabbitmq_num_consumers
  • rabbitmq_num_queues
  • rabbitmq_transactional_channel

Some explanation:

The routing key of the message and exchange name are parameters that are set while publishing messages from any RabbitMQ client. Therefore, they are speicified by the client.

All exchange types, known in RabbitMQ, are supported: direct-exchange, fanout-exchange, topic-exchange, headers-exchange, consistent-hash-exchange. The preferred exchange type is specified by the client in exchange_type parameter.

There can be no more than one exchange per table. Also one exchange can be shared between multiple tables - it enables routing into multiple tables at the same time.

If exchange type is set to 'direct', then messages go to table(s) with a routing_key parameter that exactly matches the routing key of the message.

If exchange type is set to 'fanout', it routes received messages to all tables that are bound to it regardless of routing keys. (To table(s), where the exchange_name is the same.)

If exchange type is set to 'topic', then messages are routed to one or many tables based on a matching between a message routing key and a pattern, which is set in routing_key_list parameter. By RabbitMQ documentation, if messages are sent to topic-exchange, then a routing key must be a list of words, delimited by a dot. The routing patterns may contain “*”. For example, if a pattern (specified in routing_key_list) of some tables is '*.logs', then messages with a routing key with suffix '.logs' will be sent to all those tables. Patterns can be complicated (like 'agreements.*.*.data.'). Patterns can also contain '#' - indicates a match of zero or more words. For more details see RabbitMQ documentation.

If exchange type is set to 'headers', the routing is even more flexible. If using headers-exchange, routing key can be a map (dictionary). For example, if (key = value) headers of published messages are 'format=logs', 'type=report', 'year=2020' with setting 'x-match=all', then these messages will be routed to all tables, where routing_key_list contains all those matches. If a setting is 'x-match=any', then those messages are send to all tables where at least one dictionary value matches its key. The setting is passed in the same list as headers:
rabbitmq_routing_key_list = 'x-match=all,format=logs,type=report,year=2020'.

If exchange type is set to 'consistent_hash', which is a special sharding exchange, - there will be an even distribution of messages between all tables, where the exchange_name is the same. Note that, by RabbitMQ documentation, if using consistent-hash-exchange, by default it will distribute messages based on a hash value of a routing key. Therefore, the key must be a string integer, randomised for every batch.

Multiple routing keys for each table are supported. They are specified in a list of keys, separated by commas: rabbitmq_routing_key_list = 'key1,key2,key3,key4,key5'. In case of topic-exchange it can be a list of patterns, separated by commas: rabbitmq_routing_key_list = '*.logs,data.*,*.work.data.*.logs'. In case of headers-exchange it can be a list of (key = value) headers: rabbitmq_routing_key_list = 'x-match=any,format=logs,type=report,year=2020'.

If parameter num_consumers is set, then in current implementation the exchange, specified by the client, is bound to a consistent-hash exchange (by the specified routing keys) and this exchange will make an even distribution between all (concurrent) consumers within one table, because, without this binding to consistent-hash exchange, it is impossible to ensure that one message is received no more than once, as it is the only way to enable sharding between queues (~ consumers)) with the same routing key.

If parameter num_queues (number of queues per consumer) is set, then distribution of messages is done between all queues of all consumers. It is worth setting num_queues parameter as one queue can handle up to 50K messages and it is in general highly recommended to keep queues as short as possible to increase throughput. By default, there is one unique queue for each consumer. Also there is no limit in size of parameters num_consumers, num_queues.

Also, as a local (unique for each table) consistent-hash-exchange is used for sharding within one table and as there still remains a rule that hash-exchange distributes messages based on a hash value of an integer-key, which in our case can be a string, a complicated pattern or a key=value header, then in current implementation hash-property of the exchange is changed to message_id. (This will make it possible to enable sharding with non-integer routing keys). But remember, that to be able to use a different hash-property, by RabbitMQ documentation, when messages are published from some RabbitMQ client, then the message_id property must be set in publishing parameters (otherwise all messages are routed to one arbitrary chosen queue). And to enable sharding - this message_id should be unique for each message or at least unique for every batch of messages.

(!) If there is no need for a specific exchange type, then there is a default implementation. It should be used, if there is just a need for specific table to receive messages quickly. Because in this case only direct-exchange type is used, which is the quickest type of all, and for sharding consistent-hash exchange is not used - everything is done only by direct-exchange. This makes a certain difference in speed as direct-exchange works quicker than consistent-hash-exchange. Sharding with direct-exchange is possible because of the special internal bindings, which are made by default. Therefore, with default implementation, a much faster sharding between all concurrent consumers (~ their queues) is done within each table. This implementation is also used in INSERT query.

To use this default behaviour, exchange_type parameter must not be used, exchange_name must be different for each table, there is no need to set routing_key_list parameter (as it will be ignored). Also when publishing messages - the routing key message parameter must be a string integer, randomized in range [1, num_consumers] for every batch of messages (the smaller the batch size, the better). And if num_queues is set, then in range [1, num_consumers * num_queues]. (In this case, the exchange type, where messages are first published by the client is fanout-exchange, later it will be routed to the needed local table's exchange.)

If transactional_channel parameter is set to 1 (true), then publishing inside INSERT queries implementation will be wrapped in transactions.

Note:

  • Consistent-hash exchange must be enabled with rabbitmq plugin.
  • By RabbitMQ documentation, if there are multiple consumers, then the order of messages is not guaranteed.

@blinkov blinkov added doc-alert pr-feature Pull request with new product feature labels May 20, 2020
@alesapin alesapin self-assigned this May 21, 2020
Copy link
Member

@alesapin alesapin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Almost ready! Now we need to fix style check https://clickhouse-test-reports.s3.yandex.net/11069/c3569882bbcc33c047d9d9b9424bf06b9a50a3bf/style_check.html#fail1 and clang-tidy warnings https://clickhouse-builds.s3.yandex.net/11069/c3569882bbcc33c047d9d9b9424bf06b9a50a3bf/build_log_680227519_1590004367.txt. After that I'll try to help with compatibility.

Do you have any ideas why some kafka tests failed in your PR?

@kssenii kssenii force-pushed the add-storage-rabbitmq-read-only branch from 1314807 to 14c67c6 Compare May 26, 2020 17:36
@kssenii kssenii changed the title Add storage rabbitmq read-only part Add storage RabbitMQ Jun 1, 2020
Copy link
Member

@alesapin alesapin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We discussed that we have to simplify the code of (Read/Write)Buffer with separation of the logic of reading from RabbitMQ (and moving it into separate background task) and reading into views/select queries.

Copy link
Member

@alesapin alesapin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We still have several points to improve and clarify. However this code is tested well and quite isolated, so I think we can merge it and perform some improvements in separate PRs.

@alesapin alesapin merged commit a2b6d58 into ClickHouse:master Jul 4, 2020
@qza1800
Copy link

qza1800 commented Aug 12, 2020

@kssenii this feature is really fantastic
Can you please give a sample how to use Protobuf format? I can't find a way to put format_schema
https://clickhouse.tech/docs/en/interfaces/formats/#protobuf
Thank you

@kssenii
Copy link
Member Author

kssenii commented Aug 12, 2020

@qza1800 thank you. Now format_schema is not added. I will add it here #12761.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

pr-feature Pull request with new product feature submodule changed At least one submodule changed in this PR.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants