KAFKA-1893: Allow regex subscriptions in the new consumer#128
KAFKA-1893: Allow regex subscriptions in the new consumer#128SinghAsDev wants to merge 4 commits intoapache:trunkfrom
Conversation
There was a problem hiding this comment.
It would be nice to keep this asynchronous. Instead of calling listTopics, which blocks, could we initiate a new metadata fetch directly and add a listener to handle the completion?
There was a problem hiding this comment.
Hey @hachikuji , even I was initially thinking to have it asynchronous, but I could not convince myself that we will gain much with it? schedulePatternSubscriptionTask is already a delayed task that user do not have to wait on. Moreover initiating a new metadata fetch directly needs to handle retries, so more params for max retries, interval, etc. I was under impression that we made ListTopics handle all that so that we will not have to worry about it again during regex implementation. I might be missing something.
There was a problem hiding this comment.
@SinghAsDev Since KafkaConsumer has only one thread, even scheduled tasks have to be executed in that thread, which means the user has to wait for them. Since you can't really control when the tasks will be executed, in the worst case, it could turn a non-blocking call into a blocking one. And I don't see why error handling can't be handled asynchronously. For updating regex subscriptions, I wouldn't think it too much of a big deal even if we just ignored failures and waited for the next metadata update, though it would be easy to implement retries with backoff (I think we do this for heartbeats already).
There was a problem hiding this comment.
True. For error handling, I was saying that it might require more configs not the complexity. However, we should be able to reuse the existing configs. Will update the patch. Thanks!
|
kafka-trunk-git-pr #120 SUCCESS |
|
Good comments from Grant. Other than that this change looks good. |
|
@hachikuji @granthenke I have addressed your review comments. Let me know if I am still missing something. Thanks! |
|
kafka-trunk-git-pr #133 SUCCESS |
There was a problem hiding this comment.
@SinghAsDev Are there any error cases to check here? Disconnects for example?
There was a problem hiding this comment.
Do you mean in case of errors, I should log it and schedule the task as in case of onFailure?
There was a problem hiding this comment.
@SinghAsDev Haha, I can understand why this is a little confusing, but the network layer considers the request a "success" if it gets a response. However, that doesn't mean that there wasn't an error code in that response. It might be nice to handle this at the network layer, but it seems to me that there wasn't a generic way to check for errors. Each response object had the error code at a different location in its schema, so the only thing we could do is pass the response back and let the application determine if there was an error. The one case where we might be able to infer errors generically is by checking ClientResponse.wasDisconnected(), but I don't think we do this either (I've forgotten if there's a good reason for that).
There was a problem hiding this comment.
@hachikuji updated. Let me know if I got that right :)
There was a problem hiding this comment.
@SinghAsDev I think that works! I wasn't sure if we needed to handle errors in the MetadataResponse itself, but it looks like the topics are only added to the Cluster if they had no error in the response.
|
kafka-trunk-git-pr #140 SUCCESS |
|
LGTM @guozhangwang Do you want to have a look? |
There was a problem hiding this comment.
Do we need a new config for subscription interval? Could we piggy-back the logic on regular metadata refresh?
There was a problem hiding this comment.
Makes sense. Removed.
|
@SinghAsDev Another problem that @guozhangwang and I briefly talked about was whether we should augment the Metadata object to support getting metadata for all topics (with a hook to get a notification when there is a change)*. Your approach is exactly the one I had in mind, but it does result in a little more overhead since metadata refreshes are done both in KafkaConsumer and in NetworkClient. The nice thing about separating them is that it lets you refresh the full metadata list less frequently. If your regex only covers a small set of the topics, then that makes "normal" metadata refreshes cheaper and they can be done more often. However, it seems a little questionable whether this capability is actually useful in practice, so reducing the metadata overhead might be the bigger win. What do you think? *Note that it's still useful to be able to bypass NetworkClient to support the listTopics API (which doesn't affect subscriptions). |
There was a problem hiding this comment.
@SinghAsDev you are not increasing pos! It should be topicsToSubscribe[pos++] = topic, right?
In fact, I would use a list as below:
List<String> topicsToSubscribe = new ArrayList<>(partitions.size());
blahblahblah
....
subscribeTopics(topicsToSubscribe.toArray(new String[0]), true);
But I am fine with the array too.
There was a problem hiding this comment.
Aha, nice catch. Will fix. Thanks!
|
kafka-trunk-git-pr #160 FAILURE |
|
@hachikuji that makes sense and I actually suggested the same even for listTopics patch. Is my understanding correct that what you are suggesting is similar to my suggestion on listTopics JIRA? |
|
@SinghAsDev I think the listTopics approach is still fine and doesn't need to be changed. The key difference is that listTopics doesn't affect subscriptions, so it should not have any impact on the Metadata shared throughout the consumer. |
|
@SinghAsDev Sorry for being late on this, regarding the subscribe semantics I am convinced 1) we do not need to have an extra blacklist pattern in the API but just let users specify that in the regex, and 2) we do not need to do incremental subscription and replacing subscription should just be fine. Regarding the metadata refresh, what I was thinking is that the current patch took a different scheduled task with different topic sets outside Metadata for regex subscription, while if we can piggy-back it with metadata refresh we can potentially reduce the code complexity while the cost of having metadata refresh always asking for all topics seems OK to me since in practice users would probably want to set the "regex refreshing interval" to be the same as "metadata refreshing interval" the same anyways. What do you think? On the other hand, we do not necessarily need to make ListTopics API also changing the Metadata states object as @hachikuji suggested since I feel it is in many cases a one-time thing. I could review this PR again and check it in once it gets rebased / updated. |
|
@guozhangwang @hachikuji makes sense. Will update. However, for this to happen we need to somehow make networkClient aware of the fact that if pattern subscription is being used get metadata for all topics. There are a few ways I could think of for doing this.
I am more inclined towards Option 1. What do you guys suggest? |
|
@SinghAsDev Option 1 definitely sounds nicer to me. |
|
@SinghAsDev Btw, I was planning to use a MetadataListener in KAFKA-2464 to hook into metadata updates: https://github.com/apache/kafka/pull/165/files#diff-62bba39339405475f71241a182ef9819R229. Seems like that might be sufficient for this case too. |
|
@hachikuji yea, it will be needed. Should I wait for your patch to go in? |
|
@SinghAsDev Nah, yours is probably easier to get through, so go ahead. I'll rebase accordingly. |
|
+1 on option 1 also. |
|
kafka-trunk-git-pr #233 FAILURE |
|
@hachikuji @guozhangwang I have updated the PR with the discussed approach. Let me know how it looks. I will add some unit tests for the new methods added to metadata tonight. |
|
Ahh.. looks like I will again have to rebase. Will do that along with adding unit tests for Metadata. |
There was a problem hiding this comment.
I wonder if we should clear the topic collection when this is set. If we did that, then the change in NetworkClient would be unnecessary. Is there any reason not to?
There was a problem hiding this comment.
This will be called only when a pattern is subscribed. At that time yes, what you are saying makes sense. However, in subsequent metadata updates the changes in NetworkClient is still needed as metadata will have topics that matched pattern and were added in last update. Makes sense?
There was a problem hiding this comment.
Yeah, I guess that makes sense. It kind of feels weird to be tracking a topic list in Metadata that we're not going to use, however. My feeling is that maybe we shouldn't update Metadata for topics subscribed from a regex. Instead we just update metadata once in subscribe(pattern). Does that make sense?
There was a problem hiding this comment.
I am a bit lost here. Even if we clear topics here, topics will be added by KafkaConsumer.onMetadataUpdate and any metadata update after that will fetch metadata for only those topics if NetworkClient's changes are removed. I think I am missing something here :)
There was a problem hiding this comment.
Probably my fault. :) I was trying to suggest to not have KafkaConsumer.onMetadataUpdate modify the Metadata topic list, just the subscription topic list. So basically in KafkaConsumer.subscribe(pattern), we set the Metadata allTopics flag, and we don't modify Metadata again (unless the user changes their subscription). Does that make any sense?
There was a problem hiding this comment.
This might not work as well as I thought since the metadata is also updated in partitionsFor(). I actually think it would be better for partitionsFor() to use the same approach as listTopics(), but that is out of scope for this issue.
There was a problem hiding this comment.
Haha.. I actually made the changes and realized the same.. It does make it look cleaner though. Also, one more issue would be is when subscribed via pattern, metadata will always store metadata for all topics and a fetch will return metadata for all topics, not just for the topics that are actually subscribed via pattern subscription.
For now, I think we can just leave it as it is. If you think we should refactor partitionsFor() then let me know, we probably should create a separate JIRA for it though.
There was a problem hiding this comment.
Refactoring partitionsFor does seem worthwhile to me since then Metadata is only modified by subscription (and metadata for subscribed topics is all we care about in a steady state). Maybe we can leave this patch as it is and plan to fix it in that JIRA.
|
LGTM. About unsubscribing, I think it is useful to keep the |
8abf03e to
8c0def4
Compare
|
@hachikuji @guozhangwang changed the behavior of unsubscribe. Up for your reviews again :) |
|
kafka-trunk-git-pr #385 SUCCESS |
There was a problem hiding this comment.
To be clear, we'll continue fetching metadata for all the topics that were added to Metadata directly. Is that right? If instead we cleared those topics, would the NetworkClient fetch metadata for all topics until we've set a new subscription?
There was a problem hiding this comment.
I think this is a bug in the protocol that if the metadata request contains empty list, then returns the full topic list. This should be fixed in another ticket.
Another two general comments: 1) we could remove a duplicate ConsumerRebalanceListener / RebalanceListener, and then remove the Consumer parameter in the callback functions; 2) we probably can create a member Metadata.Listener inside KafkaConsumer instead of letting KafkaConsumer to implement this interface along with Consumer, since it is only used in subscribe(Pattern).
Just to not drag this ticket further I suggest we do these in later tickets. @hachikuji Does that sound good to you?
There was a problem hiding this comment.
@guozhangwang @hachikuji thanks for the reviews and helping in getting this committed. If there is no JIRA already I can create the JIRA and PR for fixing the issues mentioned by @guozhangwang . Let me know. Thanks!
|
@SinghAsDev I just created a ticket for removing the consumer instance from the callback. You can create a ticket for the other issue if you want. |
|
@hachikuji I have created KAFKA-2533, will create PR shortly. |
Initial implementation of the Tier Fetcher. Supports fetching a single partition at a time from tiered storage.
…er/consumer to network client (apache#286) [LI-HOTFX] Allow passing client software name and version from producer/consumer to network client (apache#128) TICKET = N/A LI_DESCRIPTION = Add config to pass customized client name from producer/consumer to ApiVersionsRequest. Starting from kafka 2.4, brokers are able to collect clients' version and name, see KIP-511 for more details. With kafka 2.4 and linkedin-kafka-clients 10, we should make this name unique so that in future when collecting user metrics, we can distinguish supported clients from those using unsupported clients EXIT_CRITERIA = N/A Co-authored-by: Ke Hu <[email protected]> (cherry picked from commit 011ebf8) fixing merge conflicts and tests
…cer/consumer to network client (apache#286) Original commit: [LI-HOTFX] Allow passing client software name and version from producer/consumer to network client (apache#128) TICKET = N/A LI_DESCRIPTION = Add config to pass customized client name from producer/consumer to ApiVersionsRequest. Starting from kafka 2.4, brokers are able to collect clients' version and name, see KIP-511 for more details. With kafka 2.4 and linkedin-kafka-clients 10, we should make this name unique so that in future when collecting user metrics, we can distinguish supported clients from those using unsupported clients EXIT_CRITERIA = N/A Co-authored-by: Ke Hu <[email protected]> (cherry picked from commit 011ebf8) fixing merge conflicts and tests
…cer/consumer to network client (apache#286) Original commit: [LI-HOTFX] Allow passing client software name and version from producer/consumer to network client (apache#128) TICKET = N/A LI_DESCRIPTION = Add config to pass customized client name from producer/consumer to ApiVersionsRequest. Starting from kafka 2.4, brokers are able to collect clients' version and name, see KIP-511 for more details. With kafka 2.4 and linkedin-kafka-clients 10, we should make this name unique so that in future when collecting user metrics, we can distinguish supported clients from those using unsupported clients EXIT_CRITERIA = N/A Co-authored-by: Ke Hu <[email protected]> (cherry picked from commit 011ebf8) fixing merge conflicts and tests
CSMDS-564: Fix GHA runner label for unit tests (apache#74) CSMDS-885: Jenkins unit tester action - patching api4jenkins with MavenModuleSetBuild (apache#129) CSMDS-882: Updating workflow runner to redhat8-builds (apache#128)
No description provided.