KAFKA-14274: [1/7] basic refactoring#14305
KAFKA-14274: [1/7] basic refactoring#14305junrao merged 1 commit intoapache:trunkfrom kirktrue:KAFKA-14274-basic-refactoring-ak
Conversation
There was a problem hiding this comment.
Author’s note: The two new methods in this file were moved here from ConsumerNetworkClient as they will be used in other classes in future commits.
There was a problem hiding this comment.
Author’s note: this method is used in the new Consumer’s constructor in a future commit.
There was a problem hiding this comment.
Author’s note: adding toString() implementations is helpful in debugging.
There was a problem hiding this comment.
Author’s note: adding toString() implementations is helpful in debugging.
There was a problem hiding this comment.
Author’s note: formatting and added debug
There was a problem hiding this comment.
Author’s note: this is the logic from the now-removed requestBackoffExpired method with a log line for debugging.
There was a problem hiding this comment.
Author’s note: requestBackoffExpired is now inline in canSendRequest above.
There was a problem hiding this comment.
Author’s note: added a LogContext argument and formatting.
There was a problem hiding this comment.
Author’s note: added a LogContext argument and formatting.
There was a problem hiding this comment.
Author’s note: mostly encapsulating the instance variables for protection against accidental outside changes.
There was a problem hiding this comment.
Author’s note: As stated previously, the core logic from isUnavailable and maybeThrowAuthFailure were moved to NetworkClientUtils for reuse.
There was a problem hiding this comment.
Author’s note: most of these changes are related to the encapsulation changes that were made in CompletedFetch.
There was a problem hiding this comment.
Author’s note: requestMetadataUpdate was moved to FetchUtils as it will be reused elsewhere in future commits.
There was a problem hiding this comment.
Author’s note: renamed subscriptionState to subscriptions for consistency with the rest of the newly refactored code base.
clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchUtils.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
i actually wonder if the debug would be useful due to lack of the information of what request it is. maybe this log should stay at the request manager level?
There was a problem hiding this comment.
Hi @philipnee!
That's a good point. This debug logging doesn't have context of the request.
The request manager calls canSendRequest() before creating the request, though. So in the cases where we return false, there won't be a request, so there isn't any additional request information that the request manager would be able to log.
Additionally, there are two different reasons why a request cannot be sent:
- There's already an inflight request
- The backoff hasn't expired
When canSendRequest() returns false back to the request manager, we'd lose the context to explain the reason why the request can't be sent.
I'll look into this a bit more to see what I can do.
Thanks!
There was a problem hiding this comment.
I added a relevant toString() method to the RequestState sub-classes that includes details about the request. The logging now includes the toString()-ed RequestState for context.
|
Hey @kirktrue - Thanks for the PR. Left a couple of comments otherwise it looks good. |
|
Test failures are unrelated: |
|
@junrao let me know your thoughts on this PR. Thanks! |
There was a problem hiding this comment.
Instead of duplicating the code in RequestState, could we pull out the common part as a util and reuse?
There was a problem hiding this comment.
@junrao Are you referring to the toString() code specifically, or something else?
There was a problem hiding this comment.
I can make an internal toStringBase() to RequestState that has the core instance variables concatenated as a string. And then subclasses can first append their own instance variable string and then append the result of toStringBase() or something. Does that make sense?
There was a problem hiding this comment.
Yes, I was referring to the toString method. It seems that every subclass of RequestState directly gets every field in RequestState to build its own string. This creates duplicated code and can be a bit error prone.
There was a problem hiding this comment.
@junrao I created toStringBase() in RequestState:
/**
* This method appends the instance variables together in a simple String of comma-separated key value pairs.
* This allows subclasses to include these values and not have to duplicate each variable, helping to prevent
* any variables from being omitted when new ones are added.
*
* @return String version of instance variables.
*/
protected String toStringBase() {
return "owner='" + owner + '\'' +
", exponentialBackoff=" + exponentialBackoff +
", lastSentMs=" + lastSentMs +
", lastReceivedMs=" + lastReceivedMs +
", numAttempts=" + numAttempts +
", backoffMs=" + backoffMs;
}
@Override
public String toString() {
return "RequestState{" + toStringBase() + '}';
}Subclasses look like this:
@Override
public String toString() {
return "OffsetFetchRequestState{" +
"requestedPartitions=" + requestedPartitions +
", requestedGeneration=" + requestedGeneration +
", future=" + future +
", " + toStringBase() +
'}';
}Does that seem OK?
Introduces some basic clean up and refactoring for forthcoming commits.
clolov
left a comment
There was a problem hiding this comment.
Thanks for the change! I believe the test failures are unrelated
| protected static Map<String, Object> appendDeserializerToConfig(Map<String, Object> configs, | ||
| Deserializer<?> keyDeserializer, | ||
| Deserializer<?> valueDeserializer) { | ||
| public static Map<String, Object> appendDeserializerToConfig(Map<String, Object> configs, |
There was a problem hiding this comment.
I assume this visibility change is needed for a subsequent commit?
|
Test failures in latest build are unrelated and about half already have Jiras:
|
|
Hey @kirktrue Thanks. |
|
@divijvaidya - thanks for reporting that. |
|
@divijvaidya - I was trying to cherry pick some additional changes into trunk however, i actually don't find it flaky - I think these two should be failing constantly (causing by completing the wrong CompletableFuture). It's strange that this doesn't fail all the time... |
|
Hey @divijvaidya - Looking at the PR: in fact there isn't any implementation level changes, so I'm not sure why these two functions started to become flaky. Does this metric measure the trunk or all draft/pr? I did open a PR that has broken BaseAsyncConsumerTest. |
Thank you for looking into this @philipnee. You can add "trunk" as a tag in the above link to get data on trunk. Trunk seems all green, in which case, seems like it flaky only on PRs. Doesn't seem like this PR is related in that case. The timeline just happens to match up :) I appreciate your effort towards this investigation. Thank you! |
|
@divijvaidya - thank you for looking into this proactively! 🤝 |
This change introduces some basic clean up and refactoring for forthcoming commits related to the revised fetch code for the consumer threading refactor project. Reviewers: Christo Lolov <[email protected]>, Jun Rao <[email protected]>


This change introduces some basic clean up and refactoring for forthcoming commits related to the revised fetch code for the consumer threading refactor project.
See KAFKA-14274 for more background.