MINOR: Fix Deadlock in StreamThread#2791
MINOR: Fix Deadlock in StreamThread#2791original-brownbear wants to merge 2 commits intoapache:trunkfrom
Conversation
|
Refer to this link for build results (access rights to CI server needed): |
|
Refer to this link for build results (access rights to CI server needed): |
|
@hachikuji @ijuma fyi :) We def. have an issue here in my opinion (bottom of description has a straightforward reproducer), let me know what you think about the solution :) |
|
Thanks for the PR, cc @dguy @guozhangwang |
|
Refer to this link for build results (access rights to CI server needed): |
mjsax
left a comment
There was a problem hiding this comment.
Very nice finding @original-brownbear ! Just some code style nit pick. Overall LGTM.
| if (stateListener != null) { | ||
| stateListener.onChange(state, oldState); | ||
| private void setState(final State newState) { | ||
| synchronized (this.stateLock) { |
| synchronized (this.stateLock) { | ||
| final State oldState = state; | ||
| if (!state.isValidTransition(newState)) { | ||
| log.warn( |
| } catch (RuntimeException t) { | ||
| log.error("{} Failed while executing {} {} due to {}: ", | ||
| StreamThread.this.logPrefix, | ||
| this.logPrefix, |
|
@mjsax thanks for taking a look! Fixed the codestyle issues :) |
|
Refer to this link for build results (access rights to CI server needed): |
|
Refer to this link for build results (access rights to CI server needed): |
|
Since @guozhangwang is away on holiday and this is an important fix (since it fixes one cause of hung Jenkins builds), I'll review and merge it after we get a second opinion from someone familiar with the Streams code, cc @enothereska @dguy. |
|
Refer to this link for build results (access rights to CI server needed): |
|
LGTM thanks. |
| private class StreamStateListener implements StreamThread.StateListener { | ||
| private final class StreamStateListener implements StreamThread.StateListener { | ||
| @Override | ||
| public synchronized void onChange(final StreamThread thread, |
There was a problem hiding this comment.
@dguy @enothereska This synchronized here seems suspicious. Is it really the aim to synchronize on the listener instance when updating variables like threadState? Seems like a bug.
There was a problem hiding this comment.
It looks like there should only be a single StreamStateListener and threadState should be a member.
There was a problem hiding this comment.
Thanks @dguy. That makes sense. @original-brownbear, maybe you can do a follow-up that does that.
There was a problem hiding this comment.
@ijuma put it on my todos, will get to this in about ~12h :)
|
One more thing: please keep in mind that the PR description becomes the commit message. It's good to aim for a clear and concise description of the issue. |
|
@ijuma got it, will be kept in mind for the next PR :) |
I think this may be the (or on of them) reason we see Jenkins jobs time out at times. At least I can reproduce this to cause tests to time out with a certain rate.
With current trunk there is a possibility to run into this:
In a nutshell:
KafkaStreamsandStreamThreadare both waiting for each other since another intermittendclose(eg. from a test) comes along also trying to lock onKafkaStreams:=> causing a deadlock.
Fixed this by softer locking on the state change, that guarantees atomic changes to the state but does not lock on the whole object (I at least could not find another method that would require more than atomicly-locked access except for
setState).Also qualified the state listeners with their outer-class to make the whole code-flow around this more readable (having two interfaces with the same naming for interface and method and then using them between their two outer classes is crazy hard to get imo :)).
Easy to reproduced yourself by running
org.apache.kafka.streams.KafkaStreamsTestin a loop for a bit (save yourself some time by running 2-4 in parallel :)). Eventually it will lock on one of the tests (for me this takes less than 1 min with 4 parallel runs).