HADOOP-12957. Limit the number of outstanding async calls. Contributed by Xiaobing Zhou

This commit is contained in:
Tsz-Wo Nicholas Sze 2016-05-02 11:15:12 -07:00
parent 2beedead72
commit 1b9f18623a
6 changed files with 450 additions and 114 deletions

View File

@ -321,6 +321,9 @@ public class CommonConfigurationKeys extends CommonConfigurationKeysPublic {
public static final long HADOOP_SECURITY_UID_NAME_CACHE_TIMEOUT_DEFAULT =
4*60*60; // 4 hours
public static final String IPC_CLIENT_ASYNC_CALLS_MAX_KEY =
"ipc.client.async.calls.max";
public static final int IPC_CLIENT_ASYNC_CALLS_MAX_DEFAULT = 100;
public static final String IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY = "ipc.client.fallback-to-simple-auth-allowed";
public static final boolean IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT = false;

View File

@ -0,0 +1,36 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ipc;
import java.io.IOException;
/**
* Signals that an AsyncCallLimitExceededException has occurred. This class is
* used to make application code using async RPC aware that limit of max async
* calls is reached, application code need to retrieve results from response of
* established async calls to avoid buffer overflow in order for follow-on async
* calls going correctly.
*/
public class AsyncCallLimitExceededException extends IOException {
private static final long serialVersionUID = 1L;
public AsyncCallLimitExceededException(String message) {
super(message);
}
}

View File

@ -159,7 +159,9 @@ public static void setCallIdAndRetryCount(int cid, int rc) {
private final boolean fallbackAllowed;
private final byte[] clientId;
private final int maxAsyncCalls;
private final AtomicInteger asyncCallCounter = new AtomicInteger(0);
/**
* Executor on which IPC calls' parameters are sent.
* Deferring the sending of parameters to a separate
@ -1288,6 +1290,9 @@ public Client(Class<? extends Writable> valueClass, Configuration conf,
CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT);
this.clientId = ClientId.getClientId();
this.sendParamsExecutor = clientExcecutorFactory.refAndGetInstance();
this.maxAsyncCalls = conf.getInt(
CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY,
CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_DEFAULT);
}
/**
@ -1354,6 +1359,20 @@ public Writable call(RPC.RpcKind rpcKind, Writable rpcRequest,
fallbackToSimpleAuth);
}
private void checkAsyncCall() throws IOException {
if (isAsynchronousMode()) {
if (asyncCallCounter.incrementAndGet() > maxAsyncCalls) {
asyncCallCounter.decrementAndGet();
String errMsg = String.format(
"Exceeded limit of max asynchronous calls: %d, " +
"please configure %s to adjust it.",
maxAsyncCalls,
CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY);
throw new AsyncCallLimitExceededException(errMsg);
}
}
}
/**
* Make a call, passing <code>rpcRequest</code>, to the IPC server defined by
* <code>remoteId</code>, returning the rpc response.
@ -1374,24 +1393,38 @@ Writable call(RPC.RpcKind rpcKind, Writable rpcRequest,
final Call call = createCall(rpcKind, rpcRequest);
final Connection connection = getConnection(remoteId, call, serviceClass,
fallbackToSimpleAuth);
try {
connection.sendRpcRequest(call); // send the rpc request
} catch (RejectedExecutionException e) {
throw new IOException("connection has been closed", e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOG.warn("interrupted waiting to send rpc request to server", e);
throw new IOException(e);
checkAsyncCall();
try {
connection.sendRpcRequest(call); // send the rpc request
} catch (RejectedExecutionException e) {
throw new IOException("connection has been closed", e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOG.warn("interrupted waiting to send rpc request to server", e);
throw new IOException(e);
}
} catch(Exception e) {
if (isAsynchronousMode()) {
releaseAsyncCall();
}
throw e;
}
if (isAsynchronousMode()) {
Future<Writable> returnFuture = new AbstractFuture<Writable>() {
private final AtomicBoolean callled = new AtomicBoolean(false);
@Override
public Writable get() throws InterruptedException, ExecutionException {
try {
set(getRpcResponse(call, connection));
} catch (IOException ie) {
setException(ie);
if (callled.compareAndSet(false, true)) {
try {
set(getRpcResponse(call, connection));
} catch (IOException ie) {
setException(ie);
} finally {
releaseAsyncCall();
}
}
return super.get();
}
@ -1427,6 +1460,15 @@ public static void setAsynchronousMode(boolean async) {
asynchronousMode.set(async);
}
private void releaseAsyncCall() {
asyncCallCounter.decrementAndGet();
}
@VisibleForTesting
int getAsyncCallCount() {
return asyncCallCounter.get();
}
private Writable getRpcResponse(final Call call, final Connection connection)
throws IOException {
synchronized (call) {

View File

@ -20,6 +20,7 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.net.InetSocketAddress;
@ -34,6 +35,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.ipc.RPC.RpcKind;
@ -54,12 +56,13 @@ public class TestAsyncIPC {
@Before
public void setupConf() {
conf = new Configuration();
conf.setInt(CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY, 10000);
Client.setPingInterval(conf, TestIPC.PING_INTERVAL);
// set asynchronous mode for main thread
Client.setAsynchronousMode(true);
}
protected static class SerialCaller extends Thread {
static class AsyncCaller extends Thread {
private Client client;
private InetSocketAddress server;
private int count;
@ -68,11 +71,11 @@ protected static class SerialCaller extends Thread {
new HashMap<Integer, Future<LongWritable>>();
Map<Integer, Long> expectedValues = new HashMap<Integer, Long>();
public SerialCaller(Client client, InetSocketAddress server, int count) {
public AsyncCaller(Client client, InetSocketAddress server, int count) {
this.client = client;
this.server = server;
this.count = count;
// set asynchronous mode, since SerialCaller extends Thread
// set asynchronous mode, since AsyncCaller extends Thread
Client.setAsynchronousMode(true);
}
@ -107,14 +110,111 @@ public void waitForReturnValues() throws InterruptedException,
}
}
@Test
public void testSerial() throws IOException, InterruptedException,
ExecutionException {
internalTestSerial(3, false, 2, 5, 100);
internalTestSerial(3, true, 2, 5, 10);
static class AsyncLimitlCaller extends Thread {
private Client client;
private InetSocketAddress server;
private int count;
private boolean failed;
Map<Integer, Future<LongWritable>> returnFutures = new HashMap<Integer, Future<LongWritable>>();
Map<Integer, Long> expectedValues = new HashMap<Integer, Long>();
int start = 0, end = 0;
int getStart() {
return start;
}
int getEnd() {
return end;
}
int getCount() {
return count;
}
public AsyncLimitlCaller(Client client, InetSocketAddress server, int count) {
this(0, client, server, count);
}
final int callerId;
public AsyncLimitlCaller(int callerId, Client client, InetSocketAddress server,
int count) {
this.client = client;
this.server = server;
this.count = count;
// set asynchronous mode, since AsyncLimitlCaller extends Thread
Client.setAsynchronousMode(true);
this.callerId = callerId;
}
@Override
public void run() {
// in case Thread#Start is called, which will spawn new thread
Client.setAsynchronousMode(true);
for (int i = 0; i < count; i++) {
try {
final long param = TestIPC.RANDOM.nextLong();
runCall(i, param);
} catch (Exception e) {
LOG.fatal(String.format("Caller-%d Call-%d caught: %s", callerId, i,
StringUtils.stringifyException(e)));
failed = true;
}
}
}
private void runCall(final int idx, final long param)
throws InterruptedException, ExecutionException, IOException {
for (;;) {
try {
doCall(idx, param);
return;
} catch (AsyncCallLimitExceededException e) {
/**
* reached limit of async calls, fetch results of finished async calls
* to let follow-on calls go
*/
start = end;
end = idx;
waitForReturnValues(start, end);
}
}
}
private void doCall(final int idx, final long param) throws IOException {
TestIPC.call(client, param, server, conf);
Future<LongWritable> returnFuture = Client.getReturnRpcResponse();
returnFutures.put(idx, returnFuture);
expectedValues.put(idx, param);
}
private void waitForReturnValues(final int start, final int end)
throws InterruptedException, ExecutionException {
for (int i = start; i < end; i++) {
LongWritable value = returnFutures.get(i).get();
if (expectedValues.get(i) != value.get()) {
LOG.fatal(String.format("Caller-%d Call-%d failed!", callerId, i));
failed = true;
break;
}
}
}
}
public void internalTestSerial(int handlerCount, boolean handlerSleep,
@Test(timeout = 60000)
public void testAsyncCall() throws IOException, InterruptedException,
ExecutionException {
internalTestAsyncCall(3, false, 2, 5, 100);
internalTestAsyncCall(3, true, 2, 5, 10);
}
@Test(timeout = 60000)
public void testAsyncCallLimit() throws IOException,
InterruptedException, ExecutionException {
internalTestAsyncCallLimit(100, false, 5, 10, 500);
}
public void internalTestAsyncCall(int handlerCount, boolean handlerSleep,
int clientCount, int callerCount, int callCount) throws IOException,
InterruptedException, ExecutionException {
Server server = new TestIPC.TestServer(handlerCount, handlerSleep, conf);
@ -126,9 +226,9 @@ public void internalTestSerial(int handlerCount, boolean handlerSleep,
clients[i] = new Client(LongWritable.class, conf);
}
SerialCaller[] callers = new SerialCaller[callerCount];
AsyncCaller[] callers = new AsyncCaller[callerCount];
for (int i = 0; i < callerCount; i++) {
callers[i] = new SerialCaller(clients[i % clientCount], addr, callCount);
callers[i] = new AsyncCaller(clients[i % clientCount], addr, callCount);
callers[i].start();
}
for (int i = 0; i < callerCount; i++) {
@ -144,6 +244,75 @@ public void internalTestSerial(int handlerCount, boolean handlerSleep,
server.stop();
}
@Test(timeout = 60000)
public void testCallGetReturnRpcResponseMultipleTimes() throws IOException,
InterruptedException, ExecutionException {
int handlerCount = 10, callCount = 100;
Server server = new TestIPC.TestServer(handlerCount, false, conf);
InetSocketAddress addr = NetUtils.getConnectAddress(server);
server.start();
final Client client = new Client(LongWritable.class, conf);
int asyncCallCount = client.getAsyncCallCount();
try {
AsyncCaller caller = new AsyncCaller(client, addr, callCount);
caller.run();
caller.waitForReturnValues();
String msg = String.format(
"First time, expected not failed for caller: %s.", caller);
assertFalse(msg, caller.failed);
caller.waitForReturnValues();
assertTrue(asyncCallCount == client.getAsyncCallCount());
msg = String.format("Second time, expected not failed for caller: %s.",
caller);
assertFalse(msg, caller.failed);
assertTrue(asyncCallCount == client.getAsyncCallCount());
} finally {
client.stop();
server.stop();
}
}
public void internalTestAsyncCallLimit(int handlerCount, boolean handlerSleep,
int clientCount, int callerCount, int callCount) throws IOException,
InterruptedException, ExecutionException {
Configuration conf = new Configuration();
conf.setInt(CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY, 100);
Client.setPingInterval(conf, TestIPC.PING_INTERVAL);
Server server = new TestIPC.TestServer(handlerCount, handlerSleep, conf);
InetSocketAddress addr = NetUtils.getConnectAddress(server);
server.start();
Client[] clients = new Client[clientCount];
for (int i = 0; i < clientCount; i++) {
clients[i] = new Client(LongWritable.class, conf);
}
AsyncLimitlCaller[] callers = new AsyncLimitlCaller[callerCount];
for (int i = 0; i < callerCount; i++) {
callers[i] = new AsyncLimitlCaller(i, clients[i % clientCount], addr,
callCount);
callers[i].start();
}
for (int i = 0; i < callerCount; i++) {
callers[i].join();
callers[i].waitForReturnValues(callers[i].getStart(),
callers[i].getCount());
String msg = String.format("Expected not failed for caller-%d: %s.", i,
callers[i]);
assertFalse(msg, callers[i].failed);
}
for (int i = 0; i < clientCount; i++) {
clients[i].stop();
}
server.stop();
}
/**
* Test if (1) the rpc server uses the call id/retry provided by the rpc
* client, and (2) the rpc client receives the same call id/retry from the rpc
@ -196,7 +365,7 @@ public void run() {
try {
InetSocketAddress addr = NetUtils.getConnectAddress(server);
server.start();
final SerialCaller caller = new SerialCaller(client, addr, 4);
final AsyncCaller caller = new AsyncCaller(client, addr, 4);
caller.run();
caller.waitForReturnValues();
String msg = String.format("Expected not failed for caller: %s.", caller);
@ -235,7 +404,7 @@ public void run() {
try {
InetSocketAddress addr = NetUtils.getConnectAddress(server);
server.start();
final SerialCaller caller = new SerialCaller(client, addr, 10);
final AsyncCaller caller = new AsyncCaller(client, addr, 10);
caller.run();
caller.waitForReturnValues();
String msg = String.format("Expected not failed for caller: %s.", caller);
@ -272,7 +441,7 @@ public void run() {
try {
InetSocketAddress addr = NetUtils.getConnectAddress(server);
server.start();
final SerialCaller caller = new SerialCaller(client, addr, 10);
final AsyncCaller caller = new AsyncCaller(client, addr, 10);
caller.run();
caller.waitForReturnValues();
String msg = String.format("Expected not failed for caller: %s.", caller);
@ -313,9 +482,9 @@ public void run() {
try {
InetSocketAddress addr = NetUtils.getConnectAddress(server);
server.start();
SerialCaller[] callers = new SerialCaller[callerCount];
AsyncCaller[] callers = new AsyncCaller[callerCount];
for (int i = 0; i < callerCount; ++i) {
callers[i] = new SerialCaller(client, addr, perCallerCallCount);
callers[i] = new AsyncCaller(client, addr, perCallerCallCount);
callers[i].start();
}
for (int i = 0; i < callerCount; ++i) {

View File

@ -22,6 +22,7 @@
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.fs.Options;
@ -50,11 +51,14 @@ static <T> Future<T> getReturnValue() {
final Callable<T> returnValueCallback = ClientNamenodeProtocolTranslatorPB
.getReturnValueCallback();
Future<T> returnFuture = new AbstractFuture<T>() {
private final AtomicBoolean called = new AtomicBoolean(false);
public T get() throws InterruptedException, ExecutionException {
try {
set(returnValueCallback.call());
} catch (Exception e) {
setException(e);
if (called.compareAndSet(false, true)) {
try {
set(returnValueCallback.call());
} catch (Exception e) {
setException(e);
}
}
return super.get();
}

View File

@ -20,7 +20,6 @@
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.io.DataOutputStream;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.HashMap;
@ -31,80 +30,25 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.Options.Rename;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.ipc.AsyncCallLimitExceededException;
import org.apache.hadoop.security.UserGroupInformation;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class TestAsyncDFSRename {
final Path asyncRenameDir = new Path("/test/async_rename/");
public static final Log LOG = LogFactory.getLog(TestAsyncDFSRename.class);
final private static Configuration CONF = new HdfsConfiguration();
final private static String GROUP1_NAME = "group1";
final private static String GROUP2_NAME = "group2";
final private static String USER1_NAME = "user1";
private static final UserGroupInformation USER1;
private MiniDFSCluster gCluster;
static {
// explicitly turn on permission checking
CONF.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, true);
// create fake mapping for the groups
Map<String, String[]> u2g_map = new HashMap<String, String[]>(1);
u2g_map.put(USER1_NAME, new String[] { GROUP1_NAME, GROUP2_NAME });
DFSTestUtil.updateConfWithFakeGroupMapping(CONF, u2g_map);
// Initiate all four users
USER1 = UserGroupInformation.createUserForTesting(USER1_NAME, new String[] {
GROUP1_NAME, GROUP2_NAME });
}
@Before
public void setUp() throws IOException {
gCluster = new MiniDFSCluster.Builder(CONF).numDataNodes(3).build();
gCluster.waitActive();
}
@After
public void tearDown() throws IOException {
if (gCluster != null) {
gCluster.shutdown();
gCluster = null;
}
}
static int countLease(MiniDFSCluster cluster) {
return TestDFSRename.countLease(cluster);
}
void list(DistributedFileSystem dfs, String name) throws IOException {
FileSystem.LOG.info("\n\n" + name);
for (FileStatus s : dfs.listStatus(asyncRenameDir)) {
FileSystem.LOG.info("" + s.getPath());
}
}
static void createFile(DistributedFileSystem dfs, Path f) throws IOException {
DataOutputStream a_out = dfs.create(f);
a_out.writeBytes("something");
a_out.close();
}
/**
* Check the blocks of dst file are cleaned after rename with overwrite
* Restart NN to check the rename successfully
*/
@Test
@Test(timeout = 60000)
public void testAsyncRenameWithOverwrite() throws Exception {
final short replFactor = 2;
final long blockSize = 512;
@ -169,26 +113,26 @@ public void testAsyncRenameWithOverwrite() throws Exception {
}
}
@Test
public void testConcurrentAsyncRenameWithOverwrite() throws Exception {
@Test(timeout = 60000)
public void testCallGetReturnValueMultipleTimes() throws Exception {
final short replFactor = 2;
final long blockSize = 512;
final Path renameDir = new Path(
"/test/concurrent_reanme_with_overwrite_dir/");
Configuration conf = new HdfsConfiguration();
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2)
.build();
"/test/testCallGetReturnValueMultipleTimes/");
final Configuration conf = new HdfsConfiguration();
conf.setInt(CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY, 200);
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(2).build();
cluster.waitActive();
DistributedFileSystem dfs = cluster.getFileSystem();
AsyncDistributedFileSystem adfs = dfs.getAsyncDistributedFileSystem();
int count = 1000;
final DistributedFileSystem dfs = cluster.getFileSystem();
final AsyncDistributedFileSystem adfs = dfs.getAsyncDistributedFileSystem();
final int count = 100;
long fileLen = blockSize * 3;
final Map<Integer, Future<Void>> returnFutures = new HashMap<Integer, Future<Void>>();
assertTrue(dfs.mkdirs(renameDir));
try {
long fileLen = blockSize * 3;
assertTrue(dfs.mkdirs(renameDir));
Map<Integer, Future<Void>> returnFutures = new HashMap<Integer, Future<Void>>();
// concurrently invoking many rename
for (int i = 0; i < count; i++) {
Path src = new Path(renameDir, "src" + i);
@ -199,8 +143,104 @@ public void testConcurrentAsyncRenameWithOverwrite() throws Exception {
returnFutures.put(i, returnFuture);
}
// wait for completing the calls
for (int i = 0; i < 5; i++) {
verifyCallGetReturnValueMultipleTimes(returnFutures, count, cluster,
renameDir, dfs);
}
} finally {
if (dfs != null) {
dfs.close();
}
if (cluster != null) {
cluster.shutdown();
}
}
}
private void verifyCallGetReturnValueMultipleTimes(
Map<Integer, Future<Void>> returnFutures, int count,
MiniDFSCluster cluster, Path renameDir, DistributedFileSystem dfs)
throws InterruptedException, ExecutionException, IOException {
// wait for completing the calls
for (int i = 0; i < count; i++) {
returnFutures.get(i).get();
}
// Restart NN and check the rename successfully
cluster.restartNameNodes();
// very the src dir should not exist, dst should
for (int i = 0; i < count; i++) {
Path src = new Path(renameDir, "src" + i);
Path dst = new Path(renameDir, "dst" + i);
assertFalse(dfs.exists(src));
assertTrue(dfs.exists(dst));
}
}
@Test(timeout = 120000)
public void testAggressiveConcurrentAsyncRenameWithOverwrite()
throws Exception {
internalTestConcurrentAsyncRenameWithOverwrite(100,
"testAggressiveConcurrentAsyncRenameWithOverwrite");
}
@Test(timeout = 60000)
public void testConservativeConcurrentAsyncRenameWithOverwrite()
throws Exception {
internalTestConcurrentAsyncRenameWithOverwrite(10000,
"testConservativeConcurrentAsyncRenameWithOverwrite");
}
private void internalTestConcurrentAsyncRenameWithOverwrite(
final int asyncCallLimit, final String basePath) throws Exception {
final short replFactor = 2;
final long blockSize = 512;
final Path renameDir = new Path(String.format("/test/%s/", basePath));
Configuration conf = new HdfsConfiguration();
conf.setInt(CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY,
asyncCallLimit);
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2)
.build();
cluster.waitActive();
DistributedFileSystem dfs = cluster.getFileSystem();
AsyncDistributedFileSystem adfs = dfs.getAsyncDistributedFileSystem();
int count = 1000;
long fileLen = blockSize * 3;
int start = 0, end = 0;
Map<Integer, Future<Void>> returnFutures = new HashMap<Integer, Future<Void>>();
assertTrue(dfs.mkdirs(renameDir));
try {
// concurrently invoking many rename
for (int i = 0; i < count; i++) {
Path src = new Path(renameDir, "src" + i);
Path dst = new Path(renameDir, "dst" + i);
DFSTestUtil.createFile(dfs, src, fileLen, replFactor, 1);
DFSTestUtil.createFile(dfs, dst, fileLen, replFactor, 1);
for (;;) {
try {
LOG.info("rename #" + i);
Future<Void> returnFuture = adfs.rename(src, dst, Rename.OVERWRITE);
returnFutures.put(i, returnFuture);
break;
} catch (AsyncCallLimitExceededException e) {
/**
* reached limit of async calls, fetch results of finished async
* calls to let follow-on calls go
*/
LOG.error(e);
start = end;
end = i;
LOG.info(String.format("start=%d, end=%d, i=%d", start, end, i));
waitForReturnValues(returnFutures, start, end);
}
}
}
// wait for completing the calls
for (int i = start; i < count; i++) {
returnFutures.get(i).get();
}
@ -215,26 +255,60 @@ public void testConcurrentAsyncRenameWithOverwrite() throws Exception {
assertTrue(dfs.exists(dst));
}
} finally {
dfs.delete(renameDir, true);
if (dfs != null) {
dfs.close();
}
if (cluster != null) {
cluster.shutdown();
}
}
}
@Test
private void waitForReturnValues(
final Map<Integer, Future<Void>> returnFutures, final int start,
final int end) throws InterruptedException, ExecutionException {
LOG.info(String.format("calling waitForReturnValues [%d, %d)", start, end));
for (int i = start; i < end; i++) {
LOG.info("calling Future#get #" + i);
returnFutures.get(i).get();
}
}
@Test(timeout = 60000)
public void testAsyncRenameWithException() throws Exception {
FileSystem rootFs = FileSystem.get(CONF);
Configuration conf = new HdfsConfiguration();
String group1 = "group1";
String group2 = "group2";
String user1 = "user1";
UserGroupInformation ugi1;
// explicitly turn on permission checking
conf.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, true);
// create fake mapping for the groups
Map<String, String[]> u2g_map = new HashMap<String, String[]>(1);
u2g_map.put(user1, new String[] { group1, group2 });
DFSTestUtil.updateConfWithFakeGroupMapping(conf, u2g_map);
// Initiate all four users
ugi1 = UserGroupInformation.createUserForTesting(user1, new String[] {
group1, group2 });
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(3).build();
cluster.waitActive();
FileSystem rootFs = FileSystem.get(conf);
final Path renameDir = new Path("/test/async_rename_exception/");
final Path src = new Path(renameDir, "src");
final Path dst = new Path(renameDir, "dst");
rootFs.mkdirs(src);
AsyncDistributedFileSystem adfs = USER1
AsyncDistributedFileSystem adfs = ugi1
.doAs(new PrivilegedExceptionAction<AsyncDistributedFileSystem>() {
@Override
public AsyncDistributedFileSystem run() throws Exception {
return gCluster.getFileSystem().getAsyncDistributedFileSystem();
return cluster.getFileSystem().getAsyncDistributedFileSystem();
}
});
@ -242,16 +316,24 @@ public AsyncDistributedFileSystem run() throws Exception {
Future<Void> returnFuture = adfs.rename(src, dst, Rename.OVERWRITE);
returnFuture.get();
} catch (ExecutionException e) {
checkPermissionDenied(e, src);
checkPermissionDenied(e, src, user1);
} finally {
if (rootFs != null) {
rootFs.close();
}
if (cluster != null) {
cluster.shutdown();
}
}
}
private void checkPermissionDenied(final Exception e, final Path dir) {
private void checkPermissionDenied(final Exception e, final Path dir,
final String user) {
assertTrue(e.getCause() instanceof ExecutionException);
assertTrue("Permission denied messages must carry AccessControlException",
e.getMessage().contains("AccessControlException"));
assertTrue("Permission denied messages must carry the username", e
.getMessage().contains(USER1_NAME));
.getMessage().contains(user));
assertTrue("Permission denied messages must carry the path parent", e
.getMessage().contains(dir.getParent().toUri().getPath()));
}