From 203664e6b258b642239651fa6a17fd2561b903d2 Mon Sep 17 00:00:00 2001 From: Ayush Saxena Date: Fri, 3 May 2019 04:54:09 +0530 Subject: [PATCH] HDFS-14454. RBF: getContentSummary() should allow non-existing folders. Contributed by Inigo Goiri. --- .../federation/router/RemoteResult.java | 84 ++++++++ .../router/RouterClientProtocol.java | 65 +++--- .../federation/router/RouterRpcClient.java | 79 +++++--- .../federation/FederationTestUtils.java | 128 ++++++++++++ .../hdfs/server/federation/MockNamenode.java | 109 ++++++++++ .../router/TestRouterFaultTolerant.java | 186 +++++++----------- .../router/TestRouterMissingFolderMulti.java | 182 +++++++++++++++++ 7 files changed, 670 insertions(+), 163 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RemoteResult.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterMissingFolderMulti.java diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RemoteResult.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RemoteResult.java new file mode 100644 index 0000000000..2fbcf42612 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RemoteResult.java @@ -0,0 +1,84 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router; + +import java.io.IOException; + +/** + * Result from a remote location. + * It includes the exception if there was any error. + * @param Type of the remote location. + * @param Type of the result. + */ +public class RemoteResult { + /** The remote location. */ + private final T loc; + /** The result from the remote location. */ + private final R result; + /** If the result is set; used for void types. */ + private final boolean resultSet; + /** The exception if we couldn't get the result. */ + private final IOException ioe; + + public RemoteResult(T location, R r) { + this.loc = location; + this.result = r; + this.resultSet = true; + this.ioe = null; + } + + public RemoteResult(T location, IOException e) { + this.loc = location; + this.result = null; + this.resultSet = false; + this.ioe = e; + } + + public T getLocation() { + return loc; + } + + public boolean hasResult() { + return resultSet; + } + + public R getResult() { + return result; + } + + public boolean hasException() { + return getException() != null; + } + + public IOException getException() { + return ioe; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder() + .append("loc=").append(getLocation()); + if (hasResult()) { + sb.append(" result=").append(getResult()); + } + if (hasException()) { + sb.append(" exception=").append(getException()); + } + return sb.toString(); + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java index 6039083a73..f1f1c420b4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java @@ -728,9 +728,9 @@ public DirectoryListing getListing(String src, byte[] startAfter, RemoteMethod method = new RemoteMethod("getListing", new Class[] {String.class, startAfter.getClass(), boolean.class}, new RemoteParam(), startAfter, needLocation); - Map listings = - rpcClient.invokeConcurrent(locations, method, - !this.allowPartialList, false, DirectoryListing.class); + final List> listings = + rpcClient.invokeConcurrent( + locations, method, false, -1, DirectoryListing.class); Map nnListing = new TreeMap<>(); int totalRemainingEntries = 0; @@ -739,13 +739,17 @@ public DirectoryListing getListing(String src, byte[] startAfter, if (listings != null) { // Check the subcluster listing with the smallest name String lastName = null; - for (Map.Entry entry : - listings.entrySet()) { - RemoteLocation location = entry.getKey(); - DirectoryListing listing = entry.getValue(); - if (listing == null) { - LOG.debug("Cannot get listing from {}", location); - } else { + for (RemoteResult result : listings) { + if (result.hasException()) { + IOException ioe = result.getException(); + if (ioe instanceof FileNotFoundException) { + RemoteLocation location = result.getLocation(); + LOG.debug("Cannot get listing from {}", location); + } else if (!allowPartialList) { + throw ioe; + } + } else if (result.getResult() != null) { + DirectoryListing listing = result.getResult(); totalRemainingEntries += listing.getRemainingEntries(); HdfsFileStatus[] partialListing = listing.getPartialListing(); int length = partialListing.length; @@ -760,13 +764,14 @@ public DirectoryListing getListing(String src, byte[] startAfter, } // Add existing entries - for (Object value : listings.values()) { - DirectoryListing listing = (DirectoryListing) value; + for (RemoteResult result : listings) { + DirectoryListing listing = result.getResult(); if (listing != null) { namenodeListingExists = true; for (HdfsFileStatus file : listing.getPartialListing()) { String filename = file.getLocalName(); - if (totalRemainingEntries > 0 && filename.compareTo(lastName) > 0) { + if (totalRemainingEntries > 0 && + filename.compareTo(lastName) > 0) { // Discarding entries further than the lastName remainingEntries++; } else { @@ -1110,19 +1115,26 @@ public ContentSummary getContentSummary(String path) throws IOException { rpcServer.checkOperation(NameNode.OperationCategory.READ); // Get the summaries from regular files - Collection summaries = new LinkedList<>(); + final Collection summaries = new ArrayList<>(); + final List locations = + rpcServer.getLocationsForPath(path, false); + final RemoteMethod method = new RemoteMethod("getContentSummary", + new Class[] {String.class}, new RemoteParam()); + final List> results = + rpcClient.invokeConcurrent(locations, method, + false, -1, ContentSummary.class); FileNotFoundException notFoundException = null; - try { - final List locations = - rpcServer.getLocationsForPath(path, false); - RemoteMethod method = new RemoteMethod("getContentSummary", - new Class[] {String.class}, new RemoteParam()); - Map results = - rpcClient.invokeConcurrent(locations, method, - !this.allowPartialList, false, ContentSummary.class); - summaries.addAll(results.values()); - } catch (FileNotFoundException e) { - notFoundException = e; + for (RemoteResult result : results) { + if (result.hasException()) { + IOException ioe = result.getException(); + if (ioe instanceof FileNotFoundException) { + notFoundException = (FileNotFoundException)ioe; + } else if (!allowPartialList) { + throw ioe; + } + } else if (result.getResult() != null) { + summaries.add(result.getResult()); + } } // Add mount points at this level in the tree @@ -1131,7 +1143,8 @@ public ContentSummary getContentSummary(String path) throws IOException { for (String child : children) { Path childPath = new Path(path, child); try { - ContentSummary mountSummary = getContentSummary(childPath.toString()); + ContentSummary mountSummary = getContentSummary( + childPath.toString()); if (mountSummary != null) { summaries.add(mountSummary); } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java index 730952b9db..19aa13ac7e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java @@ -1083,11 +1083,58 @@ public Map invokeConcurrent( * @throws IOException If requiredResponse=true and any of the calls throw an * exception. */ - @SuppressWarnings("unchecked") public Map invokeConcurrent( final Collection locations, final RemoteMethod method, boolean requireResponse, boolean standby, long timeOutMs, Class clazz) throws IOException { + final List> results = invokeConcurrent( + locations, method, standby, timeOutMs, clazz); + + final Map ret = new TreeMap<>(); + for (final RemoteResult result : results) { + // Response from all servers required, use this error. + if (requireResponse && result.hasException()) { + throw result.getException(); + } + if (result.hasResult()) { + ret.put(result.getLocation(), result.getResult()); + } + } + + // Throw the exception for the first location if there are no results + if (ret.isEmpty()) { + final RemoteResult result = results.get(0); + if (result.hasException()) { + throw result.getException(); + } + } + + return ret; + } + + /** + * Invokes multiple concurrent proxy calls to different clients. Returns an + * array of results. + * + * Re-throws exceptions generated by the remote RPC call as either + * RemoteException or IOException. + * + * @param The type of the remote location. + * @param The type of the remote method return + * @param locations List of remote locations to call concurrently. + * @param method The remote method and parameters to invoke. + * @param standby If the requests should go to the standby namenodes too. + * @param timeOutMs Timeout for each individual call. + * @param clazz Type of the remote return type. + * @return Result of invoking the method per subcluster (list of results). + * This includes the exception for each remote location. + * @throws IOException If there are errors invoking the method. + */ + @SuppressWarnings("unchecked") + public List> + invokeConcurrent(final Collection locations, + final RemoteMethod method, boolean standby, long timeOutMs, + Class clazz) throws IOException { final UserGroupInformation ugi = RouterRpcServer.getRemoteUser(); final Method m = method.getMethod(); @@ -1103,8 +1150,9 @@ public Map invokeConcurrent( try { Class proto = method.getProtocol(); Object[] paramList = method.getParams(location); - Object result = invokeMethod(ugi, namenodes, proto, m, paramList); - return Collections.singletonMap(location, (R) result); + R result = (R) invokeMethod(ugi, namenodes, proto, m, paramList); + RemoteResult remoteResult = new RemoteResult<>(location, result); + return Collections.singletonList(remoteResult); } catch (IOException ioe) { // Localize the exception throw processException(ioe, location); @@ -1151,21 +1199,20 @@ public Map invokeConcurrent( } else { futures = executorService.invokeAll(callables); } - Map results = new TreeMap<>(); - Map exceptions = new TreeMap<>(); + List> results = new ArrayList<>(); for (int i=0; i future = futures.get(i); - Object result = future.get(); - results.put(location, (R) result); + R result = (R) future.get(); + results.add(new RemoteResult<>(location, result)); } catch (CancellationException ce) { T loc = orderedLocations.get(i); String msg = "Invocation to \"" + loc + "\" for \"" + method.getMethodName() + "\" timed out"; LOG.error(msg); IOException ioe = new SubClusterTimeoutException(msg); - exceptions.put(location, ioe); + results.add(new RemoteResult<>(location, ioe)); } catch (ExecutionException ex) { Throwable cause = ex.getCause(); LOG.debug("Canot execute {} in {}: {}", @@ -1180,22 +1227,8 @@ public Map invokeConcurrent( m.getName() + ": " + cause.getMessage(), cause); } - // Response from all servers required, use this error. - if (requireResponse) { - throw ioe; - } - // Store the exceptions - exceptions.put(location, ioe); - } - } - - // Throw the exception for the first location if there are no results - if (results.isEmpty()) { - T location = orderedLocations.get(0); - IOException ioe = exceptions.get(location); - if (ioe != null) { - throw ioe; + results.add(new RemoteResult<>(location, ioe)); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/FederationTestUtils.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/FederationTestUtils.java index 54342240f1..fd5b23ba85 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/FederationTestUtils.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/FederationTestUtils.java @@ -20,6 +20,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.spy; @@ -31,8 +32,14 @@ import java.lang.management.ManagementFactory; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; +import java.net.InetSocketAddress; +import java.net.URI; +import java.util.Collection; +import java.util.Collections; import java.util.Date; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Random; import javax.management.JMX; @@ -40,6 +47,7 @@ import javax.management.MalformedObjectNameException; import javax.management.ObjectName; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -48,12 +56,18 @@ import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.NamenodeContext; import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver; import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeContext; import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState; +import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager; import org.apache.hadoop.hdfs.server.federation.resolver.NamenodeStatusReport; +import org.apache.hadoop.hdfs.server.federation.resolver.order.DestinationOrder; import org.apache.hadoop.hdfs.server.federation.router.ConnectionManager; +import org.apache.hadoop.hdfs.server.federation.router.Router; +import org.apache.hadoop.hdfs.server.federation.router.RouterClient; import org.apache.hadoop.hdfs.server.federation.router.RouterRpcClient; import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; @@ -61,6 +75,12 @@ import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory; import org.apache.hadoop.hdfs.server.namenode.ha.HAContext; import org.apache.hadoop.hdfs.server.federation.store.RouterStore; +import org.apache.hadoop.hdfs.server.federation.store.StateStoreService; +import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryResponse; +import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest; +import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesResponse; +import org.apache.hadoop.hdfs.server.federation.store.records.MountTable; import org.apache.hadoop.hdfs.server.federation.store.records.RouterState; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.security.AccessControlException; @@ -412,4 +432,112 @@ public static void transitionClusterNSToActive( listNamenodeContext.get(index).getNamenodeId()); } } + + /** + * Get the file system for HDFS in an RPC port. + * @param rpcPort RPC port. + * @return HDFS file system. + * @throws IOException If it cannot create the file system. + */ + public static FileSystem getFileSystem(int rpcPort) throws IOException { + Configuration conf = new HdfsConfiguration(); + URI uri = URI.create("hdfs://localhost:" + rpcPort); + return DistributedFileSystem.get(uri, conf); + } + + /** + * Get the file system for HDFS for a Router. + * @param router Router. + * @return HDFS file system. + * @throws IOException If it cannot create the file system. + */ + public static FileSystem getFileSystem(final Router router) + throws IOException { + InetSocketAddress rpcAddress = router.getRpcServerAddress(); + int rpcPort = rpcAddress.getPort(); + return getFileSystem(rpcPort); + } + + /** + * Get the admin interface for a Router. + * @param router Router. + * @return Admin interface. + * @throws IOException If it cannot create the admin interface. + */ + public static RouterClient getAdminClient( + final Router router) throws IOException { + Configuration conf = new HdfsConfiguration(); + InetSocketAddress routerSocket = router.getAdminServerAddress(); + return new RouterClient(routerSocket, conf); + } + + /** + * Add a mount table entry in some name services and wait until it is + * available. + * @param router Router to change. + * @param mountPoint Name of the mount point. + * @param order Order of the mount table entry. + * @param nsIds Name service identifiers. + * @throws Exception If the entry could not be created. + */ + public static void createMountTableEntry( + final Router router, + final String mountPoint, final DestinationOrder order, + Collection nsIds) throws Exception { + createMountTableEntry( + Collections.singletonList(router), mountPoint, order, nsIds); + } + + /** + * Add a mount table entry in some name services and wait until it is + * available. + * @param routers List of routers. + * @param mountPoint Name of the mount point. + * @param order Order of the mount table entry. + * @param nsIds Name service identifiers. + * @throws Exception If the entry could not be created. + */ + public static void createMountTableEntry( + final List routers, + final String mountPoint, + final DestinationOrder order, + final Collection nsIds) throws Exception { + Router router = routers.get(0); + RouterClient admin = getAdminClient(router); + MountTableManager mountTable = admin.getMountTableManager(); + Map destMap = new HashMap<>(); + for (String nsId : nsIds) { + destMap.put(nsId, mountPoint); + } + MountTable newEntry = MountTable.newInstance(mountPoint, destMap); + newEntry.setDestOrder(order); + AddMountTableEntryRequest addRequest = + AddMountTableEntryRequest.newInstance(newEntry); + AddMountTableEntryResponse addResponse = + mountTable.addMountTableEntry(addRequest); + boolean created = addResponse.getStatus(); + assertTrue(created); + + refreshRoutersCaches(routers); + + // Check for the path + GetMountTableEntriesRequest getRequest = + GetMountTableEntriesRequest.newInstance(mountPoint); + GetMountTableEntriesResponse getResponse = + mountTable.getMountTableEntries(getRequest); + List entries = getResponse.getEntries(); + assertEquals("Too many entries: " + entries, 1, entries.size()); + assertEquals(mountPoint, entries.get(0).getSourcePath()); + } + + /** + * Refresh the caches of a set of Routers. + * @param routers List of Routers. + */ + public static void refreshRoutersCaches(final List routers) { + for (final Router router : routers) { + StateStoreService stateStore = router.getStateStore(); + stateStore.refreshCaches(true); + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MockNamenode.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MockNamenode.java index d8dffeedd1..bfa56a2b5d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MockNamenode.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MockNamenode.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hdfs.server.federation; +import static java.util.Collections.emptySet; +import static java.util.Collections.singletonList; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ADMIN; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyBoolean; @@ -31,14 +33,19 @@ import java.net.ConnectException; import java.net.URI; import java.util.ArrayList; +import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.Map.Entry; import java.util.SortedMap; import java.util.concurrent.ConcurrentSkipListMap; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.fs.FileAlreadyExistsException; import org.apache.hadoop.fs.FsServerDefaults; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; import org.apache.hadoop.ha.HAServiceStatus; @@ -67,6 +74,9 @@ import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolServerSideTranslatorPB; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; +import org.apache.hadoop.hdfs.server.federation.resolver.MembershipNamenodeResolver; +import org.apache.hadoop.hdfs.server.federation.resolver.NamenodeStatusReport; +import org.apache.hadoop.hdfs.server.federation.router.Router; import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException; import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException; import org.apache.hadoop.hdfs.server.namenode.SafeModeException; @@ -311,6 +321,9 @@ public void addFileSystemMock() throws IOException { when(l).thenAnswer(invocation -> { String src = getSrc(invocation); LOG.info("{} getListing({})", nsId, src); + if (fs.get(src) == null) { + throw new FileNotFoundException("File does not exist " + src); + } if (!src.endsWith("/")) { src += "/"; } @@ -338,6 +351,15 @@ public void addFileSystemMock() throws IOException { when(c).thenAnswer(invocation -> { String src = getSrc(invocation); LOG.info("{} create({})", nsId, src); + boolean createParent = (boolean)invocation.getArgument(4); + if (createParent) { + Path path = new Path(src).getParent(); + while (!path.isRoot()) { + LOG.info("{} create parent {}", nsId, path); + fs.put(path.toString(), "DIRECTORY"); + path = path.getParent(); + } + } fs.put(src, "FILE"); return getMockHdfsFileStatus(src, "FILE"); }); @@ -375,6 +397,15 @@ public void addFileSystemMock() throws IOException { when(m).thenAnswer(invocation -> { String src = getSrc(invocation); LOG.info("{} mkdirs({})", nsId, src); + boolean createParent = (boolean)invocation.getArgument(2); + if (createParent) { + Path path = new Path(src).getParent(); + while (!path.isRoot()) { + LOG.info("{} mkdir parent {}", nsId, path); + fs.put(path.toString(), "DIRECTORY"); + path = path.getParent(); + } + } fs.put(src, "DIRECTORY"); return true; }); @@ -386,6 +417,39 @@ public void addFileSystemMock() throws IOException { when(defaults.getKeyProviderUri()).thenReturn(nsId); return defaults; }); + when(mockNn.getContentSummary(anyString())).thenAnswer(invocation -> { + String src = getSrc(invocation); + LOG.info("{} getContentSummary({})", nsId, src); + if (fs.get(src) == null) { + throw new FileNotFoundException("File does not exist " + src); + } + if (!src.endsWith("/")) { + src += "/"; + } + Map files = + fs.subMap(src, src + Character.MAX_VALUE); + int numFiles = 0; + int numDirs = 0; + int length = 0; + for (Entry entry : files.entrySet()) { + String file = entry.getKey(); + if (file.substring(src.length()).indexOf('/') < 0) { + String type = entry.getValue(); + if ("DIRECTORY".equals(type)) { + numDirs++; + } else if ("FILE".equals(type)) { + numFiles++; + length += 100; + } + } + } + return new ContentSummary.Builder() + .fileCount(numFiles) + .directoryCount(numDirs) + .length(length) + .erasureCodingPolicy("") + .build(); + }); } private static String getSrc(InvocationOnMock invocation) { @@ -445,4 +509,49 @@ private static LocatedBlock getMockLocatedBlock(final String nsId) { when(lb.getBlockToken()).thenReturn(tok); return lb; } + + /** + * Register a set of NameNodes in a Router. + * @param router Router to register to. + * @param namenodes Set of NameNodes. + * @throws IOException If it cannot register them. + */ + public static void registerSubclusters(Router router, + Collection namenodes) throws IOException { + registerSubclusters(singletonList(router), namenodes, emptySet()); + } + + /** + * Register a set of NameNodes in a set of Routers. + * @param routers Set of Routers. + * @param namenodes Set of NameNodes. + * @param unavailableSubclusters Set of unavailable subclusters. + * @throws IOException If it cannot register them. + */ + public static void registerSubclusters(List routers, + Collection namenodes, + Set unavailableSubclusters) throws IOException { + + for (final Router router : routers) { + MembershipNamenodeResolver resolver = + (MembershipNamenodeResolver) router.getNamenodeResolver(); + for (final MockNamenode nn : namenodes) { + String nsId = nn.getNameserviceId(); + String rpcAddress = "localhost:" + nn.getRPCPort(); + String httpAddress = "localhost:" + nn.getHTTPPort(); + NamenodeStatusReport report = new NamenodeStatusReport( + nsId, null, rpcAddress, rpcAddress, rpcAddress, httpAddress); + if (unavailableSubclusters.contains(nsId)) { + LOG.info("Register {} as UNAVAILABLE", nsId); + report.setRegistrationValid(false); + } else { + LOG.info("Register {} as ACTIVE", nsId); + report.setRegistrationValid(true); + } + report.setNamespaceInfo(new NamespaceInfo(0, nsId, nsId, 0)); + resolver.registerNamenode(report); + } + resolver.loadCache(true); + } + } } \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterFaultTolerant.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterFaultTolerant.java index c8f96c659c..39d9561395 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterFaultTolerant.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterFaultTolerant.java @@ -18,6 +18,11 @@ package org.apache.hadoop.hdfs.server.federation.router; import static java.util.Arrays.asList; +import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.createMountTableEntry; +import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.getAdminClient; +import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.getFileSystem; +import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.refreshRoutersCaches; +import static org.apache.hadoop.hdfs.server.federation.MockNamenode.registerSubclusters; import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.getStateStoreConfiguration; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; @@ -26,8 +31,8 @@ import java.io.FileNotFoundException; import java.io.IOException; -import java.net.InetSocketAddress; -import java.net.URI; +import java.io.PrintWriter; +import java.io.StringWriter; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.Collection; @@ -36,21 +41,21 @@ import java.util.List; import java.util.Map; import java.util.Random; -import java.util.Set; import java.util.UUID; import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.DFSConfigKeys; -import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.server.federation.MockNamenode; import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder; @@ -59,17 +64,12 @@ import org.apache.hadoop.hdfs.server.federation.resolver.MembershipNamenodeResolver; import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager; import org.apache.hadoop.hdfs.server.federation.resolver.MultipleDestinationMountTableResolver; -import org.apache.hadoop.hdfs.server.federation.resolver.NamenodeStatusReport; import org.apache.hadoop.hdfs.server.federation.resolver.order.DestinationOrder; -import org.apache.hadoop.hdfs.server.federation.store.StateStoreService; -import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryRequest; -import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryResponse; import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest; import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesResponse; import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryRequest; import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryResponse; import org.apache.hadoop.hdfs.server.federation.store.records.MountTable; -import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.security.UserGroupInformation; import org.junit.After; @@ -150,7 +150,8 @@ public void setup() throws Exception { } LOG.info("Registering the subclusters in the Routers"); - registerSubclusters(Collections.singleton("ns1")); + registerSubclusters( + routers, namenodes.values(), Collections.singleton("ns1")); LOG.info("Stop ns1 to simulate an unavailable subcluster"); namenodes.get("ns1").stop(); @@ -158,36 +159,6 @@ public void setup() throws Exception { service = Executors.newFixedThreadPool(10); } - /** - * Register the subclusters in all Routers. - * @param unavailableSubclusters Set of unavailable subclusters. - * @throws IOException If it cannot register a subcluster. - */ - private void registerSubclusters(Set unavailableSubclusters) - throws IOException { - for (final Router router : routers) { - MembershipNamenodeResolver resolver = - (MembershipNamenodeResolver) router.getNamenodeResolver(); - for (final MockNamenode nn : namenodes.values()) { - String nsId = nn.getNameserviceId(); - String rpcAddress = "localhost:" + nn.getRPCPort(); - String httpAddress = "localhost:" + nn.getHTTPPort(); - NamenodeStatusReport report = new NamenodeStatusReport( - nsId, null, rpcAddress, rpcAddress, rpcAddress, httpAddress); - if (unavailableSubclusters.contains(nsId)) { - LOG.info("Register {} as UNAVAILABLE", nsId); - report.setRegistrationValid(false); - } else { - LOG.info("Register {} as ACTIVE", nsId); - report.setRegistrationValid(true); - } - report.setNamespaceInfo(new NamespaceInfo(0, nsId, nsId, 0)); - resolver.registerNamenode(report); - } - resolver.loadCache(true); - } - } - @After public void cleanup() throws Exception { LOG.info("Stopping the cluster"); @@ -205,45 +176,6 @@ public void cleanup() throws Exception { } } - /** - * Add a mount table entry in some name services and wait until it is - * available. - * @param mountPoint Name of the mount point. - * @param order Order of the mount table entry. - * @param nsIds Name service identifiers. - * @throws Exception If the entry could not be created. - */ - private void createMountTableEntry( - final String mountPoint, final DestinationOrder order, - Collection nsIds) throws Exception { - Router router = getRandomRouter(); - RouterClient admin = getAdminClient(router); - MountTableManager mountTable = admin.getMountTableManager(); - Map destMap = new HashMap<>(); - for (String nsId : nsIds) { - destMap.put(nsId, mountPoint); - } - MountTable newEntry = MountTable.newInstance(mountPoint, destMap); - newEntry.setDestOrder(order); - AddMountTableEntryRequest addRequest = - AddMountTableEntryRequest.newInstance(newEntry); - AddMountTableEntryResponse addResponse = - mountTable.addMountTableEntry(addRequest); - boolean created = addResponse.getStatus(); - assertTrue(created); - - refreshRoutersCaches(); - - // Check for the path - GetMountTableEntriesRequest getRequest = - GetMountTableEntriesRequest.newInstance(mountPoint); - GetMountTableEntriesResponse getResponse = - mountTable.getMountTableEntries(getRequest); - List entries = getResponse.getEntries(); - assertEquals("Too many entries: " + entries, 1, entries.size()); - assertEquals(mountPoint, entries.get(0).getSourcePath()); - } - /** * Update a mount table entry to be fault tolerant. * @param mountPoint Mount point to update. @@ -266,17 +198,7 @@ private void updateMountPointFaultTolerant(final String mountPoint) mountTable.updateMountTableEntry(updateRequest); assertTrue(updateResponse.getStatus()); - refreshRoutersCaches(); - } - - /** - * Refresh the caches of all Routers (to get the mount table). - */ - private void refreshRoutersCaches() { - for (final Router router : routers) { - StateStoreService stateStore = router.getStateStore(); - stateStore.refreshCaches(true); - } + refreshRoutersCaches(routers); } /** @@ -320,8 +242,8 @@ private void testWriteWithFailedSubcluster(final DestinationOrder order) final String mountPoint = "/" + order + "-failsubcluster"; final Path mountPath = new Path(mountPoint); LOG.info("Setup {} with order {}", mountPoint, order); - createMountTableEntry(mountPoint, order, namenodes.keySet()); - + createMountTableEntry( + getRandomRouter(), mountPoint, order, namenodes.keySet()); LOG.info("Write in {} should succeed writing in ns0 and fail for ns1", mountPath); @@ -383,7 +305,14 @@ private void checkDirectoriesFaultTolerant( tasks.add(getListFailTask(router0Fs, mountPoint)); int filesExpected = dirs0.length + results.getSuccess(); tasks.add(getListSuccessTask(router1Fs, mountPoint, filesExpected)); - assertEquals(2, collectResults("List " + mountPoint, tasks).getSuccess()); + results = collectResults("List " + mountPoint, tasks); + assertEquals("Failed listing", 2, results.getSuccess()); + + tasks.add(getContentSummaryFailTask(router0Fs, mountPoint)); + tasks.add(getContentSummarySuccessTask( + router1Fs, mountPoint, filesExpected)); + results = collectResults("Content summary " + mountPoint, tasks); + assertEquals("Failed content summary", 2, results.getSuccess()); } /** @@ -422,6 +351,12 @@ private void checkFilesFaultTolerant( tasks.add(getListFailTask(router0Fs, dir0)); tasks.add(getListSuccessTask(router1Fs, dir0, results.getSuccess())); assertEquals(2, collectResults("List " + dir0, tasks).getSuccess()); + + tasks.add(getContentSummaryFailTask(router0Fs, dir0)); + tasks.add(getContentSummarySuccessTask( + router1Fs, dir0, results.getSuccess())); + results = collectResults("Content summary " + dir0, tasks); + assertEquals(2, results.getSuccess()); } /** @@ -534,6 +469,42 @@ private static Callable getListSuccessTask( }; } + + /** + * Task that lists a directory and expects to fail. + * @param fs File system to check. + * @param path Path to try to list. + * @return If the listing failed as expected. + */ + private static Callable getContentSummaryFailTask( + FileSystem fs, Path path) { + return () -> { + try { + fs.getContentSummary(path); + return false; + } catch (RemoteException re) { + return true; + } + }; + } + + /** + * Task that lists a directory and succeeds. + * @param fs File system to check. + * @param path Path to list. + * @param expected Number of files to expect to find. + * @return If the listing succeeds. + */ + private static Callable getContentSummarySuccessTask( + FileSystem fs, Path path, int expected) { + return () -> { + ContentSummary summary = fs.getContentSummary(path); + assertEquals("Wrong summary for " + path, + expected, summary.getFileAndDirectoryCount()); + return true; + }; + } + /** * Invoke a set of tasks and collect their outputs. * The tasks should do assertions. @@ -556,7 +527,14 @@ private TaskResults collectResults(final String tag, results.incrFailure(); } } catch (Exception e) { - fail(e.getMessage()); + StringWriter stackTrace = new StringWriter(); + PrintWriter writer = new PrintWriter(stackTrace); + if (e instanceof ExecutionException) { + e.getCause().printStackTrace(writer); + } else { + e.printStackTrace(writer); + } + fail("Failed to run \"" + tag + "\": " + stackTrace); } }); tasks.clear(); @@ -631,24 +609,4 @@ private FileSystem getRandomRouterFileSystem() throws Exception { return userUgi.doAs( (PrivilegedExceptionAction) () -> getFileSystem(router)); } - - private static FileSystem getFileSystem(int rpcPort) throws IOException { - Configuration conf = new HdfsConfiguration(); - URI uri = URI.create("hdfs://localhost:" + rpcPort); - return DistributedFileSystem.get(uri, conf); - } - - private static FileSystem getFileSystem(final Router router) - throws IOException { - InetSocketAddress rpcAddress = router.getRpcServerAddress(); - int rpcPort = rpcAddress.getPort(); - return getFileSystem(rpcPort); - } - - private static RouterClient getAdminClient( - final Router router) throws IOException { - Configuration conf = new HdfsConfiguration(); - InetSocketAddress routerSocket = router.getAdminServerAddress(); - return new RouterClient(routerSocket, conf); - } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterMissingFolderMulti.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterMissingFolderMulti.java new file mode 100644 index 0000000000..8ce4eb6423 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterMissingFolderMulti.java @@ -0,0 +1,182 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router; + +import static java.util.Arrays.asList; +import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.createMountTableEntry; +import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.getFileSystem; +import static org.apache.hadoop.hdfs.server.federation.MockNamenode.registerSubclusters; +import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.getStateStoreConfiguration; +import static org.junit.Assert.assertEquals; + +import java.io.FileNotFoundException; +import java.util.HashMap; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.ContentSummary; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.server.federation.MockNamenode; +import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder; +import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver; +import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver; +import org.apache.hadoop.hdfs.server.federation.resolver.MembershipNamenodeResolver; +import org.apache.hadoop.hdfs.server.federation.resolver.MultipleDestinationMountTableResolver; +import org.apache.hadoop.hdfs.server.federation.resolver.order.DestinationOrder; +import org.apache.hadoop.test.LambdaTestUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Test the behavior when listing a mount point mapped to multiple subclusters + * and one of the subclusters is missing it. + */ +public class TestRouterMissingFolderMulti { + + private static final Logger LOG = + LoggerFactory.getLogger(TestRouterMissingFolderMulti.class); + + /** Number of files to create for testing. */ + private static final int NUM_FILES = 10; + + /** Namenodes for the test per name service id (subcluster). */ + private Map namenodes = new HashMap<>(); + /** Routers for the test. */ + private Router router; + + + @Before + public void setup() throws Exception { + LOG.info("Start the Namenodes"); + Configuration nnConf = new HdfsConfiguration(); + nnConf.setInt(DFSConfigKeys.DFS_NAMENODE_HANDLER_COUNT_KEY, 10); + for (final String nsId : asList("ns0", "ns1")) { + MockNamenode nn = new MockNamenode(nsId, nnConf); + nn.transitionToActive(); + nn.addFileSystemMock(); + namenodes.put(nsId, nn); + } + + LOG.info("Start the Routers"); + Configuration routerConf = new RouterConfigBuilder() + .stateStore() + .admin() + .rpc() + .build(); + routerConf.set(RBFConfigKeys.DFS_ROUTER_RPC_ADDRESS_KEY, "0.0.0.0:0"); + routerConf.set(RBFConfigKeys.DFS_ROUTER_HTTP_ADDRESS_KEY, "0.0.0.0:0"); + routerConf.set(RBFConfigKeys.DFS_ROUTER_ADMIN_ADDRESS_KEY, "0.0.0.0:0"); + + Configuration stateStoreConf = getStateStoreConfiguration(); + stateStoreConf.setClass( + RBFConfigKeys.FEDERATION_NAMENODE_RESOLVER_CLIENT_CLASS, + MembershipNamenodeResolver.class, ActiveNamenodeResolver.class); + stateStoreConf.setClass( + RBFConfigKeys.FEDERATION_FILE_RESOLVER_CLIENT_CLASS, + MultipleDestinationMountTableResolver.class, + FileSubclusterResolver.class); + routerConf.addResource(stateStoreConf); + + routerConf.setBoolean(RBFConfigKeys.DFS_ROUTER_ALLOW_PARTIAL_LIST, false); + + router = new Router(); + router.init(routerConf); + router.start(); + + LOG.info("Registering the subclusters in the Routers"); + registerSubclusters(router, namenodes.values()); + } + + @After + public void cleanup() throws Exception { + LOG.info("Stopping the cluster"); + for (final MockNamenode nn : namenodes.values()) { + nn.stop(); + } + namenodes.clear(); + + if (router != null) { + router.stop(); + router = null; + } + } + + @Test + public void testSuccess() throws Exception { + FileSystem fs = getFileSystem(router); + String mountPoint = "/test-success"; + createMountTableEntry(router, mountPoint, + DestinationOrder.HASH_ALL, namenodes.keySet()); + Path folder = new Path(mountPoint, "folder-all"); + for (int i = 0; i < NUM_FILES; i++) { + Path file = new Path(folder, "file-" + i + ".txt"); + FSDataOutputStream os = fs.create(file); + os.close(); + } + FileStatus[] files = fs.listStatus(folder); + assertEquals(NUM_FILES, files.length); + ContentSummary contentSummary = fs.getContentSummary(folder); + assertEquals(NUM_FILES, contentSummary.getFileCount()); + } + + @Test + public void testFileNotFound() throws Exception { + FileSystem fs = getFileSystem(router); + String mountPoint = "/test-non-existing"; + createMountTableEntry(router, + mountPoint, DestinationOrder.HASH_ALL, namenodes.keySet()); + Path path = new Path(mountPoint, "folder-all"); + LambdaTestUtils.intercept(FileNotFoundException.class, + () -> fs.listStatus(path)); + LambdaTestUtils.intercept(FileNotFoundException.class, + () -> fs.getContentSummary(path)); + } + + @Test + public void testOneMissing() throws Exception { + FileSystem fs = getFileSystem(router); + String mountPoint = "/test-one-missing"; + createMountTableEntry(router, mountPoint, + DestinationOrder.HASH_ALL, namenodes.keySet()); + + // Create the folders directly in only one of the Namenodes + MockNamenode nn = namenodes.get("ns0"); + int nnRpcPort = nn.getRPCPort(); + FileSystem nnFs = getFileSystem(nnRpcPort); + Path folder = new Path(mountPoint, "folder-all"); + for (int i = 0; i < NUM_FILES; i++) { + Path file = new Path(folder, "file-" + i + ".txt"); + FSDataOutputStream os = nnFs.create(file); + os.close(); + } + + FileStatus[] files = fs.listStatus(folder); + assertEquals(NUM_FILES, files.length); + ContentSummary summary = fs.getContentSummary(folder); + assertEquals(NUM_FILES, summary.getFileAndDirectoryCount()); + } +}