Create consumers for Kafka tables on fly (but keep them for some period since last used)#57829
Conversation
|
This is an automated comment for commit ebad1bf with description of existing statuses. It's updated for the latest CI running ❌ Click here to open a full report in a separate page Successful checks
|
d8d600c to
8a145e4
Compare
8a145e4 to
c3a8eb6
Compare
|
Can a similar problem happen in the case when readings from the topic occur Ideally, it would be better to have some backpressure mechanism, or we can just forbid Additionally, it seems that 1 minute to remove a consumer from the pool is too fast. I have seen It is also worth mentioning that with that PR, after deleting a consumer, Maybe we can try deleting only the internal object (cppkafka::Consumer) while |
Poll frequency is small enough - 500ms Also note, that this messages have higher priority AFAIR, so even in case of errors while reading from the topic they will be handled first.
This setting means different, it is how much time polling from kafka can be done without yield.
This 1min timeout is the TTL for inactive consumers, if the data transfer in progress then it is in use and won't be deleted, and even when transfer will be finished it won't be deleted either, and since the default poll frequency 500ms this should be OK. The only reason for keeping it multiple times bigger the 500ms is to preserve the consumers for SELECTs
Indeed.
Yes, this make sense, will update the patch. |
Creating / removing the consumer is quite expensive as it triggers consumer group rebalance (which is 'stop the world' thing in Kafka). It's actually a bad thing (BTW - maybe we can create some warning about that? Or inform user someway? Or just make the size of that pool dynamic?), and here delays sometimes can get bigger (no free thread in the pool, nobody can execute StorageKafka::threadFunc so the consumer objects will be created / removed causing rebalances and everythig will become event worse (BTW - it seem like in that way the |
Indeed, this is also a problem. And also there are some other issues, like So yes, all of this, should be addressed first. |
2345903 to
7265068
Compare
src/Storages/Kafka/KafkaConsumer.h
Outdated
There was a problem hiding this comment.
not very obvious choice of moment of setting last_used time.
Why not inside KafkaConsumer::consume? (it's even have last poll time already)
There was a problem hiding this comment.
last_poll_time is not the same, it can differs from the last_used_usec, because, after poll had been finished other things could be done:
- reading from this source could take some time
- commit
- writing this data to MV could take some time
And updating it in notInUse looks like the most logical, since KafkaConsumer should know nothing about TTL (but to avoid tracking this timestamps in a separate map, this time had been added to the KafkaConsumer)
There was a problem hiding this comment.
Maybe the option to convert consumers from StorageKafka into class with methods pop / push, also maintaining that in_use flag & time of the last usage there would be less confusing (since KafkaConsumer class don't do anything with that anyway). But OK.
|
CI is unrelated:
Already fixed in upstream
Not enough info, even in core dump (sigh), core dumps should be fixed. |
….size()) Signed-off-by: Azat Khuzhin <[email protected]>
Fixes: ClickHouse#42777 Signed-off-by: Azat Khuzhin <[email protected]>
…e last used) Pool of consumers created a problem for librdkafka internal statistics, you need to read from the queue always, while in ClickHouse consumers created regardless are there any readers or not (attached materialized views or direct SELECTs). Otherwise, this statistics messages got queued and never released, which: - creates live memory leak - and also makes destroy very slow, due to librdkafka internals (it moves entries from this queue into another linked list, but in a with sorting, which is incredibly slow for linked lists) So the idea is simple, let's create a pool of consumers only when they are required, and destroy them after some timeout (right now it is 60 seconds) if nobody uses them, that way this problem should gone. This should also reduce number of internal librdkafka threads, when nobody reads from Kafka tables. Signed-off-by: Azat Khuzhin <[email protected]>
Signed-off-by: Azat Khuzhin <[email protected]>
This will make system.kafka_consumers more useful, since after TTL consumer object will be removed prio this patch, but after, all information will be preserved. Signed-off-by: Azat Khuzhin <[email protected]>
Signed-off-by: Azat Khuzhin <[email protected]>
Signed-off-by: Azat Khuzhin <[email protected]>
Since pool may exceed threads, while we need to run this thread always to avoid memory leaking. And this should not be a problem since librdkafka has multiple threads for each consumer (5!) anyway. Signed-off-by: Azat Khuzhin <[email protected]>
Signed-off-by: Azat Khuzhin <[email protected]>
Actually now we can create consumer object in the ctor, no need to do this in startup(), since consumer now do not connects to kafka. Signed-off-by: Azat Khuzhin <[email protected]>
Signed-off-by: Azat Khuzhin <[email protected]>
d2865ae to
ebad1bf
Compare
|
https://s3.amazonaws.com/clickhouse-test-reports/0/745d9bb47f3425e28e5660ed7c730038ffece4ee/integration_tests__asan__analyzer__[6_6]/integration_run_parallel4_0.log looks like it might be related |
|
Thank you! We overlooked that. @azat, could you please resubmit? |
|
See #58310 |
Changelog category (leave one):
Changelog entry (a user-readable short description of the changes that goes to CHANGELOG.md):
Create consumers for Kafka tables on fly (but keep them for some period -
kafka_consumers_pool_ttl_ms, since last used), this should fix problem with statistics forsystem.kafka_consumers(that does not consumed when nobody reads from Kafka table, which leads to live memory leak and slow table detach) and also this PR enables stats forsystem.kafka_consumersby default again.Pool of consumers created a problem for librdkafka internal statistics,
you need to read from the queue always, while in ClickHouse consumers
created regardless are there any readers or not (attached materialized
views or direct SELECTs).
Otherwise, this statistics messages got queued and never released,
which:
moves entries from this queue into another linked list, but in a
with sorting, which is incredibly slow for linked lists)
So the idea is simple, let's create a pool of consumers only when they
are required, and destroy them after some timeout (right now it is 60
seconds) if nobody uses them, that way this problem should gone.
This should also reduce number of internal librdkafka threads, when
nobody reads from Kafka tables.
Requires: #57822 (not requires, but just better to merge it after it, since in that case I will enable statistics by default here again, or maybe it should be done separately...)Fixes: #50999 (cc @ilejn)
Note: this should not be backported