Skip to content

KAFKA-7190: Retain producer state until transactionalIdExpiration time passes#7388

Merged
hachikuji merged 2 commits intoapache:trunkfrom
bob-barrett:KAFKA-7190
Oct 8, 2019
Merged

KAFKA-7190: Retain producer state until transactionalIdExpiration time passes#7388
hachikuji merged 2 commits intoapache:trunkfrom
bob-barrett:KAFKA-7190

Conversation

@bob-barrett
Copy link
Contributor

As described in KIP-360, this patch changes producer state retention so that prodcuer state remains cached even after it is removed from the log. Producer state will only be removed now when the trasnactional id expiration time has passed. This is intended to reduce the incidence of UNKNOWN_PRODUCER_ID errors for producers when records are deleted or when a topic has a short retention time. Tested with unit tests.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

As described in KIP-360, this patch changes producer state retention so that prodcuer state remains cached even after it is removed from the log. Producer state will only be removed now when the trasnactional id expiration time has passed. This is intended to reduce the incidence of UNKNOWN_PRODUCER_ID errors for producers when records are deleted or when a topic has a short retention time. Tested with unit tests.
Copy link
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bob-barrett One thing I do not observe is the logic of expiring producer id upon transactional id expiration. Looking at TransactionStateManager#enableTransactionalIdExpiration today we do not notify the brokers when certain transactionIds are being expired.

EDIT: nvm, I think we set the maxPidExpirationMs always to transactionalIdExpiration anyways.

Copy link
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made a pass.

def oldestSnapshotOffset: Option[Long] = oldestSnapshotFile.map(file => offsetFromFile(file))

private def isProducerRetained(producerStateEntry: ProducerStateEntry, logStartOffset: Long): Boolean = {
producerStateEntry.removeBatchesOlderThan(logStartOffset)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removeBatchesOlderThan in line 151 is no longer needed.

// safe to clear the unreplicated transactions
unreplicatedTxns.clear()
loadFromSnapshot(logStartOffset, currentTimeMs)
} else {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to clarify: since we still remove the snapshots above, the in-memory bookkeeping and the persistent snapshot files could be inconsistent (as we discussed in the ticket). It also means, if the broker has a failure over then we can still see the unknown pid potentially, right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also it worth updating the javadoc to emphasize the difference of in-memory cache and persistent snapshots in line 601 above.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't want to remove the logStartOffset check because I was worried that the snapshots would accumulate. So yes, this means that in the case of a failover or an unclean shutdown, we would lose the state before the expiration time passes.

@guozhangwang
Copy link
Contributor

The code LGTM, minor comments only.

Also cc @lambdaliu who have looked into this ticket before for another look as well.

@bob-barrett bob-barrett changed the title KAFKA-7190: Retain producer state until transactional id expires KAFKA-7190: Retain producer state until transactionalIdExpiration time passes Oct 4, 2019
@guozhangwang
Copy link
Contributor

Retest this please

@guozhangwang
Copy link
Contributor

There are no overlapping test failures across three runs, retest this please.

Copy link
Contributor

@hachikuji hachikuji left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@hachikuji hachikuji merged commit c49775b into apache:trunk Oct 8, 2019
hachikuji pushed a commit that referenced this pull request Oct 8, 2019
…e passes (#7388)

As described in KIP-360, this patch changes producer state retention so that producer state remains cached even after it is removed from the log. Producer state will only be removed now when the transactional id expiration time has passed. This is intended to reduce the incidence of UNKNOWN_PRODUCER_ID errors for producers when records are deleted or when a topic has a short retention time. Tested with unit tests.

Reviewers: Jason Gustafson <[email protected]>, Guozhang Wang <[email protected]>
ijuma added a commit to ijuma/kafka that referenced this pull request Apr 28, 2020
…t-for-generated-requests

* apache-github/trunk:
  KAFKA-8932; Add tag for CreateTopicsResponse.TopicConfigErrorCode (KIP-525) (apache#7464)
  KAFKA-8944: Fixed KTable compiler warning. (apache#7393)
  KAFKA-8964: Rename tag client-id for thread-level metrics and below (apache#7429)
  MINOR: remove unused imports in Streams system tests (apache#7468)
  KAFKA-7190; Retain producer state until transactionalIdExpiration time passes (apache#7388)
  KAFKA-8983; AdminClient deleteRecords should not fail all partitions unnecessarily (apache#7449)
  MINOR: Modified Exception handling for KIP-470 (apache#7461)
  KAFKA-7245: Deprecate WindowStore#put(key, value) (apache#7105)
  KAFKA-8179: Part 7, cooperative rebalancing in Streams (apache#7386)
  KAFKA-8985; Add flexible version support to inter-broker APIs (apache#7453)
  MINOR: Bump version to 2.5.0-SNAPSHOT (apache#7455)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants