Skip to content

Commit c1478fa

Browse files
committed
Handle canceled keys in UDP client
#129
1 parent 75fcff1 commit c1478fa

7 files changed

Lines changed: 197 additions & 35 deletions

File tree

pom.xml

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -299,6 +299,10 @@
299299
<artifactId>maven-surefire-plugin</artifactId>
300300
<version>3.5.4</version>
301301
<configuration>
302+
<argLine>
303+
@{argLine}
304+
-Djava.util.logging.config.file=${project.build.testOutputDirectory}/logging.properties
305+
</argLine>
302306
<rerunFailingTestsCount>3</rerunFailingTestsCount>
303307
<redirectTestOutputToFile>true</redirectTestOutputToFile>
304308
<includes>
@@ -655,6 +659,12 @@
655659
<version>${slf4j.version}</version>
656660
<scope>test</scope>
657661
</dependency>
662+
<dependency>
663+
<groupId>org.slf4j</groupId>
664+
<artifactId>jul-to-slf4j</artifactId>
665+
<version>${slf4j.version}</version>
666+
<scope>test</scope>
667+
</dependency>
658668
<dependency>
659669
<groupId>io.vertx</groupId>
660670
<artifactId>vertx-core</artifactId>
@@ -852,6 +862,7 @@
852862
<configuration>
853863
<argLine>
854864
@{argLine}
865+
-Djava.util.logging.config.file=${project.build.testOutputDirectory}/logging.properties
855866
--add-opens java.base/sun.net.dns=ALL-UNNAMED
856867
</argLine>
857868

@@ -981,6 +992,7 @@
981992
<configuration>
982993
<argLine>
983994
@{argLine}
995+
-Djava.util.logging.config.file=${project.build.testOutputDirectory}/logging.properties
984996
--enable-native-access=ALL-UNNAMED
985997
--add-opens java.base/sun.net.dns=ALL-UNNAMED
986998
-javaagent:${net.bytebuddy:byte-buddy-agent:jar}

src/main/java/org/xbill/DNS/NioClient.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -44,11 +44,11 @@ public abstract class NioClient {
4444
private static PacketLogger packetLogger = null;
4545

4646
private static final Runnable[] TIMEOUT_TASKS = new Runnable[2];
47+
private static final Runnable[] CLOSE_TASKS = new Runnable[2];
4748

48-
private static Consumer<Selector> TCP_REGISTRATIONS_TASK;
49-
private static Consumer<Selector> UDP_REGISTRATIONS_TASK;
49+
private static Consumer<Selector> tcpRegistrationsTask;
50+
private static Consumer<Selector> udpRegistrationsTask;
5051

51-
private static final Runnable[] CLOSE_TASKS = new Runnable[2];
5252
private static Thread selectorThread;
5353
private static Thread closeThread;
5454
private static volatile Selector selector;
@@ -206,9 +206,9 @@ static void setTimeoutTask(Runnable r, boolean isTcpClient) {
206206

207207
static void setRegistrationsTask(Consumer<Selector> r, boolean isTcpClient) {
208208
if (isTcpClient) {
209-
TCP_REGISTRATIONS_TASK = r;
209+
tcpRegistrationsTask = r;
210210
} else {
211-
UDP_REGISTRATIONS_TASK = r;
211+
udpRegistrationsTask = r;
212212
}
213213
}
214214

@@ -236,11 +236,11 @@ private static void runTasks(Runnable[] runnables) {
236236
}
237237

238238
private static void runRegistrationTasks() {
239-
Consumer<Selector> tcpTask = TCP_REGISTRATIONS_TASK;
239+
Consumer<Selector> tcpTask = tcpRegistrationsTask;
240240
if (tcpTask != null) {
241241
tcpTask.accept(selector);
242242
}
243-
Consumer<Selector> udpTask = UDP_REGISTRATIONS_TASK;
243+
Consumer<Selector> udpTask = udpRegistrationsTask;
244244
if (udpTask != null) {
245245
udpTask.accept(selector);
246246
}

src/main/java/org/xbill/DNS/NioTcpClient.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ private void closeTcp() {
7474
}
7575

7676
@RequiredArgsConstructor
77-
private static class Transaction {
77+
private static final class Transaction {
7878
private final Message query;
7979
private final byte[] queryData;
8080
private final long endTime;
@@ -148,9 +148,11 @@ public void processReadyKey(SelectionKey key) {
148148
processWrite(key);
149149
}
150150
if (key.isReadable()) {
151-
processRead();
151+
processRead(key);
152152
}
153153
}
154+
} else {
155+
handleTransactionException(new EOFException("Invalid key"));
154156
}
155157
}
156158

@@ -187,15 +189,17 @@ private void processConnect(SelectionKey key) {
187189
key.interestOps(SelectionKey.OP_WRITE);
188190
} catch (IOException e) {
189191
handleChannelException(e);
192+
key.cancel();
190193
}
191194
}
192195

193-
private void processRead() {
196+
private void processRead(SelectionKey key) {
194197
try {
195198
if (readState == 0) {
196199
int read = channel.read(responseLengthData);
197200
if (read < 0) {
198201
handleChannelException(new EOFException());
202+
key.cancel();
199203
return;
200204
}
201205

@@ -211,12 +215,14 @@ private void processRead() {
211215
int read = channel.read(responseData);
212216
if (read < 0) {
213217
handleChannelException(new EOFException());
218+
key.cancel();
214219
return;
215220
} else if (responseData.hasRemaining()) {
216221
return;
217222
}
218223
} catch (IOException e) {
219224
handleChannelException(e);
225+
key.cancel();
220226
return;
221227
}
222228

@@ -269,6 +275,7 @@ private void processWrite(SelectionKey key) {
269275
} catch (IOException e) {
270276
t.f.completeExceptionally(e);
271277
it.remove();
278+
key.cancel();
272279
}
273280
}
274281

src/main/java/org/xbill/DNS/NioUdpClient.java

Lines changed: 41 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import java.net.SocketTimeoutException;
99
import java.nio.ByteBuffer;
1010
import java.nio.channels.DatagramChannel;
11+
import java.nio.channels.NotYetConnectedException;
1112
import java.nio.channels.SelectionKey;
1213
import java.nio.channels.Selector;
1314
import java.security.SecureRandom;
@@ -82,7 +83,7 @@ private void checkTransactionTimeouts() {
8283
}
8384

8485
@RequiredArgsConstructor
85-
private class Transaction implements KeyProcessor {
86+
private final class Transaction implements KeyProcessor {
8687
private final int id;
8788
private final byte[] data;
8889
private final int max;
@@ -109,9 +110,16 @@ void send() throws IOException {
109110

110111
@Override
111112
public void processReadyKey(SelectionKey key) {
113+
if (!key.isValid()) {
114+
completeExceptionally(new EOFException("Key for transaction " + id + " is invalid"));
115+
pendingTransactions.remove(this);
116+
return;
117+
}
118+
112119
if (!key.isReadable()) {
113120
completeExceptionally(new EOFException("Key for transaction " + id + " is not readable"));
114121
pendingTransactions.remove(this);
122+
key.cancel();
115123
return;
116124
}
117125

@@ -121,11 +129,12 @@ public void processReadyKey(SelectionKey key) {
121129
try {
122130
read = keyChannel.read(buffer);
123131
if (read <= 0) {
124-
throw new EOFException();
132+
throw new EOFException("Could not read expected data for transaction " + id);
125133
}
126-
} catch (IOException e) {
134+
} catch (IOException | NotYetConnectedException e) {
127135
completeExceptionally(e);
128136
pendingTransactions.remove(this);
137+
key.cancel();
129138
return;
130139
}
131140

@@ -137,6 +146,7 @@ public void processReadyKey(SelectionKey key) {
137146
keyChannel.socket().getLocalSocketAddress(),
138147
keyChannel.socket().getRemoteSocketAddress(),
139148
resultingData);
149+
key.cancel();
140150
silentDisconnectAndCloseChannel();
141151
f.complete(resultingData);
142152
pendingTransactions.remove(this);
@@ -177,28 +187,8 @@ public CompletableFuture<byte[]> sendAndReceiveUdp(
177187
Transaction t = new Transaction(query.getHeader().getID(), data, max, endTime, channel, f);
178188
if (local == null || local.getPort() == 0) {
179189
boolean bound = false;
180-
for (int i = 0; i < 1024; i++) {
181-
try {
182-
InetSocketAddress addr = null;
183-
if (local == null) {
184-
if (prng != null) {
185-
addr = new InetSocketAddress(prng.nextInt(ephemeralRange) + ephemeralStart);
186-
}
187-
} else {
188-
int port = local.getPort();
189-
if (port == 0 && prng != null) {
190-
port = prng.nextInt(ephemeralRange) + ephemeralStart;
191-
}
192-
193-
addr = new InetSocketAddress(local.getAddress(), port);
194-
}
195-
196-
channel.bind(addr);
197-
bound = true;
198-
break;
199-
} catch (SocketException e) {
200-
// ignore, we'll try another random port
201-
}
190+
for (int i = 0; i < 1024 && !bound; i++) {
191+
bound = tryBindToSocket(local, channel);
202192
}
203193

204194
if (!bound) {
@@ -223,6 +213,32 @@ public CompletableFuture<byte[]> sendAndReceiveUdp(
223213
return f;
224214
}
225215

216+
private boolean tryBindToSocket(InetSocketAddress local, DatagramChannel channel)
217+
throws IOException {
218+
try {
219+
InetSocketAddress address = null;
220+
if (local == null) {
221+
if (prng != null) {
222+
address = new InetSocketAddress(prng.nextInt(ephemeralRange) + ephemeralStart);
223+
}
224+
} else {
225+
int port = local.getPort();
226+
if (port == 0 && prng != null) {
227+
port = prng.nextInt(ephemeralRange) + ephemeralStart;
228+
}
229+
230+
address = new InetSocketAddress(local.getAddress(), port);
231+
}
232+
233+
channel.bind(address);
234+
return true;
235+
} catch (SocketException e) {
236+
// ignore, we'll try another random port
237+
}
238+
239+
return false;
240+
}
241+
226242
private static void silentCloseChannel(DatagramChannel channel) {
227243
if (channel != null) {
228244
try {
Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
// SPDX-License-Identifier: BSD-3-Clause
2+
package org.xbill.DNS;
3+
4+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
5+
import static org.mockito.ArgumentMatchers.any;
6+
import static org.mockito.Mockito.doReturn;
7+
import static org.mockito.Mockito.spy;
8+
import static org.mockito.Mockito.when;
9+
10+
import io.vertx.core.Vertx;
11+
import io.vertx.core.datagram.DatagramSocket;
12+
import io.vertx.core.net.SocketAddress;
13+
import io.vertx.junit5.VertxExtension;
14+
import io.vertx.junit5.VertxTestContext;
15+
import java.io.EOFException;
16+
import java.io.IOException;
17+
import java.net.InetSocketAddress;
18+
import java.nio.ByteBuffer;
19+
import java.nio.channels.DatagramChannel;
20+
import java.nio.channels.SelectionKey;
21+
import java.nio.channels.Selector;
22+
import java.time.Duration;
23+
import java.util.HashSet;
24+
import java.util.Set;
25+
import java.util.concurrent.CompletableFuture;
26+
import org.junit.jupiter.api.AfterAll;
27+
import org.junit.jupiter.api.BeforeAll;
28+
import org.junit.jupiter.api.Test;
29+
import org.junit.jupiter.api.extension.ExtendWith;
30+
import org.mockito.MockedStatic;
31+
import org.mockito.Mockito;
32+
33+
@ExtendWith(VertxExtension.class)
34+
@SuppressWarnings("unchecked")
35+
class NioUdpClientTest {
36+
private static SocketAddress localAddress;
37+
38+
@BeforeAll
39+
static void beforeAll(Vertx vertx, VertxTestContext context) {
40+
DatagramSocket datagramSocket = vertx.createDatagramSocket();
41+
datagramSocket.handler(
42+
p -> datagramSocket.send(p.data(), p.sender().port(), p.sender().host()));
43+
datagramSocket
44+
.listen(0, "localhost")
45+
.map(
46+
s -> {
47+
localAddress = s.localAddress();
48+
return null;
49+
})
50+
.onComplete(context.succeedingThenComplete());
51+
}
52+
53+
@AfterAll
54+
static void afterAll() {
55+
NioClient.close();
56+
}
57+
58+
private CompletableFuture<byte[]> createAndSendQuery() {
59+
NioUdpClient udp = new NioUdpClient();
60+
Message query = Message.newQuery(Record.newRecord(Name.root, Type.A, DClass.IN));
61+
return udp.sendAndReceiveUdp(
62+
null,
63+
new InetSocketAddress(localAddress.hostAddress(), localAddress.port()),
64+
query,
65+
query.toWire(),
66+
65535,
67+
Duration.ofSeconds(10));
68+
}
69+
70+
@Test
71+
void selectorWithAllCanceledKey() throws IOException {
72+
Selector spiedSelector = spy(Selector.open());
73+
when(spiedSelector.selectedKeys())
74+
.thenAnswer(
75+
a -> {
76+
Set<SelectionKey> keys = (Set<SelectionKey>) a.callRealMethod();
77+
for (SelectionKey key : keys) {
78+
key.cancel();
79+
}
80+
return keys;
81+
});
82+
83+
try (MockedStatic<Selector> sel = Mockito.mockStatic(Selector.class)) {
84+
sel.when(Selector::open).thenReturn(spiedSelector);
85+
CompletableFuture<byte[]> result = createAndSendQuery();
86+
assertThatThrownBy(result::get).hasCauseInstanceOf(EOFException.class);
87+
}
88+
}
89+
90+
@Test
91+
void readFromKeyFailsFuture() throws IOException {
92+
Selector spiedSelector = spy(Selector.open());
93+
when(spiedSelector.selectedKeys())
94+
.thenAnswer(
95+
selectedKeysIntercept -> {
96+
Set<SelectionKey> keys = (Set<SelectionKey>) selectedKeysIntercept.callRealMethod();
97+
Set<SelectionKey> mockedKeys = new HashSet<>(keys.size());
98+
for (SelectionKey key : keys) {
99+
SelectionKey spy = spy(key);
100+
when(spy.channel())
101+
.thenAnswer(
102+
channelIntercept -> {
103+
DatagramChannel channel =
104+
(DatagramChannel) channelIntercept.callRealMethod();
105+
DatagramChannel spyChannel = spy(channel);
106+
doReturn(0).when(spyChannel).read(any(ByteBuffer.class));
107+
return spyChannel;
108+
});
109+
mockedKeys.add(spy);
110+
}
111+
112+
return mockedKeys;
113+
});
114+
115+
try (MockedStatic<Selector> sel = Mockito.mockStatic(Selector.class)) {
116+
sel.when(Selector::open).thenReturn(spiedSelector);
117+
CompletableFuture<byte[]> result = createAndSendQuery();
118+
assertThatThrownBy(result::get)
119+
.cause()
120+
.isInstanceOf(EOFException.class)
121+
.hasMessageStartingWith("Could not read expected data");
122+
}
123+
}
124+
}
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
handlers=org.slf4j.bridge.SLF4JBridgeHandler

src/test/resources/simplelogger.properties

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,3 +3,5 @@ org.slf4j.simpleLogger.dateTimeFormat=yyyy-MM-dd HH:mm:ss:SSS Z
33
org.slf4j.simpleLogger.showDateTime=true
44
org.slf4j.simpleLogger.log.org.xbill.DNS.Name=info
55
org.slf4j.simpleLogger.log.org.xbill.DNS.Compression=info
6+
org.slf4j.simpleLogger.log.io.netty=info
7+
org.slf4j.simpleLogger.log.io.vertx=info

0 commit comments

Comments
 (0)