Skip to content

KAFKA-2718: Prevent temp directory being reused in parallel test runs#583

Closed
rajinisivaram wants to merge 3 commits intoapache:trunkfrom
rajinisivaram:KAFKA-2718-v2
Closed

KAFKA-2718: Prevent temp directory being reused in parallel test runs#583
rajinisivaram wants to merge 3 commits intoapache:trunkfrom
rajinisivaram:KAFKA-2718-v2

Conversation

@rajinisivaram
Copy link
Contributor

Use Files.createTempDirectory to avoid reuse, for log directories create a new temp directory as parent

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Keep it 1000 to be on the safer side?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@guozhangwang
Copy link
Contributor

@rajinisivaram Does Files.createTempDirectory guarantee atomic uniqueness? I cannot find it in the Java docs though it claims createTempFile does.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is relying on a convention, maybe worth keeping the shutdown hook.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@rajinisivaram
Copy link
Contributor Author

@guozhangwang @ijuma Thank you for reviewing the changes. Files.createTempDirectory uses Files.createDirectory in a loop under the covers. And the docs for Files.createDirectory says:

Creates a new and empty file, failing if the file already exists. The check for the existence of the file and the creation of the new file if it does not exist are a single operation that is atomic with respect to all other filesystem activities that might affect the directory.

@guozhangwang
Copy link
Contributor

@rajinisivaram @ijuma I am a bit concerned about having more than one concurrent shutdown hook thread trying to delete the file since File.delete() may not be atomic.

@ijuma
Copy link
Member

ijuma commented Nov 25, 2015

@guozhangwang @rajinisivaram Maybe we could rename tempPartitionLogDir to randomPartitionLogDir, remove the shutdown hook and document that deletion must be done by the caller.

@rajinisivaram
Copy link
Contributor Author

@guozhangwang @ijuma Have made the change suggested by @ijuma above.

@guozhangwang
Copy link
Contributor

LGTM.

@asfgit asfgit closed this in 8ed271b Nov 25, 2015
asfgit pushed a commit that referenced this pull request Nov 25, 2015
Use Files.createTempDirectory to avoid reuse, for log directories create a new temp directory as parent

Author: Rajini Sivaram <[email protected]>

Reviewers: Ismael Juma, Guozhang Wang

Closes #583 from rajinisivaram/KAFKA-2718-v2

(cherry picked from commit 8ed271b)
Signed-off-by: Guozhang Wang <[email protected]>
asfgit pushed a commit that referenced this pull request Apr 3, 2017
This may be a reason why we see Jenkins jobs time out at times.
I can reproduce it locally.

With current trunk there is a possibility to run into this:

```sh
"kafka-streams-close-thread" #585 daemon prio=5 os_prio=0 tid=0x00007f66d052d800 nid=0x7e02 waiting for monitor entry [0x00007f66ae2e5000]
   java.lang.Thread.State: BLOCKED (on object monitor)
	at org.apache.kafka.streams.processor.internals.StreamThread.close(StreamThread.java:345)
	- waiting to lock <0x000000077d33c538> (a org.apache.kafka.streams.processor.internals.StreamThread)
	at org.apache.kafka.streams.KafkaStreams$1.run(KafkaStreams.java:474)
	at java.lang.Thread.run(Thread.java:745)

"appId-bd262a91-5155-4a35-bc46-c6432552c2c5-StreamThread-97" #583 prio=5 os_prio=0 tid=0x00007f66d052f000 nid=0x7e01 waiting for monitor entry [0x00007f66ae4e6000]
   java.lang.Thread.State: BLOCKED (on object monitor)
	at org.apache.kafka.streams.KafkaStreams.setState(KafkaStreams.java:219)
	- waiting to lock <0x000000077d335760> (a org.apache.kafka.streams.KafkaStreams)
	at org.apache.kafka.streams.KafkaStreams.access$100(KafkaStreams.java:117)
	at org.apache.kafka.streams.KafkaStreams$StreamStateListener.onChange(KafkaStreams.java:259)
	- locked <0x000000077d42f138> (a org.apache.kafka.streams.KafkaStreams$StreamStateListener)
	at org.apache.kafka.streams.processor.internals.StreamThread.setState(StreamThread.java:168)
	- locked <0x000000077d33c538> (a org.apache.kafka.streams.processor.internals.StreamThread)
	at org.apache.kafka.streams.processor.internals.StreamThread.setStateWhenNotInPendingShutdown(StreamThread.java:176)
	- locked <0x000000077d33c538> (a org.apache.kafka.streams.processor.internals.StreamThread)
	at org.apache.kafka.streams.processor.internals.StreamThread.access$1600(StreamThread.java:70)
	at org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsRevoked(StreamThread.java:1321)
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinPrepare(ConsumerCoordinator.java:406)
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:349)
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:310)
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:296)
	at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1037)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1002)
	at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:531)
	at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:669)
	at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:326)

```

In a nutshell: `KafkaStreams` and `StreamThread` are both
waiting for each other since another intermittent `close`
(eg. from a test) comes along also trying to lock on
`KafkaStreams` :

```sh
"main" #1 prio=5 os_prio=0 tid=0x00007f66d000c800 nid=0x78bb in Object.wait() [0x00007f66d7a15000]
   java.lang.Thread.State: WAITING (on object monitor)
	at java.lang.Object.wait(Native Method)
	at java.lang.Thread.join(Thread.java:1249)
	- locked <0x000000077d45a590> (a java.lang.Thread)
	at org.apache.kafka.streams.KafkaStreams.close(KafkaStreams.java:503)
	- locked <0x000000077d335760> (a org.apache.kafka.streams.KafkaStreams)
	at org.apache.kafka.streams.KafkaStreams.close(KafkaStreams.java:447)
	at org.apache.kafka.streams.KafkaStreamsTest.testCannotStartOnceClosed(KafkaStreamsTest.java:115)
```

=> causing a deadlock.

Fixed this by softer locking on the state change, that guarantees
atomic changes to the state but does not lock on the whole object
(I at least could not find another method that would require more
than atomicly-locked access except for `setState`).

Also qualified the state listeners with their outer-class to make
the whole code-flow around this more readable (having two
interfaces with the same naming for interface and method and then
using them between their two outer classes is crazy hard to read
imo :)).

Easy to reproduced yourself by running
`org.apache.kafka.streams.KafkaStreamsTest` in a loop for a bit
(save yourself some time by running 2-4 in parallel :)). Eventually
it will lock on one of the tests (for me this takes less than 1 min
with 4 parallel runs).

Author: Armin Braun <[email protected]>
Author: Armin <[email protected]>

Reviewers: Eno Thereska <[email protected]>, Damian Guy <[email protected]>, Ismael Juma <[email protected]>

Closes #2791 from original-brownbear/fix-streams-deadlock
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants