Skip to content

Commit c83c7b4

Browse files
committed
FIX: pending reads no longer block socket writes.
1 parent 4f98bc5 commit c83c7b4

6 files changed

Lines changed: 51 additions & 155 deletions

File tree

build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
group 'com.softwareverde'
2-
version '2.2.0'
2+
version '2.2.1'
33

44
apply plugin: 'java'
55
apply plugin: 'java-library'

src/main/java/com/softwareverde/http/HttpRequestExecutionThread.java

Lines changed: 31 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,37 @@ class HttpRequestExecutionThread extends Thread {
2727
protected final Integer _redirectCount;
2828
protected HttpURLConnection _connection;
2929

30+
protected Socket _extractConnectionSocket() {
31+
Object httpConnectionHolder = null;
32+
try {
33+
httpConnectionHolder = ((_connection instanceof HttpsURLConnection) ? ReflectionUtil.getValue(_connection, "delegate") : _connection);
34+
final Object httpClient = ReflectionUtil.getValue(httpConnectionHolder, "http");
35+
return ReflectionUtil.getValue(httpClient, "serverSocket");
36+
}
37+
catch (final Exception exception1) {
38+
if (httpConnectionHolder == null) {
39+
throw new RuntimeException("Unable to obtain connection socket via reflection", exception1);
40+
}
41+
try {
42+
// unable to get standard http server socket, check for OkHttp implementation
43+
final Object httpEngine = ReflectionUtil.getValue(httpConnectionHolder, "httpEngine");
44+
final Object streamAllocation = ReflectionUtil.getValue(httpEngine, "streamAllocation");
45+
final Object realConnection = ReflectionUtil.getValue(streamAllocation, "connection");
46+
return (Socket) ReflectionUtil.getValue(realConnection, "socket");
47+
}
48+
catch (final Exception exception2) {
49+
Logger.debug("Unable to get connection socket (1/2)", exception1);
50+
Logger.debug("Unable to get connection socket (2/2)", exception2);
51+
throw new RuntimeException("Unable to obtain connection socket via reflection");
52+
}
53+
}
54+
}
55+
56+
protected void _configureRequestForWebSocketUpgrade() {
57+
_httpRequest.setAllowWebSocketUpgrade(true);
58+
_httpRequest.setHeader("Upgrade", "websocket");
59+
}
60+
3061
public HttpRequestExecutionThread(final String httpRequestUrl, final HttpRequest httpRequest, final HttpRequest.Callback callback, final Integer redirectCount) {
3162
_httpRequestUrl = httpRequestUrl;
3263
_httpRequest = httpRequest;
@@ -194,37 +225,6 @@ public void run() {
194225
}
195226
}
196227

197-
private Socket _extractConnectionSocket() {
198-
Object httpConnectionHolder = null;
199-
try {
200-
httpConnectionHolder = ((_connection instanceof HttpsURLConnection) ? ReflectionUtil.getValue(_connection, "delegate") : _connection);
201-
final Object httpClient = ReflectionUtil.getValue(httpConnectionHolder, "http");
202-
return ReflectionUtil.getValue(httpClient, "serverSocket");
203-
}
204-
catch (final Exception exception1) {
205-
if (httpConnectionHolder == null) {
206-
throw new RuntimeException("Unable to obtain connection socket via reflection", exception1);
207-
}
208-
try {
209-
// unable to get standard http server socket, check for OkHttp implementation
210-
final Object httpEngine = ReflectionUtil.getValue(httpConnectionHolder, "httpEngine");
211-
final Object streamAllocation = ReflectionUtil.getValue(httpEngine, "streamAllocation");
212-
final Object realConnection = ReflectionUtil.getValue(streamAllocation, "connection");
213-
return (Socket) ReflectionUtil.getValue(realConnection, "socket");
214-
}
215-
catch (final Exception exception2) {
216-
Logger.debug("Unable to get connection socket (1/2)", exception1);
217-
Logger.debug("Unable to get connection socket (2/2)", exception2);
218-
throw new RuntimeException("Unable to obtain connection socket via reflection");
219-
}
220-
}
221-
}
222-
223-
private void _configureRequestForWebSocketUpgrade() {
224-
_httpRequest.setAllowWebSocketUpgrade(true);
225-
_httpRequest.setHeader("Upgrade", "websocket");
226-
}
227-
228228
public void cancel() {
229229
final HttpURLConnection connection = _connection;
230230
if (connection == null) { return; }

src/main/java/com/softwareverde/http/websocket/SocketStreams.java

Lines changed: 0 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -64,26 +64,11 @@ public void shutdownOutput() {
6464
_shutdown();
6565
}
6666

67-
// @Override
68-
// public boolean isOutputShutdown() {
69-
// return _isShutdown.get();
70-
// }
71-
72-
// @Override
73-
// public void shutdownInput() throws IOException {
74-
// _shutdown();
75-
// }
76-
7767
@Override
7868
public boolean isInputShutdown() {
7969
return _isShutdown.get();
8070
}
8171

82-
// @Override
83-
// public void close() throws IOException {
84-
// _shutdown();
85-
// }
86-
8772
@Override
8873
public int fill(final Buffer buffer) {
8974

@@ -115,78 +100,8 @@ public int flush(final Buffer buffer) throws IOException {
115100
}
116101
}
117102

118-
// @Override
119-
// public int flush(final Buffer header, final Buffer buffer, final Buffer trailer) throws IOException {
120-
// return (this.flush(header) + this.flush(buffer) + this.flush(trailer));
121-
// }
122-
123-
// @Override
124-
// public String getLocalAddr() {
125-
// return _socket.getLocalAddress().getHostAddress();
126-
// }
127-
128-
// @Override
129-
// public String getLocalHost() {
130-
// return _socket.getLocalAddress().getHostName();
131-
// }
132-
133-
// @Override
134-
// public int getLocalPort() {
135-
// return _socket.getLocalPort();
136-
// }
137-
138-
// @Override
139-
// public String getRemoteAddr() {
140-
// return _socket.getInetAddress().getHostAddress();
141-
// }
142-
143-
// @Override
144-
// public String getRemoteHost() {
145-
// return _socket.getInetAddress().getHostName();
146-
// }
147-
148-
// @Override
149-
// public int getRemotePort() {
150-
// return _socket.getPort();
151-
// }
152-
153-
@Override
154-
public boolean isBlocking() {
155-
return true;
156-
}
157-
158-
// @Override
159-
// public boolean blockReadable(final long millisecs) {
160-
// return true;
161-
// }
162-
163-
@Override
164-
public boolean blockWritable(final long millisecs) {
165-
return true;
166-
}
167-
168103
@Override
169104
public boolean isOpen() {
170105
return (! _isShutdown.get());
171106
}
172-
173-
// @Override
174-
// public Object getTransport() {
175-
// return null;
176-
// }
177-
178-
// @Override
179-
// public void flush() throws IOException {
180-
// _outputStream.flush();
181-
// }
182-
183-
@Override
184-
public int getMaxIdleTime() {
185-
return Integer.MAX_VALUE;
186-
}
187-
188-
@Override
189-
public void setMaxIdleTime(final int timeMs) throws IOException {
190-
_socket.setSoTimeout((timeMs < 0 ? 0 : timeMs));
191-
}
192107
}

src/main/java/com/softwareverde/http/websocket/WebSocket.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,16 @@ public Long getId() {
155155
return _webSocketId;
156156
}
157157

158+
public void setSocketTimeout(final Integer socketTimeoutMs) {
159+
final Socket socket = _connectionLayer.getSocket();
160+
try {
161+
socket.setSoTimeout(socketTimeoutMs);
162+
}
163+
catch (final Exception exception) {
164+
// Nothing.
165+
}
166+
}
167+
158168
public void startListening() {
159169
try { // Disable SO_TIMEOUT to remove read-lag since reads are not being ignored.
160170
final Socket socket = _connectionLayer.getSocket();
@@ -165,7 +175,9 @@ public void startListening() {
165175
socket.setSoTimeout(0);
166176
}
167177
}
168-
catch (final Exception exception) { }
178+
catch (final Exception exception) {
179+
// Nothing.
180+
}
169181

170182
_webSocketReader.start();
171183
}

src/main/java/org/eclipse/jetty/io/EndPoint.java

Lines changed: 3 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,9 @@
1717
//
1818
// Modifications:
1919
// 2019 - Software Verde, LLC
20-
// Removed all unused methods.
20+
// Removed unused methods.
21+
// 2020 - Software Verde, LLC
22+
// Removed unused methods.
2123
//
2224

2325
package org.eclipse.jetty.io;
@@ -61,31 +63,7 @@ public interface EndPoint
6163
*/
6264
int flush(Buffer buffer) throws IOException;
6365

64-
/* ------------------------------------------------------------ */
65-
public boolean isBlocking();
66-
67-
/* ------------------------------------------------------------ */
68-
public boolean blockWritable(long millisecs) throws IOException;
69-
7066
/* ------------------------------------------------------------ */
7167
public boolean isOpen();
7268

73-
/* ------------------------------------------------------------ */
74-
/** Get the max idle time in ms.
75-
* <p>The max idle time is the time the endpoint can be idle before
76-
* extraordinary handling takes place. This loosely corresponds to
77-
* the {@link java.net.Socket#getSoTimeout()} for blocking connections,
78-
* but {@link AsyncEndPoint} implementations must use other mechanisms
79-
* to implement the max idle time.
80-
* @return the max idle time in ms or if ms <= 0 implies an infinite timeout
81-
*/
82-
public int getMaxIdleTime();
83-
84-
/* ------------------------------------------------------------ */
85-
/** Set the max idle time.
86-
* @param timeMs the max idle time in MS. Timeout <= 0 implies an infinite timeout
87-
* @throws IOException if the timeout cannot be set.
88-
*/
89-
public void setMaxIdleTime(int timeMs) throws IOException;
90-
9169
}

src/main/java/org/eclipse/jetty/websocket/WebSocketGeneratorRFC6455.java

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,9 @@
1515
// You may elect to redistribute this code under either of these licenses.
1616
// ========================================================================
1717
//
18+
// 2020 - Software Verde, LLC
19+
// FIX: pending reads no longer block socket writes.
20+
//
1821

1922
package org.eclipse.jetty.websocket;
2023

@@ -239,21 +242,9 @@ public int flush() throws IOException
239242
return 0;
240243

241244
int result = flushBuffer();
242-
if (!_endp.isBlocking())
243245
{
244-
long now = System.currentTimeMillis();
245-
long end = now + _endp.getMaxIdleTime();
246246
while (_buffer.length() > 0)
247247
{
248-
boolean ready = _endp.blockWritable(end - now);
249-
if (!ready)
250-
{
251-
now = System.currentTimeMillis();
252-
if (now < end)
253-
continue;
254-
throw new IOException("Write timeout");
255-
}
256-
257248
result += flushBuffer();
258249
}
259250
}

0 commit comments

Comments
 (0)