diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Quota.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Quota.java index dbb6ffa0fd..413a4e1d09 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Quota.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Quota.java @@ -199,7 +199,7 @@ private List getQuotaRemoteLocations(String path) if (manager != null) { Set childrenPaths = manager.getPaths(path); for (String childPath : childrenPaths) { - locations.addAll(rpcServer.getLocationsForPath(childPath, true)); + locations.addAll(rpcServer.getLocationsForPath(childPath, true, false)); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterQuotaUpdateService.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterQuotaUpdateService.java index 9fc93c1550..506e2ee2c8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterQuotaUpdateService.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterQuotaUpdateService.java @@ -27,6 +27,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.QuotaUsage; import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.server.federation.store.MountTableStore; import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest; import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesResponse; @@ -83,13 +84,40 @@ protected void periodicInvoke() { RouterQuotaUsage oldQuota = entry.getQuota(); long nsQuota = oldQuota.getQuota(); long ssQuota = oldQuota.getSpaceQuota(); - // Call RouterRpcServer#getQuotaUsage for getting current quota usage. - QuotaUsage currentQuotaUsage = this.rpcServer.getQuotaModule() - .getQuotaUsage(src); + + QuotaUsage currentQuotaUsage = null; + + // Check whether destination path exists in filesystem. If destination + // is not present, reset the usage. For other mount entry get current + // quota usage + HdfsFileStatus ret = this.rpcServer.getFileInfo(src); + if (ret == null) { + currentQuotaUsage = new RouterQuotaUsage.Builder() + .fileAndDirectoryCount(0) + .quota(nsQuota) + .spaceConsumed(0) + .spaceQuota(ssQuota).build(); + } else { + // Call RouterRpcServer#getQuotaUsage for getting current quota usage. + // If any exception occurs catch it and proceed with other entries. + try { + currentQuotaUsage = this.rpcServer.getQuotaModule() + .getQuotaUsage(src); + } catch (IOException ioe) { + LOG.error("Unable to get quota usage for " + src, ioe); + continue; + } + } + // If quota is not set in some subclusters under federation path, // set quota for this path. if (currentQuotaUsage.getQuota() == HdfsConstants.QUOTA_DONT_SET) { - this.rpcServer.setQuota(src, nsQuota, ssQuota, null); + try { + this.rpcServer.setQuota(src, nsQuota, ssQuota, null); + } catch (IOException ioe) { + LOG.error("Unable to set quota at remote location for " + + src, ioe); + } } RouterQuotaUsage newQuota = generateNewQuota(oldQuota, @@ -221,7 +249,12 @@ private void updateMountTableEntries(List updateMountTables) for (MountTable entry : updateMountTables) { UpdateMountTableEntryRequest updateRequest = UpdateMountTableEntryRequest .newInstance(entry); - getMountTableStore().updateMountTableEntry(updateRequest); + try { + getMountTableStore().updateMountTableEntry(updateRequest); + } catch (IOException e) { + LOG.error("Quota update error for mount entry " + + entry.getSourcePath(), e); + } } } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterQuota.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterQuota.java index c331c6bdb2..431b394796 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterQuota.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterQuota.java @@ -20,6 +20,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNull; +import static org.junit.Assert.fail; import java.io.IOException; import java.util.Collections; @@ -410,8 +411,7 @@ public void testQuotaUpdating() throws Exception { updateService.periodicInvoke(); // verify initial quota value - List results = getMountTable(path); - MountTable updatedMountTable = !results.isEmpty() ? results.get(0) : null; + MountTable updatedMountTable = getMountTable(path); RouterQuotaUsage quota = updatedMountTable.getQuota(); assertEquals(nsQuota, quota.getQuota()); assertEquals(ssQuota, quota.getSpaceQuota()); @@ -426,8 +426,7 @@ public void testQuotaUpdating() throws Exception { appendData(path + "/file", routerClient, BLOCK_SIZE); updateService.periodicInvoke(); - results = getMountTable(path); - updatedMountTable = !results.isEmpty() ? results.get(0) : null; + updatedMountTable = getMountTable(path); quota = updatedMountTable.getQuota(); // verify if quota has been updated in state store @@ -443,17 +442,18 @@ public void testQuotaUpdating() throws Exception { * @return If it was successfully got. * @throws IOException Problems getting entries. */ - private List getMountTable(String path) throws IOException { + private MountTable getMountTable(String path) throws IOException { // Reload the Router cache resolver.loadCache(true); RouterClient client = routerContext.getAdminClient(); MountTableManager mountTableManager = client.getMountTableManager(); GetMountTableEntriesRequest getRequest = GetMountTableEntriesRequest .newInstance(path); - GetMountTableEntriesResponse removeResponse = mountTableManager + GetMountTableEntriesResponse response = mountTableManager .getMountTableEntries(getRequest); + List results = response.getEntries(); - return removeResponse.getEntries(); + return !results.isEmpty() ? results.get(0) : null; } @Test @@ -493,4 +493,200 @@ public void testQuotaSynchronization() throws IOException { assertEquals(updateNsQuota, realQuota.getQuota()); assertEquals(updateSsQuota, realQuota.getSpaceQuota()); } -} + + @Test + public void testQuotaRefreshAfterQuotaExceed() throws Exception { + long nsQuota = 3; + long ssQuota = 100; + final FileSystem nnFs1 = nnContext1.getFileSystem(); + final FileSystem nnFs2 = nnContext2.getFileSystem(); + + // Add two mount tables: + // /setquota1 --> ns0---testdir11 + // /setquota2 --> ns1---testdir12 + nnFs1.mkdirs(new Path("/testdir11")); + nnFs2.mkdirs(new Path("/testdir12")); + MountTable mountTable1 = MountTable.newInstance("/setquota1", + Collections.singletonMap("ns0", "/testdir11")); + mountTable1 + .setQuota(new RouterQuotaUsage.Builder().quota(nsQuota) + .spaceQuota(ssQuota).build()); + addMountTable(mountTable1); + + MountTable mountTable2 = MountTable.newInstance("/setquota2", + Collections.singletonMap("ns1", "/testdir12")); + mountTable2 + .setQuota(new RouterQuotaUsage.Builder().quota(nsQuota) + .spaceQuota(ssQuota).build()); + addMountTable(mountTable2); + + final FileSystem routerFs = routerContext.getFileSystem(); + // Create directory to make directory count equals to nsQuota + routerFs.mkdirs(new Path("/setquota1/" + UUID.randomUUID())); + routerFs.mkdirs(new Path("/setquota1/" + UUID.randomUUID())); + + // create one more directory to exceed the nsQuota + routerFs.mkdirs(new Path("/setquota1/" + UUID.randomUUID())); + + RouterQuotaUpdateService updateService = routerContext.getRouter() + .getQuotaCacheUpdateService(); + // Call RouterQuotaUpdateService#periodicInvoke to update quota cache + updateService.periodicInvoke(); + // Reload the Router cache + resolver.loadCache(true); + + RouterQuotaManager quotaManager = + routerContext.getRouter().getQuotaManager(); + ClientProtocol client1 = nnContext1.getClient().getNamenode(); + ClientProtocol client2 = nnContext2.getClient().getNamenode(); + QuotaUsage quota1 = client1.getQuotaUsage("/testdir11"); + QuotaUsage quota2 = client2.getQuotaUsage("/testdir12"); + QuotaUsage cacheQuota1 = quotaManager.getQuotaUsage("/setquota1"); + QuotaUsage cacheQuota2 = quotaManager.getQuotaUsage("/setquota2"); + + // Verify quota usage + assertEquals(4, quota1.getFileAndDirectoryCount()); + assertEquals(4, cacheQuota1.getFileAndDirectoryCount()); + assertEquals(1, quota2.getFileAndDirectoryCount()); + assertEquals(1, cacheQuota2.getFileAndDirectoryCount()); + + try { + // create new directory to trigger NSQuotaExceededException + routerFs.mkdirs(new Path("/testdir11/" + UUID.randomUUID())); + fail("Mkdir should be failed under dir /testdir11."); + } catch (NSQuotaExceededException ignored) { + } + + // Create directory under the other mount point + routerFs.mkdirs(new Path("/setquota2/" + UUID.randomUUID())); + routerFs.mkdirs(new Path("/setquota2/" + UUID.randomUUID())); + + // Call RouterQuotaUpdateService#periodicInvoke to update quota cache + updateService.periodicInvoke(); + + quota1 = client1.getQuotaUsage("/testdir11"); + cacheQuota1 = quotaManager.getQuotaUsage("/setquota1"); + quota2 = client2.getQuotaUsage("/testdir12"); + cacheQuota2 = quotaManager.getQuotaUsage("/setquota2"); + + // Verify whether quota usage cache is update by periodicInvoke(). + assertEquals(4, quota1.getFileAndDirectoryCount()); + assertEquals(4, cacheQuota1.getFileAndDirectoryCount()); + assertEquals(3, quota2.getFileAndDirectoryCount()); + assertEquals(3, cacheQuota2.getFileAndDirectoryCount()); + } + + /** + * Verify whether mount table and quota usage cache is updated properly. + * {@link RouterQuotaUpdateService#periodicInvoke()} should be able to update + * the cache and the mount table even if the destination directory for some + * mount entry is not present in the filesystem. + */ + @Test + public void testQuotaRefreshWhenDestinationNotPresent() throws Exception { + long nsQuota = 5; + long ssQuota = 3*BLOCK_SIZE; + final FileSystem nnFs = nnContext1.getFileSystem(); + + // Add three mount tables: + // /setdir1 --> ns0---testdir13 + // /setdir2 --> ns0---testdir14 + // Create destination directory + nnFs.mkdirs(new Path("/testdir13")); + nnFs.mkdirs(new Path("/testdir14")); + + MountTable mountTable = MountTable.newInstance("/setdir1", + Collections.singletonMap("ns0", "/testdir13")); + mountTable + .setQuota(new RouterQuotaUsage.Builder().quota(nsQuota) + .spaceQuota(ssQuota).build()); + addMountTable(mountTable); + + mountTable = MountTable.newInstance("/setdir2", + Collections.singletonMap("ns0", "/testdir14")); + mountTable + .setQuota(new RouterQuotaUsage.Builder().quota(nsQuota) + .spaceQuota(ssQuota).build()); + addMountTable(mountTable); + + final DFSClient routerClient = routerContext.getClient(); + // Create file + routerClient.create("/setdir1/file1", true).close(); + routerClient.create("/setdir2/file2", true).close(); + // append data to the file + appendData("/setdir1/file1", routerClient, BLOCK_SIZE); + appendData("/setdir2/file2", routerClient, BLOCK_SIZE); + + RouterQuotaUpdateService updateService = + routerContext.getRouter().getQuotaCacheUpdateService(); + // Update quota cache + updateService.periodicInvoke(); + // Reload the Router cache + resolver.loadCache(true); + + ClientProtocol client1 = nnContext1.getClient().getNamenode(); + RouterQuotaManager quotaManager = + routerContext.getRouter().getQuotaManager(); + QuotaUsage quota1 = client1.getQuotaUsage("/testdir13"); + QuotaUsage quota2 = client1.getQuotaUsage("/testdir14"); + QuotaUsage cacheQuota1 = quotaManager.getQuotaUsage("/setdir1"); + QuotaUsage cacheQuota2 = quotaManager.getQuotaUsage("/setdir2"); + + // Get quota details in mount table + MountTable updatedMountTable = getMountTable("/setdir1"); + RouterQuotaUsage mountQuota1 = updatedMountTable.getQuota(); + updatedMountTable = getMountTable("/setdir2"); + RouterQuotaUsage mountQuota2 = updatedMountTable.getQuota(); + + // Verify quota usage + assertEquals(2, quota1.getFileAndDirectoryCount()); + assertEquals(2, cacheQuota1.getFileAndDirectoryCount()); + assertEquals(2, mountQuota1.getFileAndDirectoryCount()); + assertEquals(2, quota2.getFileAndDirectoryCount()); + assertEquals(2, cacheQuota2.getFileAndDirectoryCount()); + assertEquals(2, mountQuota2.getFileAndDirectoryCount()); + assertEquals(BLOCK_SIZE, quota1.getSpaceConsumed()); + assertEquals(BLOCK_SIZE, cacheQuota1.getSpaceConsumed()); + assertEquals(BLOCK_SIZE, mountQuota1.getSpaceConsumed()); + assertEquals(BLOCK_SIZE, quota2.getSpaceConsumed()); + assertEquals(BLOCK_SIZE, cacheQuota2.getSpaceConsumed()); + assertEquals(BLOCK_SIZE, mountQuota2.getSpaceConsumed()); + + FileSystem routerFs = routerContext.getFileSystem(); + // Remove destination directory for the mount entry + routerFs.delete(new Path("/setdir1"), true); + + // Create file + routerClient.create("/setdir2/file3", true).close(); + // append data to the file + appendData("/setdir2/file3", routerClient, BLOCK_SIZE); + int updatedSpace = BLOCK_SIZE + BLOCK_SIZE; + + // Update quota cache + updateService.periodicInvoke(); + + quota2 = client1.getQuotaUsage("/testdir14"); + cacheQuota1 = quotaManager.getQuotaUsage("/setdir1"); + cacheQuota2 = quotaManager.getQuotaUsage("/setdir2"); + + // Get quota details in mount table + updatedMountTable = getMountTable("/setdir1"); + mountQuota1 = updatedMountTable.getQuota(); + updatedMountTable = getMountTable("/setdir2"); + mountQuota2 = updatedMountTable.getQuota(); + + // If destination is not present the quota usage should be reset to 0 + assertEquals(0, cacheQuota1.getFileAndDirectoryCount()); + assertEquals(0, mountQuota1.getFileAndDirectoryCount()); + assertEquals(0, cacheQuota1.getSpaceConsumed()); + assertEquals(0, mountQuota1.getSpaceConsumed()); + + // Verify current quota usage for other mount entries + assertEquals(3, quota2.getFileAndDirectoryCount()); + assertEquals(3, cacheQuota2.getFileAndDirectoryCount()); + assertEquals(3, mountQuota2.getFileAndDirectoryCount()); + assertEquals(updatedSpace, quota2.getSpaceConsumed()); + assertEquals(updatedSpace, cacheQuota2.getSpaceConsumed()); + assertEquals(updatedSpace, mountQuota2.getSpaceConsumed()); + } +} \ No newline at end of file