Skip to content

Better settings for Kafka#11388

Merged
abyss7 merged 9 commits intoClickHouse:masterfrom
filimonov:kafka_better_settings
Jun 6, 2020
Merged

Better settings for Kafka#11388
abyss7 merged 9 commits intoClickHouse:masterfrom
filimonov:kafka_better_settings

Conversation

@filimonov
Copy link
Contributor

@filimonov filimonov commented Jun 2, 2020

I hereby agree to the terms of the CLA available at: https://yandex.ru/legal/cla/?lang=en

Changelog category (leave one):

  • Improvement

Changelog entry (a user-readable short description of the changes that goes to CHANGELOG.md):
Support for all format settings in Kafka, expose some setting on table level, adjust the defaults for better performance.

Detailed description / Documentation draft:
Now it is possible to use any setting related to format parsing during the creation of Kafka table.
Sample:

CREATE TABLE test.kafka (a UInt64, b String)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka1:19092',
kafka_topic_list = 'issue4116',
kafka_group_name = 'issue4116',
kafka_format = 'CSV',
kafka_row_delimiter = '\\n',
format_csv_delimiter = '|';

Those settings changed/added:

  • kafka_max_block_size: Number of rows collected by poll(s) for flushing data from Kafka. Default changed to min_insert_block_size / kafka_num_consumers (for the single consumer it is 1048576, before that max_block_size (65536) was used, which was suboptimal, because was leading to too frequent commits and too small insert blocks in the target table. See Fixed reschedule issue in Kafka #11149 (comment) Kafka fixes part2 #8917 (comment)
  • kafka_poll_max_batch_size: Maximum amount of messages to be polled in a single Kafka poll. (default now is min(max_block_size, kafka_max_block_size), normally 65536). Can now be configurable separated, before is was kafka_max_block_size. It's better to do smaller polls, to avoid bigger allocations, and to give a chance for librdkafka to fill the queue while we processing polled block.
  • kafka_poll_timeout_ms: Timeout for the single poll from Kafka (default is taken from stream_poll_timeout_ms=500ms), new setting, can now be configurable per table (before stream_poll_timeout_ms was always used).
  • kafka_flush_interval_ms: Timeout for flushing data from Kafka (default is taken from stream_flush_interval_ms=7500ms), new setting, can now be configurable per table (before stream_flush_interval_ms was always used).

To understand the relation of those settings check the following preuso-code illustrating how ClickHouse consumes the data from the kafka:

insert_block = [];
timer.start();
while (true)
{
   messages = kafka.batch_poll(kafka_poll_max_batch_size, kafka_poll_timeout_ms);  
   insert_block.add_rows(parse(messages));
   if (insert_block.rows >= kafka_max_block_size)
      break;
   if (timer.time_since_start >=  kafka_flush_interval_ms)
      break;
}
put_to_target_tables(insert_block);
kafka.commit();

Librdkafka settings adjusted:

  • client.software.name - now filled as "ClickHouse"
  • client.software.version - now filled with clickhouse version, for example "v20.5.1.1-prestable"
  • queued.min.messages - default (100000) is increased to kafka_max_block_size (but not decreased), which allows preventing fast draining of the librdkafka queue during the building of a single insert block. Improves performance significantly, but may lead to bigger memory consumption.

Extra:

  • code around Kafka settings cleaned a bit,
  • get rid of excessive Context copying

Closes #11308
Closes #4116
Closes #8056

@blinkov blinkov added the pr-improvement Pull request with some product improvements label Jun 2, 2020
@qoega qoega added the doc-alert label Jun 3, 2020
@filimonov filimonov force-pushed the kafka_better_settings branch from 3973762 to 94261b9 Compare June 3, 2020 17:07
@filimonov filimonov marked this pull request as ready for review June 3, 2020 17:08
@filimonov filimonov changed the title [wip] Better settings for Kafka Better settings for Kafka Jun 3, 2020
@filimonov
Copy link
Contributor Author

filimonov commented Jun 4, 2020

It seems like queued.min.messages=kafka_max_block_size allows to archive very good performance.

Also it seem that the issue #11216 affects performance (when we drain librdkafka internal queue it start reporting 'stalled' status).

@abyss7 abyss7 self-assigned this Jun 4, 2020
@filimonov
Copy link
Contributor Author

filimonov commented Jun 4, 2020

Benchmark results: consume speed (measured by https://github.com/filimonov/ch-kafka-consume-perftest):

That PR (v20.5.1.3632. 26d93fd)

  • all settings default: ~466000 rows/sec
  • kafka_num_consumers=4: ~1243000 rows/sec

Base master commit:

  • all settings default: ~180000 rows/sec
  • kafka_max_block_size=1048576: ~342000 rows/sec
  • kafka_max_block_size=1048576, queued.min.messages=1048576: ~436000 rows/sec
  • kafka_max_block_size=1048576, queued.min.messages=1048576, kafka_num_consumers=4: ~1157000 row/sec

With default settings: 2.58x times faster. <-- that was the main goal of changing defaults.
That PR default setting vs master (tuned) >=5-40% faster (depends on used tuning).

@abyss7
Copy link
Contributor

abyss7 commented Jun 6, 2020

Yandex check failure is unrelated

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

pr-improvement Pull request with some product improvements

Projects

None yet

5 participants