diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFS.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFS.java index ddcf492922..67262dd466 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFS.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFS.java @@ -29,16 +29,13 @@ import static org.apache.hadoop.hdfs.server.namenode.AclTestHelpers.aclEntry; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertTrue; -import static org.junit.Assert.assertFalse; import static org.junit.Assert.fail; import java.io.IOException; import java.security.PrivilegedExceptionAction; -import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Random; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; @@ -46,21 +43,15 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeys; -import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.Options.Rename; import org.apache.hadoop.fs.permission.AclEntry; import org.apache.hadoop.fs.permission.AclStatus; -import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.hadoop.hdfs.TestDFSPermission.PermissionGenerator; import org.apache.hadoop.hdfs.server.namenode.AclTestHelpers; import org.apache.hadoop.hdfs.server.namenode.FSAclBaseTest; import org.apache.hadoop.ipc.AsyncCallLimitExceededException; -import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.util.Time; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -72,28 +63,21 @@ * */ public class TestAsyncDFS { public static final Log LOG = LogFactory.getLog(TestAsyncDFS.class); - private final short replFactor = 1; - private final long blockSize = 512; - private long fileLen = blockSize * 3; - private final long seed = Time.now(); - private final Random r = new Random(seed); - private final PermissionGenerator permGenerator = new PermissionGenerator(r); - private static final int NUM_TESTS = 50; + private static final int NUM_TESTS = 1000; private static final int NUM_NN_HANDLER = 10; - private static final int ASYNC_CALL_LIMIT = 1000; + private static final int ASYNC_CALL_LIMIT = 100; private Configuration conf; private MiniDFSCluster cluster; private FileSystem fs; - private AsyncDistributedFileSystem adfs; @Before public void setup() throws IOException { conf = new HdfsConfiguration(); // explicitly turn on acl conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY, true); - // explicitly turn on permission checking - conf.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, true); + // explicitly turn on ACL + conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_ACLS_ENABLED_KEY, true); // set the limit of max async calls conf.setInt(CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY, ASYNC_CALL_LIMIT); @@ -102,7 +86,6 @@ public void setup() throws IOException { cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build(); cluster.waitActive(); fs = FileSystem.get(conf); - adfs = cluster.getFileSystem().getAsyncDistributedFileSystem(); } @After @@ -147,9 +130,13 @@ public void testBatchAsyncAcl() throws Exception { final String basePath = "testBatchAsyncAcl"; final Path parent = new Path(String.format("/test/%s/", basePath)); + AsyncDistributedFileSystem adfs = cluster.getFileSystem() + .getAsyncDistributedFileSystem(); + // prepare test - final Path[] paths = new Path[NUM_TESTS]; - for (int i = 0; i < NUM_TESTS; i++) { + int count = NUM_TESTS; + final Path[] paths = new Path[count]; + for (int i = 0; i < count; i++) { paths[i] = new Path(parent, "acl" + i); FileSystem.mkdirs(fs, paths[i], FsPermission.createImmutable((short) 0750)); @@ -166,7 +153,7 @@ public void testBatchAsyncAcl() throws Exception { int start = 0, end = 0; try { // test setAcl - for (int i = 0; i < NUM_TESTS; i++) { + for (int i = 0; i < count; i++) { for (;;) { try { Future retFuture = adfs.setAcl(paths[i], aclSpec); @@ -179,12 +166,12 @@ public void testBatchAsyncAcl() throws Exception { } } } - waitForAclReturnValues(setAclRetFutures, end, NUM_TESTS); + waitForAclReturnValues(setAclRetFutures, end, count); // test getAclStatus start = 0; end = 0; - for (int i = 0; i < NUM_TESTS; i++) { + for (int i = 0; i < count; i++) { for (;;) { try { Future retFuture = adfs.getAclStatus(paths[i]); @@ -198,23 +185,13 @@ public void testBatchAsyncAcl() throws Exception { } } } - waitForAclReturnValues(getAclRetFutures, end, NUM_TESTS, paths, + waitForAclReturnValues(getAclRetFutures, end, count, paths, expectedAclSpec); } catch (Exception e) { throw e; } } - static void waitForReturnValues(final Map> retFutures, - final int start, final int end) - throws InterruptedException, ExecutionException { - LOG.info(String.format("calling waitForReturnValues [%d, %d)", start, end)); - for (int i = start; i < end; i++) { - LOG.info("calling Future#get #" + i); - retFutures.get(i).get(); - } - } - private void waitForAclReturnValues( final Map> aclRetFutures, final int start, final int end) throws InterruptedException, ExecutionException { @@ -289,12 +266,9 @@ public void testAsyncAPIWithException() throws Exception { final Path parent = new Path("/test/async_api_exception/"); final Path aclDir = new Path(parent, "aclDir"); - final Path src = new Path(parent, "src"); - final Path dst = new Path(parent, "dst"); - fs.mkdirs(aclDir, FsPermission.createImmutable((short) 0700)); - fs.mkdirs(src); + fs.mkdirs(aclDir, FsPermission.createImmutable((short) 0770)); - AsyncDistributedFileSystem adfs1 = ugi1 + AsyncDistributedFileSystem adfs = ugi1 .doAs(new PrivilegedExceptionAction() { @Override public AsyncDistributedFileSystem run() throws Exception { @@ -303,36 +277,9 @@ public AsyncDistributedFileSystem run() throws Exception { }); Future retFuture; - // test rename - try { - retFuture = adfs1.rename(src, dst, Rename.OVERWRITE); - retFuture.get(); - } catch (ExecutionException e) { - checkPermissionDenied(e, src, user1); - assertTrue("Permission denied messages must carry the path parent", e - .getMessage().contains(src.getParent().toUri().getPath())); - } - - // test setPermission - FsPermission fsPerm = new FsPermission(permGenerator.next()); - try { - retFuture = adfs1.setPermission(src, fsPerm); - retFuture.get(); - } catch (ExecutionException e) { - checkPermissionDenied(e, src, user1); - } - - // test setOwner - try { - retFuture = adfs1.setOwner(src, "user1", "group2"); - retFuture.get(); - } catch (ExecutionException e) { - checkPermissionDenied(e, src, user1); - } - // test setAcl try { - retFuture = adfs1.setAcl(aclDir, + retFuture = adfs.setAcl(aclDir, Lists.newArrayList(aclEntry(ACCESS, USER, ALL))); retFuture.get(); fail("setAcl should fail with permission denied"); @@ -342,7 +289,7 @@ public AsyncDistributedFileSystem run() throws Exception { // test getAclStatus try { - Future aclRetFuture = adfs1.getAclStatus(aclDir); + Future aclRetFuture = adfs.getAclStatus(aclDir); aclRetFuture.get(); fail("getAclStatus should fail with permission denied"); } catch (ExecutionException e) { @@ -360,148 +307,4 @@ public static void checkPermissionDenied(final Exception e, final Path dir, assertTrue("Permission denied messages must carry the name of the path", e.getMessage().contains(dir.getName())); } - - - @Test(timeout = 120000) - public void testConcurrentAsyncAPI() throws Exception { - String group1 = "group1"; - String group2 = "group2"; - String user1 = "user1"; - - // create fake mapping for the groups - Map u2gMap = new HashMap(1); - u2gMap.put(user1, new String[] {group1, group2}); - DFSTestUtil.updateConfWithFakeGroupMapping(conf, u2gMap); - - // prepare for test - final Path parent = new Path( - String.format("/test/%s/", "testConcurrentAsyncAPI")); - final Path[] srcs = new Path[NUM_TESTS]; - final Path[] dsts = new Path[NUM_TESTS]; - short[] permissions = new short[NUM_TESTS]; - for (int i = 0; i < NUM_TESTS; i++) { - srcs[i] = new Path(parent, "src" + i); - dsts[i] = new Path(parent, "dst" + i); - DFSTestUtil.createFile(fs, srcs[i], fileLen, replFactor, 1); - DFSTestUtil.createFile(fs, dsts[i], fileLen, replFactor, 1); - assertTrue(fs.exists(srcs[i])); - assertTrue(fs.getFileStatus(srcs[i]).isFile()); - assertTrue(fs.exists(dsts[i])); - assertTrue(fs.getFileStatus(dsts[i]).isFile()); - permissions[i] = permGenerator.next(); - } - - Map> renameRetFutures = - new HashMap>(); - Map> permRetFutures = - new HashMap>(); - Map> ownerRetFutures = - new HashMap>(); - int start = 0, end = 0; - // test rename - for (int i = 0; i < NUM_TESTS; i++) { - for (;;) { - try { - Future returnFuture = adfs.rename(srcs[i], dsts[i], - Rename.OVERWRITE); - renameRetFutures.put(i, returnFuture); - break; - } catch (AsyncCallLimitExceededException e) { - start = end; - end = i; - waitForReturnValues(renameRetFutures, start, end); - } - } - } - - // wait for completing the calls - waitForAclReturnValues(renameRetFutures, end, NUM_TESTS); - - // verify the src should not exist, dst should - for (int i = 0; i < NUM_TESTS; i++) { - assertFalse(fs.exists(srcs[i])); - assertTrue(fs.exists(dsts[i])); - } - - // test permissions - for (int i = 0; i < NUM_TESTS; i++) { - for (;;) { - try { - Future retFuture = adfs.setPermission(dsts[i], - new FsPermission(permissions[i])); - permRetFutures.put(i, retFuture); - break; - } catch (AsyncCallLimitExceededException e) { - start = end; - end = i; - waitForReturnValues(permRetFutures, start, end); - } - } - } - // wait for completing the calls - waitForAclReturnValues(permRetFutures, end, NUM_TESTS); - - // verify the permission - for (int i = 0; i < NUM_TESTS; i++) { - assertTrue(fs.exists(dsts[i])); - FsPermission fsPerm = new FsPermission(permissions[i]); - checkAccessPermissions(fs.getFileStatus(dsts[i]), fsPerm.getUserAction()); - } - - // test setOwner - start = 0; - end = 0; - for (int i = 0; i < NUM_TESTS; i++) { - for (;;) { - try { - Future retFuture = adfs.setOwner(dsts[i], "user1", "group2"); - ownerRetFutures.put(i, retFuture); - break; - } catch (AsyncCallLimitExceededException e) { - start = end; - end = i; - waitForReturnValues(ownerRetFutures, start, end); - } - } - } - // wait for completing the calls - waitForAclReturnValues(ownerRetFutures, end, NUM_TESTS); - - // verify the owner - for (int i = 0; i < NUM_TESTS; i++) { - assertTrue(fs.exists(dsts[i])); - assertTrue("user1".equals(fs.getFileStatus(dsts[i]).getOwner())); - assertTrue("group2".equals(fs.getFileStatus(dsts[i]).getGroup())); - } - } - - static void checkAccessPermissions(FileStatus stat, FsAction mode) - throws IOException { - checkAccessPermissions(UserGroupInformation.getCurrentUser(), stat, mode); - } - - static void checkAccessPermissions(final UserGroupInformation ugi, - FileStatus stat, FsAction mode) throws IOException { - FsPermission perm = stat.getPermission(); - String user = ugi.getShortUserName(); - List groups = Arrays.asList(ugi.getGroupNames()); - - if (user.equals(stat.getOwner())) { - if (perm.getUserAction().implies(mode)) { - return; - } - } else if (groups.contains(stat.getGroup())) { - if (perm.getGroupAction().implies(mode)) { - return; - } - } else { - if (perm.getOtherAction().implies(mode)) { - return; - } - } - throw new AccessControlException(String.format( - "Permission denied: user=%s, path=\"%s\":%s:%s:%s%s", user, stat - .getPath(), stat.getOwner(), stat.getGroup(), - stat.isDirectory() ? "d" : "-", perm)); - } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java index 8d3e509d9d..03c8151a45 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAsyncDFSRename.java @@ -19,11 +19,14 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; -import static org.junit.Assert.assertEquals; import java.io.IOException; +import java.security.PrivilegedExceptionAction; +import java.util.Arrays; import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.Random; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; @@ -31,157 +34,521 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Options.Rename; +import org.apache.hadoop.fs.permission.FsAction; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.TestDFSPermission.PermissionGenerator; +import org.apache.hadoop.hdfs.protocol.LocatedBlocks; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; +import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; import org.apache.hadoop.ipc.AsyncCallLimitExceededException; -import org.junit.After; -import org.junit.Before; +import org.apache.hadoop.security.AccessControlException; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.util.Time; import org.junit.Test; public class TestAsyncDFSRename { public static final Log LOG = LogFactory.getLog(TestAsyncDFSRename.class); - private final short replFactor = 1; + private final long seed = Time.now(); + private final Random r = new Random(seed); + private final PermissionGenerator permGenerator = new PermissionGenerator(r); + private final short replFactor = 2; private final long blockSize = 512; private long fileLen = blockSize * 3; - private static final int NUM_TESTS = 50; - private static final int NUM_NN_HANDLER = 10; - private static final int ASYNC_CALL_LIMIT = 1000; - private Configuration conf; - private MiniDFSCluster cluster; - private FileSystem fs; - private AsyncDistributedFileSystem adfs; - - @Before - public void setup() throws IOException { - conf = new HdfsConfiguration(); - // set the limit of max async calls - conf.setInt(CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY, - ASYNC_CALL_LIMIT); - // set server handlers - conf.setInt(DFSConfigKeys.DFS_NAMENODE_HANDLER_COUNT_KEY, NUM_NN_HANDLER); - cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); + /** + * Check the blocks of dst file are cleaned after rename with overwrite + * Restart NN to check the rename successfully + */ + @Test(timeout = 60000) + public void testAsyncRenameWithOverwrite() throws Exception { + Configuration conf = new Configuration(); + MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes( + replFactor).build(); cluster.waitActive(); - fs = FileSystem.get(conf); - adfs = cluster.getFileSystem().getAsyncDistributedFileSystem(); - } + DistributedFileSystem dfs = cluster.getFileSystem(); + AsyncDistributedFileSystem adfs = dfs.getAsyncDistributedFileSystem(); - @After - public void tearDown() throws IOException { - if (fs != null) { - fs.close(); - fs = null; - } - if (cluster != null) { - cluster.shutdown(); - cluster = null; + try { + String src = "/foo/src"; + String dst = "/foo/dst"; + String src2 = "/foo/src2"; + String dst2 = "/foo/dst2"; + Path srcPath = new Path(src); + Path dstPath = new Path(dst); + Path srcPath2 = new Path(src2); + Path dstPath2 = new Path(dst2); + + DFSTestUtil.createFile(dfs, srcPath, fileLen, replFactor, 1); + DFSTestUtil.createFile(dfs, dstPath, fileLen, replFactor, 1); + DFSTestUtil.createFile(dfs, srcPath2, fileLen, replFactor, 1); + DFSTestUtil.createFile(dfs, dstPath2, fileLen, replFactor, 1); + + LocatedBlocks lbs = NameNodeAdapter.getBlockLocations( + cluster.getNameNode(), dst, 0, fileLen); + LocatedBlocks lbs2 = NameNodeAdapter.getBlockLocations( + cluster.getNameNode(), dst2, 0, fileLen); + BlockManager bm = NameNodeAdapter.getNamesystem(cluster.getNameNode()) + .getBlockManager(); + assertTrue(bm.getStoredBlock(lbs.getLocatedBlocks().get(0).getBlock() + .getLocalBlock()) != null); + assertTrue(bm.getStoredBlock(lbs2.getLocatedBlocks().get(0).getBlock() + .getLocalBlock()) != null); + + Future retVal1 = adfs.rename(srcPath, dstPath, Rename.OVERWRITE); + Future retVal2 = adfs.rename(srcPath2, dstPath2, Rename.OVERWRITE); + retVal1.get(); + retVal2.get(); + + assertTrue(bm.getStoredBlock(lbs.getLocatedBlocks().get(0).getBlock() + .getLocalBlock()) == null); + assertTrue(bm.getStoredBlock(lbs2.getLocatedBlocks().get(0).getBlock() + .getLocalBlock()) == null); + + // Restart NN and check the rename successfully + cluster.restartNameNodes(); + assertFalse(dfs.exists(srcPath)); + assertTrue(dfs.exists(dstPath)); + assertFalse(dfs.exists(srcPath2)); + assertTrue(dfs.exists(dstPath2)); + } finally { + if (dfs != null) { + dfs.close(); + } + if (cluster != null) { + cluster.shutdown(); + } } } @Test(timeout = 60000) public void testCallGetReturnValueMultipleTimes() throws Exception { - final Path parent = new Path("/test/testCallGetReturnValueMultipleTimes/"); - assertTrue(fs.mkdirs(parent)); + final Path renameDir = new Path( + "/test/testCallGetReturnValueMultipleTimes/"); + final Configuration conf = new HdfsConfiguration(); + conf.setInt(CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY, 200); + final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) + .numDataNodes(2).build(); + cluster.waitActive(); + final DistributedFileSystem dfs = cluster.getFileSystem(); + final AsyncDistributedFileSystem adfs = dfs.getAsyncDistributedFileSystem(); + final int count = 100; + final Map> returnFutures = new HashMap>(); - // prepare test - final Path[] srcs = new Path[NUM_TESTS]; - final Path[] dsts = new Path[NUM_TESTS]; - for (int i = 0; i < NUM_TESTS; i++) { - srcs[i] = new Path(parent, "src" + i); - dsts[i] = new Path(parent, "dst" + i); - DFSTestUtil.createFile(fs, srcs[i], fileLen, replFactor, 1); - DFSTestUtil.createFile(fs, dsts[i], fileLen, replFactor, 1); - } + assertTrue(dfs.mkdirs(renameDir)); - // concurrently invoking many rename - final Map> reFutures = - new HashMap>(); - for (int i = 0; i < NUM_TESTS; i++) { - Future retFuture = adfs.rename(srcs[i], dsts[i], - Rename.OVERWRITE); - reFutures.put(i, retFuture); - } + try { + // concurrently invoking many rename + for (int i = 0; i < count; i++) { + Path src = new Path(renameDir, "src" + i); + Path dst = new Path(renameDir, "dst" + i); + DFSTestUtil.createFile(dfs, src, fileLen, replFactor, 1); + DFSTestUtil.createFile(dfs, dst, fileLen, replFactor, 1); + Future returnFuture = adfs.rename(src, dst, Rename.OVERWRITE); + returnFutures.put(i, returnFuture); + } - assertEquals(NUM_TESTS, reFutures.size()); - - for (int i = 0; i < 5; i++) { - verifyCallGetReturnValueMultipleTimes(reFutures, srcs, dsts); + for (int i = 0; i < 5; i++) { + verifyCallGetReturnValueMultipleTimes(returnFutures, count, cluster, + renameDir, dfs); + } + } finally { + if (dfs != null) { + dfs.close(); + } + if (cluster != null) { + cluster.shutdown(); + } } } private void verifyCallGetReturnValueMultipleTimes( - final Map> reFutures, final Path[] srcs, - final Path[] dsts) + Map> returnFutures, int count, + MiniDFSCluster cluster, Path renameDir, DistributedFileSystem dfs) throws InterruptedException, ExecutionException, IOException { - // wait for completing the calls - waitForReturnValues(reFutures, 0, NUM_TESTS); + for (int i = 0; i < count; i++) { + returnFutures.get(i).get(); + } - // verify the src dir should not exist, dst should - verifyRenames(srcs, dsts); + // Restart NN and check the rename successfully + cluster.restartNameNodes(); + + // very the src dir should not exist, dst should + for (int i = 0; i < count; i++) { + Path src = new Path(renameDir, "src" + i); + Path dst = new Path(renameDir, "dst" + i); + assertFalse(dfs.exists(src)); + assertTrue(dfs.exists(dst)); + } + } + + @Test + public void testConservativeConcurrentAsyncRenameWithOverwrite() + throws Exception { + internalTestConcurrentAsyncRenameWithOverwrite(100, + "testAggressiveConcurrentAsyncRenameWithOverwrite"); } @Test(timeout = 60000) - public void testConcurrentAsyncRename() throws Exception { - final Path parent = new Path( - String.format("/test/%s/", "testConcurrentAsyncRename")); - assertTrue(fs.mkdirs(parent)); + public void testAggressiveConcurrentAsyncRenameWithOverwrite() + throws Exception { + internalTestConcurrentAsyncRenameWithOverwrite(10000, + "testConservativeConcurrentAsyncRenameWithOverwrite"); + } - // prepare test - final Path[] srcs = new Path[NUM_TESTS]; - final Path[] dsts = new Path[NUM_TESTS]; - for (int i = 0; i < NUM_TESTS; i++) { + private void internalTestConcurrentAsyncRenameWithOverwrite( + final int asyncCallLimit, final String basePath) throws Exception { + final Path renameDir = new Path(String.format("/test/%s/", basePath)); + Configuration conf = new HdfsConfiguration(); + conf.setInt(CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY, + asyncCallLimit); + MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2) + .build(); + cluster.waitActive(); + DistributedFileSystem dfs = cluster.getFileSystem(); + AsyncDistributedFileSystem adfs = dfs.getAsyncDistributedFileSystem(); + int count = 1000; + int start = 0, end = 0; + Map> returnFutures = new HashMap>(); + + assertTrue(dfs.mkdirs(renameDir)); + + try { + // concurrently invoking many rename + for (int i = 0; i < count; i++) { + Path src = new Path(renameDir, "src" + i); + Path dst = new Path(renameDir, "dst" + i); + DFSTestUtil.createFile(dfs, src, fileLen, replFactor, 1); + DFSTestUtil.createFile(dfs, dst, fileLen, replFactor, 1); + for (;;) { + try { + LOG.info("rename #" + i); + Future returnFuture = adfs.rename(src, dst, Rename.OVERWRITE); + returnFutures.put(i, returnFuture); + break; + } catch (AsyncCallLimitExceededException e) { + /** + * reached limit of async calls, fetch results of finished async + * calls to let follow-on calls go + */ + LOG.error(e); + start = end; + end = i; + LOG.info(String.format("start=%d, end=%d, i=%d", start, end, i)); + waitForReturnValues(returnFutures, start, end); + } + } + } + + // wait for completing the calls + for (int i = start; i < count; i++) { + returnFutures.get(i).get(); + } + + // Restart NN and check the rename successfully + cluster.restartNameNodes(); + + // very the src dir should not exist, dst should + for (int i = 0; i < count; i++) { + Path src = new Path(renameDir, "src" + i); + Path dst = new Path(renameDir, "dst" + i); + assertFalse(dfs.exists(src)); + assertTrue(dfs.exists(dst)); + } + } finally { + if (dfs != null) { + dfs.close(); + } + if (cluster != null) { + cluster.shutdown(); + } + } + } + + private void waitForReturnValues( + final Map> returnFutures, final int start, + final int end) throws InterruptedException, ExecutionException { + LOG.info(String.format("calling waitForReturnValues [%d, %d)", start, end)); + for (int i = start; i < end; i++) { + LOG.info("calling Future#get #" + i); + returnFutures.get(i).get(); + } + } + + @Test + public void testConservativeConcurrentAsyncAPI() throws Exception { + internalTestConcurrentAsyncAPI(100, "testConservativeConcurrentAsyncAPI"); + } + + @Test(timeout = 60000) + public void testAggressiveConcurrentAsyncAPI() throws Exception { + internalTestConcurrentAsyncAPI(10000, "testAggressiveConcurrentAsyncAPI"); + } + + private void internalTestConcurrentAsyncAPI(final int asyncCallLimit, + final String basePath) throws Exception { + Configuration conf = new HdfsConfiguration(); + String group1 = "group1"; + String group2 = "group2"; + String user1 = "user1"; + int count = 500; + + // explicitly turn on permission checking + conf.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, true); + // set the limit of max async calls + conf.setInt(CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY, + asyncCallLimit); + + // create fake mapping for the groups + Map u2gMap = new HashMap(1); + u2gMap.put(user1, new String[] {group1, group2}); + DFSTestUtil.updateConfWithFakeGroupMapping(conf, u2gMap); + + // start mini cluster + final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) + .numDataNodes(3).build(); + cluster.waitActive(); + AsyncDistributedFileSystem adfs = cluster.getFileSystem() + .getAsyncDistributedFileSystem(); + + // prepare for test + FileSystem rootFs = FileSystem.get(conf); + final Path parent = new Path(String.format("/test/%s/", basePath)); + final Path[] srcs = new Path[count]; + final Path[] dsts = new Path[count]; + short[] permissions = new short[count]; + for (int i = 0; i < count; i++) { srcs[i] = new Path(parent, "src" + i); dsts[i] = new Path(parent, "dst" + i); - DFSTestUtil.createFile(fs, srcs[i], fileLen, replFactor, 1); - DFSTestUtil.createFile(fs, dsts[i], fileLen, replFactor, 1); + DFSTestUtil.createFile(rootFs, srcs[i], fileLen, replFactor, 1); + DFSTestUtil.createFile(rootFs, dsts[i], fileLen, replFactor, 1); + assertTrue(rootFs.exists(srcs[i])); + assertTrue(rootFs.getFileStatus(srcs[i]).isFile()); + assertTrue(rootFs.exists(dsts[i])); + assertTrue(rootFs.getFileStatus(dsts[i]).isFile()); + permissions[i] = permGenerator.next(); } - // concurrently invoking many rename - int start = 0, end = 0; - Map> retFutures = + Map> renameRetFutures = new HashMap>(); - for (int i = 0; i < NUM_TESTS; i++) { + Map> permRetFutures = + new HashMap>(); + Map> ownerRetFutures = + new HashMap>(); + int start = 0, end = 0; + // test rename + for (int i = 0; i < count; i++) { for (;;) { try { - LOG.info("rename #" + i); - Future retFuture = adfs.rename(srcs[i], dsts[i], + Future returnFuture = adfs.rename(srcs[i], dsts[i], Rename.OVERWRITE); - retFutures.put(i, retFuture); + renameRetFutures.put(i, returnFuture); break; } catch (AsyncCallLimitExceededException e) { - /** - * reached limit of async calls, fetch results of finished async calls - * to let follow-on calls go - */ - LOG.error(e); start = end; end = i; - LOG.info(String.format("start=%d, end=%d, i=%d", start, end, i)); - waitForReturnValues(retFutures, start, end); + waitForReturnValues(renameRetFutures, start, end); } } } // wait for completing the calls - waitForReturnValues(retFutures, end, NUM_TESTS); + for (int i = start; i < count; i++) { + renameRetFutures.get(i).get(); + } - // verify the src dir should not exist, dst should - verifyRenames(srcs, dsts); - } + // Restart NN and check the rename successfully + cluster.restartNameNodes(); - private void verifyRenames(final Path[] srcs, final Path[] dsts) - throws IOException { - for (int i = 0; i < NUM_TESTS; i++) { - assertFalse(fs.exists(srcs[i])); - assertTrue(fs.exists(dsts[i])); + // very the src should not exist, dst should + for (int i = 0; i < count; i++) { + assertFalse(rootFs.exists(srcs[i])); + assertTrue(rootFs.exists(dsts[i])); + } + + // test permissions + try { + for (int i = 0; i < count; i++) { + for (;;) { + try { + Future retFuture = adfs.setPermission(dsts[i], + new FsPermission(permissions[i])); + permRetFutures.put(i, retFuture); + break; + } catch (AsyncCallLimitExceededException e) { + start = end; + end = i; + waitForReturnValues(permRetFutures, start, end); + } + } + } + // wait for completing the calls + for (int i = start; i < count; i++) { + permRetFutures.get(i).get(); + } + + // Restart NN and check permission then + cluster.restartNameNodes(); + + // verify the permission + for (int i = 0; i < count; i++) { + assertTrue(rootFs.exists(dsts[i])); + FsPermission fsPerm = new FsPermission(permissions[i]); + checkAccessPermissions(rootFs.getFileStatus(dsts[i]), + fsPerm.getUserAction()); + } + + // test setOwner + start = 0; + end = 0; + for (int i = 0; i < count; i++) { + for (;;) { + try { + Future retFuture = adfs.setOwner(dsts[i], "user1", + "group2"); + ownerRetFutures.put(i, retFuture); + break; + } catch (AsyncCallLimitExceededException e) { + start = end; + end = i; + waitForReturnValues(ownerRetFutures, start, end); + } + } + } + // wait for completing the calls + for (int i = start; i < count; i++) { + ownerRetFutures.get(i).get(); + } + + // Restart NN and check owner then + cluster.restartNameNodes(); + + // verify the owner + for (int i = 0; i < count; i++) { + assertTrue(rootFs.exists(dsts[i])); + assertTrue( + "user1".equals(rootFs.getFileStatus(dsts[i]).getOwner())); + assertTrue( + "group2".equals(rootFs.getFileStatus(dsts[i]).getGroup())); + } + } catch (AccessControlException ace) { + throw ace; + } finally { + if (rootFs != null) { + rootFs.close(); + } + if (cluster != null) { + cluster.shutdown(); + } } } - void waitForReturnValues(final Map> retFutures, - final int start, final int end) - throws InterruptedException, ExecutionException { - TestAsyncDFS.waitForReturnValues(retFutures, start, end); + static void checkAccessPermissions(FileStatus stat, FsAction mode) + throws IOException { + checkAccessPermissions(UserGroupInformation.getCurrentUser(), stat, mode); + } + + static void checkAccessPermissions(final UserGroupInformation ugi, + FileStatus stat, FsAction mode) throws IOException { + FsPermission perm = stat.getPermission(); + String user = ugi.getShortUserName(); + List groups = Arrays.asList(ugi.getGroupNames()); + + if (user.equals(stat.getOwner())) { + if (perm.getUserAction().implies(mode)) { + return; + } + } else if (groups.contains(stat.getGroup())) { + if (perm.getGroupAction().implies(mode)) { + return; + } + } else { + if (perm.getOtherAction().implies(mode)) { + return; + } + } + throw new AccessControlException(String.format( + "Permission denied: user=%s, path=\"%s\":%s:%s:%s%s", user, stat + .getPath(), stat.getOwner(), stat.getGroup(), + stat.isDirectory() ? "d" : "-", perm)); + } + + @Test(timeout = 60000) + public void testAsyncAPIWithException() throws Exception { + Configuration conf = new HdfsConfiguration(); + String group1 = "group1"; + String group2 = "group2"; + String user1 = "user1"; + UserGroupInformation ugi1; + + // explicitly turn on permission checking + conf.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, true); + + // create fake mapping for the groups + Map u2gMap = new HashMap(1); + u2gMap.put(user1, new String[] {group1, group2}); + DFSTestUtil.updateConfWithFakeGroupMapping(conf, u2gMap); + + // Initiate all four users + ugi1 = UserGroupInformation.createUserForTesting(user1, new String[] { + group1, group2 }); + + final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) + .numDataNodes(3).build(); + cluster.waitActive(); + + FileSystem rootFs = FileSystem.get(conf); + final Path renameDir = new Path("/test/async_api_exception/"); + final Path src = new Path(renameDir, "src"); + final Path dst = new Path(renameDir, "dst"); + rootFs.mkdirs(src); + + AsyncDistributedFileSystem adfs = ugi1 + .doAs(new PrivilegedExceptionAction() { + @Override + public AsyncDistributedFileSystem run() throws Exception { + return cluster.getFileSystem().getAsyncDistributedFileSystem(); + } + }); + + Future retFuture; + try { + retFuture = adfs.rename(src, dst, Rename.OVERWRITE); + retFuture.get(); + } catch (ExecutionException e) { + TestAsyncDFS.checkPermissionDenied(e, src, user1); + assertTrue("Permission denied messages must carry the path parent", e + .getMessage().contains(src.getParent().toUri().getPath())); + } + + FsPermission fsPerm = new FsPermission(permGenerator.next()); + try { + retFuture = adfs.setPermission(src, fsPerm); + retFuture.get(); + } catch (ExecutionException e) { + TestAsyncDFS.checkPermissionDenied(e, src, user1); + assertTrue("Permission denied messages must carry the name of the path", + e.getMessage().contains(src.getName())); + } + + try { + retFuture = adfs.setOwner(src, "user1", "group2"); + retFuture.get(); + } catch (ExecutionException e) { + TestAsyncDFS.checkPermissionDenied(e, src, user1); + assertTrue("Permission denied messages must carry the name of the path", + e.getMessage().contains(src.getName())); + } finally { + if (rootFs != null) { + rootFs.close(); + } + if (cluster != null) { + cluster.shutdown(); + } + } } } \ No newline at end of file