HDFS-12890 . Ozone: XceiverClient should have upper bound on async requests. Contributed by Shashikant Banerjee.

This commit is contained in:
Anu Engineer 2017-12-18 14:27:45 -08:00 committed by Owen O'Malley
parent 2f3068bc72
commit fd09c2ce5b
7 changed files with 78 additions and 10 deletions

View File

@ -861,4 +861,14 @@ public static long formatDateTime(String date) throws ParseException {
return ZonedDateTime.parse(date, DATE_FORMAT.get()) return ZonedDateTime.parse(date, DATE_FORMAT.get())
.toInstant().getEpochSecond(); .toInstant().getEpochSecond();
} }
/**
* Returns the maximum no of outstanding async requests to be handled by
* Standalone and Ratis client.
*/
public static int getMaxOutstandingRequests(Configuration config) {
return config
.getInt(ScmConfigKeys.SCM_CONTAINER_CLIENT_MAX_OUTSTANDING_REQUESTS,
ScmConfigKeys.SCM_CONTAINER_CLIENT_MAX_OUTSTANDING_REQUESTS_DEFAULT);
}
} }

View File

@ -37,6 +37,11 @@ public final class ScmConfigKeys {
public static final int SCM_CONTAINER_CLIENT_MAX_SIZE_DEFAULT = public static final int SCM_CONTAINER_CLIENT_MAX_SIZE_DEFAULT =
256; 256;
public static final String SCM_CONTAINER_CLIENT_MAX_OUTSTANDING_REQUESTS =
"scm.container.client.max.outstanding.requests";
public static final int SCM_CONTAINER_CLIENT_MAX_OUTSTANDING_REQUESTS_DEFAULT
= 100;
public static final String DFS_CONTAINER_RATIS_ENABLED_KEY public static final String DFS_CONTAINER_RATIS_ENABLED_KEY
= "dfs.container.ratis.enabled"; = "dfs.container.ratis.enabled";
public static final boolean DFS_CONTAINER_RATIS_ENABLED_DEFAULT public static final boolean DFS_CONTAINER_RATIS_ENABLED_DEFAULT

View File

@ -31,6 +31,7 @@
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos; import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.client.OzoneClientUtils;
import org.apache.hadoop.scm.container.common.helpers.Pipeline; import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -40,6 +41,7 @@
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.List; import java.util.List;
import java.util.concurrent.Semaphore;
/** /**
* A Client for the storageContainer protocol. * A Client for the storageContainer protocol.
@ -51,6 +53,7 @@ public class XceiverClient extends XceiverClientSpi {
private Channel channel; private Channel channel;
private Bootstrap b; private Bootstrap b;
private EventLoopGroup group; private EventLoopGroup group;
private final Semaphore semaphore;
/** /**
* Constructs a client that can communicate with the Container framework on * Constructs a client that can communicate with the Container framework on
@ -65,6 +68,8 @@ public XceiverClient(Pipeline pipeline, Configuration config) {
Preconditions.checkNotNull(config); Preconditions.checkNotNull(config);
this.pipeline = pipeline; this.pipeline = pipeline;
this.config = config; this.config = config;
this.semaphore =
new Semaphore(OzoneClientUtils.getMaxOutstandingRequests(config));
} }
@Override @Override
@ -78,7 +83,7 @@ public void connect() throws Exception {
b.group(group) b.group(group)
.channel(NioSocketChannel.class) .channel(NioSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO)) .handler(new LoggingHandler(LogLevel.INFO))
.handler(new XceiverClientInitializer(this.pipeline)); .handler(new XceiverClientInitializer(this.pipeline, semaphore));
DatanodeID leader = this.pipeline.getLeader(); DatanodeID leader = this.pipeline.getLeader();
// read port from the data node, on failure use default configured // read port from the data node, on failure use default configured
@ -116,8 +121,7 @@ public Pipeline getPipeline() {
@Override @Override
public ContainerProtos.ContainerCommandResponseProto sendCommand( public ContainerProtos.ContainerCommandResponseProto sendCommand(
ContainerProtos.ContainerCommandRequestProto request) ContainerProtos.ContainerCommandRequestProto request) throws IOException {
throws IOException {
try { try {
if ((channel == null) || (!channel.isActive())) { if ((channel == null) || (!channel.isActive())) {
throw new IOException("This channel is not connected."); throw new IOException("This channel is not connected.");
@ -127,7 +131,20 @@ public ContainerProtos.ContainerCommandResponseProto sendCommand(
return handler.sendCommand(request); return handler.sendCommand(request);
} catch (ExecutionException | InterruptedException e) { } catch (ExecutionException | InterruptedException e) {
throw new IOException("Unexpected exception during execution", e); /**
* In case the netty channel handler throws an exception,
* the exception thrown will be wrapped within {@link ExecutionException}.
* Unwarpping here so that original exception gets passed
* to to the client.
*/
if (e instanceof ExecutionException) {
Throwable cause = e.getCause();
if (cause instanceof IOException) {
throw (IOException) cause;
}
}
throw new IOException(
"Unexpected exception during execution:" + e.getMessage());
} }
} }

View File

@ -32,11 +32,13 @@
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.util.Iterator;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
/** /**
* Netty client handler. * Netty client handler.
@ -51,15 +53,17 @@ public class XceiverClientHandler extends
private final Pipeline pipeline; private final Pipeline pipeline;
private volatile Channel channel; private volatile Channel channel;
private XceiverClientMetrics metrics; private XceiverClientMetrics metrics;
private final Semaphore semaphore;
/** /**
* Constructs a client that can communicate to a container server. * Constructs a client that can communicate to a container server.
*/ */
public XceiverClientHandler(Pipeline pipeline) { public XceiverClientHandler(Pipeline pipeline, Semaphore semaphore) {
super(false); super(false);
Preconditions.checkNotNull(pipeline); Preconditions.checkNotNull(pipeline);
this.pipeline = pipeline; this.pipeline = pipeline;
this.metrics = XceiverClientManager.getXceiverClientMetrics(); this.metrics = XceiverClientManager.getXceiverClientMetrics();
this.semaphore = semaphore;
} }
/** /**
@ -83,6 +87,7 @@ public void channelRead0(ChannelHandlerContext ctx,
String key = msg.getTraceID(); String key = msg.getTraceID();
ResponseFuture response = responses.remove(key); ResponseFuture response = responses.remove(key);
semaphore.release();
if (response != null) { if (response != null) {
response.getFuture().complete(msg); response.getFuture().complete(msg);
@ -105,6 +110,12 @@ public void channelRegistered(ChannelHandlerContext ctx) {
@Override @Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
LOG.info("Exception in client " + cause.toString()); LOG.info("Exception in client " + cause.toString());
Iterator<String> keyIterator = responses.keySet().iterator();
while (keyIterator.hasNext()) {
ResponseFuture response = responses.remove(keyIterator.next());
response.getFuture().completeExceptionally(cause);
semaphore.release();
}
ctx.close(); ctx.close();
} }
@ -133,7 +144,8 @@ public ContainerCommandResponseProto sendCommand(
* @return CompletableFuture * @return CompletableFuture
*/ */
public CompletableFuture<ContainerCommandResponseProto> sendCommandAsync( public CompletableFuture<ContainerCommandResponseProto> sendCommandAsync(
ContainerProtos.ContainerCommandRequestProto request) { ContainerProtos.ContainerCommandRequestProto request)
throws InterruptedException {
// Throw an exception of request doesn't have traceId // Throw an exception of request doesn't have traceId
if (StringUtils.isEmpty(request.getTraceID())) { if (StringUtils.isEmpty(request.getTraceID())) {
@ -152,6 +164,7 @@ public CompletableFuture<ContainerCommandResponseProto> sendCommandAsync(
= new CompletableFuture<>(); = new CompletableFuture<>();
ResponseFuture response = new ResponseFuture(future, ResponseFuture response = new ResponseFuture(future,
Time.monotonicNowNanos()); Time.monotonicNowNanos());
semaphore.acquire();
ResponseFuture previous = responses.putIfAbsent( ResponseFuture previous = responses.putIfAbsent(
request.getTraceID(), response); request.getTraceID(), response);
if (previous != null) { if (previous != null) {

View File

@ -27,19 +27,23 @@
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos; import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
import org.apache.hadoop.scm.container.common.helpers.Pipeline; import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import java.util.concurrent.Semaphore;
/** /**
* Setup the netty pipeline. * Setup the netty pipeline.
*/ */
public class XceiverClientInitializer extends public class XceiverClientInitializer extends
ChannelInitializer<SocketChannel> { ChannelInitializer<SocketChannel> {
private final Pipeline pipeline; private final Pipeline pipeline;
private final Semaphore semaphore;
/** /**
* Constructs an Initializer for the client pipeline. * Constructs an Initializer for the client pipeline.
* @param pipeline - Pipeline. * @param pipeline - Pipeline.
*/ */
public XceiverClientInitializer(Pipeline pipeline) { public XceiverClientInitializer(Pipeline pipeline, Semaphore semaphore) {
this.pipeline = pipeline; this.pipeline = pipeline;
this.semaphore = semaphore;
} }
/** /**
@ -62,7 +66,7 @@ protected void initChannel(SocketChannel ch) throws Exception {
p.addLast(new ProtobufVarint32LengthFieldPrepender()); p.addLast(new ProtobufVarint32LengthFieldPrepender());
p.addLast(new ProtobufEncoder()); p.addLast(new ProtobufEncoder());
p.addLast(new XceiverClientHandler(this.pipeline)); p.addLast(new XceiverClientHandler(this.pipeline, this.semaphore));
} }
} }

View File

@ -23,6 +23,7 @@
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandRequestProto; import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandRequestProto;
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandResponseProto; import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandResponseProto;
import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.ozone.client.OzoneClientUtils;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos; import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
import org.apache.hadoop.scm.container.common.helpers.Pipeline; import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import org.apache.ratis.RatisHelper; import org.apache.ratis.RatisHelper;
@ -56,19 +57,24 @@ public static XceiverClientRatis newXceiverClientRatis(
final String rpcType = ozoneConf.get( final String rpcType = ozoneConf.get(
ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY, ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY,
ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT); ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT);
final int maxOutstandingRequests =
OzoneClientUtils.getMaxOutstandingRequests(ozoneConf);
return new XceiverClientRatis(pipeline, return new XceiverClientRatis(pipeline,
SupportedRpcType.valueOfIgnoreCase(rpcType)); SupportedRpcType.valueOfIgnoreCase(rpcType), maxOutstandingRequests);
} }
private final Pipeline pipeline; private final Pipeline pipeline;
private final RpcType rpcType; private final RpcType rpcType;
private final AtomicReference<RaftClient> client = new AtomicReference<>(); private final AtomicReference<RaftClient> client = new AtomicReference<>();
private final int maxOutstandingRequests;
/** Constructs a client. */ /** Constructs a client. */
private XceiverClientRatis(Pipeline pipeline, RpcType rpcType) { private XceiverClientRatis(Pipeline pipeline, RpcType rpcType,
int maxOutStandingChunks) {
super(); super();
this.pipeline = pipeline; this.pipeline = pipeline;
this.rpcType = rpcType; this.rpcType = rpcType;
this.maxOutstandingRequests = maxOutStandingChunks;
} }
/** /**
@ -147,6 +153,9 @@ public void connect() throws Exception {
LOG.debug("Connecting to pipeline:{} leader:{}", LOG.debug("Connecting to pipeline:{} leader:{}",
getPipeline().getPipelineName(), getPipeline().getPipelineName(),
RatisHelper.toRaftPeerId(pipeline.getLeader())); RatisHelper.toRaftPeerId(pipeline.getLeader()));
// TODO : XceiverClient ratis should pass the config value of
// maxOutstandingRequests so as to set the upper bound on max no of async
// requests to be handled by raft client
if (!client.compareAndSet(null, if (!client.compareAndSet(null,
RatisHelper.newRaftClient(rpcType, getPipeline()))) { RatisHelper.newRaftClient(rpcType, getPipeline()))) {
throw new IllegalStateException("Client is already connected."); throw new IllegalStateException("Client is already connected.");

View File

@ -1096,6 +1096,16 @@
</description> </description>
</property> </property>
<property>
<name>scm.container.client.max.outstanding.requests</name>
<value>100</value>
<tag>OZONE, PERFORMANCE</tag>
<description>
Controls the maximum number of outstanding async requests that can be
handled by the Standalone as well as Ratis client.
</description>
</property>
<property> <property>
<name>ozone.scm.container.creation.lease.timeout</name> <name>ozone.scm.container.creation.lease.timeout</name>
<value>60s</value> <value>60s</value>