Skip to content

KAFKA-14274: [1/7] basic refactoring#14305

Merged
junrao merged 1 commit intoapache:trunkfrom
kirktrue:KAFKA-14274-basic-refactoring-ak
Sep 7, 2023
Merged

KAFKA-14274: [1/7] basic refactoring#14305
junrao merged 1 commit intoapache:trunkfrom
kirktrue:KAFKA-14274-basic-refactoring-ak

Conversation

@kirktrue
Copy link
Contributor

This change introduces some basic clean up and refactoring for forthcoming commits related to the revised fetch code for the consumer threading refactor project.

See KAFKA-14274 for more background.

Copy link
Contributor Author

@kirktrue kirktrue Aug 29, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Author’s note: The two new methods in this file were moved here from ConsumerNetworkClient as they will be used in other classes in future commits.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Author’s note: this method is used in the new Consumer’s constructor in a future commit.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Author’s note: adding toString() implementations is helpful in debugging.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Author’s note: adding toString() implementations is helpful in debugging.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Author’s note: formatting and added debug

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Author’s note: this is the logic from the now-removed requestBackoffExpired method with a log line for debugging.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Author’s note: requestBackoffExpired is now inline in canSendRequest above.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Author’s note: added a LogContext argument and formatting.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Author’s note: added a LogContext argument and formatting.

@philipnee philipnee added consumer KIP-848 The Next Generation of the Consumer Rebalance Protocol ctr Consumer Threading Refactor (KIP-848) labels Aug 29, 2023
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Author’s note: mostly encapsulating the instance variables for protection against accidental outside changes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Author’s note: As stated previously, the core logic from isUnavailable and maybeThrowAuthFailure were moved to NetworkClientUtils for reuse.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Author’s note: most of these changes are related to the encapsulation changes that were made in CompletedFetch.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Author’s note: requestMetadataUpdate was moved to FetchUtils as it will be reused elsewhere in future commits.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Author’s note: renamed subscriptionState to subscriptions for consistency with the rest of the newly refactored code base.

@kirktrue kirktrue closed this Aug 29, 2023
@kirktrue kirktrue reopened this Aug 29, 2023
@kirktrue kirktrue closed this Aug 29, 2023
@kirktrue kirktrue reopened this Aug 29, 2023
@kirktrue
Copy link
Contributor Author

cc @lianetm @philipnee

@kirktrue kirktrue changed the title KAFKA-14274 #1: basic refactoring KAFKA-14274: [1/7] basic refactoring Aug 29, 2023
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i actually wonder if the debug would be useful due to lack of the information of what request it is. maybe this log should stay at the request manager level?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @philipnee!

That's a good point. This debug logging doesn't have context of the request.

The request manager calls canSendRequest() before creating the request, though. So in the cases where we return false, there won't be a request, so there isn't any additional request information that the request manager would be able to log.

Additionally, there are two different reasons why a request cannot be sent:

  1. There's already an inflight request
  2. The backoff hasn't expired

When canSendRequest() returns false back to the request manager, we'd lose the context to explain the reason why the request can't be sent.

I'll look into this a bit more to see what I can do.

Thanks!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a relevant toString() method to the RequestState sub-classes that includes details about the request. The logging now includes the toString()-ed RequestState for context.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto ^^

@philipnee
Copy link
Contributor

Hey @kirktrue - Thanks for the PR. Left a couple of comments otherwise it looks good.

@kirktrue kirktrue closed this Aug 30, 2023
@kirktrue kirktrue reopened this Aug 30, 2023
@kirktrue
Copy link
Contributor Author

Test failures are unrelated:

kafka.api.ConsumerBounceTest.testConsumptionWithBrokerFailures()
kafka.server.DynamicBrokerReconfigurationTest.testThreadPoolResize()
o.a.k.connect.mirror.integration.IdentityReplicationIntegrationTest.testReplicateSourceDefault()
o.a.k.controller.QuorumControllerTest.testBalancePartitionLeaders()
o.a.k.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerTest.testNewPartitionUpdates()
o.a.k.streams.integration.EOSUncleanShutdownIntegrationTest.shouldWorkWithUncleanShutdownWipeOutStateStore()
o.a.k.streams.processor.internals.StreamsAssignmentScaleTest.testHighAvailabilityTaskAssignorLargeNumConsumers
o.a.k.trogdor.coordinator.CoordinatorTest.testTaskRequestWithOldStartMsGetsUpdated()

@kirktrue
Copy link
Contributor Author

kirktrue commented Sep 1, 2023

@junrao let me know your thoughts on this PR. Thanks!

Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kirktrue : Thanks for the PR. Just one comment.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of duplicating the code in RequestState, could we pull out the common part as a util and reuse?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@junrao Are you referring to the toString() code specifically, or something else?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can make an internal toStringBase() to RequestState that has the core instance variables concatenated as a string. And then subclasses can first append their own instance variable string and then append the result of toStringBase() or something. Does that make sense?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I was referring to the toString method. It seems that every subclass of RequestState directly gets every field in RequestState to build its own string. This creates duplicated code and can be a bit error prone.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@junrao I created toStringBase() in RequestState:

/**
 * This method appends the instance variables together in a simple String of comma-separated key value pairs.
 * This allows subclasses to include these values and not have to duplicate each variable, helping to prevent
 * any variables from being omitted when new ones are added.
 *
 * @return String version of instance variables.
 */
protected String toStringBase() {
    return "owner='" + owner + '\'' +
            ", exponentialBackoff=" + exponentialBackoff +
            ", lastSentMs=" + lastSentMs +
            ", lastReceivedMs=" + lastReceivedMs +
            ", numAttempts=" + numAttempts +
            ", backoffMs=" + backoffMs;
}

@Override
public String toString() {
    return "RequestState{" + toStringBase() + '}';
}

Subclasses look like this:

@Override
public String toString() {
    return "OffsetFetchRequestState{" +
            "requestedPartitions=" + requestedPartitions +
            ", requestedGeneration=" + requestedGeneration +
            ", future=" + future +
            ", " + toStringBase() +
            '}';
}

Does that seem OK?

Introduces some basic clean up and refactoring for forthcoming commits.
Copy link
Contributor

@junrao junrao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kirktrue : Thanks for the updated PR. LGTM. Are the test failures related?

Copy link
Contributor

@clolov clolov left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the change! I believe the test failures are unrelated

protected static Map<String, Object> appendDeserializerToConfig(Map<String, Object> configs,
Deserializer<?> keyDeserializer,
Deserializer<?> valueDeserializer) {
public static Map<String, Object> appendDeserializerToConfig(Map<String, Object> configs,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume this visibility change is needed for a subsequent commit?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes.

@kirktrue
Copy link
Contributor Author

kirktrue commented Sep 7, 2023

Test failures in latest build are unrelated and about half already have Jiras:

@junrao junrao merged commit a2de7d3 into apache:trunk Sep 7, 2023
@divijvaidya
Copy link
Member

Hey @kirktrue
BaseAsyncConsumerTest have started becoming flaky [1] since this commit was introduced on Sept 8th. The flakiness may or may not be related since I haven't investigated in details but can you please check to ensure that this commit didn't cause a flakiness.

Thanks.

Screenshot 2023-09-11 at 12 06 49

Screenshot 2023-09-11 at 12 08 02

[1] https://ge.apache.org/scans/tests?search.relativeStartTime=P28D&search.rootProjectNames=kafka&search.timeZoneId=Europe/Berlin&tests.container=kafka.api.BaseAsyncConsumerTest&tests.test=testCommitSync()

@philipnee
Copy link
Contributor

@divijvaidya - thanks for reporting that.

@philipnee
Copy link
Contributor

@divijvaidya - I was trying to cherry pick some additional changes into trunk however, i actually don't find it flaky - I think these two should be failing constantly (causing by completing the wrong CompletableFuture). It's strange that this doesn't fail all the time...

@philipnee
Copy link
Contributor

Hey @divijvaidya - Looking at the PR: in fact there isn't any implementation level changes, so I'm not sure why these two functions started to become flaky. Does this metric measure the trunk or all draft/pr? I did open a PR that has broken BaseAsyncConsumerTest.

@divijvaidya
Copy link
Member

Does this metric measure the trunk or all draft/pr?

Thank you for looking into this @philipnee. You can add "trunk" as a tag in the above link to get data on trunk. Trunk seems all green, in which case, seems like it flaky only on PRs. Doesn't seem like this PR is related in that case. The timeline just happens to match up :)

I appreciate your effort towards this investigation. Thank you!

@philipnee
Copy link
Contributor

@divijvaidya - thank you for looking into this proactively! 🤝

@kirktrue kirktrue deleted the KAFKA-14274-basic-refactoring-ak branch October 25, 2023 00:17
AnatolyPopov pushed a commit to aiven/kafka that referenced this pull request Feb 16, 2024
This change introduces some basic clean up and refactoring for forthcoming commits related to the revised fetch code for the consumer threading refactor project.

Reviewers: Christo Lolov <[email protected]>, Jun Rao <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

consumer ctr Consumer Threading Refactor (KIP-848) KIP-848 The Next Generation of the Consumer Rebalance Protocol

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants