KAFKA-8983; AdminClient deleteRecords should not fail all partitions unnecessarily#7449
KAFKA-8983; AdminClient deleteRecords should not fail all partitions unnecessarily#7449hachikuji merged 4 commits intoapache:trunkfrom
Conversation
| ExecutionException exception = assertThrows(ExecutionException.class, () -> future.get()); | ||
| assertTrue("Unexpected exception cause " + exception.getCause(), | ||
| exceptionClass.isAssignableFrom(exception.getCause().getClass())); | ||
| return (T) exception.getCause(); |
There was a problem hiding this comment.
I think you can avoid the warning (and suppression) if you use exceptionClass.cast.
There was a problem hiding this comment.
Woah. Didn't know about that.
| public static <T extends Throwable> T assertFutureThrows(KafkaFuture<?> future, Class<T> exceptionClass) { | ||
| ExecutionException exception = assertThrows(ExecutionException.class, () -> future.get()); | ||
| assertTrue("Unexpected exception cause " + exception.getCause(), | ||
| exceptionClass.isAssignableFrom(exception.getCause().getClass())); |
| return tps; | ||
| } | ||
|
|
||
| public static <T extends Throwable> T assertFutureThrows(KafkaFuture<?> future, Class<T> exceptionClass) { |
There was a problem hiding this comment.
Nit: shall we call the parameter exceptionCauseClass and add brief javadoc saying that this checks the cause of the execution exception?
| @Override | ||
| public void close() { | ||
| this.adminClient.close(); | ||
| this.adminClient.close(Duration.ZERO); |
There was a problem hiding this comment.
The default implementation of close() is to wait forever. What's the reasoning for using Duration.ZERO here?
There was a problem hiding this comment.
Indefinite blocking seems like a bad combination when time is often mocked.
There was a problem hiding this comment.
That said, I don't think I actually need this change for this patch, so I can revert it.
| final long nowDelete = time.milliseconds(); | ||
|
|
||
| final Map<TopicPartition, Long> partitionDeleteOffsets = entry.getValue(); | ||
| final long currentTimeMs = time.milliseconds(); |
There was a problem hiding this comment.
An existing issue, but would it be better to do this outside the loop?
| } | ||
| } | ||
|
|
||
| for (final Map.Entry<Node, Map<TopicPartition, Long>> entry: leaders.entrySet()) { |
There was a problem hiding this comment.
Nit: existing issue, but can we add a space before :?
…unnecessarily (#7449) The deleteRecords API in the AdminClient groups records to be sent by the partition leaders. If one of these requests fails, we currently fail all futures, including those tied to requests sent to other leaders. It would be better to fail only those partitions included in the failed request. Reviewers: Ismael Juma <[email protected]>
…t-for-generated-requests * apache-github/trunk: KAFKA-8932; Add tag for CreateTopicsResponse.TopicConfigErrorCode (KIP-525) (apache#7464) KAFKA-8944: Fixed KTable compiler warning. (apache#7393) KAFKA-8964: Rename tag client-id for thread-level metrics and below (apache#7429) MINOR: remove unused imports in Streams system tests (apache#7468) KAFKA-7190; Retain producer state until transactionalIdExpiration time passes (apache#7388) KAFKA-8983; AdminClient deleteRecords should not fail all partitions unnecessarily (apache#7449) MINOR: Modified Exception handling for KIP-470 (apache#7461) KAFKA-7245: Deprecate WindowStore#put(key, value) (apache#7105) KAFKA-8179: Part 7, cooperative rebalancing in Streams (apache#7386) KAFKA-8985; Add flexible version support to inter-broker APIs (apache#7453) MINOR: Bump version to 2.5.0-SNAPSHOT (apache#7455)
The deleteRecords API in the AdminClient groups records to be sent by the partition leaders. If one of these requests fails, we currently fail all futures, including those tied to requests sent to other leaders. It would be better to fail only those partitions included in the failed request.
Committer Checklist (excluded from commit message)