Skip to content

KAFKA-1893: Allow regex subscriptions in the new consumer#128

Closed
SinghAsDev wants to merge 4 commits intoapache:trunkfrom
SinghAsDev:KAFKA-1893
Closed

KAFKA-1893: Allow regex subscriptions in the new consumer#128
SinghAsDev wants to merge 4 commits intoapache:trunkfrom
SinghAsDev:KAFKA-1893

Conversation

@SinghAsDev
Copy link
Contributor

No description provided.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice to keep this asynchronous. Instead of calling listTopics, which blocks, could we initiate a new metadata fetch directly and add a listener to handle the completion?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @hachikuji , even I was initially thinking to have it asynchronous, but I could not convince myself that we will gain much with it? schedulePatternSubscriptionTask is already a delayed task that user do not have to wait on. Moreover initiating a new metadata fetch directly needs to handle retries, so more params for max retries, interval, etc. I was under impression that we made ListTopics handle all that so that we will not have to worry about it again during regex implementation. I might be missing something.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@SinghAsDev Since KafkaConsumer has only one thread, even scheduled tasks have to be executed in that thread, which means the user has to wait for them. Since you can't really control when the tasks will be executed, in the worst case, it could turn a non-blocking call into a blocking one. And I don't see why error handling can't be handled asynchronously. For updating regex subscriptions, I wouldn't think it too much of a big deal even if we just ignored failures and waited for the next metadata update, though it would be easy to implement retries with backoff (I think we do this for heartbeats already).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True. For error handling, I was saying that it might require more configs not the complexity. However, we should be able to reuse the existing configs. Will update the patch. Thanks!

@asfbot
Copy link

asfbot commented Aug 10, 2015

kafka-trunk-git-pr #120 SUCCESS
This pull request looks good

@benstopford
Copy link
Contributor

Good comments from Grant. Other than that this change looks good.

@SinghAsDev
Copy link
Contributor Author

@hachikuji @granthenke I have addressed your review comments. Let me know if I am still missing something. Thanks!

@asfbot
Copy link

asfbot commented Aug 12, 2015

kafka-trunk-git-pr #133 SUCCESS
This pull request looks good

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@SinghAsDev Are there any error cases to check here? Disconnects for example?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean in case of errors, I should log it and schedule the task as in case of onFailure?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@SinghAsDev Haha, I can understand why this is a little confusing, but the network layer considers the request a "success" if it gets a response. However, that doesn't mean that there wasn't an error code in that response. It might be nice to handle this at the network layer, but it seems to me that there wasn't a generic way to check for errors. Each response object had the error code at a different location in its schema, so the only thing we could do is pass the response back and let the application determine if there was an error. The one case where we might be able to infer errors generically is by checking ClientResponse.wasDisconnected(), but I don't think we do this either (I've forgotten if there's a good reason for that).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hachikuji updated. Let me know if I got that right :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@SinghAsDev I think that works! I wasn't sure if we needed to handle errors in the MetadataResponse itself, but it looks like the topics are only added to the Cluster if they had no error in the response.

@asfbot
Copy link

asfbot commented Aug 13, 2015

kafka-trunk-git-pr #140 SUCCESS
This pull request looks good

@hachikuji
Copy link
Contributor

LGTM

@guozhangwang Do you want to have a look?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need a new config for subscription interval? Could we piggy-back the logic on regular metadata refresh?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. Removed.

@hachikuji
Copy link
Contributor

@SinghAsDev Another problem that @guozhangwang and I briefly talked about was whether we should augment the Metadata object to support getting metadata for all topics (with a hook to get a notification when there is a change)*. Your approach is exactly the one I had in mind, but it does result in a little more overhead since metadata refreshes are done both in KafkaConsumer and in NetworkClient. The nice thing about separating them is that it lets you refresh the full metadata list less frequently. If your regex only covers a small set of the topics, then that makes "normal" metadata refreshes cheaper and they can be done more often. However, it seems a little questionable whether this capability is actually useful in practice, so reducing the metadata overhead might be the bigger win. What do you think?

*Note that it's still useful to be able to bypass NetworkClient to support the listTopics API (which doesn't affect subscriptions).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@SinghAsDev you are not increasing pos! It should be topicsToSubscribe[pos++] = topic, right?

In fact, I would use a list as below:

List<String> topicsToSubscribe = new ArrayList<>(partitions.size());

blahblahblah
....

subscribeTopics(topicsToSubscribe.toArray(new String[0]), true);

But I am fine with the array too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aha, nice catch. Will fix. Thanks!

@asfbot
Copy link

asfbot commented Aug 18, 2015

kafka-trunk-git-pr #160 FAILURE
Looks like there's a problem with this pull request

@SinghAsDev
Copy link
Contributor Author

@hachikuji that makes sense and I actually suggested the same even for listTopics patch. Is my understanding correct that what you are suggesting is similar to my suggestion on listTopics JIRA?

@hachikuji
Copy link
Contributor

@SinghAsDev I think the listTopics approach is still fine and doesn't need to be changed. The key difference is that listTopics doesn't affect subscriptions, so it should not have any impact on the Metadata shared throughout the consumer.

@guozhangwang
Copy link
Contributor

@SinghAsDev Sorry for being late on this, regarding the subscribe semantics I am convinced 1) we do not need to have an extra blacklist pattern in the API but just let users specify that in the regex, and 2) we do not need to do incremental subscription and replacing subscription should just be fine.

Regarding the metadata refresh, what I was thinking is that the current patch took a different scheduled task with different topic sets outside Metadata for regex subscription, while if we can piggy-back it with metadata refresh we can potentially reduce the code complexity while the cost of having metadata refresh always asking for all topics seems OK to me since in practice users would probably want to set the "regex refreshing interval" to be the same as "metadata refreshing interval" the same anyways. What do you think?

On the other hand, we do not necessarily need to make ListTopics API also changing the Metadata states object as @hachikuji suggested since I feel it is in many cases a one-time thing.

I could review this PR again and check it in once it gets rebased / updated.

@SinghAsDev
Copy link
Contributor Author

@guozhangwang @hachikuji makes sense. Will update. However, for this to happen we need to somehow make networkClient aware of the fact that if pattern subscription is being used get metadata for all topics. There are a few ways I could think of for doing this.

  1. Have Metadata object maintain a flag that indicates wether we need metadata for all topics or just for the ones maintained in metadata.
  2. Have NetworkClient maintain a flag which is set by KafkaConsumer during subscribe(pattern) and unsubscribed during unsubscribe(pattern). This will require changing the KafkaClient interface though.

I am more inclined towards Option 1. What do you guys suggest?

@hachikuji
Copy link
Contributor

@SinghAsDev Option 1 definitely sounds nicer to me.

@hachikuji
Copy link
Contributor

@SinghAsDev Btw, I was planning to use a MetadataListener in KAFKA-2464 to hook into metadata updates: https://github.com/apache/kafka/pull/165/files#diff-62bba39339405475f71241a182ef9819R229. Seems like that might be sufficient for this case too.

@SinghAsDev
Copy link
Contributor Author

@hachikuji yea, it will be needed. Should I wait for your patch to go in?

@hachikuji
Copy link
Contributor

@SinghAsDev Nah, yours is probably easier to get through, so go ahead. I'll rebase accordingly.

@guozhangwang
Copy link
Contributor

+1 on option 1 also.

@asfbot
Copy link

asfbot commented Aug 27, 2015

kafka-trunk-git-pr #233 FAILURE
Looks like there's a problem with this pull request

@SinghAsDev
Copy link
Contributor Author

@hachikuji @guozhangwang I have updated the PR with the discussed approach. Let me know how it looks. I will add some unit tests for the new methods added to metadata tonight.

@SinghAsDev
Copy link
Contributor Author

Ahh.. looks like I will again have to rebase. Will do that along with adding unit tests for Metadata.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we should clear the topic collection when this is set. If we did that, then the change in NetworkClient would be unnecessary. Is there any reason not to?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will be called only when a pattern is subscribed. At that time yes, what you are saying makes sense. However, in subsequent metadata updates the changes in NetworkClient is still needed as metadata will have topics that matched pattern and were added in last update. Makes sense?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I guess that makes sense. It kind of feels weird to be tracking a topic list in Metadata that we're not going to use, however. My feeling is that maybe we shouldn't update Metadata for topics subscribed from a regex. Instead we just update metadata once in subscribe(pattern). Does that make sense?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am a bit lost here. Even if we clear topics here, topics will be added by KafkaConsumer.onMetadataUpdate and any metadata update after that will fetch metadata for only those topics if NetworkClient's changes are removed. I think I am missing something here :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably my fault. :) I was trying to suggest to not have KafkaConsumer.onMetadataUpdate modify the Metadata topic list, just the subscription topic list. So basically in KafkaConsumer.subscribe(pattern), we set the Metadata allTopics flag, and we don't modify Metadata again (unless the user changes their subscription). Does that make any sense?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might not work as well as I thought since the metadata is also updated in partitionsFor(). I actually think it would be better for partitionsFor() to use the same approach as listTopics(), but that is out of scope for this issue.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Haha.. I actually made the changes and realized the same.. It does make it look cleaner though. Also, one more issue would be is when subscribed via pattern, metadata will always store metadata for all topics and a fetch will return metadata for all topics, not just for the topics that are actually subscribed via pattern subscription.

For now, I think we can just leave it as it is. If you think we should refactor partitionsFor() then let me know, we probably should create a separate JIRA for it though.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refactoring partitionsFor does seem worthwhile to me since then Metadata is only modified by subscription (and metadata for subscribed topics is all we care about in a steady state). Maybe we can leave this patch as it is and plan to fix it in that JIRA.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. Created KAFKA-2506.

@guozhangwang
Copy link
Contributor

LGTM.

About unsubscribing, I think it is useful to keep the unsubscribe() to unsubscribe any topic / partition / pattern instead of keeping a unsubscribe(Pattern) since we will only have one pattern subscribed at the same time.

@SinghAsDev
Copy link
Contributor Author

@hachikuji @guozhangwang changed the behavior of unsubscribe. Up for your reviews again :)

@asfbot
Copy link

asfbot commented Sep 10, 2015

kafka-trunk-git-pr #385 SUCCESS
This pull request looks good

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be clear, we'll continue fetching metadata for all the topics that were added to Metadata directly. Is that right? If instead we cleared those topics, would the NetworkClient fetch metadata for all topics until we've set a new subscription?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is a bug in the protocol that if the metadata request contains empty list, then returns the full topic list. This should be fixed in another ticket.

Another two general comments: 1) we could remove a duplicate ConsumerRebalanceListener / RebalanceListener, and then remove the Consumer parameter in the callback functions; 2) we probably can create a member Metadata.Listener inside KafkaConsumer instead of letting KafkaConsumer to implement this interface along with Consumer, since it is only used in subscribe(Pattern).

Just to not drag this ticket further I suggest we do these in later tickets. @hachikuji Does that sound good to you?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@guozhangwang Yep, sounds good to me.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@guozhangwang @hachikuji thanks for the reviews and helping in getting this committed. If there is no JIRA already I can create the JIRA and PR for fixing the issues mentioned by @guozhangwang . Let me know. Thanks!

@asfgit asfgit closed this in fd12396 Sep 10, 2015
@hachikuji
Copy link
Contributor

@SinghAsDev I just created a ticket for removing the consumer instance from the callback. You can create a ticket for the other issue if you want.

@SinghAsDev
Copy link
Contributor Author

@hachikuji I have created KAFKA-2533, will create PR shortly.

@guozhangwang guozhangwang mentioned this pull request Sep 18, 2015
hachikuji added a commit to hachikuji/kafka that referenced this pull request Feb 20, 2017
hachikuji added a commit to hachikuji/kafka that referenced this pull request Feb 23, 2017
hachikuji added a commit to hachikuji/kafka that referenced this pull request Feb 23, 2017
hachikuji added a commit to hachikuji/kafka that referenced this pull request Mar 2, 2017
hachikuji added a commit to hachikuji/kafka that referenced this pull request Mar 3, 2017
hachikuji added a commit to hachikuji/kafka that referenced this pull request Mar 9, 2017
hachikuji added a commit to hachikuji/kafka that referenced this pull request Mar 10, 2017
apurvam pushed a commit to apurvam/kafka that referenced this pull request Mar 15, 2017
ijuma pushed a commit to ijuma/kafka that referenced this pull request Mar 22, 2017
hachikuji added a commit to hachikuji/kafka that referenced this pull request Mar 22, 2017
jsancio pushed a commit to jsancio/kafka that referenced this pull request Aug 6, 2019
Initial implementation of the Tier Fetcher. Supports fetching a single partition at a time from tiered storage.
wyuka pushed a commit to wyuka/kafka that referenced this pull request Mar 4, 2022
…er/consumer to network client (apache#286)

[LI-HOTFX] Allow passing client software name and version from producer/consumer to network client (apache#128)

TICKET = N/A
LI_DESCRIPTION = Add config to pass customized client name from producer/consumer to ApiVersionsRequest.

Starting from kafka 2.4, brokers are able to collect clients' version and name, see KIP-511 for more details.
With kafka 2.4 and linkedin-kafka-clients 10, we should make this name unique so that in future when collecting user metrics, we can distinguish supported clients from those using unsupported clients
EXIT_CRITERIA = N/A

Co-authored-by: Ke Hu <[email protected]>
(cherry picked from commit 011ebf8)
fixing merge conflicts and tests
wyuka pushed a commit to wyuka/kafka that referenced this pull request Mar 28, 2022
…cer/consumer to network client (apache#286)

Original commit:
[LI-HOTFX] Allow passing client software name and version from producer/consumer to network client (apache#128)

TICKET = N/A
LI_DESCRIPTION = Add config to pass customized client name from producer/consumer to ApiVersionsRequest.

Starting from kafka 2.4, brokers are able to collect clients' version and name, see KIP-511 for more details.
With kafka 2.4 and linkedin-kafka-clients 10, we should make this name unique so that in future when collecting user metrics, we can distinguish supported clients from those using unsupported clients
EXIT_CRITERIA = N/A

Co-authored-by: Ke Hu <[email protected]>
(cherry picked from commit 011ebf8)
fixing merge conflicts and tests
wyuka pushed a commit to wyuka/kafka that referenced this pull request Jun 16, 2022
…cer/consumer to network client (apache#286)

Original commit:
[LI-HOTFX] Allow passing client software name and version from producer/consumer to network client (apache#128)

TICKET = N/A
LI_DESCRIPTION = Add config to pass customized client name from producer/consumer to ApiVersionsRequest.

Starting from kafka 2.4, brokers are able to collect clients' version and name, see KIP-511 for more details.
With kafka 2.4 and linkedin-kafka-clients 10, we should make this name unique so that in future when collecting user metrics, we can distinguish supported clients from those using unsupported clients
EXIT_CRITERIA = N/A

Co-authored-by: Ke Hu <[email protected]>
(cherry picked from commit 011ebf8)
fixing merge conflicts and tests
patrik-marton pushed a commit to patrik-marton/kafka that referenced this pull request Mar 11, 2025
CSMDS-564: Fix GHA runner label for unit tests (apache#74)
CSMDS-885: Jenkins unit tester action - patching api4jenkins with MavenModuleSetBuild (apache#129)
CSMDS-882: Updating workflow runner to redhat8-builds (apache#128)
davide-armand pushed a commit to aiven/kafka that referenced this pull request Dec 1, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants