From 761594549ec0c6bab50a28a7eb6c741aec7239d7 Mon Sep 17 00:00:00 2001 From: Ayush Saxena Date: Tue, 8 Oct 2019 14:01:44 +0530 Subject: [PATCH] HDFS-14814. RBF: RouterQuotaUpdateService supports inherited rule. Contributed by Jinglun. --- .../hdfs/server/federation/router/Quota.java | 70 ++++++++++++++- .../federation/router/RouterQuotaManager.java | 27 ++++++ .../router/RouterQuotaUpdateService.java | 49 ++++++++--- .../router/TestDisableRouterQuota.java | 10 +++ .../federation/router/TestRouterQuota.java | 88 +++++++++++++++++++ 5 files changed, 226 insertions(+), 18 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Quota.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Quota.java index 48f0b96a8c..df5f319fc7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Quota.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/Quota.java @@ -23,6 +23,8 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Map.Entry; +import java.util.TreeMap; import java.util.Set; import org.apache.hadoop.fs.QuotaUsage; @@ -70,13 +72,30 @@ public Quota(Router router, RouterRpcServer server) { */ public void setQuota(String path, long namespaceQuota, long storagespaceQuota, StorageType type) throws IOException { + setQuotaInternal(path, null, namespaceQuota, storagespaceQuota, type); + } + + /** + * Set quota for the federation path. + * @param path Federation path. + * @param locations Locations of the Federation path. + * @param namespaceQuota Name space quota. + * @param storagespaceQuota Storage space quota. + * @param type StorageType that the space quota is intended to be set on. + * @throws IOException If the quota system is disabled. + */ + void setQuotaInternal(String path, List locations, + long namespaceQuota, long storagespaceQuota, StorageType type) + throws IOException { rpcServer.checkOperation(OperationCategory.WRITE); if (!router.isQuotaEnabled()) { throw new IOException("The quota system is disabled in Router."); } // Set quota for current path and its children mount table path. - final List locations = getQuotaRemoteLocations(path); + if (locations == null) { + locations = getQuotaRemoteLocations(path); + } if (LOG.isDebugEnabled()) { for (RemoteLocation loc : locations) { LOG.debug("Set quota for path: nsId: {}, dest: {}.", @@ -92,12 +111,23 @@ public void setQuota(String path, long namespaceQuota, } /** - * Get quota usage for the federation path. + * Get aggregated quota usage for the federation path. * @param path Federation path. * @return Aggregated quota. * @throws IOException If the quota system is disabled. */ public QuotaUsage getQuotaUsage(String path) throws IOException { + return aggregateQuota(getEachQuotaUsage(path)); + } + + /** + * Get quota usage for the federation path. + * @param path Federation path. + * @return quota usage for each remote location. + * @throws IOException If the quota system is disabled. + */ + Map getEachQuotaUsage(String path) + throws IOException { rpcServer.checkOperation(OperationCategory.READ); if (!router.isQuotaEnabled()) { throw new IOException("The quota system is disabled in Router."); @@ -109,7 +139,39 @@ public QuotaUsage getQuotaUsage(String path) throws IOException { Map results = rpcClient.invokeConcurrent( quotaLocs, method, true, false, QuotaUsage.class); - return aggregateQuota(results); + return results; + } + + /** + * Get global quota for the federation path. + * @param path Federation path. + * @return global quota for path. + * @throws IOException If the quota system is disabled. + */ + QuotaUsage getGlobalQuota(String path) throws IOException { + if (!router.isQuotaEnabled()) { + throw new IOException("The quota system is disabled in Router."); + } + + long nQuota = HdfsConstants.QUOTA_RESET; + long sQuota = HdfsConstants.QUOTA_RESET; + RouterQuotaManager manager = this.router.getQuotaManager(); + TreeMap pts = + manager.getParentsContainingQuota(path); + Entry entry = pts.lastEntry(); + while (entry != null && (nQuota == HdfsConstants.QUOTA_RESET + || sQuota == HdfsConstants.QUOTA_RESET)) { + String ppath = entry.getKey(); + QuotaUsage quota = entry.getValue(); + if (nQuota == HdfsConstants.QUOTA_RESET) { + nQuota = quota.getQuota(); + } + if (sQuota == HdfsConstants.QUOTA_RESET) { + sQuota = quota.getSpaceQuota(); + } + entry = pts.lowerEntry(ppath); + } + return new QuotaUsage.Builder().quota(nQuota).spaceQuota(sQuota).build(); } /** @@ -157,7 +219,7 @@ private List getValidQuotaLocations(String path) * @param results Quota query result. * @return Aggregated Quota. */ - private QuotaUsage aggregateQuota(Map results) { + QuotaUsage aggregateQuota(Map results) { long nsCount = 0; long ssCount = 0; long nsQuota = HdfsConstants.QUOTA_RESET; diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterQuotaManager.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterQuotaManager.java index e818f5accd..f478436568 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterQuotaManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterQuotaManager.java @@ -20,6 +20,7 @@ import static org.apache.hadoop.hdfs.server.federation.router.FederationUtil.isParentEntry; import java.util.HashSet; +import java.util.Map.Entry; import java.util.Set; import java.util.SortedMap; import java.util.TreeMap; @@ -113,6 +114,32 @@ public Set getPaths(String parentPath) { } } + /** + * Get parent paths (including itself) and quotas of the specified federation + * path. Only parents containing quota are returned. + * @param childPath Federated path. + * @return TreeMap of parent paths and quotas. + */ + TreeMap getParentsContainingQuota( + String childPath) { + TreeMap res = new TreeMap<>(); + readLock.lock(); + try { + Entry entry = this.cache.floorEntry(childPath); + while (entry != null) { + String mountPath = entry.getKey(); + RouterQuotaUsage quota = entry.getValue(); + if (isQuotaSet(quota) && isParentEntry(childPath, mountPath)) { + res.put(mountPath, quota); + } + entry = this.cache.lowerEntry(mountPath); + } + return res; + } finally { + readLock.unlock(); + } + } + /** * Put new entity into cache. * @param path Mount table path. diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterQuotaUpdateService.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterQuotaUpdateService.java index dd21e1a7e6..7982bc92f6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterQuotaUpdateService.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterQuotaUpdateService.java @@ -18,16 +18,20 @@ package org.apache.hadoop.hdfs.server.federation.router; import java.io.IOException; +import java.util.Arrays; +import java.util.HashMap; import java.util.HashSet; import java.util.LinkedList; import java.util.List; +import java.util.Map; +import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.QuotaUsage; -import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; +import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation; import org.apache.hadoop.hdfs.server.federation.store.MountTableStore; import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest; import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesResponse; @@ -79,6 +83,7 @@ protected void periodicInvoke() { try { List updateMountTables = new LinkedList<>(); List mountTables = getQuotaSetMountTables(); + Map remoteQuotaUsage = new HashMap<>(); for (MountTable entry : mountTables) { String src = entry.getSourcePath(); RouterQuotaUsage oldQuota = entry.getQuota(); @@ -102,25 +107,17 @@ protected void periodicInvoke() { // Call RouterRpcServer#getQuotaUsage for getting current quota usage. // If any exception occurs catch it and proceed with other entries. try { - currentQuotaUsage = this.rpcServer.getQuotaModule() - .getQuotaUsage(src); + Quota quotaModule = this.rpcServer.getQuotaModule(); + Map usageMap = + quotaModule.getEachQuotaUsage(src); + currentQuotaUsage = quotaModule.aggregateQuota(usageMap); + remoteQuotaUsage.putAll(usageMap); } catch (IOException ioe) { LOG.error("Unable to get quota usage for " + src, ioe); continue; } } - // If quota is not set in some subclusters under federation path, - // set quota for this path. - if (currentQuotaUsage.getQuota() == HdfsConstants.QUOTA_RESET) { - try { - this.rpcServer.setQuota(src, nsQuota, ssQuota, null); - } catch (IOException ioe) { - LOG.error("Unable to set quota at remote location for " - + src, ioe); - } - } - RouterQuotaUsage newQuota = generateNewQuota(oldQuota, currentQuotaUsage); this.quotaManager.put(src, newQuota); @@ -139,12 +136,36 @@ protected void periodicInvoke() { } } + // Fix inconsistent quota. + for (Entry en : remoteQuotaUsage + .entrySet()) { + RemoteLocation remoteLocation = en.getKey(); + QuotaUsage currentQuota = en.getValue(); + fixGlobalQuota(remoteLocation, currentQuota); + } + updateMountTableEntries(updateMountTables); } catch (IOException e) { LOG.error("Quota cache updated error.", e); } } + private void fixGlobalQuota(RemoteLocation location, QuotaUsage remoteQuota) + throws IOException { + QuotaUsage gQuota = + this.rpcServer.getQuotaModule().getGlobalQuota(location.getSrc()); + if (remoteQuota.getQuota() != gQuota.getQuota() + || remoteQuota.getSpaceQuota() != gQuota.getSpaceQuota()) { + this.rpcServer.getQuotaModule() + .setQuotaInternal(location.getSrc(), Arrays.asList(location), + gQuota.getQuota(), gQuota.getSpaceQuota(), null); + LOG.info("[Fix Quota] src={} dst={} oldQuota={}/{} newQuota={}/{}", + location.getSrc(), location, remoteQuota.getQuota(), + remoteQuota.getSpaceQuota(), gQuota.getQuota(), + gQuota.getSpaceQuota()); + } + } + /** * Get mount table store management interface. * @return MountTableStore instance. diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestDisableRouterQuota.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestDisableRouterQuota.java index 081f604466..28d12fc2dd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestDisableRouterQuota.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestDisableRouterQuota.java @@ -23,6 +23,7 @@ import java.io.IOException; import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.test.LambdaTestUtils; import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; @@ -92,4 +93,13 @@ public void testGetQuotaUsage() throws Exception { } } + @Test + public void testGetGlobalQuota() throws Exception { + LambdaTestUtils.intercept(IOException.class, + "The quota system is disabled in Router.", + "The getGlobalQuota call should fail.", () -> { + Quota quotaModule = router.getRpcServer().getQuotaModule(); + quotaModule.getGlobalQuota("/test"); + }); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterQuota.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterQuota.java index 5e36262a14..9873a2e416 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterQuota.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterQuota.java @@ -868,4 +868,92 @@ public void testNoQuotaaExceptionForUnrelatedOperations() throws Exception { routerFs.listStatus(path); routerFs.getContentSummary(path); } + + @Test + public void testGetGlobalQuota() throws Exception { + long nsQuota = 5; + long ssQuota = 3 * BLOCK_SIZE; + prepareGlobalQuotaTestMountTable(nsQuota, ssQuota); + + Quota qModule = routerContext.getRouter().getRpcServer().getQuotaModule(); + QuotaUsage qu = qModule.getGlobalQuota("/dir-1"); + assertEquals(nsQuota, qu.getQuota()); + assertEquals(ssQuota, qu.getSpaceQuota()); + qu = qModule.getGlobalQuota("/dir-1/dir-2"); + assertEquals(nsQuota, qu.getQuota()); + assertEquals(ssQuota * 2, qu.getSpaceQuota()); + qu = qModule.getGlobalQuota("/dir-1/dir-2/dir-3"); + assertEquals(nsQuota, qu.getQuota()); + assertEquals(ssQuota * 2, qu.getSpaceQuota()); + qu = qModule.getGlobalQuota("/dir-4"); + assertEquals(-1, qu.getQuota()); + assertEquals(-1, qu.getSpaceQuota()); + } + + @Test + public void testFixGlobalQuota() throws Exception { + long nsQuota = 5; + long ssQuota = 3 * BLOCK_SIZE; + final FileSystem nnFs = nnContext1.getFileSystem(); + prepareGlobalQuotaTestMountTable(nsQuota, ssQuota); + + QuotaUsage qu = nnFs.getQuotaUsage(new Path("/dir-1")); + assertEquals(nsQuota, qu.getQuota()); + assertEquals(ssQuota, qu.getSpaceQuota()); + qu = nnFs.getQuotaUsage(new Path("/dir-2")); + assertEquals(nsQuota, qu.getQuota()); + assertEquals(ssQuota * 2, qu.getSpaceQuota()); + qu = nnFs.getQuotaUsage(new Path("/dir-3")); + assertEquals(nsQuota, qu.getQuota()); + assertEquals(ssQuota * 2, qu.getSpaceQuota()); + qu = nnFs.getQuotaUsage(new Path("/dir-4")); + assertEquals(-1, qu.getQuota()); + assertEquals(-1, qu.getSpaceQuota()); + } + + /** + * Add three mount tables. + * /dir-1 --> ns0---/dir-1 [nsQuota, ssQuota] + * /dir-1/dir-2 --> ns0---/dir-2 [QUOTA_UNSET, ssQuota * 2] + * /dir-1/dir-2/dir-3 --> ns0---/dir-3 [QUOTA_UNSET, QUOTA_UNSET] + * /dir-4 --> ns0---/dir-4 [QUOTA_UNSET, QUOTA_UNSET] + * + * Expect three remote locations' global quota. + * ns0---/dir-1 --> [nsQuota, ssQuota] + * ns0---/dir-2 --> [nsQuota, ssQuota * 2] + * ns0---/dir-3 --> [nsQuota, ssQuota * 2] + * ns0---/dir-4 --> [-1, -1] + */ + private void prepareGlobalQuotaTestMountTable(long nsQuota, long ssQuota) + throws IOException { + final FileSystem nnFs = nnContext1.getFileSystem(); + + // Create destination directory + nnFs.mkdirs(new Path("/dir-1")); + nnFs.mkdirs(new Path("/dir-2")); + nnFs.mkdirs(new Path("/dir-3")); + nnFs.mkdirs(new Path("/dir-4")); + + MountTable mountTable = MountTable.newInstance("/dir-1", + Collections.singletonMap("ns0", "/dir-1")); + mountTable.setQuota(new RouterQuotaUsage.Builder().quota(nsQuota) + .spaceQuota(ssQuota).build()); + addMountTable(mountTable); + mountTable = MountTable.newInstance("/dir-1/dir-2", + Collections.singletonMap("ns0", "/dir-2")); + mountTable.setQuota(new RouterQuotaUsage.Builder().spaceQuota(ssQuota * 2) + .build()); + addMountTable(mountTable); + mountTable = MountTable.newInstance("/dir-1/dir-2/dir-3", + Collections.singletonMap("ns0", "/dir-3")); + addMountTable(mountTable); + mountTable = MountTable.newInstance("/dir-4", + Collections.singletonMap("ns0", "/dir-4")); + addMountTable(mountTable); + + // Ensure setQuota RPC was invoked and mount table was updated. + RouterQuotaUpdateService updateService = routerContext.getRouter() + .getQuotaCacheUpdateService(); + updateService.periodicInvoke(); + } }