diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
index 6693b77b36..9dfdd2fe48 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
@@ -321,6 +321,9 @@ public class CommonConfigurationKeys extends CommonConfigurationKeysPublic {
public static final long HADOOP_SECURITY_UID_NAME_CACHE_TIMEOUT_DEFAULT =
4*60*60; // 4 hours
+ public static final String IPC_CLIENT_ASYNC_CALLS_MAX_KEY =
+ "ipc.client.async.calls.max";
+ public static final int IPC_CLIENT_ASYNC_CALLS_MAX_DEFAULT = 100;
public static final String IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY = "ipc.client.fallback-to-simple-auth-allowed";
public static final boolean IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT = false;
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AsyncCallLimitExceededException.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AsyncCallLimitExceededException.java
new file mode 100644
index 0000000000..db97b6cb36
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AsyncCallLimitExceededException.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ipc;
+
+import java.io.IOException;
+
+/**
+ * Signals that an AsyncCallLimitExceededException has occurred. This class is
+ * used to make application code using async RPC aware that limit of max async
+ * calls is reached, application code need to retrieve results from response of
+ * established async calls to avoid buffer overflow in order for follow-on async
+ * calls going correctly.
+ */
+public class AsyncCallLimitExceededException extends IOException {
+ private static final long serialVersionUID = 1L;
+
+ public AsyncCallLimitExceededException(String message) {
+ super(message);
+ }
+}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
index d59aeb8995..9be4649378 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
@@ -159,7 +159,9 @@ public static void setCallIdAndRetryCount(int cid, int rc) {
private final boolean fallbackAllowed;
private final byte[] clientId;
-
+ private final int maxAsyncCalls;
+ private final AtomicInteger asyncCallCounter = new AtomicInteger(0);
+
/**
* Executor on which IPC calls' parameters are sent.
* Deferring the sending of parameters to a separate
@@ -1288,6 +1290,9 @@ public Client(Class extends Writable> valueClass, Configuration conf,
CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT);
this.clientId = ClientId.getClientId();
this.sendParamsExecutor = clientExcecutorFactory.refAndGetInstance();
+ this.maxAsyncCalls = conf.getInt(
+ CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY,
+ CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_DEFAULT);
}
/**
@@ -1354,6 +1359,20 @@ public Writable call(RPC.RpcKind rpcKind, Writable rpcRequest,
fallbackToSimpleAuth);
}
+ private void checkAsyncCall() throws IOException {
+ if (isAsynchronousMode()) {
+ if (asyncCallCounter.incrementAndGet() > maxAsyncCalls) {
+ asyncCallCounter.decrementAndGet();
+ String errMsg = String.format(
+ "Exceeded limit of max asynchronous calls: %d, " +
+ "please configure %s to adjust it.",
+ maxAsyncCalls,
+ CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY);
+ throw new AsyncCallLimitExceededException(errMsg);
+ }
+ }
+ }
+
/**
* Make a call, passing rpcRequest
, to the IPC server defined by
* remoteId
, returning the rpc response.
@@ -1374,24 +1393,38 @@ Writable call(RPC.RpcKind rpcKind, Writable rpcRequest,
final Call call = createCall(rpcKind, rpcRequest);
final Connection connection = getConnection(remoteId, call, serviceClass,
fallbackToSimpleAuth);
+
try {
- connection.sendRpcRequest(call); // send the rpc request
- } catch (RejectedExecutionException e) {
- throw new IOException("connection has been closed", e);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- LOG.warn("interrupted waiting to send rpc request to server", e);
- throw new IOException(e);
+ checkAsyncCall();
+ try {
+ connection.sendRpcRequest(call); // send the rpc request
+ } catch (RejectedExecutionException e) {
+ throw new IOException("connection has been closed", e);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ LOG.warn("interrupted waiting to send rpc request to server", e);
+ throw new IOException(e);
+ }
+ } catch(Exception e) {
+ if (isAsynchronousMode()) {
+ releaseAsyncCall();
+ }
+ throw e;
}
if (isAsynchronousMode()) {
Future returnFuture = new AbstractFuture() {
+ private final AtomicBoolean callled = new AtomicBoolean(false);
@Override
public Writable get() throws InterruptedException, ExecutionException {
- try {
- set(getRpcResponse(call, connection));
- } catch (IOException ie) {
- setException(ie);
+ if (callled.compareAndSet(false, true)) {
+ try {
+ set(getRpcResponse(call, connection));
+ } catch (IOException ie) {
+ setException(ie);
+ } finally {
+ releaseAsyncCall();
+ }
}
return super.get();
}
@@ -1427,6 +1460,15 @@ public static void setAsynchronousMode(boolean async) {
asynchronousMode.set(async);
}
+ private void releaseAsyncCall() {
+ asyncCallCounter.decrementAndGet();
+ }
+
+ @VisibleForTesting
+ int getAsyncCallCount() {
+ return asyncCallCounter.get();
+ }
+
private Writable getRpcResponse(final Call call, final Connection connection)
throws IOException {
synchronized (call) {
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java
index 6cf75c74c7..8ee3a2c64d 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java
@@ -20,6 +20,7 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.net.InetSocketAddress;
@@ -34,6 +35,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.ipc.RPC.RpcKind;
@@ -54,12 +56,13 @@ public class TestAsyncIPC {
@Before
public void setupConf() {
conf = new Configuration();
+ conf.setInt(CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY, 10000);
Client.setPingInterval(conf, TestIPC.PING_INTERVAL);
// set asynchronous mode for main thread
Client.setAsynchronousMode(true);
}
- protected static class SerialCaller extends Thread {
+ static class AsyncCaller extends Thread {
private Client client;
private InetSocketAddress server;
private int count;
@@ -68,11 +71,11 @@ protected static class SerialCaller extends Thread {
new HashMap>();
Map expectedValues = new HashMap();
- public SerialCaller(Client client, InetSocketAddress server, int count) {
+ public AsyncCaller(Client client, InetSocketAddress server, int count) {
this.client = client;
this.server = server;
this.count = count;
- // set asynchronous mode, since SerialCaller extends Thread
+ // set asynchronous mode, since AsyncCaller extends Thread
Client.setAsynchronousMode(true);
}
@@ -107,14 +110,111 @@ public void waitForReturnValues() throws InterruptedException,
}
}
- @Test
- public void testSerial() throws IOException, InterruptedException,
- ExecutionException {
- internalTestSerial(3, false, 2, 5, 100);
- internalTestSerial(3, true, 2, 5, 10);
+ static class AsyncLimitlCaller extends Thread {
+ private Client client;
+ private InetSocketAddress server;
+ private int count;
+ private boolean failed;
+ Map> returnFutures = new HashMap>();
+ Map expectedValues = new HashMap();
+ int start = 0, end = 0;
+
+ int getStart() {
+ return start;
+ }
+
+ int getEnd() {
+ return end;
+ }
+
+ int getCount() {
+ return count;
+ }
+
+ public AsyncLimitlCaller(Client client, InetSocketAddress server, int count) {
+ this(0, client, server, count);
+ }
+
+ final int callerId;
+
+ public AsyncLimitlCaller(int callerId, Client client, InetSocketAddress server,
+ int count) {
+ this.client = client;
+ this.server = server;
+ this.count = count;
+ // set asynchronous mode, since AsyncLimitlCaller extends Thread
+ Client.setAsynchronousMode(true);
+ this.callerId = callerId;
+ }
+
+ @Override
+ public void run() {
+ // in case Thread#Start is called, which will spawn new thread
+ Client.setAsynchronousMode(true);
+ for (int i = 0; i < count; i++) {
+ try {
+ final long param = TestIPC.RANDOM.nextLong();
+ runCall(i, param);
+ } catch (Exception e) {
+ LOG.fatal(String.format("Caller-%d Call-%d caught: %s", callerId, i,
+ StringUtils.stringifyException(e)));
+ failed = true;
+ }
+ }
+ }
+
+ private void runCall(final int idx, final long param)
+ throws InterruptedException, ExecutionException, IOException {
+ for (;;) {
+ try {
+ doCall(idx, param);
+ return;
+ } catch (AsyncCallLimitExceededException e) {
+ /**
+ * reached limit of async calls, fetch results of finished async calls
+ * to let follow-on calls go
+ */
+ start = end;
+ end = idx;
+ waitForReturnValues(start, end);
+ }
+ }
+ }
+
+ private void doCall(final int idx, final long param) throws IOException {
+ TestIPC.call(client, param, server, conf);
+ Future returnFuture = Client.getReturnRpcResponse();
+ returnFutures.put(idx, returnFuture);
+ expectedValues.put(idx, param);
+ }
+
+ private void waitForReturnValues(final int start, final int end)
+ throws InterruptedException, ExecutionException {
+ for (int i = start; i < end; i++) {
+ LongWritable value = returnFutures.get(i).get();
+ if (expectedValues.get(i) != value.get()) {
+ LOG.fatal(String.format("Caller-%d Call-%d failed!", callerId, i));
+ failed = true;
+ break;
+ }
+ }
+ }
}
- public void internalTestSerial(int handlerCount, boolean handlerSleep,
+ @Test(timeout = 60000)
+ public void testAsyncCall() throws IOException, InterruptedException,
+ ExecutionException {
+ internalTestAsyncCall(3, false, 2, 5, 100);
+ internalTestAsyncCall(3, true, 2, 5, 10);
+ }
+
+ @Test(timeout = 60000)
+ public void testAsyncCallLimit() throws IOException,
+ InterruptedException, ExecutionException {
+ internalTestAsyncCallLimit(100, false, 5, 10, 500);
+ }
+
+ public void internalTestAsyncCall(int handlerCount, boolean handlerSleep,
int clientCount, int callerCount, int callCount) throws IOException,
InterruptedException, ExecutionException {
Server server = new TestIPC.TestServer(handlerCount, handlerSleep, conf);
@@ -126,9 +226,9 @@ public void internalTestSerial(int handlerCount, boolean handlerSleep,
clients[i] = new Client(LongWritable.class, conf);
}
- SerialCaller[] callers = new SerialCaller[callerCount];
+ AsyncCaller[] callers = new AsyncCaller[callerCount];
for (int i = 0; i < callerCount; i++) {
- callers[i] = new SerialCaller(clients[i % clientCount], addr, callCount);
+ callers[i] = new AsyncCaller(clients[i % clientCount], addr, callCount);
callers[i].start();
}
for (int i = 0; i < callerCount; i++) {
@@ -144,6 +244,75 @@ public void internalTestSerial(int handlerCount, boolean handlerSleep,
server.stop();
}
+ @Test(timeout = 60000)
+ public void testCallGetReturnRpcResponseMultipleTimes() throws IOException,
+ InterruptedException, ExecutionException {
+ int handlerCount = 10, callCount = 100;
+ Server server = new TestIPC.TestServer(handlerCount, false, conf);
+ InetSocketAddress addr = NetUtils.getConnectAddress(server);
+ server.start();
+ final Client client = new Client(LongWritable.class, conf);
+
+ int asyncCallCount = client.getAsyncCallCount();
+
+ try {
+ AsyncCaller caller = new AsyncCaller(client, addr, callCount);
+ caller.run();
+
+ caller.waitForReturnValues();
+ String msg = String.format(
+ "First time, expected not failed for caller: %s.", caller);
+ assertFalse(msg, caller.failed);
+
+ caller.waitForReturnValues();
+ assertTrue(asyncCallCount == client.getAsyncCallCount());
+ msg = String.format("Second time, expected not failed for caller: %s.",
+ caller);
+ assertFalse(msg, caller.failed);
+
+ assertTrue(asyncCallCount == client.getAsyncCallCount());
+ } finally {
+ client.stop();
+ server.stop();
+ }
+ }
+
+ public void internalTestAsyncCallLimit(int handlerCount, boolean handlerSleep,
+ int clientCount, int callerCount, int callCount) throws IOException,
+ InterruptedException, ExecutionException {
+ Configuration conf = new Configuration();
+ conf.setInt(CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY, 100);
+ Client.setPingInterval(conf, TestIPC.PING_INTERVAL);
+
+ Server server = new TestIPC.TestServer(handlerCount, handlerSleep, conf);
+ InetSocketAddress addr = NetUtils.getConnectAddress(server);
+ server.start();
+
+ Client[] clients = new Client[clientCount];
+ for (int i = 0; i < clientCount; i++) {
+ clients[i] = new Client(LongWritable.class, conf);
+ }
+
+ AsyncLimitlCaller[] callers = new AsyncLimitlCaller[callerCount];
+ for (int i = 0; i < callerCount; i++) {
+ callers[i] = new AsyncLimitlCaller(i, clients[i % clientCount], addr,
+ callCount);
+ callers[i].start();
+ }
+ for (int i = 0; i < callerCount; i++) {
+ callers[i].join();
+ callers[i].waitForReturnValues(callers[i].getStart(),
+ callers[i].getCount());
+ String msg = String.format("Expected not failed for caller-%d: %s.", i,
+ callers[i]);
+ assertFalse(msg, callers[i].failed);
+ }
+ for (int i = 0; i < clientCount; i++) {
+ clients[i].stop();
+ }
+ server.stop();
+ }
+
/**
* Test if (1) the rpc server uses the call id/retry provided by the rpc
* client, and (2) the rpc client receives the same call id/retry from the rpc
@@ -196,7 +365,7 @@ public void run() {
try {
InetSocketAddress addr = NetUtils.getConnectAddress(server);
server.start();
- final SerialCaller caller = new SerialCaller(client, addr, 4);
+ final AsyncCaller caller = new AsyncCaller(client, addr, 4);
caller.run();
caller.waitForReturnValues();
String msg = String.format("Expected not failed for caller: %s.", caller);
@@ -235,7 +404,7 @@ public void run() {
try {
InetSocketAddress addr = NetUtils.getConnectAddress(server);
server.start();
- final SerialCaller caller = new SerialCaller(client, addr, 10);
+ final AsyncCaller caller = new AsyncCaller(client, addr, 10);
caller.run();
caller.waitForReturnValues();
String msg = String.format("Expected not failed for caller: %s.", caller);
@@ -272,7 +441,7 @@ public void run() {
try {
InetSocketAddress addr = NetUtils.getConnectAddress(server);
server.start();
- final SerialCaller caller = new SerialCaller(client, addr, 10);
+ final AsyncCaller caller = new AsyncCaller(client, addr, 10);
caller.run();
caller.waitForReturnValues();
String msg = String.format("Expected not failed for caller: %s.", caller);
@@ -313,9 +482,9 @@ public void run() {
try {
InetSocketAddress addr = NetUtils.getConnectAddress(server);
server.start();
- SerialCaller[] callers = new SerialCaller[callerCount];
+ AsyncCaller[] callers = new AsyncCaller[callerCount];
for (int i = 0; i < callerCount; ++i) {
- callers[i] = new SerialCaller(client, addr, perCallerCallCount);
+ callers[i] = new AsyncCaller(client, addr, perCallerCallCount);
callers[i].start();
}
for (int i = 0; i < callerCount; ++i) {
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java
index 37899aa703..356ae3ff56 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/AsyncDistributedFileSystem.java
@@ -22,6 +22,7 @@
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.fs.Options;
@@ -50,11 +51,14 @@ static Future getReturnValue() {
final Callable returnValueCallback = ClientNamenodeProtocolTranslatorPB
.getReturnValueCallback();
Future returnFuture = new AbstractFuture() {
+ private final AtomicBoolean called = new AtomicBoolean(false);
public T get() throws InterruptedException, ExecutionException {
- try {
- set(returnValueCallback.call());
- } catch (Exception e) {
- setException(e);
+ if (called.compareAndSet(false, true)) {
+ try {
+ set(returnValueCallback.call());
+ } catch (Exception e) {
+ setException(e);
+ }
}
return super.get();
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java
index 9322e1afbb..d129299bf5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java
@@ -20,7 +20,6 @@
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
-import java.io.DataOutputStream;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.HashMap;
@@ -31,80 +30,25 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.Options.Rename;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
+import org.apache.hadoop.ipc.AsyncCallLimitExceededException;
import org.apache.hadoop.security.UserGroupInformation;
-import org.junit.After;
-import org.junit.Before;
import org.junit.Test;
public class TestAsyncDFSRename {
- final Path asyncRenameDir = new Path("/test/async_rename/");
public static final Log LOG = LogFactory.getLog(TestAsyncDFSRename.class);
- final private static Configuration CONF = new HdfsConfiguration();
-
- final private static String GROUP1_NAME = "group1";
- final private static String GROUP2_NAME = "group2";
- final private static String USER1_NAME = "user1";
- private static final UserGroupInformation USER1;
-
- private MiniDFSCluster gCluster;
-
- static {
- // explicitly turn on permission checking
- CONF.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, true);
-
- // create fake mapping for the groups
- Map u2g_map = new HashMap(1);
- u2g_map.put(USER1_NAME, new String[] { GROUP1_NAME, GROUP2_NAME });
- DFSTestUtil.updateConfWithFakeGroupMapping(CONF, u2g_map);
-
- // Initiate all four users
- USER1 = UserGroupInformation.createUserForTesting(USER1_NAME, new String[] {
- GROUP1_NAME, GROUP2_NAME });
- }
-
- @Before
- public void setUp() throws IOException {
- gCluster = new MiniDFSCluster.Builder(CONF).numDataNodes(3).build();
- gCluster.waitActive();
- }
-
- @After
- public void tearDown() throws IOException {
- if (gCluster != null) {
- gCluster.shutdown();
- gCluster = null;
- }
- }
-
- static int countLease(MiniDFSCluster cluster) {
- return TestDFSRename.countLease(cluster);
- }
-
- void list(DistributedFileSystem dfs, String name) throws IOException {
- FileSystem.LOG.info("\n\n" + name);
- for (FileStatus s : dfs.listStatus(asyncRenameDir)) {
- FileSystem.LOG.info("" + s.getPath());
- }
- }
-
- static void createFile(DistributedFileSystem dfs, Path f) throws IOException {
- DataOutputStream a_out = dfs.create(f);
- a_out.writeBytes("something");
- a_out.close();
- }
/**
* Check the blocks of dst file are cleaned after rename with overwrite
* Restart NN to check the rename successfully
*/
- @Test
+ @Test(timeout = 60000)
public void testAsyncRenameWithOverwrite() throws Exception {
final short replFactor = 2;
final long blockSize = 512;
@@ -169,26 +113,26 @@ public void testAsyncRenameWithOverwrite() throws Exception {
}
}
- @Test
- public void testConcurrentAsyncRenameWithOverwrite() throws Exception {
+ @Test(timeout = 60000)
+ public void testCallGetReturnValueMultipleTimes() throws Exception {
final short replFactor = 2;
final long blockSize = 512;
final Path renameDir = new Path(
- "/test/concurrent_reanme_with_overwrite_dir/");
- Configuration conf = new HdfsConfiguration();
- MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2)
- .build();
+ "/test/testCallGetReturnValueMultipleTimes/");
+ final Configuration conf = new HdfsConfiguration();
+ conf.setInt(CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY, 200);
+ final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+ .numDataNodes(2).build();
cluster.waitActive();
- DistributedFileSystem dfs = cluster.getFileSystem();
- AsyncDistributedFileSystem adfs = dfs.getAsyncDistributedFileSystem();
- int count = 1000;
+ final DistributedFileSystem dfs = cluster.getFileSystem();
+ final AsyncDistributedFileSystem adfs = dfs.getAsyncDistributedFileSystem();
+ final int count = 100;
+ long fileLen = blockSize * 3;
+ final Map> returnFutures = new HashMap>();
+
+ assertTrue(dfs.mkdirs(renameDir));
try {
- long fileLen = blockSize * 3;
- assertTrue(dfs.mkdirs(renameDir));
-
- Map> returnFutures = new HashMap>();
-
// concurrently invoking many rename
for (int i = 0; i < count; i++) {
Path src = new Path(renameDir, "src" + i);
@@ -199,8 +143,104 @@ public void testConcurrentAsyncRenameWithOverwrite() throws Exception {
returnFutures.put(i, returnFuture);
}
- // wait for completing the calls
+ for (int i = 0; i < 5; i++) {
+ verifyCallGetReturnValueMultipleTimes(returnFutures, count, cluster,
+ renameDir, dfs);
+ }
+ } finally {
+ if (dfs != null) {
+ dfs.close();
+ }
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+ }
+
+ private void verifyCallGetReturnValueMultipleTimes(
+ Map> returnFutures, int count,
+ MiniDFSCluster cluster, Path renameDir, DistributedFileSystem dfs)
+ throws InterruptedException, ExecutionException, IOException {
+ // wait for completing the calls
+ for (int i = 0; i < count; i++) {
+ returnFutures.get(i).get();
+ }
+
+ // Restart NN and check the rename successfully
+ cluster.restartNameNodes();
+
+ // very the src dir should not exist, dst should
+ for (int i = 0; i < count; i++) {
+ Path src = new Path(renameDir, "src" + i);
+ Path dst = new Path(renameDir, "dst" + i);
+ assertFalse(dfs.exists(src));
+ assertTrue(dfs.exists(dst));
+ }
+ }
+
+ @Test(timeout = 120000)
+ public void testAggressiveConcurrentAsyncRenameWithOverwrite()
+ throws Exception {
+ internalTestConcurrentAsyncRenameWithOverwrite(100,
+ "testAggressiveConcurrentAsyncRenameWithOverwrite");
+ }
+
+ @Test(timeout = 60000)
+ public void testConservativeConcurrentAsyncRenameWithOverwrite()
+ throws Exception {
+ internalTestConcurrentAsyncRenameWithOverwrite(10000,
+ "testConservativeConcurrentAsyncRenameWithOverwrite");
+ }
+
+ private void internalTestConcurrentAsyncRenameWithOverwrite(
+ final int asyncCallLimit, final String basePath) throws Exception {
+ final short replFactor = 2;
+ final long blockSize = 512;
+ final Path renameDir = new Path(String.format("/test/%s/", basePath));
+ Configuration conf = new HdfsConfiguration();
+ conf.setInt(CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY,
+ asyncCallLimit);
+ MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2)
+ .build();
+ cluster.waitActive();
+ DistributedFileSystem dfs = cluster.getFileSystem();
+ AsyncDistributedFileSystem adfs = dfs.getAsyncDistributedFileSystem();
+ int count = 1000;
+ long fileLen = blockSize * 3;
+ int start = 0, end = 0;
+ Map> returnFutures = new HashMap>();
+
+ assertTrue(dfs.mkdirs(renameDir));
+
+ try {
+ // concurrently invoking many rename
for (int i = 0; i < count; i++) {
+ Path src = new Path(renameDir, "src" + i);
+ Path dst = new Path(renameDir, "dst" + i);
+ DFSTestUtil.createFile(dfs, src, fileLen, replFactor, 1);
+ DFSTestUtil.createFile(dfs, dst, fileLen, replFactor, 1);
+ for (;;) {
+ try {
+ LOG.info("rename #" + i);
+ Future returnFuture = adfs.rename(src, dst, Rename.OVERWRITE);
+ returnFutures.put(i, returnFuture);
+ break;
+ } catch (AsyncCallLimitExceededException e) {
+ /**
+ * reached limit of async calls, fetch results of finished async
+ * calls to let follow-on calls go
+ */
+ LOG.error(e);
+ start = end;
+ end = i;
+ LOG.info(String.format("start=%d, end=%d, i=%d", start, end, i));
+ waitForReturnValues(returnFutures, start, end);
+ }
+ }
+ }
+
+ // wait for completing the calls
+ for (int i = start; i < count; i++) {
returnFutures.get(i).get();
}
@@ -215,26 +255,60 @@ public void testConcurrentAsyncRenameWithOverwrite() throws Exception {
assertTrue(dfs.exists(dst));
}
} finally {
- dfs.delete(renameDir, true);
+ if (dfs != null) {
+ dfs.close();
+ }
if (cluster != null) {
cluster.shutdown();
}
}
}
- @Test
+ private void waitForReturnValues(
+ final Map> returnFutures, final int start,
+ final int end) throws InterruptedException, ExecutionException {
+ LOG.info(String.format("calling waitForReturnValues [%d, %d)", start, end));
+ for (int i = start; i < end; i++) {
+ LOG.info("calling Future#get #" + i);
+ returnFutures.get(i).get();
+ }
+ }
+
+ @Test(timeout = 60000)
public void testAsyncRenameWithException() throws Exception {
- FileSystem rootFs = FileSystem.get(CONF);
+ Configuration conf = new HdfsConfiguration();
+ String group1 = "group1";
+ String group2 = "group2";
+ String user1 = "user1";
+ UserGroupInformation ugi1;
+
+ // explicitly turn on permission checking
+ conf.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, true);
+
+ // create fake mapping for the groups
+ Map u2g_map = new HashMap(1);
+ u2g_map.put(user1, new String[] { group1, group2 });
+ DFSTestUtil.updateConfWithFakeGroupMapping(conf, u2g_map);
+
+ // Initiate all four users
+ ugi1 = UserGroupInformation.createUserForTesting(user1, new String[] {
+ group1, group2 });
+
+ final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+ .numDataNodes(3).build();
+ cluster.waitActive();
+
+ FileSystem rootFs = FileSystem.get(conf);
final Path renameDir = new Path("/test/async_rename_exception/");
final Path src = new Path(renameDir, "src");
final Path dst = new Path(renameDir, "dst");
rootFs.mkdirs(src);
- AsyncDistributedFileSystem adfs = USER1
+ AsyncDistributedFileSystem adfs = ugi1
.doAs(new PrivilegedExceptionAction() {
@Override
public AsyncDistributedFileSystem run() throws Exception {
- return gCluster.getFileSystem().getAsyncDistributedFileSystem();
+ return cluster.getFileSystem().getAsyncDistributedFileSystem();
}
});
@@ -242,16 +316,24 @@ public AsyncDistributedFileSystem run() throws Exception {
Future returnFuture = adfs.rename(src, dst, Rename.OVERWRITE);
returnFuture.get();
} catch (ExecutionException e) {
- checkPermissionDenied(e, src);
+ checkPermissionDenied(e, src, user1);
+ } finally {
+ if (rootFs != null) {
+ rootFs.close();
+ }
+ if (cluster != null) {
+ cluster.shutdown();
+ }
}
}
- private void checkPermissionDenied(final Exception e, final Path dir) {
+ private void checkPermissionDenied(final Exception e, final Path dir,
+ final String user) {
assertTrue(e.getCause() instanceof ExecutionException);
assertTrue("Permission denied messages must carry AccessControlException",
e.getMessage().contains("AccessControlException"));
assertTrue("Permission denied messages must carry the username", e
- .getMessage().contains(USER1_NAME));
+ .getMessage().contains(user));
assertTrue("Permission denied messages must carry the path parent", e
.getMessage().contains(dir.getParent().toUri().getPath()));
}