Skip to content

MINOR: Improve broker id documentation#585

Closed
granthenke wants to merge 1 commit intoapache:trunkfrom
granthenke:brokerid
Closed

MINOR: Improve broker id documentation#585
granthenke wants to merge 1 commit intoapache:trunkfrom
granthenke:brokerid

Conversation

@granthenke
Copy link
Member

No description provided.

@ijuma
Copy link
Member

ijuma commented Nov 25, 2015

LGTM

1 similar comment
@gwenshap
Copy link
Contributor

LGTM

@asfgit asfgit closed this in abf6b2e Nov 26, 2015
@granthenke granthenke deleted the brokerid branch January 19, 2016 04:36
asfgit pushed a commit that referenced this pull request Apr 3, 2017
This may be a reason why we see Jenkins jobs time out at times.
I can reproduce it locally.

With current trunk there is a possibility to run into this:

```sh
"kafka-streams-close-thread" #585 daemon prio=5 os_prio=0 tid=0x00007f66d052d800 nid=0x7e02 waiting for monitor entry [0x00007f66ae2e5000]
   java.lang.Thread.State: BLOCKED (on object monitor)
	at org.apache.kafka.streams.processor.internals.StreamThread.close(StreamThread.java:345)
	- waiting to lock <0x000000077d33c538> (a org.apache.kafka.streams.processor.internals.StreamThread)
	at org.apache.kafka.streams.KafkaStreams$1.run(KafkaStreams.java:474)
	at java.lang.Thread.run(Thread.java:745)

"appId-bd262a91-5155-4a35-bc46-c6432552c2c5-StreamThread-97" #583 prio=5 os_prio=0 tid=0x00007f66d052f000 nid=0x7e01 waiting for monitor entry [0x00007f66ae4e6000]
   java.lang.Thread.State: BLOCKED (on object monitor)
	at org.apache.kafka.streams.KafkaStreams.setState(KafkaStreams.java:219)
	- waiting to lock <0x000000077d335760> (a org.apache.kafka.streams.KafkaStreams)
	at org.apache.kafka.streams.KafkaStreams.access$100(KafkaStreams.java:117)
	at org.apache.kafka.streams.KafkaStreams$StreamStateListener.onChange(KafkaStreams.java:259)
	- locked <0x000000077d42f138> (a org.apache.kafka.streams.KafkaStreams$StreamStateListener)
	at org.apache.kafka.streams.processor.internals.StreamThread.setState(StreamThread.java:168)
	- locked <0x000000077d33c538> (a org.apache.kafka.streams.processor.internals.StreamThread)
	at org.apache.kafka.streams.processor.internals.StreamThread.setStateWhenNotInPendingShutdown(StreamThread.java:176)
	- locked <0x000000077d33c538> (a org.apache.kafka.streams.processor.internals.StreamThread)
	at org.apache.kafka.streams.processor.internals.StreamThread.access$1600(StreamThread.java:70)
	at org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsRevoked(StreamThread.java:1321)
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinPrepare(ConsumerCoordinator.java:406)
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:349)
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:310)
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:296)
	at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1037)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1002)
	at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:531)
	at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:669)
	at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:326)

```

In a nutshell: `KafkaStreams` and `StreamThread` are both
waiting for each other since another intermittent `close`
(eg. from a test) comes along also trying to lock on
`KafkaStreams` :

```sh
"main" #1 prio=5 os_prio=0 tid=0x00007f66d000c800 nid=0x78bb in Object.wait() [0x00007f66d7a15000]
   java.lang.Thread.State: WAITING (on object monitor)
	at java.lang.Object.wait(Native Method)
	at java.lang.Thread.join(Thread.java:1249)
	- locked <0x000000077d45a590> (a java.lang.Thread)
	at org.apache.kafka.streams.KafkaStreams.close(KafkaStreams.java:503)
	- locked <0x000000077d335760> (a org.apache.kafka.streams.KafkaStreams)
	at org.apache.kafka.streams.KafkaStreams.close(KafkaStreams.java:447)
	at org.apache.kafka.streams.KafkaStreamsTest.testCannotStartOnceClosed(KafkaStreamsTest.java:115)
```

=> causing a deadlock.

Fixed this by softer locking on the state change, that guarantees
atomic changes to the state but does not lock on the whole object
(I at least could not find another method that would require more
than atomicly-locked access except for `setState`).

Also qualified the state listeners with their outer-class to make
the whole code-flow around this more readable (having two
interfaces with the same naming for interface and method and then
using them between their two outer classes is crazy hard to read
imo :)).

Easy to reproduced yourself by running
`org.apache.kafka.streams.KafkaStreamsTest` in a loop for a bit
(save yourself some time by running 2-4 in parallel :)). Eventually
it will lock on one of the tests (for me this takes less than 1 min
with 4 parallel runs).

Author: Armin Braun <[email protected]>
Author: Armin <[email protected]>

Reviewers: Eno Thereska <[email protected]>, Damian Guy <[email protected]>, Ismael Juma <[email protected]>

Closes #2791 from original-brownbear/fix-streams-deadlock
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants