Skip to content

KAFKA-8983; AdminClient deleteRecords should not fail all partitions unnecessarily#7449

Merged
hachikuji merged 4 commits intoapache:trunkfrom
hachikuji:KAFKA-8983
Oct 8, 2019
Merged

KAFKA-8983; AdminClient deleteRecords should not fail all partitions unnecessarily#7449
hachikuji merged 4 commits intoapache:trunkfrom
hachikuji:KAFKA-8983

Conversation

@hachikuji
Copy link
Contributor

The deleteRecords API in the AdminClient groups records to be sent by the partition leaders. If one of these requests fails, we currently fail all futures, including those tied to requests sent to other leaders. It would be better to fail only those partitions included in the failed request.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

ExecutionException exception = assertThrows(ExecutionException.class, () -> future.get());
assertTrue("Unexpected exception cause " + exception.getCause(),
exceptionClass.isAssignableFrom(exception.getCause().getClass()));
return (T) exception.getCause();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can avoid the warning (and suppression) if you use exceptionClass.cast.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Woah. Didn't know about that.

public static <T extends Throwable> T assertFutureThrows(KafkaFuture<?> future, Class<T> exceptionClass) {
ExecutionException exception = assertThrows(ExecutionException.class, () -> future.get());
assertTrue("Unexpected exception cause " + exception.getCause(),
exceptionClass.isAssignableFrom(exception.getCause().getClass()));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

exceptionClass.isInstance?

Copy link
Member

@ijuma ijuma left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, just a few nits.

return tps;
}

public static <T extends Throwable> T assertFutureThrows(KafkaFuture<?> future, Class<T> exceptionClass) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: shall we call the parameter exceptionCauseClass and add brief javadoc saying that this checks the cause of the execution exception?

@Override
public void close() {
this.adminClient.close();
this.adminClient.close(Duration.ZERO);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The default implementation of close() is to wait forever. What's the reasoning for using Duration.ZERO here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indefinite blocking seems like a bad combination when time is often mocked.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That said, I don't think I actually need this change for this patch, so I can revert it.

final long nowDelete = time.milliseconds();

final Map<TopicPartition, Long> partitionDeleteOffsets = entry.getValue();
final long currentTimeMs = time.milliseconds();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An existing issue, but would it be better to do this outside the loop?

}
}

for (final Map.Entry<Node, Map<TopicPartition, Long>> entry: leaders.entrySet()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: existing issue, but can we add a space before :?

Copy link
Member

@ijuma ijuma left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@hachikuji hachikuji merged commit a949334 into apache:trunk Oct 8, 2019
hachikuji added a commit that referenced this pull request Oct 8, 2019
…unnecessarily (#7449)

The deleteRecords API in the AdminClient groups records to be sent by the partition leaders. If one of these requests fails, we currently fail all futures, including those tied to requests sent to other leaders. It would be better to fail only those partitions included in the failed request.

Reviewers: Ismael Juma <[email protected]>
ijuma added a commit to ijuma/kafka that referenced this pull request Apr 28, 2020
…t-for-generated-requests

* apache-github/trunk:
  KAFKA-8932; Add tag for CreateTopicsResponse.TopicConfigErrorCode (KIP-525) (apache#7464)
  KAFKA-8944: Fixed KTable compiler warning. (apache#7393)
  KAFKA-8964: Rename tag client-id for thread-level metrics and below (apache#7429)
  MINOR: remove unused imports in Streams system tests (apache#7468)
  KAFKA-7190; Retain producer state until transactionalIdExpiration time passes (apache#7388)
  KAFKA-8983; AdminClient deleteRecords should not fail all partitions unnecessarily (apache#7449)
  MINOR: Modified Exception handling for KIP-470 (apache#7461)
  KAFKA-7245: Deprecate WindowStore#put(key, value) (apache#7105)
  KAFKA-8179: Part 7, cooperative rebalancing in Streams (apache#7386)
  KAFKA-8985; Add flexible version support to inter-broker APIs (apache#7453)
  MINOR: Bump version to 2.5.0-SNAPSHOT (apache#7455)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants