diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/OzoneClientUtils.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/OzoneClientUtils.java index 7381bc1768..1830bdfbea 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/OzoneClientUtils.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/client/OzoneClientUtils.java @@ -861,4 +861,14 @@ public static long formatDateTime(String date) throws ParseException { return ZonedDateTime.parse(date, DATE_FORMAT.get()) .toInstant().getEpochSecond(); } + + /** + * Returns the maximum no of outstanding async requests to be handled by + * Standalone and Ratis client. + */ + public static int getMaxOutstandingRequests(Configuration config) { + return config + .getInt(ScmConfigKeys.SCM_CONTAINER_CLIENT_MAX_OUTSTANDING_REQUESTS, + ScmConfigKeys.SCM_CONTAINER_CLIENT_MAX_OUTSTANDING_REQUESTS_DEFAULT); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/ScmConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/ScmConfigKeys.java index 410f39835f..b79f72b6e8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/ScmConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/ScmConfigKeys.java @@ -37,6 +37,11 @@ public final class ScmConfigKeys { public static final int SCM_CONTAINER_CLIENT_MAX_SIZE_DEFAULT = 256; + public static final String SCM_CONTAINER_CLIENT_MAX_OUTSTANDING_REQUESTS = + "scm.container.client.max.outstanding.requests"; + public static final int SCM_CONTAINER_CLIENT_MAX_OUTSTANDING_REQUESTS_DEFAULT + = 100; + public static final String DFS_CONTAINER_RATIS_ENABLED_KEY = "dfs.container.ratis.enabled"; public static final boolean DFS_CONTAINER_RATIS_ENABLED_DEFAULT diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClient.java index 60f0998d7b..bde9064213 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClient.java @@ -31,6 +31,7 @@ import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos; import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.ozone.OzoneConfigKeys; +import org.apache.hadoop.ozone.client.OzoneClientUtils; import org.apache.hadoop.scm.container.common.helpers.Pipeline; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,6 +41,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.List; +import java.util.concurrent.Semaphore; /** * A Client for the storageContainer protocol. @@ -51,6 +53,7 @@ public class XceiverClient extends XceiverClientSpi { private Channel channel; private Bootstrap b; private EventLoopGroup group; + private final Semaphore semaphore; /** * Constructs a client that can communicate with the Container framework on @@ -65,6 +68,8 @@ public XceiverClient(Pipeline pipeline, Configuration config) { Preconditions.checkNotNull(config); this.pipeline = pipeline; this.config = config; + this.semaphore = + new Semaphore(OzoneClientUtils.getMaxOutstandingRequests(config)); } @Override @@ -78,7 +83,7 @@ public void connect() throws Exception { b.group(group) .channel(NioSocketChannel.class) .handler(new LoggingHandler(LogLevel.INFO)) - .handler(new XceiverClientInitializer(this.pipeline)); + .handler(new XceiverClientInitializer(this.pipeline, semaphore)); DatanodeID leader = this.pipeline.getLeader(); // read port from the data node, on failure use default configured @@ -116,8 +121,7 @@ public Pipeline getPipeline() { @Override public ContainerProtos.ContainerCommandResponseProto sendCommand( - ContainerProtos.ContainerCommandRequestProto request) - throws IOException { + ContainerProtos.ContainerCommandRequestProto request) throws IOException { try { if ((channel == null) || (!channel.isActive())) { throw new IOException("This channel is not connected."); @@ -127,7 +131,20 @@ public ContainerProtos.ContainerCommandResponseProto sendCommand( return handler.sendCommand(request); } catch (ExecutionException | InterruptedException e) { - throw new IOException("Unexpected exception during execution", e); + /** + * In case the netty channel handler throws an exception, + * the exception thrown will be wrapped within {@link ExecutionException}. + * Unwarpping here so that original exception gets passed + * to to the client. + */ + if (e instanceof ExecutionException) { + Throwable cause = e.getCause(); + if (cause instanceof IOException) { + throw (IOException) cause; + } + } + throw new IOException( + "Unexpected exception during execution:" + e.getMessage()); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientHandler.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientHandler.java index 1d91b1463b..ac2cf1a3b3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientHandler.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientHandler.java @@ -32,11 +32,13 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Iterator; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; +import java.util.concurrent.Semaphore; /** * Netty client handler. @@ -51,15 +53,17 @@ public class XceiverClientHandler extends private final Pipeline pipeline; private volatile Channel channel; private XceiverClientMetrics metrics; + private final Semaphore semaphore; /** * Constructs a client that can communicate to a container server. */ - public XceiverClientHandler(Pipeline pipeline) { + public XceiverClientHandler(Pipeline pipeline, Semaphore semaphore) { super(false); Preconditions.checkNotNull(pipeline); this.pipeline = pipeline; this.metrics = XceiverClientManager.getXceiverClientMetrics(); + this.semaphore = semaphore; } /** @@ -83,6 +87,7 @@ public void channelRead0(ChannelHandlerContext ctx, String key = msg.getTraceID(); ResponseFuture response = responses.remove(key); + semaphore.release(); if (response != null) { response.getFuture().complete(msg); @@ -105,6 +110,12 @@ public void channelRegistered(ChannelHandlerContext ctx) { @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { LOG.info("Exception in client " + cause.toString()); + Iterator keyIterator = responses.keySet().iterator(); + while (keyIterator.hasNext()) { + ResponseFuture response = responses.remove(keyIterator.next()); + response.getFuture().completeExceptionally(cause); + semaphore.release(); + } ctx.close(); } @@ -133,7 +144,8 @@ public ContainerCommandResponseProto sendCommand( * @return CompletableFuture */ public CompletableFuture sendCommandAsync( - ContainerProtos.ContainerCommandRequestProto request) { + ContainerProtos.ContainerCommandRequestProto request) + throws InterruptedException { // Throw an exception of request doesn't have traceId if (StringUtils.isEmpty(request.getTraceID())) { @@ -152,6 +164,7 @@ public CompletableFuture sendCommandAsync( = new CompletableFuture<>(); ResponseFuture response = new ResponseFuture(future, Time.monotonicNowNanos()); + semaphore.acquire(); ResponseFuture previous = responses.putIfAbsent( request.getTraceID(), response); if (previous != null) { diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientInitializer.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientInitializer.java index fbfb7cab94..6aac9606dd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientInitializer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientInitializer.java @@ -27,19 +27,23 @@ import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos; import org.apache.hadoop.scm.container.common.helpers.Pipeline; +import java.util.concurrent.Semaphore; + /** * Setup the netty pipeline. */ public class XceiverClientInitializer extends ChannelInitializer { private final Pipeline pipeline; + private final Semaphore semaphore; /** * Constructs an Initializer for the client pipeline. * @param pipeline - Pipeline. */ - public XceiverClientInitializer(Pipeline pipeline) { + public XceiverClientInitializer(Pipeline pipeline, Semaphore semaphore) { this.pipeline = pipeline; + this.semaphore = semaphore; } /** @@ -62,7 +66,7 @@ protected void initChannel(SocketChannel ch) throws Exception { p.addLast(new ProtobufVarint32LengthFieldPrepender()); p.addLast(new ProtobufEncoder()); - p.addLast(new XceiverClientHandler(this.pipeline)); + p.addLast(new XceiverClientHandler(this.pipeline, this.semaphore)); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientRatis.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientRatis.java index 5e171b1a2e..12ee328f1f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientRatis.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientRatis.java @@ -23,6 +23,7 @@ import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandRequestProto; import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandResponseProto; import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.ozone.client.OzoneClientUtils; import org.apache.hadoop.ozone.protocol.proto.OzoneProtos; import org.apache.hadoop.scm.container.common.helpers.Pipeline; import org.apache.ratis.RatisHelper; @@ -56,19 +57,24 @@ public static XceiverClientRatis newXceiverClientRatis( final String rpcType = ozoneConf.get( ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY, ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT); + final int maxOutstandingRequests = + OzoneClientUtils.getMaxOutstandingRequests(ozoneConf); return new XceiverClientRatis(pipeline, - SupportedRpcType.valueOfIgnoreCase(rpcType)); + SupportedRpcType.valueOfIgnoreCase(rpcType), maxOutstandingRequests); } private final Pipeline pipeline; private final RpcType rpcType; private final AtomicReference client = new AtomicReference<>(); + private final int maxOutstandingRequests; /** Constructs a client. */ - private XceiverClientRatis(Pipeline pipeline, RpcType rpcType) { + private XceiverClientRatis(Pipeline pipeline, RpcType rpcType, + int maxOutStandingChunks) { super(); this.pipeline = pipeline; this.rpcType = rpcType; + this.maxOutstandingRequests = maxOutStandingChunks; } /** @@ -147,6 +153,9 @@ public void connect() throws Exception { LOG.debug("Connecting to pipeline:{} leader:{}", getPipeline().getPipelineName(), RatisHelper.toRaftPeerId(pipeline.getLeader())); + // TODO : XceiverClient ratis should pass the config value of + // maxOutstandingRequests so as to set the upper bound on max no of async + // requests to be handled by raft client if (!client.compareAndSet(null, RatisHelper.newRaftClient(rpcType, getPipeline()))) { throw new IllegalStateException("Client is already connected."); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml index 8c248ea4a5..1c8d086218 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml @@ -1096,6 +1096,16 @@ + + scm.container.client.max.outstanding.requests + 100 + OZONE, PERFORMANCE + + Controls the maximum number of outstanding async requests that can be + handled by the Standalone as well as Ratis client. + + + ozone.scm.container.creation.lease.timeout 60s