KAFKA-7190: Retain producer state until transactionalIdExpiration time passes#7388
KAFKA-7190: Retain producer state until transactionalIdExpiration time passes#7388hachikuji merged 2 commits intoapache:trunkfrom
Conversation
As described in KIP-360, this patch changes producer state retention so that prodcuer state remains cached even after it is removed from the log. Producer state will only be removed now when the trasnactional id expiration time has passed. This is intended to reduce the incidence of UNKNOWN_PRODUCER_ID errors for producers when records are deleted or when a topic has a short retention time. Tested with unit tests.
There was a problem hiding this comment.
@bob-barrett One thing I do not observe is the logic of expiring producer id upon transactional id expiration. Looking at TransactionStateManager#enableTransactionalIdExpiration today we do not notify the brokers when certain transactionIds are being expired.
EDIT: nvm, I think we set the maxPidExpirationMs always to transactionalIdExpiration anyways.
| def oldestSnapshotOffset: Option[Long] = oldestSnapshotFile.map(file => offsetFromFile(file)) | ||
|
|
||
| private def isProducerRetained(producerStateEntry: ProducerStateEntry, logStartOffset: Long): Boolean = { | ||
| producerStateEntry.removeBatchesOlderThan(logStartOffset) |
There was a problem hiding this comment.
removeBatchesOlderThan in line 151 is no longer needed.
| // safe to clear the unreplicated transactions | ||
| unreplicatedTxns.clear() | ||
| loadFromSnapshot(logStartOffset, currentTimeMs) | ||
| } else { |
There was a problem hiding this comment.
Just to clarify: since we still remove the snapshots above, the in-memory bookkeeping and the persistent snapshot files could be inconsistent (as we discussed in the ticket). It also means, if the broker has a failure over then we can still see the unknown pid potentially, right?
There was a problem hiding this comment.
Also it worth updating the javadoc to emphasize the difference of in-memory cache and persistent snapshots in line 601 above.
There was a problem hiding this comment.
I didn't want to remove the logStartOffset check because I was worried that the snapshots would accumulate. So yes, this means that in the case of a failover or an unclean shutdown, we would lose the state before the expiration time passes.
|
The code LGTM, minor comments only. Also cc @lambdaliu who have looked into this ticket before for another look as well. |
|
Retest this please |
|
There are no overlapping test failures across three runs, retest this please. |
…e passes (#7388) As described in KIP-360, this patch changes producer state retention so that producer state remains cached even after it is removed from the log. Producer state will only be removed now when the transactional id expiration time has passed. This is intended to reduce the incidence of UNKNOWN_PRODUCER_ID errors for producers when records are deleted or when a topic has a short retention time. Tested with unit tests. Reviewers: Jason Gustafson <[email protected]>, Guozhang Wang <[email protected]>
…t-for-generated-requests * apache-github/trunk: KAFKA-8932; Add tag for CreateTopicsResponse.TopicConfigErrorCode (KIP-525) (apache#7464) KAFKA-8944: Fixed KTable compiler warning. (apache#7393) KAFKA-8964: Rename tag client-id for thread-level metrics and below (apache#7429) MINOR: remove unused imports in Streams system tests (apache#7468) KAFKA-7190; Retain producer state until transactionalIdExpiration time passes (apache#7388) KAFKA-8983; AdminClient deleteRecords should not fail all partitions unnecessarily (apache#7449) MINOR: Modified Exception handling for KIP-470 (apache#7461) KAFKA-7245: Deprecate WindowStore#put(key, value) (apache#7105) KAFKA-8179: Part 7, cooperative rebalancing in Streams (apache#7386) KAFKA-8985; Add flexible version support to inter-broker APIs (apache#7453) MINOR: Bump version to 2.5.0-SNAPSHOT (apache#7455)
As described in KIP-360, this patch changes producer state retention so that prodcuer state remains cached even after it is removed from the log. Producer state will only be removed now when the trasnactional id expiration time has passed. This is intended to reduce the incidence of UNKNOWN_PRODUCER_ID errors for producers when records are deleted or when a topic has a short retention time. Tested with unit tests.
Committer Checklist (excluded from commit message)