From 369a54b2359f0d3ad5bf6a7a694da15ce0689577 Mon Sep 17 00:00:00 2001 From: Alexander Koshevoy Date: Fri, 8 Jul 2016 18:25:24 +0300 Subject: [PATCH 1/7] close() of HttpResponseInputStream is not an implementation of InputStream.close() but rather a separate method --- .../netty/handler/HttpResponseStreamHandler.java | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/src/main/java/com/github/dockerjava/netty/handler/HttpResponseStreamHandler.java b/src/main/java/com/github/dockerjava/netty/handler/HttpResponseStreamHandler.java index 706228d04..9dba7e98c 100644 --- a/src/main/java/com/github/dockerjava/netty/handler/HttpResponseStreamHandler.java +++ b/src/main/java/com/github/dockerjava/netty/handler/HttpResponseStreamHandler.java @@ -32,13 +32,13 @@ protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Excep @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { - stream.close(); + stream.writeComplete(); super.channelReadComplete(ctx); } public static class HttpResponseInputStream extends InputStream { - private AtomicBoolean closed = new AtomicBoolean(false); + private AtomicBoolean writeCompleted = new AtomicBoolean(false); private LinkedTransferQueue queue = new LinkedTransferQueue(); @@ -48,10 +48,8 @@ public void write(ByteBuf byteBuf) { queue.put(byteBuf); } - @Override - public void close() throws IOException { - closed.set(true); - super.close(); + public void writeComplete() { + writeCompleted.set(true); } @Override @@ -75,7 +73,7 @@ public int read() throws IOException { poll(); if (readableBytes() == 0) { - if (closed.get()) { + if (writeCompleted.get()) { return -1; } } From 71692bb03e952ef44d3430ec16bd9f52a72a416b Mon Sep 17 00:00:00 2001 From: Alexander Koshevoy Date: Fri, 8 Jul 2016 18:27:34 +0300 Subject: [PATCH 2/7] Fix for #624 --- .../handler/HttpResponseStreamHandler.java | 23 +++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/src/main/java/com/github/dockerjava/netty/handler/HttpResponseStreamHandler.java b/src/main/java/com/github/dockerjava/netty/handler/HttpResponseStreamHandler.java index 9dba7e98c..f3d7d382d 100644 --- a/src/main/java/com/github/dockerjava/netty/handler/HttpResponseStreamHandler.java +++ b/src/main/java/com/github/dockerjava/netty/handler/HttpResponseStreamHandler.java @@ -52,6 +52,21 @@ public void writeComplete() { writeCompleted.set(true); } + @Override + public void close() throws IOException { + releaseCurrent(); + releaseQueued(); + super.close(); + } + + private void releaseQueued() { + ByteBuf byteBuf = queue.poll(); + while (byteBuf != null) { + byteBuf.release(); + byteBuf = queue.poll(); + } + } + @Override public int available() throws IOException { poll(); @@ -88,11 +103,19 @@ public int read() throws IOException { private void poll() { if (readableBytes() == 0) { try { + releaseCurrent(); current = queue.poll(50, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { throw new RuntimeException(e); } } } + + private void releaseCurrent() { + if (current != null) { + current.release(); + current = null; + } + } } } From 9969922845d12f344af4c8a8158b3c0a8a5d1d83 Mon Sep 17 00:00:00 2001 From: Alexander Koshevoy Date: Mon, 29 Aug 2016 15:14:31 +0300 Subject: [PATCH 3/7] Fix for #679 HttpResponseInputStream reworked not to queue incoming byte buffers. This queuing leads to the OutOfDirectMemoryError for sure when buffer elements are not processed quite fast. --- .../core/async/AsyncResultCallback.java | 104 +++++++++++++ .../dockerjava/netty/InvocationBuilder.java | 5 +- .../handler/HttpResponseStreamHandler.java | 144 ++++++++++++------ .../netty/exec/SaveImageCmdExecTest.java | 5 +- .../HttpResponseStreamHandlerTest.java | 2 +- 5 files changed, 211 insertions(+), 49 deletions(-) create mode 100644 src/main/java/com/github/dockerjava/core/async/AsyncResultCallback.java diff --git a/src/main/java/com/github/dockerjava/core/async/AsyncResultCallback.java b/src/main/java/com/github/dockerjava/core/async/AsyncResultCallback.java new file mode 100644 index 000000000..329982cee --- /dev/null +++ b/src/main/java/com/github/dockerjava/core/async/AsyncResultCallback.java @@ -0,0 +1,104 @@ +package com.github.dockerjava.core.async; + +import java.io.Closeable; +import java.io.IOException; +import java.util.concurrent.CountDownLatch; + +import com.github.dockerjava.api.async.ResultCallback; +import com.google.common.base.Throwables; + +/** + * Implementation of {@link ResultCallback} with the single result event expected. + */ +public class AsyncResultCallback implements ResultCallback { + + private A_RES_T result = null; + + private final CountDownLatch resultReady = new CountDownLatch(1); + + private Closeable stream; + + private boolean closed = false; + + private Throwable firstError = null; + + @Override + public void onStart(Closeable stream) { + this.stream = stream; + closed = false; + } + + @Override + public void onNext(A_RES_T object) { + onResult(object); + } + + private void onResult(A_RES_T object) { + if (resultReady.getCount() == 0) { + throw new IllegalStateException("Result has already been set"); + } + + try { + result = object; + } finally { + resultReady.countDown(); + } + } + + @Override + public void onError(Throwable throwable) { + if (closed) return; + + if (firstError == null) { + firstError = throwable; + } + + try { + close(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public void onComplete() { + try { + close(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public void close() throws IOException { + try { + closed = true; + if (stream != null) { + stream.close(); + } + } finally { + resultReady.countDown(); + } + } + + /** + * Blocks until {@link ResultCallback#onNext(Object)} was called for the first time + */ + @SuppressWarnings("unchecked") + public A_RES_T awaitResult() { + try { + resultReady.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + rethrowFirstError(); + return result; + } + + private void rethrowFirstError() { + if (firstError != null) { + // this call throws a RuntimeException + throw Throwables.propagate(firstError); + } + } +} diff --git a/src/main/java/com/github/dockerjava/netty/InvocationBuilder.java b/src/main/java/com/github/dockerjava/netty/InvocationBuilder.java index d76918bcb..36e041d86 100644 --- a/src/main/java/com/github/dockerjava/netty/InvocationBuilder.java +++ b/src/main/java/com/github/dockerjava/netty/InvocationBuilder.java @@ -34,6 +34,7 @@ import com.github.dockerjava.api.async.ResultCallback; import com.github.dockerjava.api.exception.DockerClientException; import com.github.dockerjava.api.model.Frame; +import com.github.dockerjava.core.async.AsyncResultCallback; import com.github.dockerjava.core.async.ResultCallbackTemplate; import com.github.dockerjava.netty.handler.FramedResponseStreamHandler; import com.github.dockerjava.netty.handler.HttpConnectionHijackHandler; @@ -203,7 +204,7 @@ public InputStream post(final Object entity) { Channel channel = getChannel(); - ResponseCallback callback = new ResponseCallback(); + AsyncResultCallback callback = new AsyncResultCallback<>(); HttpResponseHandler responseHandler = new HttpResponseHandler(requestProvider, callback); HttpResponseStreamHandler streamHandler = new HttpResponseStreamHandler(callback); @@ -454,7 +455,7 @@ public InputStream get() { Channel channel = getChannel(); - ResponseCallback resultCallback = new ResponseCallback(); + AsyncResultCallback resultCallback = new AsyncResultCallback<>(); HttpResponseHandler responseHandler = new HttpResponseHandler(requestProvider, resultCallback); diff --git a/src/main/java/com/github/dockerjava/netty/handler/HttpResponseStreamHandler.java b/src/main/java/com/github/dockerjava/netty/handler/HttpResponseStreamHandler.java index f3d7d382d..22035faa9 100644 --- a/src/main/java/com/github/dockerjava/netty/handler/HttpResponseStreamHandler.java +++ b/src/main/java/com/github/dockerjava/netty/handler/HttpResponseStreamHandler.java @@ -6,9 +6,6 @@ import java.io.IOException; import java.io.InputStream; -import java.util.concurrent.LinkedTransferQueue; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; import com.github.dockerjava.api.async.ResultCallback; @@ -19,58 +16,87 @@ */ public class HttpResponseStreamHandler extends SimpleChannelInboundHandler { - private HttpResponseInputStream stream = new HttpResponseInputStream(); + private ResultCallback resultCallback; + + private final HttpResponseInputStream stream = new HttpResponseInputStream(); public HttpResponseStreamHandler(ResultCallback resultCallback) { - resultCallback.onNext(stream); + this.resultCallback = resultCallback; } @Override protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { + invokeCallbackOnFirstRead(); + stream.write(msg.copy()); } + private void invokeCallbackOnFirstRead() { + if (resultCallback != null) { + resultCallback.onNext(stream); + resultCallback = null; + } + } + @Override - public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { + public void channelInactive(ChannelHandlerContext ctx) throws Exception { stream.writeComplete(); - super.channelReadComplete(ctx); + + super.channelInactive(ctx); } public static class HttpResponseInputStream extends InputStream { - private AtomicBoolean writeCompleted = new AtomicBoolean(false); + private boolean writeCompleted = false; - private LinkedTransferQueue queue = new LinkedTransferQueue(); + private boolean closed = false; private ByteBuf current = null; - public void write(ByteBuf byteBuf) { - queue.put(byteBuf); + private final Object lock = new Object(); + + public void write(ByteBuf byteBuf) throws InterruptedException { + synchronized (lock) { + if (closed) { + return; + } + while (current != null) { + lock.wait(); + + if (closed) { + return; + } + } + current = byteBuf; + + lock.notifyAll(); + } } public void writeComplete() { - writeCompleted.set(true); + synchronized (lock) { + writeCompleted = true; + + lock.notifyAll(); + } } @Override public void close() throws IOException { - releaseCurrent(); - releaseQueued(); - super.close(); - } + synchronized (lock) { + closed = true; + releaseCurrent(); - private void releaseQueued() { - ByteBuf byteBuf = queue.poll(); - while (byteBuf != null) { - byteBuf.release(); - byteBuf = queue.poll(); + lock.notifyAll(); } } @Override public int available() throws IOException { - poll(); - return readableBytes(); + synchronized (lock) { + poll(0); + return readableBytes(); + } } private int readableBytes() { @@ -79,42 +105,72 @@ private int readableBytes() { } else { return 0; } - } @Override public int read() throws IOException { + byte[] b = new byte[1]; + int n = read(b, 0, 1); + return n != -1 ? b[0] : -1; + } - poll(); + @Override + public int read(byte[] b, int off, int len) throws IOException { + synchronized (lock) { + off = poll(off); - if (readableBytes() == 0) { - if (writeCompleted.get()) { - return -1; + if (closed) { + throw new IOException("Stream closed"); } - } - if (current != null && current.readableBytes() > 0) { - return current.readByte() & 0xff; - } else { - return read(); + if (current == null) { + return -1; + } else { + int availableBytes = Math.min(len, current.readableBytes() - off); + current.readBytes(b, off, availableBytes); + return availableBytes; + } } } - private void poll() { - if (readableBytes() == 0) { - try { - releaseCurrent(); - current = queue.poll(50, TimeUnit.MILLISECONDS); - } catch (InterruptedException e) { - throw new RuntimeException(e); + private int poll(int off) { + synchronized (lock) { + while (readableBytes() <= off) { + try { + off -= releaseCurrent(); + if (writeCompleted) { + return off; + } + while (current == null) { + lock.wait(); + + if (closed) { + return off; + } + if (writeCompleted && current == null) { + return off; + } + } + } catch (InterruptedException e) { + throw new RuntimeException(e); + } } + return off; } } - private void releaseCurrent() { - if (current != null) { - current.release(); - current = null; + private int releaseCurrent() { + synchronized (lock) { + if (current != null) { + int n = current.readableBytes(); + current.release(); + current = null; + + lock.notifyAll(); + + return n; + } + return 0; } } } diff --git a/src/test/java/com/github/dockerjava/netty/exec/SaveImageCmdExecTest.java b/src/test/java/com/github/dockerjava/netty/exec/SaveImageCmdExecTest.java index 0527b793f..a2fab38a7 100644 --- a/src/test/java/com/github/dockerjava/netty/exec/SaveImageCmdExecTest.java +++ b/src/test/java/com/github/dockerjava/netty/exec/SaveImageCmdExecTest.java @@ -47,8 +47,9 @@ public void afterMethod(ITestResult result) { @Test public void saveImage() throws Exception { - InputStream image = IOUtils.toBufferedInputStream(dockerClient.saveImageCmd("busybox").exec()); - assertThat(image.available(), greaterThan(0)); + try (InputStream image = dockerClient.saveImageCmd("busybox").exec()) { + assertThat(image.available(), greaterThan(0)); + } } diff --git a/src/test/java/com/github/dockerjava/netty/handler/HttpResponseStreamHandlerTest.java b/src/test/java/com/github/dockerjava/netty/handler/HttpResponseStreamHandlerTest.java index 6652f3eba..c32d2157a 100644 --- a/src/test/java/com/github/dockerjava/netty/handler/HttpResponseStreamHandlerTest.java +++ b/src/test/java/com/github/dockerjava/netty/handler/HttpResponseStreamHandlerTest.java @@ -25,7 +25,7 @@ public void testNoBytesSkipped() throws Exception { ChannelHandlerContext ctx = Mockito.mock(ChannelHandlerContext.class); ByteBuf buffer = generateByteBuf(); streamHandler.channelRead0(ctx, buffer); - streamHandler.channelReadComplete(ctx); + streamHandler.channelInactive(ctx); assertTrue(IOUtils.contentEquals(callback.getInputStream(), new ByteBufInputStream(buffer))); } From 7a672dddd6b04ff9d4bc5e13920dc286d8ba6f62 Mon Sep 17 00:00:00 2001 From: Alexander Koshevoy Date: Tue, 30 Aug 2016 11:41:45 +0300 Subject: [PATCH 4/7] HttpResponseInputStream.read() method covered --- .../HttpResponseStreamHandlerTest.java | 30 ++++++++++++++++--- 1 file changed, 26 insertions(+), 4 deletions(-) diff --git a/src/test/java/com/github/dockerjava/netty/handler/HttpResponseStreamHandlerTest.java b/src/test/java/com/github/dockerjava/netty/handler/HttpResponseStreamHandlerTest.java index c32d2157a..f55d6df54 100644 --- a/src/test/java/com/github/dockerjava/netty/handler/HttpResponseStreamHandlerTest.java +++ b/src/test/java/com/github/dockerjava/netty/handler/HttpResponseStreamHandlerTest.java @@ -1,13 +1,15 @@ package com.github.dockerjava.netty.handler; +import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertTrue; -import java.io.InputStream; - import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufInputStream; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; + +import java.io.InputStream; + import org.apache.commons.io.IOUtils; import org.mockito.Mockito; import org.testng.annotations.Test; @@ -27,7 +29,27 @@ public void testNoBytesSkipped() throws Exception { streamHandler.channelRead0(ctx, buffer); streamHandler.channelInactive(ctx); - assertTrue(IOUtils.contentEquals(callback.getInputStream(), new ByteBufInputStream(buffer))); + try (InputStream inputStream = callback.getInputStream()) { + assertTrue(IOUtils.contentEquals(inputStream, new ByteBufInputStream(buffer))); + } + } + + @Test + public void testReadByteByByte() throws Exception { + ResultCallbackTest callback = new ResultCallbackTest(); + HttpResponseStreamHandler streamHandler = new HttpResponseStreamHandler(callback); + ChannelHandlerContext ctx = Mockito.mock(ChannelHandlerContext.class); + ByteBuf buffer = generateByteBuf(); + streamHandler.channelRead0(ctx, buffer); + streamHandler.channelInactive(ctx); + + try (InputStream inputStream = callback.getInputStream()) { + for (int i = 0; i < buffer.readableBytes(); i++) { + int b = inputStream.read(); + assertEquals(b, buffer.getByte(i)); + } + assertTrue(inputStream.read() == -1); + } } private ByteBuf generateByteBuf() { @@ -46,7 +68,7 @@ public void onNext(InputStream stream) { this.stream = stream; } - public InputStream getInputStream() { + private InputStream getInputStream() { return stream; } } From 321cff74af6631e4404db37defc325d4ba7d040e Mon Sep 17 00:00:00 2001 From: Alexander Koshevoy Date: Wed, 31 Aug 2016 11:45:00 +0300 Subject: [PATCH 5/7] Stuck on HttpResponseInputStream.read() after stream is closed fixed --- .../netty/handler/HttpResponseStreamHandler.java | 12 ++++++------ .../netty/handler/HttpResponseStreamHandlerTest.java | 12 ++++++++++++ 2 files changed, 18 insertions(+), 6 deletions(-) diff --git a/src/main/java/com/github/dockerjava/netty/handler/HttpResponseStreamHandler.java b/src/main/java/com/github/dockerjava/netty/handler/HttpResponseStreamHandler.java index 22035faa9..596334640 100644 --- a/src/main/java/com/github/dockerjava/netty/handler/HttpResponseStreamHandler.java +++ b/src/main/java/com/github/dockerjava/netty/handler/HttpResponseStreamHandler.java @@ -119,10 +119,6 @@ public int read(byte[] b, int off, int len) throws IOException { synchronized (lock) { off = poll(off); - if (closed) { - throw new IOException("Stream closed"); - } - if (current == null) { return -1; } else { @@ -133,10 +129,14 @@ public int read(byte[] b, int off, int len) throws IOException { } } - private int poll(int off) { + private int poll(int off) throws IOException { synchronized (lock) { while (readableBytes() <= off) { try { + if (closed) { + throw new IOException("Stream closed"); + } + off -= releaseCurrent(); if (writeCompleted) { return off; @@ -145,7 +145,7 @@ private int poll(int off) { lock.wait(); if (closed) { - return off; + throw new IOException("Stream closed"); } if (writeCompleted && current == null) { return off; diff --git a/src/test/java/com/github/dockerjava/netty/handler/HttpResponseStreamHandlerTest.java b/src/test/java/com/github/dockerjava/netty/handler/HttpResponseStreamHandlerTest.java index f55d6df54..7b41e0237 100644 --- a/src/test/java/com/github/dockerjava/netty/handler/HttpResponseStreamHandlerTest.java +++ b/src/test/java/com/github/dockerjava/netty/handler/HttpResponseStreamHandlerTest.java @@ -1,5 +1,6 @@ package com.github.dockerjava.netty.handler; +import static com.github.dockerjava.netty.handler.HttpResponseStreamHandler.HttpResponseInputStream; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertTrue; @@ -8,6 +9,7 @@ import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; +import java.io.IOException; import java.io.InputStream; import org.apache.commons.io.IOUtils; @@ -52,6 +54,16 @@ public void testReadByteByByte() throws Exception { } } + @Test(expectedExceptions = IOException.class) + public void testReadClosedResponseStream() throws Exception { + HttpResponseInputStream inputStream = new HttpResponseInputStream(); + ByteBuf buffer = generateByteBuf(); + + inputStream.write(buffer); + inputStream.close(); + inputStream.read(); + } + private ByteBuf generateByteBuf() { byte[] array = new byte[256]; for (int i = 0; i < array.length; i++) { From ab230cbed2136957e16942a640a2fe5124855d35 Mon Sep 17 00:00:00 2001 From: Alexander Koshevoy Date: Wed, 31 Aug 2016 11:46:08 +0300 Subject: [PATCH 6/7] Logic on the closed state of HttpResponseInputStream covered --- .../HttpResponseStreamHandlerTest.java | 46 +++++++++++++++++++ 1 file changed, 46 insertions(+) diff --git a/src/test/java/com/github/dockerjava/netty/handler/HttpResponseStreamHandlerTest.java b/src/test/java/com/github/dockerjava/netty/handler/HttpResponseStreamHandlerTest.java index 7b41e0237..eea9ddc0f 100644 --- a/src/test/java/com/github/dockerjava/netty/handler/HttpResponseStreamHandlerTest.java +++ b/src/test/java/com/github/dockerjava/netty/handler/HttpResponseStreamHandlerTest.java @@ -11,6 +11,10 @@ import java.io.IOException; import java.io.InputStream; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import org.apache.commons.io.IOUtils; import org.mockito.Mockito; @@ -54,6 +58,48 @@ public void testReadByteByByte() throws Exception { } } + @Test + public void testCloseResponseStreamBeforeWrite() throws Exception { + HttpResponseInputStream inputStream = new HttpResponseInputStream(); + ByteBuf buffer = generateByteBuf(); + + inputStream.write(buffer); + inputStream.close(); + inputStream.write(buffer); + } + + @Test + public void testCloseResponseStreamOnWrite() throws Exception { + final HttpResponseInputStream inputStream = new HttpResponseInputStream(); + + final ByteBuf buffer = generateByteBuf(); + + final CountDownLatch firstWrite = new CountDownLatch(1); + + ExecutorService executor = Executors.newSingleThreadExecutor(); + Future submit = executor.submit(new Runnable() { + @Override + public void run() { + try { + inputStream.write(buffer); + firstWrite.countDown(); + inputStream.write(buffer); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + }); + + firstWrite.await(); + assertTrue(inputStream.available() > 0); + + // second write should have started + Thread.sleep(500L); + inputStream.close(); + + submit.get(); + } + @Test(expectedExceptions = IOException.class) public void testReadClosedResponseStream() throws Exception { HttpResponseInputStream inputStream = new HttpResponseInputStream(); From aac58a74dfe4bfa96401ba6c87879a642d578b61 Mon Sep 17 00:00:00 2001 From: tejksat Date: Sat, 3 Sep 2016 14:49:50 +0300 Subject: [PATCH 7/7] AsyncResultCallback class moved to InvocationBuilder --- .../core/async/AsyncResultCallback.java | 104 ------------------ .../dockerjava/netty/InvocationBuilder.java | 53 ++++++++- 2 files changed, 52 insertions(+), 105 deletions(-) delete mode 100644 src/main/java/com/github/dockerjava/core/async/AsyncResultCallback.java diff --git a/src/main/java/com/github/dockerjava/core/async/AsyncResultCallback.java b/src/main/java/com/github/dockerjava/core/async/AsyncResultCallback.java deleted file mode 100644 index 329982cee..000000000 --- a/src/main/java/com/github/dockerjava/core/async/AsyncResultCallback.java +++ /dev/null @@ -1,104 +0,0 @@ -package com.github.dockerjava.core.async; - -import java.io.Closeable; -import java.io.IOException; -import java.util.concurrent.CountDownLatch; - -import com.github.dockerjava.api.async.ResultCallback; -import com.google.common.base.Throwables; - -/** - * Implementation of {@link ResultCallback} with the single result event expected. - */ -public class AsyncResultCallback implements ResultCallback { - - private A_RES_T result = null; - - private final CountDownLatch resultReady = new CountDownLatch(1); - - private Closeable stream; - - private boolean closed = false; - - private Throwable firstError = null; - - @Override - public void onStart(Closeable stream) { - this.stream = stream; - closed = false; - } - - @Override - public void onNext(A_RES_T object) { - onResult(object); - } - - private void onResult(A_RES_T object) { - if (resultReady.getCount() == 0) { - throw new IllegalStateException("Result has already been set"); - } - - try { - result = object; - } finally { - resultReady.countDown(); - } - } - - @Override - public void onError(Throwable throwable) { - if (closed) return; - - if (firstError == null) { - firstError = throwable; - } - - try { - close(); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - @Override - public void onComplete() { - try { - close(); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - @Override - public void close() throws IOException { - try { - closed = true; - if (stream != null) { - stream.close(); - } - } finally { - resultReady.countDown(); - } - } - - /** - * Blocks until {@link ResultCallback#onNext(Object)} was called for the first time - */ - @SuppressWarnings("unchecked") - public A_RES_T awaitResult() { - try { - resultReady.await(); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - rethrowFirstError(); - return result; - } - - private void rethrowFirstError() { - if (firstError != null) { - // this call throws a RuntimeException - throw Throwables.propagate(firstError); - } - } -} diff --git a/src/main/java/com/github/dockerjava/netty/InvocationBuilder.java b/src/main/java/com/github/dockerjava/netty/InvocationBuilder.java index 36e041d86..26b950e4e 100644 --- a/src/main/java/com/github/dockerjava/netty/InvocationBuilder.java +++ b/src/main/java/com/github/dockerjava/netty/InvocationBuilder.java @@ -27,6 +27,7 @@ import java.io.InputStream; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.CountDownLatch; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; @@ -34,7 +35,6 @@ import com.github.dockerjava.api.async.ResultCallback; import com.github.dockerjava.api.exception.DockerClientException; import com.github.dockerjava.api.model.Frame; -import com.github.dockerjava.core.async.AsyncResultCallback; import com.github.dockerjava.core.async.ResultCallbackTemplate; import com.github.dockerjava.netty.handler.FramedResponseStreamHandler; import com.github.dockerjava.netty.handler.HttpConnectionHijackHandler; @@ -76,6 +76,57 @@ public void onNext(Void object) { } } + /** + * Implementation of {@link ResultCallback} with the single result event expected. + */ + public static class AsyncResultCallback + extends ResultCallbackTemplate, A_RES_T> { + + private A_RES_T result = null; + + private final CountDownLatch resultReady = new CountDownLatch(1); + + @Override + public void onNext(A_RES_T object) { + onResult(object); + } + + private void onResult(A_RES_T object) { + if (resultReady.getCount() == 0) { + throw new IllegalStateException("Result has already been set"); + } + + try { + result = object; + } finally { + resultReady.countDown(); + } + } + + @Override + public void close() throws IOException { + try { + super.close(); + } finally { + resultReady.countDown(); + } + } + + /** + * Blocks until {@link ResultCallback#onNext(Object)} was called for the first time + */ + @SuppressWarnings("unchecked") + public A_RES_T awaitResult() { + try { + resultReady.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + getFirstError(); + return result; + } + } + private ChannelProvider channelProvider; private String resource;