KAFKA-2143: fix replica offset truncate to beginning during leader migration.#129
KAFKA-2143: fix replica offset truncate to beginning during leader migration.#129becketqin wants to merge 2 commits intoapache:trunkfrom
Conversation
|
Seems like it fixes symptom (don't truncate log when follower is confused), instead of fixing original cause (possible race condition). |
|
kafka-trunk-git-pr #121 SUCCESS |
|
@lazyval , ideally the exception returned by leader should contain more information. However, that is a big wire protocol change and perhaps not necessary. The easier fix is to handle the exception correctly. So the problem is how to deal with the race condition. The follower is not confused in this case, it just needs one more check to understand the situation. |
|
@becketqin, original issue https://issues.apache.org/jira/browse/KAFKA-2143 states that replica gets ahead of leader but you applied changes to branch for case when replica is far behind leader. |
There was a problem hiding this comment.
The doc is a bit confusing and I think it is important to be clear here. Does "follower" refer to the old leader or a 3rd replica? (i.e. if broker A is the old leader and B is the new leader, does "follower" refer to A or C?)
There was a problem hiding this comment.
Yeah... I think it perhaps is not easy to explain. Actually the follower can be either A or C. Because both of them are possible to have their high watermark greater than B's Log end offset.
The rationale behind this handling offset out of range is to figure out whether an offset OST the follower is trying to fetch was too big or to small. The way we check this is to query the log end offset (LEO) on the leader. If OST is greater than LEO it is too big, otherwise it is too small.
However, there is some time between the follower receives the OffsetOutOfRange exception and starts to handle it. That means when the follower got the exception first time, OST might be greater than leader's LEO. But when the follower starts to handle the exception and query the LEO again, OST is already smaller than leader's LEO - because the leader might already got some new messages appended, the offset perhaps is no longer too big. In this case, current code would say OST was too small and truncate all the logs, which is a bug.
There was a problem hiding this comment.
Can you reword the second scenario and clarify? Especially the part where "However, there is some time between the follower receives the OffsetOutOfRange exception and starts to handle it. That means when the follower got the exception first time, OST might be greater than leader's LEO. But when the follower starts to handle the exception and query the LEO again, OST is already smaller than leader's LEO - because the leader might already got some new messages appended, the offset perhaps is no longer too big."
It is non-intuitive and understanding this is important for understanding the code and our logic.
There was a problem hiding this comment.
@gwenshap Sorry for late reply... I somehow missed email from your followup comments...
I will put a more detailed sequence there but this could be a bit verbose. And people probably still need to read more code to understand what happened.
71f8a47 to
baeb9c6
Compare
|
kafka-trunk-git-pr #446 FAILURE |
|
LGTM. |
|
@becketqin Before committing this fix we need to figure out one thing, that this issue is ONLY due to an unclean election, and since in this case inconsistency will probably happen already we may allow follower to be different from leader. But as @junrao mentioned this can be also triggered by other scenarios then we cannot allow replicas to be inconsistent. |
|
@guozhangwang The other issue is tracked by KAFKA-2477. After that is fixed, we should not see the OffsetOutOfRangeException triggered by that issue. |
…gration. Author: Jiangjie Qin <[email protected]> Reviewers: Gwen Shapira, Guozhang Wang Closes #129 from becketqin/KAFKA-2143 (cherry picked from commit 87eccb9) Signed-off-by: Guozhang Wang <[email protected]>
|
Merged to trunk and 0.9.0. |
Upload tier epoch state to tiered storage, so it can be retrieved when needed. For each segment we upload to tiered storage, we also upload a snapshot of the tier epoch cache at that point. The snapshot is truncated to the end offset of the segment being tiered.
|
@becketqin I think there are some problems with the comments. According to the logic of your program, fetchOffset should be the max value of (leaderStartOffset, replica.logEndOffset). But now the comment says that the fetchOffset is leaderStartOffset. Here, I give an immature comment, please consider adoption.
val leaderStartOffset: Long = earliestOrLatestOffset(topicAndPartition, ListOffsetRequest.EARLIEST_TIMESTAMP,
brokerConfig.brokerId)
warn("Replica %d for partition %s reset its fetch offset from %d to current leader %d's start offset %d"
.format(brokerConfig.brokerId, topicAndPartition, replica.logEndOffset.messageOffset, sourceBroker.id, leaderStartOffset))
val offsetToFetch = Math.max(leaderStartOffset, replica.logEndOffset.messageOffset)
// Only truncate log when current leader's log start offset is greater than follower's log end offset.
if (leaderStartOffset > replica.logEndOffset.messageOffset)
replicaMgr.logManager.truncateFullyAndStartAt(topicAndPartition, leaderStartOffset)
offsetToFetch
val leaderStartOffset: Long = earliestOrLatestOffset(topicAndPartition, ListOffsetRequest.EARLIEST_TIMESTAMP,
brokerConfig.brokerId)
val offsetToFetch = Math.max(leaderStartOffset, replica.logEndOffset.messageOffset)
warn("Replica %d for partition %s reset its fetch offset from %d to %d, which is the max of current leader %d's start offset %d and replica logEndOffset %d"
.format(brokerConfig.brokerId, topicPartition, replica.logEndOffset.messageOffset, offsetToFetch, sourceBroker.id, leaderStartOffset, replica.logEndOffset.messageOffset))
// Only truncate log when current leader's log start offset is greater than follower's log end offset.
if (leaderStartOffset > replica.logEndOffset.messageOffset)
replicaMgr.logManager.truncateFullyAndStartAt(topicAndPartition, leaderStartOffset)
offsetToFetch |
CSMDS-564: Fix GHA runner label for unit tests (apache#74) CSMDS-885: Jenkins unit tester action - patching api4jenkins with MavenModuleSetBuild (apache#129) CSMDS-882: Updating workflow runner to redhat8-builds (apache#128)
No description provided.