diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClient.java index eb61709a20..4b98772755 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClient.java @@ -35,6 +35,8 @@ import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; /** @@ -51,6 +53,7 @@ public class XceiverClient extends XceiverClientSpi { /** * Constructs a client that can communicate with the Container framework on * data nodes. + * * @param pipeline - Pipeline that defines the machines. * @param config -- Ozone Config */ @@ -91,6 +94,7 @@ public void connect() throws Exception { /** * Returns if the exceiver client connects to a server. + * * @return True if the connection is alive, false otherwise. */ @VisibleForTesting @@ -100,7 +104,7 @@ public boolean isConnected() { @Override public void close() { - if(group != null) { + if (group != null) { group.shutdownGracefully(0, 0, TimeUnit.SECONDS); } @@ -118,12 +122,35 @@ public Pipeline getPipeline() { public ContainerProtos.ContainerCommandResponseProto sendCommand( ContainerProtos.ContainerCommandRequestProto request) throws IOException { - if((channelFuture == null) || (!channelFuture.channel().isActive())) { + try { + if ((channelFuture == null) || (!channelFuture.channel().isActive())) { throw new IOException("This channel is not connected."); } XceiverClientHandler handler = channelFuture.channel().pipeline().get(XceiverClientHandler.class); - return handler.sendCommand(request); + return handler.sendCommand(request); + } catch (ExecutionException | InterruptedException e) { + throw new IOException("Unexpected exception during execution", e); + } + } + + /** + * Sends a given command to server gets a waitable future back. + * + * @param request Request + * @return Response to the command + * @throws IOException + */ + @Override + public CompletableFuture + sendCommandAsync(ContainerProtos.ContainerCommandRequestProto request) + throws IOException, ExecutionException, InterruptedException { + if ((channelFuture == null) || (!channelFuture.channel().isActive())) { + throw new IOException("This channel is not connected."); + } + XceiverClientHandler handler = + channelFuture.channel().pipeline().get(XceiverClientHandler.class); + return handler.sendCommandAsync(request); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientHandler.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientHandler.java index 23e7443cb7..99fec1680a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientHandler.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientHandler.java @@ -17,26 +17,36 @@ */ package org.apache.hadoop.scm; +import com.google.common.base.Preconditions; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos; +import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos + .ContainerCommandResponseProto; + import org.apache.hadoop.scm.container.common.helpers.Pipeline; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + /** * Netty client handler. */ public class XceiverClientHandler extends - SimpleChannelInboundHandler { + SimpleChannelInboundHandler { static final Logger LOG = LoggerFactory.getLogger(XceiverClientHandler.class); - private final BlockingQueue - responses = new LinkedBlockingQueue<>(); + private final ConcurrentMap> responses = + new ConcurrentHashMap<>(); + private final Pipeline pipeline; private volatile Channel channel; @@ -56,15 +66,24 @@ public XceiverClientHandler(Pipeline pipeline) { * .ContainerCommandResponseProto}. * * @param ctx the {@link ChannelHandlerContext} which this {@link - * SimpleChannelInboundHandler} belongs to + * SimpleChannelInboundHandler} belongs to * @param msg the message to handle * @throws Exception is thrown if an error occurred */ @Override public void channelRead0(ChannelHandlerContext ctx, - ContainerProtos.ContainerCommandResponseProto msg) + ContainerProtos.ContainerCommandResponseProto msg) throws Exception { - responses.add(msg); + Preconditions.checkNotNull(msg); + String key = msg.getTraceID(); + CompletableFuture future = + responses.remove(key); + if (future != null) { + future.complete(msg); + } else { + LOG.error("A reply received for message that was not queued. trace " + + "ID: {}", msg.getTraceID()); + } } @Override @@ -88,25 +107,39 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { * @param request - request. * @return -- response */ - public ContainerProtos.ContainerCommandResponseProto - sendCommand(ContainerProtos.ContainerCommandRequestProto request) { - ContainerProtos.ContainerCommandResponseProto response; - channel.writeAndFlush(request); - boolean interrupted = false; - for (;;) { - try { - response = responses.take(); - break; - } catch (InterruptedException ignore) { - interrupted = true; - } - } - - if (interrupted) { - Thread.currentThread().interrupt(); - } - return response; + public ContainerCommandResponseProto + sendCommand(ContainerProtos.ContainerCommandRequestProto request) + throws ExecutionException, InterruptedException { + Future future = sendCommandAsync(request); + return future.get(); } + /** + * SendCommandAsyc queues a command to the Netty Subsystem and returns a + * CompletableFuture. This Future is marked compeleted in the channelRead0 + * when the call comes back. + * @param request - Request to execute + * @return CompletableFuture + */ + public CompletableFuture + sendCommandAsync(ContainerProtos.ContainerCommandRequestProto request) { + CompletableFuture response = + new CompletableFuture<>(); + + CompletableFuture previous = + responses.putIfAbsent(request.getTraceID(), response); + + if (previous != null) { + LOG.error("Command with Trace already exists. Ignoring this command. " + + "{}. Previous Command: {}", request.getTraceID(), + previous.toString()); + throw new IllegalStateException("Duplicate trace ID. Command with this " + + "trace ID is already executing. Please ensure that " + + "trace IDs are not reused. ID: " + request.getTraceID()); + } + + channel.writeAndFlush(request); + return response; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientRatis.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientRatis.java index a1ef114b2f..8ecf6f59d6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientRatis.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientRatis.java @@ -20,6 +20,7 @@ import com.google.common.base.Preconditions; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos; import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandRequestProto; import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandResponseProto; import org.apache.ratis.RatisHelper; @@ -34,6 +35,8 @@ import java.io.IOException; import java.util.Objects; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicReference; /** @@ -103,4 +106,18 @@ public ContainerCommandResponseProto sendCommand( return ContainerCommandResponseProto.parseFrom( ShadedProtoUtil.asByteString(reply.getMessage().getContent())); } + + /** + * Sends a given command to server gets a waitable future back. + * + * @param request Request + * @return Response to the command + * @throws IOException + */ + @Override + public CompletableFuture + sendCommandAsync(ContainerCommandRequestProto request) + throws IOException, ExecutionException, InterruptedException { + throw new IOException("Not implemented"); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientSpi.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientSpi.java index b48fed04a5..aed29dcb26 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientSpi.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientSpi.java @@ -27,6 +27,8 @@ import java.io.Closeable; import java.io.IOException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicInteger; /** @@ -94,4 +96,15 @@ public int getRefcount() { */ public abstract ContainerCommandResponseProto sendCommand( ContainerCommandRequestProto request) throws IOException; + + /** + * Sends a given command to server gets a waitable future back. + * @param request Request + * @return Response to the command + * @throws IOException + */ + public abstract CompletableFuture + sendCommandAsync(ContainerCommandRequestProto request) throws IOException, + ExecutionException, InterruptedException; + } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/storage/ContainerProtocolCalls.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/storage/ContainerProtocolCalls.java index d237031a0d..3ec13b1c98 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/storage/ContainerProtocolCalls.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/storage/ContainerProtocolCalls.java @@ -59,7 +59,7 @@ * Implementation of all container protocol calls performed by Container * clients. */ -public final class ContainerProtocolCalls { +public final class ContainerProtocolCalls { /** * There is no need to instantiate this class. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java index 6db7621222..637755b4c3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java @@ -248,6 +248,7 @@ public static ContainerCommandRequestProto getWriteSmallFileRequest( ContainerCommandRequestProto.newBuilder(); request.setCmdType(ContainerProtos.Type.PutSmallFile); request.setPutSmallFile(smallFileRequest); + request.setTraceID(UUID.randomUUID().toString()); return request.build(); } @@ -517,7 +518,8 @@ public static ContainerCommandRequestProto getCloseContainer( pipeline.getProtobufMessage()).build(); ContainerProtos.ContainerCommandRequestProto cmd = ContainerCommandRequestProto.newBuilder().setCmdType(ContainerProtos - .Type.CloseContainer).setCloseContainer(closeReqeuest).build(); + .Type.CloseContainer).setCloseContainer(closeReqeuest) + .build(); return cmd; } @@ -533,7 +535,9 @@ public static ContainerCommandRequestProto getDeleteContainer( ContainerProtos.DeleteContainerRequestProto.newBuilder().setName( pipeline.getContainerName()).setPipeline( pipeline.getProtobufMessage()).setForceDelete(forceDelete).build(); - return ContainerCommandRequestProto.newBuilder().setCmdType(ContainerProtos - .Type.DeleteContainer).setDeleteContainer(deleteRequest).build(); + return ContainerCommandRequestProto.newBuilder() + .setCmdType(ContainerProtos.Type.DeleteContainer) + .setDeleteContainer(deleteRequest) + .build(); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java index 266aa7f892..754d2af5c7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java @@ -28,14 +28,17 @@ import org.apache.hadoop.scm.XceiverClient; import org.apache.hadoop.scm.XceiverClientSpi; import org.apache.hadoop.scm.container.common.helpers.Pipeline; +import org.apache.hadoop.test.GenericTestUtils; import org.junit.Assert; import org.junit.Rule; import org.junit.Test; import org.junit.rules.Timeout; import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; import java.util.Map; - +import java.util.concurrent.CompletableFuture; /** * Tests ozone containers. @@ -226,7 +229,6 @@ static void runTestBothGetandPutSmallFile( final ContainerProtos.ContainerCommandRequestProto smallFileRequest = ContainerTestHelper.getWriteSmallFileRequest( client.getPipeline(), containerName, keyName, 1024); - ContainerProtos.ContainerCommandResponseProto response = client.sendCommand(smallFileRequest); Assert.assertNotNull(response); @@ -247,6 +249,8 @@ static void runTestBothGetandPutSmallFile( } } + + @Test public void testCloseContainer() throws Exception { MiniOzoneCluster cluster = null; @@ -415,6 +419,66 @@ public void testDeleteContainer() throws Exception { } } + + // Runs a set of commands as Async calls and verifies that calls indeed worked + // as expected. + static void runAsyncTests( + String containerName, XceiverClientSpi client) throws Exception { + try { + client.connect(); + + createContainerForTesting(client, containerName); + final List computeResults = new LinkedList<>(); + int requestCount = 1000; + // Create a bunch of Async calls from this test. + for(int x = 0; x + response = client.sendCommandAsync(smallFileRequest); + computeResults.add(response); + } + + CompletableFuture combinedFuture = + CompletableFuture.allOf(computeResults.toArray( + new CompletableFuture[computeResults.size()])); + // Wait for all futures to complete. + combinedFuture.get(); + // Assert that all futures are indeed done. + for (CompletableFuture future : computeResults) { + Assert.assertTrue(future.isDone()); + } + } finally { + if (client != null) { + client.close(); + } + } + } + + @Test + public void testXcieverClientAsync() throws Exception { + MiniOzoneCluster cluster = null; + XceiverClient client = null; + try { + OzoneConfiguration conf = newOzoneConfiguration(); + + client = createClientForTesting(conf); + cluster = new MiniOzoneCluster.Builder(conf) + .setRandomContainerPort(false) + .setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED).build(); + String containerName = client.getPipeline().getContainerName(); + runAsyncTests(containerName, client); + } finally { + if (cluster != null) { + cluster.shutdown(); + } + } + } + + private static XceiverClient createClientForTesting(OzoneConfiguration conf) throws Exception { String containerName = OzoneUtils.getRequestID();