diff --git a/pom.xml b/pom.xml index d4a5b6947..c33a1de3a 100644 --- a/pom.xml +++ b/pom.xml @@ -63,7 +63,7 @@ 1.1.0 6.1.1 - 4.1.0.CR2 + 4.1.0.CR3 1.3 1.6 2.3.3 diff --git a/src/main/java/com/github/dockerjava/core/async/ResultCallbackTemplate.java b/src/main/java/com/github/dockerjava/core/async/ResultCallbackTemplate.java index edbf49827..f120012af 100644 --- a/src/main/java/com/github/dockerjava/core/async/ResultCallbackTemplate.java +++ b/src/main/java/com/github/dockerjava/core/async/ResultCallbackTemplate.java @@ -95,11 +95,11 @@ public RC_T awaitCompletion() throws InterruptedException { /** * Blocks until {@link ResultCallback#onComplete()} was called or the given timeout occurs + * @return {@code true} if completed and {@code false} if the waiting time elapsed + * before {@link ResultCallback#onComplete()} was called. */ - @SuppressWarnings("unchecked") - public RC_T awaitCompletion(long timeout, TimeUnit timeUnit) throws InterruptedException { - completed.await(timeout, timeUnit); - return (RC_T) this; + public boolean awaitCompletion(long timeout, TimeUnit timeUnit) throws InterruptedException { + return completed.await(timeout, timeUnit); } /** @@ -115,11 +115,11 @@ public RC_T awaitStarted() throws InterruptedException { /** * Blocks until {@link ResultCallback#onStart()} was called or the given timeout occurs. {@link ResultCallback#onStart()} is called when * the request was processed on the server side and the response is incoming. + * @return {@code true} if started and {@code false} if the waiting time elapsed + * before {@link ResultCallback#onStart()} was called. */ - @SuppressWarnings("unchecked") - public RC_T awaitStarted(long timeout, TimeUnit timeUnit) throws InterruptedException { - started.await(timeout, timeUnit); - return (RC_T) this; + public boolean awaitStarted(long timeout, TimeUnit timeUnit) throws InterruptedException { + return started.await(timeout, timeUnit); } @CheckForNull diff --git a/src/main/java/com/github/dockerjava/netty/ChannelProvider.java b/src/main/java/com/github/dockerjava/netty/ChannelProvider.java index c742015fc..26ceea4b4 100644 --- a/src/main/java/com/github/dockerjava/netty/ChannelProvider.java +++ b/src/main/java/com/github/dockerjava/netty/ChannelProvider.java @@ -1,7 +1,7 @@ package com.github.dockerjava.netty; -import io.netty.channel.Channel; +import io.netty.channel.socket.DuplexChannel; public interface ChannelProvider { - Channel getChannel(); + DuplexChannel getChannel(); } diff --git a/src/main/java/com/github/dockerjava/netty/DockerCmdExecFactoryImpl.java b/src/main/java/com/github/dockerjava/netty/DockerCmdExecFactoryImpl.java index c714f5cc6..1eca3a9df 100644 --- a/src/main/java/com/github/dockerjava/netty/DockerCmdExecFactoryImpl.java +++ b/src/main/java/com/github/dockerjava/netty/DockerCmdExecFactoryImpl.java @@ -102,12 +102,12 @@ import com.github.dockerjava.netty.exec.WaitContainerCmdExec; import io.netty.bootstrap.Bootstrap; -import io.netty.channel.Channel; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; import io.netty.channel.epoll.EpollDomainSocketChannel; import io.netty.channel.epoll.EpollEventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.DuplexChannel; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.channel.unix.DomainSocketAddress; @@ -165,8 +165,8 @@ public class DockerCmdExecFactoryImpl implements DockerCmdExecFactory { private ChannelProvider channelProvider = new ChannelProvider() { @Override - public Channel getChannel() { - Channel channel = connect(); + public DuplexChannel getChannel() { + DuplexChannel channel = connect(); channel.pipeline().addLast(new LoggingHandler(getClass())); return channel; } @@ -190,7 +190,7 @@ public void init(DockerClientConfig dockerClientConfig) { eventLoopGroup = nettyInitializer.init(bootstrap, dockerClientConfig); } - private Channel connect() { + private DuplexChannel connect() { try { return connect(bootstrap); } catch (InterruptedException e) { @@ -198,14 +198,14 @@ private Channel connect() { } } - private Channel connect(final Bootstrap bootstrap) throws InterruptedException { + private DuplexChannel connect(final Bootstrap bootstrap) throws InterruptedException { return nettyInitializer.connect(bootstrap); } private interface NettyInitializer { EventLoopGroup init(final Bootstrap bootstrap, DockerClientConfig dockerClientConfig); - Channel connect(final Bootstrap bootstrap) throws InterruptedException; + DuplexChannel connect(final Bootstrap bootstrap) throws InterruptedException; } private class UnixDomainSocketInitializer implements NettyInitializer { @@ -223,8 +223,8 @@ protected void initChannel(final UnixChannel channel) throws Exception { } @Override - public Channel connect(Bootstrap bootstrap) throws InterruptedException { - return bootstrap.connect(new DomainSocketAddress("/var/run/docker.sock")).sync().channel(); + public DuplexChannel connect(Bootstrap bootstrap) throws InterruptedException { + return (DuplexChannel) bootstrap.connect(new DomainSocketAddress("/var/run/docker.sock")).sync().channel(); } } @@ -253,7 +253,7 @@ protected void initChannel(final SocketChannel channel) throws Exception { } @Override - public Channel connect(Bootstrap bootstrap) throws InterruptedException { + public DuplexChannel connect(Bootstrap bootstrap) throws InterruptedException { String host = dockerClientConfig.getDockerHost().getHost(); int port = dockerClientConfig.getDockerHost().getPort(); @@ -261,7 +261,7 @@ public Channel connect(Bootstrap bootstrap) throws InterruptedException { throw new RuntimeException("no port configured for " + host); } - Channel channel = bootstrap.connect(host, port).sync().channel(); + DuplexChannel channel = (DuplexChannel) bootstrap.connect(host, port).sync().channel(); if (dockerClientConfig.getDockerTlsVerify()) { final SslHandler ssl = initSsl(dockerClientConfig); diff --git a/src/main/java/com/github/dockerjava/netty/InvocationBuilder.java b/src/main/java/com/github/dockerjava/netty/InvocationBuilder.java index 70737eeac..9669d6818 100644 --- a/src/main/java/com/github/dockerjava/netty/InvocationBuilder.java +++ b/src/main/java/com/github/dockerjava/netty/InvocationBuilder.java @@ -4,6 +4,7 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; +import io.netty.channel.socket.DuplexChannel; import io.netty.handler.codec.http.DefaultFullHttpRequest; import io.netty.handler.codec.http.DefaultHttpRequest; import io.netty.handler.codec.http.FullHttpRequest; @@ -150,7 +151,7 @@ public void get(TypeReference typeReference, ResultCallback resultCall return; } - private Channel getChannel() { + private DuplexChannel getChannel() { return channelProvider.getChannel(); } @@ -215,7 +216,7 @@ public void post(final Object entity, final InputStream stdin, final ResultCallb FramedResponseStreamHandler streamHandler = new FramedResponseStreamHandler(resultCallback); - final Channel channel = getChannel(); + final DuplexChannel channel = getChannel(); // result callback's close() method must be called when the servers closes the connection channel.closeFuture().addListener(new GenericFutureListener>() { @@ -262,6 +263,9 @@ public void run() { channel.writeAndFlush(Unpooled.copiedBuffer(buffer, 0, read)); } + // we close the writing side of the socket, but keep the read side open to transfer stdout/stderr + channel.shutdownOutput(); + } }).start(); } diff --git a/src/test/java/com/github/dockerjava/core/command/AttachContainerCmdImplTest.java b/src/test/java/com/github/dockerjava/core/command/AttachContainerCmdImplTest.java index be2d5fdd0..21507e44e 100644 --- a/src/test/java/com/github/dockerjava/core/command/AttachContainerCmdImplTest.java +++ b/src/test/java/com/github/dockerjava/core/command/AttachContainerCmdImplTest.java @@ -68,7 +68,8 @@ public void onNext(Frame frame) { }; dockerClient.attachContainerCmd(container.getId()).withStdErr(true).withStdOut(true).withFollowStream(true) - .withLogs(true).exec(callback).awaitCompletion(30, TimeUnit.SECONDS).close(); + .withLogs(true).exec(callback).awaitCompletion(30, TimeUnit.SECONDS); + callback.close(); assertThat(callback.toString(), containsString(snippet)); } @@ -97,7 +98,8 @@ public void onNext(Frame frame) { }; dockerClient.attachContainerCmd(container.getId()).withStdErr(true).withStdOut(true).withFollowStream(true) - .exec(callback).awaitCompletion(15, TimeUnit.SECONDS).close(); + .exec(callback).awaitCompletion(15, TimeUnit.SECONDS); + callback.close(); System.out.println("log: " + callback.toString()); @@ -130,7 +132,8 @@ public void onNext(Frame frame) { InputStream stdin = new ByteArrayInputStream("".getBytes()); dockerClient.attachContainerCmd(container.getId()).withStdErr(true).withStdOut(true).withFollowStream(true) - .withLogs(true).withStdIn(stdin).exec(callback).awaitCompletion(30, TimeUnit.SECONDS).close(); + .withLogs(true).withStdIn(stdin).exec(callback).awaitCompletion(30, TimeUnit.SECONDS); + callback.close(); } public static class AttachContainerTestCallback extends AttachContainerResultCallback { diff --git a/src/test/java/com/github/dockerjava/netty/exec/AttachContainerCmdExecTest.java b/src/test/java/com/github/dockerjava/netty/exec/AttachContainerCmdExecTest.java index fe34aa537..7f8e66609 100644 --- a/src/test/java/com/github/dockerjava/netty/exec/AttachContainerCmdExecTest.java +++ b/src/test/java/com/github/dockerjava/netty/exec/AttachContainerCmdExecTest.java @@ -70,7 +70,8 @@ public void onNext(Frame frame) { }; dockerClient.attachContainerCmd(container.getId()).withStdErr(true).withStdOut(true).withFollowStream(true) - .withLogs(true).exec(callback).awaitCompletion(10, TimeUnit.SECONDS).close(); + .withLogs(true).exec(callback).awaitCompletion(10, TimeUnit.SECONDS); + callback.close(); assertThat(callback.toString(), containsString(snippet)); } @@ -104,7 +105,8 @@ public void onNext(Frame frame) { InputStream stdin = new ByteArrayInputStream((snippet + "\n").getBytes()); dockerClient.attachContainerCmd(container.getId()).withStdErr(true).withStdOut(true).withFollowStream(true) - .withStdIn(stdin).exec(callback).awaitCompletion(2, TimeUnit.SECONDS).close(); + .withStdIn(stdin).exec(callback).awaitCompletion(2, TimeUnit.SECONDS); + callback.close(); assertThat(callback.toString(), containsString(snippet)); } @@ -133,7 +135,8 @@ public void onNext(Frame frame) { }; dockerClient.attachContainerCmd(container.getId()).withStdErr(true).withStdOut(true).withFollowStream(true) - .exec(callback).awaitCompletion(10, TimeUnit.SECONDS).close(); + .exec(callback).awaitCompletion(10, TimeUnit.SECONDS); + callback.close(); // HexDump.dump(collectFramesCallback.toString().getBytes(), 0, System.out, 0); diff --git a/src/test/java/com/github/dockerjava/netty/exec/ExecStartCmdExecTest.java b/src/test/java/com/github/dockerjava/netty/exec/ExecStartCmdExecTest.java index 6436cd5da..137c7c579 100644 --- a/src/test/java/com/github/dockerjava/netty/exec/ExecStartCmdExecTest.java +++ b/src/test/java/com/github/dockerjava/netty/exec/ExecStartCmdExecTest.java @@ -115,9 +115,10 @@ public void execStartAttachStdin() throws Exception { ExecCreateCmdResponse execCreateCmdResponse = dockerClient.execCreateCmd(container.getId()) .withAttachStdout(true).withAttachStdin(true).withCmd("cat").exec(); - dockerClient.execStartCmd(execCreateCmdResponse.getId()).withDetach(false).withTty(true).withStdIn(stdin) + boolean completed = dockerClient.execStartCmd(execCreateCmdResponse.getId()).withDetach(false).withTty(true).withStdIn(stdin) .exec(new ExecStartResultCallback(stdout, System.err)).awaitCompletion(5, TimeUnit.SECONDS); + assertTrue(completed, "The process was not finished."); assertEquals(stdout.toString("UTF-8"), "STDIN\n"); } @@ -138,9 +139,10 @@ public void execStartNotAttachedStdin() throws Exception { ExecCreateCmdResponse execCreateCmdResponse = dockerClient.execCreateCmd(container.getId()) .withAttachStdout(true).withAttachStdin(false).withCmd("/bin/sh").exec(); - dockerClient.execStartCmd(execCreateCmdResponse.getId()).withDetach(false).withStdIn(stdin) + boolean completed = dockerClient.execStartCmd(execCreateCmdResponse.getId()).withDetach(false).withStdIn(stdin) .exec(new ExecStartResultCallback(stdout, System.err)).awaitCompletion(5, TimeUnit.SECONDS); + assertTrue(completed, "The process was not finished."); assertEquals(stdout.toString(), ""); } }