Revert "Revert "HDFS-10224. Implement asynchronous rename for DistributedFileSystem. Contributed by Xiaobing Zhou""

This reverts commit 106234d873.
This commit is contained in:
Tsz-Wo Nicholas Sze 2016-06-06 16:28:21 +08:00
parent 106234d873
commit eded3d109e
8 changed files with 463 additions and 20 deletions

View File

@ -1252,7 +1252,6 @@ public boolean setReplication(Path src, short replication)
/**
* Renames Path src to Path dst
* <ul>
* <li
* <li>Fails if src is a file and dst is a directory.
* <li>Fails if src is a directory and dst is a file.
* <li>Fails if the parent of dst does not exist or is a file.

View File

@ -119,7 +119,8 @@ public class Client implements AutoCloseable {
private static final ThreadLocal<Integer> callId = new ThreadLocal<Integer>();
private static final ThreadLocal<Integer> retryCount = new ThreadLocal<Integer>();
private static final ThreadLocal<Future<?>> returnValue = new ThreadLocal<>();
private static final ThreadLocal<Future<?>>
RETURN_RPC_RESPONSE = new ThreadLocal<>();
private static final ThreadLocal<Boolean> asynchronousMode =
new ThreadLocal<Boolean>() {
@Override
@ -130,8 +131,8 @@ protected Boolean initialValue() {
@SuppressWarnings("unchecked")
@Unstable
public static <T> Future<T> getReturnValue() {
return (Future<T>) returnValue.get();
public static <T> Future<T> getReturnRpcResponse() {
return (Future<T>) RETURN_RPC_RESPONSE.get();
}
/** Set call id and retry count for the next call. */
@ -1396,7 +1397,7 @@ public Writable get() throws InterruptedException, ExecutionException {
}
};
returnValue.set(returnFuture);
RETURN_RPC_RESPONSE.set(returnFuture);
return null;
} else {
return getRpcResponse(call, connection);
@ -1410,7 +1411,7 @@ public Writable get() throws InterruptedException, ExecutionException {
* synchronous mode.
*/
@Unstable
static boolean isAsynchronousMode() {
public static boolean isAsynchronousMode() {
return asynchronousMode.get();
}

View File

@ -26,7 +26,9 @@
import java.lang.reflect.Proxy;
import java.net.InetSocketAddress;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.net.SocketFactory;
@ -35,6 +37,7 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.DataOutputOutputStream;
import org.apache.hadoop.io.Writable;
@ -67,7 +70,9 @@
@InterfaceStability.Evolving
public class ProtobufRpcEngine implements RpcEngine {
public static final Log LOG = LogFactory.getLog(ProtobufRpcEngine.class);
private static final ThreadLocal<Callable<?>>
RETURN_MESSAGE_CALLBACK = new ThreadLocal<>();
static { // Register the rpcRequest deserializer for WritableRpcEngine
org.apache.hadoop.ipc.Server.registerProtocolEngine(
RPC.RpcKind.RPC_PROTOCOL_BUFFER, RpcRequestWrapper.class,
@ -76,6 +81,12 @@ public class ProtobufRpcEngine implements RpcEngine {
private static final ClientCache CLIENTS = new ClientCache();
@SuppressWarnings("unchecked")
@Unstable
public static <T> Callable<T> getReturnMessageCallback() {
return (Callable<T>) RETURN_MESSAGE_CALLBACK.get();
}
public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
InetSocketAddress addr, UserGroupInformation ticket, Configuration conf,
SocketFactory factory, int rpcTimeout) throws IOException {
@ -189,7 +200,7 @@ private RequestHeaderProto constructRpcRequestHeader(Method method) {
* the server.
*/
@Override
public Object invoke(Object proxy, Method method, Object[] args)
public Object invoke(Object proxy, final Method method, Object[] args)
throws ServiceException {
long startTime = 0;
if (LOG.isDebugEnabled()) {
@ -251,6 +262,23 @@ public Object invoke(Object proxy, Method method, Object[] args)
LOG.debug("Call: " + method.getName() + " took " + callTime + "ms");
}
if (Client.isAsynchronousMode()) {
final Future<RpcResponseWrapper> frrw = Client.getReturnRpcResponse();
Callable<Message> callback = new Callable<Message>() {
@Override
public Message call() throws Exception {
return getReturnMessage(method, frrw.get());
}
};
RETURN_MESSAGE_CALLBACK.set(callback);
return null;
} else {
return getReturnMessage(method, val);
}
}
private Message getReturnMessage(final Method method,
final RpcResponseWrapper rrw) throws ServiceException {
Message prototype = null;
try {
prototype = getReturnProtoType(method);
@ -260,7 +288,7 @@ public Object invoke(Object proxy, Method method, Object[] args)
Message returnMessage;
try {
returnMessage = prototype.newBuilderForType()
.mergeFrom(val.theResponseRead).build();
.mergeFrom(rrw.theResponseRead).build();
if (LOG.isTraceEnabled()) {
LOG.trace(Thread.currentThread().getId() + ": Response <- " +

View File

@ -84,7 +84,7 @@ public void run() {
try {
final long param = TestIPC.RANDOM.nextLong();
TestIPC.call(client, param, server, conf);
Future<LongWritable> returnFuture = Client.getReturnValue();
Future<LongWritable> returnFuture = Client.getReturnRpcResponse();
returnFutures.put(i, returnFuture);
expectedValues.put(i, param);
} catch (Exception e) {

View File

@ -0,0 +1,110 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
import java.io.IOException;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB;
import org.apache.hadoop.ipc.Client;
import com.google.common.util.concurrent.AbstractFuture;
/****************************************************************
* Implementation of the asynchronous distributed file system.
* This instance of this class is the way end-user code interacts
* with a Hadoop DistributedFileSystem in an asynchronous manner.
*
*****************************************************************/
@Unstable
public class AsyncDistributedFileSystem {
private final DistributedFileSystem dfs;
AsyncDistributedFileSystem(final DistributedFileSystem dfs) {
this.dfs = dfs;
}
static <T> Future<T> getReturnValue() {
final Callable<T> returnValueCallback = ClientNamenodeProtocolTranslatorPB
.getReturnValueCallback();
Future<T> returnFuture = new AbstractFuture<T>() {
public T get() throws InterruptedException, ExecutionException {
try {
set(returnValueCallback.call());
} catch (Exception e) {
setException(e);
}
return super.get();
}
};
return returnFuture;
}
/**
* Renames Path src to Path dst
* <ul>
* <li>Fails if src is a file and dst is a directory.
* <li>Fails if src is a directory and dst is a file.
* <li>Fails if the parent of dst does not exist or is a file.
* </ul>
* <p>
* If OVERWRITE option is not passed as an argument, rename fails if the dst
* already exists.
* <p>
* If OVERWRITE option is passed as an argument, rename overwrites the dst if
* it is a file or an empty directory. Rename fails if dst is a non-empty
* directory.
* <p>
* Note that atomicity of rename is dependent on the file system
* implementation. Please refer to the file system documentation for details.
* This default implementation is non atomic.
*
* @param src
* path to be renamed
* @param dst
* new path after rename
* @throws IOException
* on failure
* @return an instance of Future, #get of which is invoked to wait for
* asynchronous call being finished.
*/
public Future<Void> rename(Path src, Path dst,
final Options.Rename... options) throws IOException {
dfs.getFsStatistics().incrementWriteOps(1);
final Path absSrc = dfs.fixRelativePart(src);
final Path absDst = dfs.fixRelativePart(dst);
final boolean isAsync = Client.isAsynchronousMode();
Client.setAsynchronousMode(true);
try {
dfs.getClient().rename(dfs.getPathName(absSrc), dfs.getPathName(absDst),
options);
return getReturnValue();
} finally {
Client.setAsynchronousMode(isAsync);
}
}
}

View File

@ -31,6 +31,7 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.BlockStoragePolicySpi;
@ -204,7 +205,7 @@ public Path getHomeDirectory() {
* @return path component of {file}
* @throws IllegalArgumentException if URI does not belong to this DFS
*/
private String getPathName(Path file) {
String getPathName(Path file) {
checkPath(file);
String result = file.toUri().getPath();
if (!DFSUtilClient.isValidName(result)) {
@ -2509,4 +2510,23 @@ public Collection<FileStatus> getTrashRoots(boolean allUsers) {
}
return ret;
}
private final AsyncDistributedFileSystem adfs =
new AsyncDistributedFileSystem(this);
/** @return an {@link AsyncDistributedFileSystem} object. */
@Unstable
public AsyncDistributedFileSystem getAsyncDistributedFileSystem() {
return adfs;
}
@Override
protected Path fixRelativePart(Path p) {
return super.fixRelativePart(p);
}
Statistics getFsStatistics() {
return statistics;
}
}

View File

@ -24,11 +24,14 @@
import java.util.List;
import com.google.common.collect.Lists;
import java.util.concurrent.Callable;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.crypto.CryptoProtocolVersion;
import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedEntries;
import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedListEntries;
import org.apache.hadoop.fs.CacheFlag;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.CreateFlag;
@ -55,6 +58,7 @@
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
import org.apache.hadoop.hdfs.protocol.EncryptionZone;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction;
@ -135,7 +139,6 @@
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RefreshNodesRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RemoveCacheDirectiveRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RemoveCachePoolRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UnsetStoragePolicyRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.Rename2RequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RenameRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RenameSnapshotRequestProto;
@ -153,13 +156,15 @@
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetQuotaRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetReplicationRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetSafeModeRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetStoragePolicyRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetTimesRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.TruncateRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UnsetStoragePolicyRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdateBlockForPipelineRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.UpdatePipelineRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SetStoragePolicyRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.*;
import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos;
import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.CreateEncryptionZoneRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.EncryptionZoneProto;
import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.GetEZForPathRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos.ListEncryptionZonesRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.GetErasureCodingPoliciesRequestProto;
@ -177,8 +182,9 @@
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
import org.apache.hadoop.io.EnumSetWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.ipc.Client;
import org.apache.hadoop.ipc.ProtobufHelper;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.ProtocolMetaInterface;
import org.apache.hadoop.ipc.ProtocolTranslator;
import org.apache.hadoop.ipc.RPC;
@ -190,12 +196,9 @@
import org.apache.hadoop.security.token.Token;
import com.google.protobuf.ByteString;
import com.google.protobuf.Message;
import com.google.protobuf.ServiceException;
import static org.apache.hadoop.fs.BatchedRemoteIterator.BatchedListEntries;
import static org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos
.EncryptionZoneProto;
/**
* This class forwards NN's ClientProtocol calls as RPC calls to the NN server
* while translating from the parameter types used in ClientProtocol to the
@ -206,6 +209,8 @@
public class ClientNamenodeProtocolTranslatorPB implements
ProtocolMetaInterface, ClientProtocol, Closeable, ProtocolTranslator {
final private ClientNamenodeProtocolPB rpcProxy;
private static final ThreadLocal<Callable<?>>
RETURN_VALUE_CALLBACK = new ThreadLocal<>();
static final GetServerDefaultsRequestProto VOID_GET_SERVER_DEFAULT_REQUEST =
GetServerDefaultsRequestProto.newBuilder().build();
@ -239,6 +244,12 @@ public ClientNamenodeProtocolTranslatorPB(ClientNamenodeProtocolPB proxy) {
rpcProxy = proxy;
}
@SuppressWarnings("unchecked")
@Unstable
public static <T> Callable<T> getReturnValueCallback() {
return (Callable<T>) RETURN_VALUE_CALLBACK.get();
}
@Override
public void close() {
RPC.stopProxy(rpcProxy);
@ -475,6 +486,7 @@ public boolean rename(String src, String dst) throws IOException {
RenameRequestProto req = RenameRequestProto.newBuilder()
.setSrc(src)
.setDst(dst).build();
try {
return rpcProxy.rename(null, req).getResult();
} catch (ServiceException e) {
@ -499,7 +511,22 @@ public void rename2(String src, String dst, Rename... options)
setDst(dst).setOverwriteDest(overwrite).
build();
try {
rpcProxy.rename2(null, req);
if (Client.isAsynchronousMode()) {
rpcProxy.rename2(null, req);
final Callable<Message> returnMessageCallback = ProtobufRpcEngine
.getReturnMessageCallback();
Callable<Void> callBack = new Callable<Void>() {
@Override
public Void call() throws Exception {
returnMessageCallback.call();
return null;
}
};
RETURN_VALUE_CALLBACK.set(callBack);
} else {
rpcProxy.rename2(null, req);
}
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}

View File

@ -0,0 +1,258 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.io.DataOutputStream;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.Options.Rename;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.security.UserGroupInformation;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class TestAsyncDFSRename {
final Path asyncRenameDir = new Path("/test/async_rename/");
public static final Log LOG = LogFactory.getLog(TestAsyncDFSRename.class);
final private static Configuration CONF = new HdfsConfiguration();
final private static String GROUP1_NAME = "group1";
final private static String GROUP2_NAME = "group2";
final private static String USER1_NAME = "user1";
private static final UserGroupInformation USER1;
private MiniDFSCluster gCluster;
static {
// explicitly turn on permission checking
CONF.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, true);
// create fake mapping for the groups
Map<String, String[]> u2g_map = new HashMap<String, String[]>(1);
u2g_map.put(USER1_NAME, new String[] { GROUP1_NAME, GROUP2_NAME });
DFSTestUtil.updateConfWithFakeGroupMapping(CONF, u2g_map);
// Initiate all four users
USER1 = UserGroupInformation.createUserForTesting(USER1_NAME, new String[] {
GROUP1_NAME, GROUP2_NAME });
}
@Before
public void setUp() throws IOException {
gCluster = new MiniDFSCluster.Builder(CONF).numDataNodes(3).build();
gCluster.waitActive();
}
@After
public void tearDown() throws IOException {
if (gCluster != null) {
gCluster.shutdown();
gCluster = null;
}
}
static int countLease(MiniDFSCluster cluster) {
return TestDFSRename.countLease(cluster);
}
void list(DistributedFileSystem dfs, String name) throws IOException {
FileSystem.LOG.info("\n\n" + name);
for (FileStatus s : dfs.listStatus(asyncRenameDir)) {
FileSystem.LOG.info("" + s.getPath());
}
}
static void createFile(DistributedFileSystem dfs, Path f) throws IOException {
DataOutputStream a_out = dfs.create(f);
a_out.writeBytes("something");
a_out.close();
}
/**
* Check the blocks of dst file are cleaned after rename with overwrite
* Restart NN to check the rename successfully
*/
@Test
public void testAsyncRenameWithOverwrite() throws Exception {
final short replFactor = 2;
final long blockSize = 512;
Configuration conf = new Configuration();
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(
replFactor).build();
cluster.waitActive();
DistributedFileSystem dfs = cluster.getFileSystem();
AsyncDistributedFileSystem adfs = dfs.getAsyncDistributedFileSystem();
try {
long fileLen = blockSize * 3;
String src = "/foo/src";
String dst = "/foo/dst";
String src2 = "/foo/src2";
String dst2 = "/foo/dst2";
Path srcPath = new Path(src);
Path dstPath = new Path(dst);
Path srcPath2 = new Path(src2);
Path dstPath2 = new Path(dst2);
DFSTestUtil.createFile(dfs, srcPath, fileLen, replFactor, 1);
DFSTestUtil.createFile(dfs, dstPath, fileLen, replFactor, 1);
DFSTestUtil.createFile(dfs, srcPath2, fileLen, replFactor, 1);
DFSTestUtil.createFile(dfs, dstPath2, fileLen, replFactor, 1);
LocatedBlocks lbs = NameNodeAdapter.getBlockLocations(
cluster.getNameNode(), dst, 0, fileLen);
LocatedBlocks lbs2 = NameNodeAdapter.getBlockLocations(
cluster.getNameNode(), dst2, 0, fileLen);
BlockManager bm = NameNodeAdapter.getNamesystem(cluster.getNameNode())
.getBlockManager();
assertTrue(bm.getStoredBlock(lbs.getLocatedBlocks().get(0).getBlock()
.getLocalBlock()) != null);
assertTrue(bm.getStoredBlock(lbs2.getLocatedBlocks().get(0).getBlock()
.getLocalBlock()) != null);
Future<Void> retVal1 = adfs.rename(srcPath, dstPath, Rename.OVERWRITE);
Future<Void> retVal2 = adfs.rename(srcPath2, dstPath2, Rename.OVERWRITE);
retVal1.get();
retVal2.get();
assertTrue(bm.getStoredBlock(lbs.getLocatedBlocks().get(0).getBlock()
.getLocalBlock()) == null);
assertTrue(bm.getStoredBlock(lbs2.getLocatedBlocks().get(0).getBlock()
.getLocalBlock()) == null);
// Restart NN and check the rename successfully
cluster.restartNameNodes();
assertFalse(dfs.exists(srcPath));
assertTrue(dfs.exists(dstPath));
assertFalse(dfs.exists(srcPath2));
assertTrue(dfs.exists(dstPath2));
} finally {
if (dfs != null) {
dfs.close();
}
if (cluster != null) {
cluster.shutdown();
}
}
}
@Test
public void testConcurrentAsyncRenameWithOverwrite() throws Exception {
final short replFactor = 2;
final long blockSize = 512;
final Path renameDir = new Path(
"/test/concurrent_reanme_with_overwrite_dir/");
Configuration conf = new HdfsConfiguration();
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2)
.build();
cluster.waitActive();
DistributedFileSystem dfs = cluster.getFileSystem();
AsyncDistributedFileSystem adfs = dfs.getAsyncDistributedFileSystem();
int count = 1000;
try {
long fileLen = blockSize * 3;
assertTrue(dfs.mkdirs(renameDir));
Map<Integer, Future<Void>> returnFutures = new HashMap<Integer, Future<Void>>();
// concurrently invoking many rename
for (int i = 0; i < count; i++) {
Path src = new Path(renameDir, "src" + i);
Path dst = new Path(renameDir, "dst" + i);
DFSTestUtil.createFile(dfs, src, fileLen, replFactor, 1);
DFSTestUtil.createFile(dfs, dst, fileLen, replFactor, 1);
Future<Void> returnFuture = adfs.rename(src, dst, Rename.OVERWRITE);
returnFutures.put(i, returnFuture);
}
// wait for completing the calls
for (int i = 0; i < count; i++) {
returnFutures.get(i).get();
}
// Restart NN and check the rename successfully
cluster.restartNameNodes();
// very the src dir should not exist, dst should
for (int i = 0; i < count; i++) {
Path src = new Path(renameDir, "src" + i);
Path dst = new Path(renameDir, "dst" + i);
assertFalse(dfs.exists(src));
assertTrue(dfs.exists(dst));
}
} finally {
dfs.delete(renameDir, true);
if (cluster != null) {
cluster.shutdown();
}
}
}
@Test
public void testAsyncRenameWithException() throws Exception {
FileSystem rootFs = FileSystem.get(CONF);
final Path renameDir = new Path("/test/async_rename_exception/");
final Path src = new Path(renameDir, "src");
final Path dst = new Path(renameDir, "dst");
rootFs.mkdirs(src);
AsyncDistributedFileSystem adfs = USER1
.doAs(new PrivilegedExceptionAction<AsyncDistributedFileSystem>() {
@Override
public AsyncDistributedFileSystem run() throws Exception {
return gCluster.getFileSystem().getAsyncDistributedFileSystem();
}
});
try {
Future<Void> returnFuture = adfs.rename(src, dst, Rename.OVERWRITE);
returnFuture.get();
} catch (ExecutionException e) {
checkPermissionDenied(e, src);
}
}
private void checkPermissionDenied(final Exception e, final Path dir) {
assertTrue(e.getCause() instanceof ExecutionException);
assertTrue("Permission denied messages must carry AccessControlException",
e.getMessage().contains("AccessControlException"));
assertTrue("Permission denied messages must carry the username", e
.getMessage().contains(USER1_NAME));
assertTrue("Permission denied messages must carry the path parent", e
.getMessage().contains(dir.getParent().toUri().getPath()));
}
}