HDFS-14454. RBF: getContentSummary() should allow non-existing folders. Contributed by Inigo Goiri.

This commit is contained in:
Ayush Saxena 2019-05-03 04:54:09 +05:30 committed by Brahma Reddy Battula
parent b522b52bb1
commit 203664e6b2
7 changed files with 670 additions and 163 deletions

View File

@ -0,0 +1,84 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.router;
import java.io.IOException;
/**
* Result from a remote location.
* It includes the exception if there was any error.
* @param <T> Type of the remote location.
* @param <R> Type of the result.
*/
public class RemoteResult<T extends RemoteLocationContext, R> {
/** The remote location. */
private final T loc;
/** The result from the remote location. */
private final R result;
/** If the result is set; used for void types. */
private final boolean resultSet;
/** The exception if we couldn't get the result. */
private final IOException ioe;
public RemoteResult(T location, R r) {
this.loc = location;
this.result = r;
this.resultSet = true;
this.ioe = null;
}
public RemoteResult(T location, IOException e) {
this.loc = location;
this.result = null;
this.resultSet = false;
this.ioe = e;
}
public T getLocation() {
return loc;
}
public boolean hasResult() {
return resultSet;
}
public R getResult() {
return result;
}
public boolean hasException() {
return getException() != null;
}
public IOException getException() {
return ioe;
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder()
.append("loc=").append(getLocation());
if (hasResult()) {
sb.append(" result=").append(getResult());
}
if (hasException()) {
sb.append(" exception=").append(getException());
}
return sb.toString();
}
}

View File

@ -728,9 +728,9 @@ public DirectoryListing getListing(String src, byte[] startAfter,
RemoteMethod method = new RemoteMethod("getListing",
new Class<?>[] {String.class, startAfter.getClass(), boolean.class},
new RemoteParam(), startAfter, needLocation);
Map<RemoteLocation, DirectoryListing> listings =
rpcClient.invokeConcurrent(locations, method,
!this.allowPartialList, false, DirectoryListing.class);
final List<RemoteResult<RemoteLocation, DirectoryListing>> listings =
rpcClient.invokeConcurrent(
locations, method, false, -1, DirectoryListing.class);
Map<String, HdfsFileStatus> nnListing = new TreeMap<>();
int totalRemainingEntries = 0;
@ -739,13 +739,17 @@ public DirectoryListing getListing(String src, byte[] startAfter,
if (listings != null) {
// Check the subcluster listing with the smallest name
String lastName = null;
for (Map.Entry<RemoteLocation, DirectoryListing> entry :
listings.entrySet()) {
RemoteLocation location = entry.getKey();
DirectoryListing listing = entry.getValue();
if (listing == null) {
LOG.debug("Cannot get listing from {}", location);
} else {
for (RemoteResult<RemoteLocation, DirectoryListing> result : listings) {
if (result.hasException()) {
IOException ioe = result.getException();
if (ioe instanceof FileNotFoundException) {
RemoteLocation location = result.getLocation();
LOG.debug("Cannot get listing from {}", location);
} else if (!allowPartialList) {
throw ioe;
}
} else if (result.getResult() != null) {
DirectoryListing listing = result.getResult();
totalRemainingEntries += listing.getRemainingEntries();
HdfsFileStatus[] partialListing = listing.getPartialListing();
int length = partialListing.length;
@ -760,13 +764,14 @@ public DirectoryListing getListing(String src, byte[] startAfter,
}
// Add existing entries
for (Object value : listings.values()) {
DirectoryListing listing = (DirectoryListing) value;
for (RemoteResult<RemoteLocation, DirectoryListing> result : listings) {
DirectoryListing listing = result.getResult();
if (listing != null) {
namenodeListingExists = true;
for (HdfsFileStatus file : listing.getPartialListing()) {
String filename = file.getLocalName();
if (totalRemainingEntries > 0 && filename.compareTo(lastName) > 0) {
if (totalRemainingEntries > 0 &&
filename.compareTo(lastName) > 0) {
// Discarding entries further than the lastName
remainingEntries++;
} else {
@ -1110,19 +1115,26 @@ public ContentSummary getContentSummary(String path) throws IOException {
rpcServer.checkOperation(NameNode.OperationCategory.READ);
// Get the summaries from regular files
Collection<ContentSummary> summaries = new LinkedList<>();
final Collection<ContentSummary> summaries = new ArrayList<>();
final List<RemoteLocation> locations =
rpcServer.getLocationsForPath(path, false);
final RemoteMethod method = new RemoteMethod("getContentSummary",
new Class<?>[] {String.class}, new RemoteParam());
final List<RemoteResult<RemoteLocation, ContentSummary>> results =
rpcClient.invokeConcurrent(locations, method,
false, -1, ContentSummary.class);
FileNotFoundException notFoundException = null;
try {
final List<RemoteLocation> locations =
rpcServer.getLocationsForPath(path, false);
RemoteMethod method = new RemoteMethod("getContentSummary",
new Class<?>[] {String.class}, new RemoteParam());
Map<RemoteLocation, ContentSummary> results =
rpcClient.invokeConcurrent(locations, method,
!this.allowPartialList, false, ContentSummary.class);
summaries.addAll(results.values());
} catch (FileNotFoundException e) {
notFoundException = e;
for (RemoteResult<RemoteLocation, ContentSummary> result : results) {
if (result.hasException()) {
IOException ioe = result.getException();
if (ioe instanceof FileNotFoundException) {
notFoundException = (FileNotFoundException)ioe;
} else if (!allowPartialList) {
throw ioe;
}
} else if (result.getResult() != null) {
summaries.add(result.getResult());
}
}
// Add mount points at this level in the tree
@ -1131,7 +1143,8 @@ public ContentSummary getContentSummary(String path) throws IOException {
for (String child : children) {
Path childPath = new Path(path, child);
try {
ContentSummary mountSummary = getContentSummary(childPath.toString());
ContentSummary mountSummary = getContentSummary(
childPath.toString());
if (mountSummary != null) {
summaries.add(mountSummary);
}

View File

@ -1083,11 +1083,58 @@ public <T extends RemoteLocationContext, R> Map<T, R> invokeConcurrent(
* @throws IOException If requiredResponse=true and any of the calls throw an
* exception.
*/
@SuppressWarnings("unchecked")
public <T extends RemoteLocationContext, R> Map<T, R> invokeConcurrent(
final Collection<T> locations, final RemoteMethod method,
boolean requireResponse, boolean standby, long timeOutMs, Class<R> clazz)
throws IOException {
final List<RemoteResult<T, R>> results = invokeConcurrent(
locations, method, standby, timeOutMs, clazz);
final Map<T, R> ret = new TreeMap<>();
for (final RemoteResult<T, R> result : results) {
// Response from all servers required, use this error.
if (requireResponse && result.hasException()) {
throw result.getException();
}
if (result.hasResult()) {
ret.put(result.getLocation(), result.getResult());
}
}
// Throw the exception for the first location if there are no results
if (ret.isEmpty()) {
final RemoteResult<T, R> result = results.get(0);
if (result.hasException()) {
throw result.getException();
}
}
return ret;
}
/**
* Invokes multiple concurrent proxy calls to different clients. Returns an
* array of results.
*
* Re-throws exceptions generated by the remote RPC call as either
* RemoteException or IOException.
*
* @param <T> The type of the remote location.
* @param <R> The type of the remote method return
* @param locations List of remote locations to call concurrently.
* @param method The remote method and parameters to invoke.
* @param standby If the requests should go to the standby namenodes too.
* @param timeOutMs Timeout for each individual call.
* @param clazz Type of the remote return type.
* @return Result of invoking the method per subcluster (list of results).
* This includes the exception for each remote location.
* @throws IOException If there are errors invoking the method.
*/
@SuppressWarnings("unchecked")
public <T extends RemoteLocationContext, R> List<RemoteResult<T, R>>
invokeConcurrent(final Collection<T> locations,
final RemoteMethod method, boolean standby, long timeOutMs,
Class<R> clazz) throws IOException {
final UserGroupInformation ugi = RouterRpcServer.getRemoteUser();
final Method m = method.getMethod();
@ -1103,8 +1150,9 @@ public <T extends RemoteLocationContext, R> Map<T, R> invokeConcurrent(
try {
Class<?> proto = method.getProtocol();
Object[] paramList = method.getParams(location);
Object result = invokeMethod(ugi, namenodes, proto, m, paramList);
return Collections.singletonMap(location, (R) result);
R result = (R) invokeMethod(ugi, namenodes, proto, m, paramList);
RemoteResult<T, R> remoteResult = new RemoteResult<>(location, result);
return Collections.singletonList(remoteResult);
} catch (IOException ioe) {
// Localize the exception
throw processException(ioe, location);
@ -1151,21 +1199,20 @@ public <T extends RemoteLocationContext, R> Map<T, R> invokeConcurrent(
} else {
futures = executorService.invokeAll(callables);
}
Map<T, R> results = new TreeMap<>();
Map<T, IOException> exceptions = new TreeMap<>();
List<RemoteResult<T, R>> results = new ArrayList<>();
for (int i=0; i<futures.size(); i++) {
T location = orderedLocations.get(i);
try {
Future<Object> future = futures.get(i);
Object result = future.get();
results.put(location, (R) result);
R result = (R) future.get();
results.add(new RemoteResult<>(location, result));
} catch (CancellationException ce) {
T loc = orderedLocations.get(i);
String msg = "Invocation to \"" + loc + "\" for \""
+ method.getMethodName() + "\" timed out";
LOG.error(msg);
IOException ioe = new SubClusterTimeoutException(msg);
exceptions.put(location, ioe);
results.add(new RemoteResult<>(location, ioe));
} catch (ExecutionException ex) {
Throwable cause = ex.getCause();
LOG.debug("Canot execute {} in {}: {}",
@ -1180,22 +1227,8 @@ public <T extends RemoteLocationContext, R> Map<T, R> invokeConcurrent(
m.getName() + ": " + cause.getMessage(), cause);
}
// Response from all servers required, use this error.
if (requireResponse) {
throw ioe;
}
// Store the exceptions
exceptions.put(location, ioe);
}
}
// Throw the exception for the first location if there are no results
if (results.isEmpty()) {
T location = orderedLocations.get(0);
IOException ioe = exceptions.get(location);
if (ioe != null) {
throw ioe;
results.add(new RemoteResult<>(location, ioe));
}
}

View File

@ -20,6 +20,7 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.spy;
@ -31,8 +32,14 @@
import java.lang.management.ManagementFactory;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import javax.management.JMX;
@ -40,6 +47,7 @@
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@ -48,12 +56,18 @@
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.NamenodeContext;
import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeContext;
import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState;
import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager;
import org.apache.hadoop.hdfs.server.federation.resolver.NamenodeStatusReport;
import org.apache.hadoop.hdfs.server.federation.resolver.order.DestinationOrder;
import org.apache.hadoop.hdfs.server.federation.router.ConnectionManager;
import org.apache.hadoop.hdfs.server.federation.router.Router;
import org.apache.hadoop.hdfs.server.federation.router.RouterClient;
import org.apache.hadoop.hdfs.server.federation.router.RouterRpcClient;
import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
@ -61,6 +75,12 @@
import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
import org.apache.hadoop.hdfs.server.namenode.ha.HAContext;
import org.apache.hadoop.hdfs.server.federation.store.RouterStore;
import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesResponse;
import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
import org.apache.hadoop.hdfs.server.federation.store.records.RouterState;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.security.AccessControlException;
@ -412,4 +432,112 @@ public static void transitionClusterNSToActive(
listNamenodeContext.get(index).getNamenodeId());
}
}
/**
* Get the file system for HDFS in an RPC port.
* @param rpcPort RPC port.
* @return HDFS file system.
* @throws IOException If it cannot create the file system.
*/
public static FileSystem getFileSystem(int rpcPort) throws IOException {
Configuration conf = new HdfsConfiguration();
URI uri = URI.create("hdfs://localhost:" + rpcPort);
return DistributedFileSystem.get(uri, conf);
}
/**
* Get the file system for HDFS for a Router.
* @param router Router.
* @return HDFS file system.
* @throws IOException If it cannot create the file system.
*/
public static FileSystem getFileSystem(final Router router)
throws IOException {
InetSocketAddress rpcAddress = router.getRpcServerAddress();
int rpcPort = rpcAddress.getPort();
return getFileSystem(rpcPort);
}
/**
* Get the admin interface for a Router.
* @param router Router.
* @return Admin interface.
* @throws IOException If it cannot create the admin interface.
*/
public static RouterClient getAdminClient(
final Router router) throws IOException {
Configuration conf = new HdfsConfiguration();
InetSocketAddress routerSocket = router.getAdminServerAddress();
return new RouterClient(routerSocket, conf);
}
/**
* Add a mount table entry in some name services and wait until it is
* available.
* @param router Router to change.
* @param mountPoint Name of the mount point.
* @param order Order of the mount table entry.
* @param nsIds Name service identifiers.
* @throws Exception If the entry could not be created.
*/
public static void createMountTableEntry(
final Router router,
final String mountPoint, final DestinationOrder order,
Collection<String> nsIds) throws Exception {
createMountTableEntry(
Collections.singletonList(router), mountPoint, order, nsIds);
}
/**
* Add a mount table entry in some name services and wait until it is
* available.
* @param routers List of routers.
* @param mountPoint Name of the mount point.
* @param order Order of the mount table entry.
* @param nsIds Name service identifiers.
* @throws Exception If the entry could not be created.
*/
public static void createMountTableEntry(
final List<Router> routers,
final String mountPoint,
final DestinationOrder order,
final Collection<String> nsIds) throws Exception {
Router router = routers.get(0);
RouterClient admin = getAdminClient(router);
MountTableManager mountTable = admin.getMountTableManager();
Map<String, String> destMap = new HashMap<>();
for (String nsId : nsIds) {
destMap.put(nsId, mountPoint);
}
MountTable newEntry = MountTable.newInstance(mountPoint, destMap);
newEntry.setDestOrder(order);
AddMountTableEntryRequest addRequest =
AddMountTableEntryRequest.newInstance(newEntry);
AddMountTableEntryResponse addResponse =
mountTable.addMountTableEntry(addRequest);
boolean created = addResponse.getStatus();
assertTrue(created);
refreshRoutersCaches(routers);
// Check for the path
GetMountTableEntriesRequest getRequest =
GetMountTableEntriesRequest.newInstance(mountPoint);
GetMountTableEntriesResponse getResponse =
mountTable.getMountTableEntries(getRequest);
List<MountTable> entries = getResponse.getEntries();
assertEquals("Too many entries: " + entries, 1, entries.size());
assertEquals(mountPoint, entries.get(0).getSourcePath());
}
/**
* Refresh the caches of a set of Routers.
* @param routers List of Routers.
*/
public static void refreshRoutersCaches(final List<Router> routers) {
for (final Router router : routers) {
StateStoreService stateStore = router.getStateStore();
stateStore.refreshCaches(true);
}
}
}

View File

@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hdfs.server.federation;
import static java.util.Collections.emptySet;
import static java.util.Collections.singletonList;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ADMIN;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
@ -31,14 +33,19 @@
import java.net.ConnectException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Map.Entry;
import java.util.SortedMap;
import java.util.concurrent.ConcurrentSkipListMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FsServerDefaults;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.ha.HAServiceStatus;
@ -67,6 +74,9 @@
import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolServerSideTranslatorPB;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.federation.resolver.MembershipNamenodeResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.NamenodeStatusReport;
import org.apache.hadoop.hdfs.server.federation.router.Router;
import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException;
import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException;
import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
@ -311,6 +321,9 @@ public void addFileSystemMock() throws IOException {
when(l).thenAnswer(invocation -> {
String src = getSrc(invocation);
LOG.info("{} getListing({})", nsId, src);
if (fs.get(src) == null) {
throw new FileNotFoundException("File does not exist " + src);
}
if (!src.endsWith("/")) {
src += "/";
}
@ -338,6 +351,15 @@ public void addFileSystemMock() throws IOException {
when(c).thenAnswer(invocation -> {
String src = getSrc(invocation);
LOG.info("{} create({})", nsId, src);
boolean createParent = (boolean)invocation.getArgument(4);
if (createParent) {
Path path = new Path(src).getParent();
while (!path.isRoot()) {
LOG.info("{} create parent {}", nsId, path);
fs.put(path.toString(), "DIRECTORY");
path = path.getParent();
}
}
fs.put(src, "FILE");
return getMockHdfsFileStatus(src, "FILE");
});
@ -375,6 +397,15 @@ public void addFileSystemMock() throws IOException {
when(m).thenAnswer(invocation -> {
String src = getSrc(invocation);
LOG.info("{} mkdirs({})", nsId, src);
boolean createParent = (boolean)invocation.getArgument(2);
if (createParent) {
Path path = new Path(src).getParent();
while (!path.isRoot()) {
LOG.info("{} mkdir parent {}", nsId, path);
fs.put(path.toString(), "DIRECTORY");
path = path.getParent();
}
}
fs.put(src, "DIRECTORY");
return true;
});
@ -386,6 +417,39 @@ public void addFileSystemMock() throws IOException {
when(defaults.getKeyProviderUri()).thenReturn(nsId);
return defaults;
});
when(mockNn.getContentSummary(anyString())).thenAnswer(invocation -> {
String src = getSrc(invocation);
LOG.info("{} getContentSummary({})", nsId, src);
if (fs.get(src) == null) {
throw new FileNotFoundException("File does not exist " + src);
}
if (!src.endsWith("/")) {
src += "/";
}
Map<String, String> files =
fs.subMap(src, src + Character.MAX_VALUE);
int numFiles = 0;
int numDirs = 0;
int length = 0;
for (Entry<String, String> entry : files.entrySet()) {
String file = entry.getKey();
if (file.substring(src.length()).indexOf('/') < 0) {
String type = entry.getValue();
if ("DIRECTORY".equals(type)) {
numDirs++;
} else if ("FILE".equals(type)) {
numFiles++;
length += 100;
}
}
}
return new ContentSummary.Builder()
.fileCount(numFiles)
.directoryCount(numDirs)
.length(length)
.erasureCodingPolicy("")
.build();
});
}
private static String getSrc(InvocationOnMock invocation) {
@ -445,4 +509,49 @@ private static LocatedBlock getMockLocatedBlock(final String nsId) {
when(lb.getBlockToken()).thenReturn(tok);
return lb;
}
/**
* Register a set of NameNodes in a Router.
* @param router Router to register to.
* @param namenodes Set of NameNodes.
* @throws IOException If it cannot register them.
*/
public static void registerSubclusters(Router router,
Collection<MockNamenode> namenodes) throws IOException {
registerSubclusters(singletonList(router), namenodes, emptySet());
}
/**
* Register a set of NameNodes in a set of Routers.
* @param routers Set of Routers.
* @param namenodes Set of NameNodes.
* @param unavailableSubclusters Set of unavailable subclusters.
* @throws IOException If it cannot register them.
*/
public static void registerSubclusters(List<Router> routers,
Collection<MockNamenode> namenodes,
Set<String> unavailableSubclusters) throws IOException {
for (final Router router : routers) {
MembershipNamenodeResolver resolver =
(MembershipNamenodeResolver) router.getNamenodeResolver();
for (final MockNamenode nn : namenodes) {
String nsId = nn.getNameserviceId();
String rpcAddress = "localhost:" + nn.getRPCPort();
String httpAddress = "localhost:" + nn.getHTTPPort();
NamenodeStatusReport report = new NamenodeStatusReport(
nsId, null, rpcAddress, rpcAddress, rpcAddress, httpAddress);
if (unavailableSubclusters.contains(nsId)) {
LOG.info("Register {} as UNAVAILABLE", nsId);
report.setRegistrationValid(false);
} else {
LOG.info("Register {} as ACTIVE", nsId);
report.setRegistrationValid(true);
}
report.setNamespaceInfo(new NamespaceInfo(0, nsId, nsId, 0));
resolver.registerNamenode(report);
}
resolver.loadCache(true);
}
}
}

View File

@ -18,6 +18,11 @@
package org.apache.hadoop.hdfs.server.federation.router;
import static java.util.Arrays.asList;
import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.createMountTableEntry;
import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.getAdminClient;
import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.getFileSystem;
import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.refreshRoutersCaches;
import static org.apache.hadoop.hdfs.server.federation.MockNamenode.registerSubclusters;
import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.getStateStoreConfiguration;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
@ -26,8 +31,8 @@
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URI;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Collection;
@ -36,21 +41,21 @@
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.server.federation.MockNamenode;
import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
@ -59,17 +64,12 @@
import org.apache.hadoop.hdfs.server.federation.resolver.MembershipNamenodeResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager;
import org.apache.hadoop.hdfs.server.federation.resolver.MultipleDestinationMountTableResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.NamenodeStatusReport;
import org.apache.hadoop.hdfs.server.federation.resolver.order.DestinationOrder;
import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.UpdateMountTableEntryResponse;
import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.security.UserGroupInformation;
import org.junit.After;
@ -150,7 +150,8 @@ public void setup() throws Exception {
}
LOG.info("Registering the subclusters in the Routers");
registerSubclusters(Collections.singleton("ns1"));
registerSubclusters(
routers, namenodes.values(), Collections.singleton("ns1"));
LOG.info("Stop ns1 to simulate an unavailable subcluster");
namenodes.get("ns1").stop();
@ -158,36 +159,6 @@ public void setup() throws Exception {
service = Executors.newFixedThreadPool(10);
}
/**
* Register the subclusters in all Routers.
* @param unavailableSubclusters Set of unavailable subclusters.
* @throws IOException If it cannot register a subcluster.
*/
private void registerSubclusters(Set<String> unavailableSubclusters)
throws IOException {
for (final Router router : routers) {
MembershipNamenodeResolver resolver =
(MembershipNamenodeResolver) router.getNamenodeResolver();
for (final MockNamenode nn : namenodes.values()) {
String nsId = nn.getNameserviceId();
String rpcAddress = "localhost:" + nn.getRPCPort();
String httpAddress = "localhost:" + nn.getHTTPPort();
NamenodeStatusReport report = new NamenodeStatusReport(
nsId, null, rpcAddress, rpcAddress, rpcAddress, httpAddress);
if (unavailableSubclusters.contains(nsId)) {
LOG.info("Register {} as UNAVAILABLE", nsId);
report.setRegistrationValid(false);
} else {
LOG.info("Register {} as ACTIVE", nsId);
report.setRegistrationValid(true);
}
report.setNamespaceInfo(new NamespaceInfo(0, nsId, nsId, 0));
resolver.registerNamenode(report);
}
resolver.loadCache(true);
}
}
@After
public void cleanup() throws Exception {
LOG.info("Stopping the cluster");
@ -205,45 +176,6 @@ public void cleanup() throws Exception {
}
}
/**
* Add a mount table entry in some name services and wait until it is
* available.
* @param mountPoint Name of the mount point.
* @param order Order of the mount table entry.
* @param nsIds Name service identifiers.
* @throws Exception If the entry could not be created.
*/
private void createMountTableEntry(
final String mountPoint, final DestinationOrder order,
Collection<String> nsIds) throws Exception {
Router router = getRandomRouter();
RouterClient admin = getAdminClient(router);
MountTableManager mountTable = admin.getMountTableManager();
Map<String, String> destMap = new HashMap<>();
for (String nsId : nsIds) {
destMap.put(nsId, mountPoint);
}
MountTable newEntry = MountTable.newInstance(mountPoint, destMap);
newEntry.setDestOrder(order);
AddMountTableEntryRequest addRequest =
AddMountTableEntryRequest.newInstance(newEntry);
AddMountTableEntryResponse addResponse =
mountTable.addMountTableEntry(addRequest);
boolean created = addResponse.getStatus();
assertTrue(created);
refreshRoutersCaches();
// Check for the path
GetMountTableEntriesRequest getRequest =
GetMountTableEntriesRequest.newInstance(mountPoint);
GetMountTableEntriesResponse getResponse =
mountTable.getMountTableEntries(getRequest);
List<MountTable> entries = getResponse.getEntries();
assertEquals("Too many entries: " + entries, 1, entries.size());
assertEquals(mountPoint, entries.get(0).getSourcePath());
}
/**
* Update a mount table entry to be fault tolerant.
* @param mountPoint Mount point to update.
@ -266,17 +198,7 @@ private void updateMountPointFaultTolerant(final String mountPoint)
mountTable.updateMountTableEntry(updateRequest);
assertTrue(updateResponse.getStatus());
refreshRoutersCaches();
}
/**
* Refresh the caches of all Routers (to get the mount table).
*/
private void refreshRoutersCaches() {
for (final Router router : routers) {
StateStoreService stateStore = router.getStateStore();
stateStore.refreshCaches(true);
}
refreshRoutersCaches(routers);
}
/**
@ -320,8 +242,8 @@ private void testWriteWithFailedSubcluster(final DestinationOrder order)
final String mountPoint = "/" + order + "-failsubcluster";
final Path mountPath = new Path(mountPoint);
LOG.info("Setup {} with order {}", mountPoint, order);
createMountTableEntry(mountPoint, order, namenodes.keySet());
createMountTableEntry(
getRandomRouter(), mountPoint, order, namenodes.keySet());
LOG.info("Write in {} should succeed writing in ns0 and fail for ns1",
mountPath);
@ -383,7 +305,14 @@ private void checkDirectoriesFaultTolerant(
tasks.add(getListFailTask(router0Fs, mountPoint));
int filesExpected = dirs0.length + results.getSuccess();
tasks.add(getListSuccessTask(router1Fs, mountPoint, filesExpected));
assertEquals(2, collectResults("List " + mountPoint, tasks).getSuccess());
results = collectResults("List " + mountPoint, tasks);
assertEquals("Failed listing", 2, results.getSuccess());
tasks.add(getContentSummaryFailTask(router0Fs, mountPoint));
tasks.add(getContentSummarySuccessTask(
router1Fs, mountPoint, filesExpected));
results = collectResults("Content summary " + mountPoint, tasks);
assertEquals("Failed content summary", 2, results.getSuccess());
}
/**
@ -422,6 +351,12 @@ private void checkFilesFaultTolerant(
tasks.add(getListFailTask(router0Fs, dir0));
tasks.add(getListSuccessTask(router1Fs, dir0, results.getSuccess()));
assertEquals(2, collectResults("List " + dir0, tasks).getSuccess());
tasks.add(getContentSummaryFailTask(router0Fs, dir0));
tasks.add(getContentSummarySuccessTask(
router1Fs, dir0, results.getSuccess()));
results = collectResults("Content summary " + dir0, tasks);
assertEquals(2, results.getSuccess());
}
/**
@ -534,6 +469,42 @@ private static Callable<Boolean> getListSuccessTask(
};
}
/**
* Task that lists a directory and expects to fail.
* @param fs File system to check.
* @param path Path to try to list.
* @return If the listing failed as expected.
*/
private static Callable<Boolean> getContentSummaryFailTask(
FileSystem fs, Path path) {
return () -> {
try {
fs.getContentSummary(path);
return false;
} catch (RemoteException re) {
return true;
}
};
}
/**
* Task that lists a directory and succeeds.
* @param fs File system to check.
* @param path Path to list.
* @param expected Number of files to expect to find.
* @return If the listing succeeds.
*/
private static Callable<Boolean> getContentSummarySuccessTask(
FileSystem fs, Path path, int expected) {
return () -> {
ContentSummary summary = fs.getContentSummary(path);
assertEquals("Wrong summary for " + path,
expected, summary.getFileAndDirectoryCount());
return true;
};
}
/**
* Invoke a set of tasks and collect their outputs.
* The tasks should do assertions.
@ -556,7 +527,14 @@ private TaskResults collectResults(final String tag,
results.incrFailure();
}
} catch (Exception e) {
fail(e.getMessage());
StringWriter stackTrace = new StringWriter();
PrintWriter writer = new PrintWriter(stackTrace);
if (e instanceof ExecutionException) {
e.getCause().printStackTrace(writer);
} else {
e.printStackTrace(writer);
}
fail("Failed to run \"" + tag + "\": " + stackTrace);
}
});
tasks.clear();
@ -631,24 +609,4 @@ private FileSystem getRandomRouterFileSystem() throws Exception {
return userUgi.doAs(
(PrivilegedExceptionAction<FileSystem>) () -> getFileSystem(router));
}
private static FileSystem getFileSystem(int rpcPort) throws IOException {
Configuration conf = new HdfsConfiguration();
URI uri = URI.create("hdfs://localhost:" + rpcPort);
return DistributedFileSystem.get(uri, conf);
}
private static FileSystem getFileSystem(final Router router)
throws IOException {
InetSocketAddress rpcAddress = router.getRpcServerAddress();
int rpcPort = rpcAddress.getPort();
return getFileSystem(rpcPort);
}
private static RouterClient getAdminClient(
final Router router) throws IOException {
Configuration conf = new HdfsConfiguration();
InetSocketAddress routerSocket = router.getAdminServerAddress();
return new RouterClient(routerSocket, conf);
}
}

View File

@ -0,0 +1,182 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.router;
import static java.util.Arrays.asList;
import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.createMountTableEntry;
import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.getFileSystem;
import static org.apache.hadoop.hdfs.server.federation.MockNamenode.registerSubclusters;
import static org.apache.hadoop.hdfs.server.federation.store.FederationStateStoreTestUtils.getStateStoreConfiguration;
import static org.junit.Assert.assertEquals;
import java.io.FileNotFoundException;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.server.federation.MockNamenode;
import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.MembershipNamenodeResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.MultipleDestinationMountTableResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.order.DestinationOrder;
import org.apache.hadoop.test.LambdaTestUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Test the behavior when listing a mount point mapped to multiple subclusters
* and one of the subclusters is missing it.
*/
public class TestRouterMissingFolderMulti {
private static final Logger LOG =
LoggerFactory.getLogger(TestRouterMissingFolderMulti.class);
/** Number of files to create for testing. */
private static final int NUM_FILES = 10;
/** Namenodes for the test per name service id (subcluster). */
private Map<String, MockNamenode> namenodes = new HashMap<>();
/** Routers for the test. */
private Router router;
@Before
public void setup() throws Exception {
LOG.info("Start the Namenodes");
Configuration nnConf = new HdfsConfiguration();
nnConf.setInt(DFSConfigKeys.DFS_NAMENODE_HANDLER_COUNT_KEY, 10);
for (final String nsId : asList("ns0", "ns1")) {
MockNamenode nn = new MockNamenode(nsId, nnConf);
nn.transitionToActive();
nn.addFileSystemMock();
namenodes.put(nsId, nn);
}
LOG.info("Start the Routers");
Configuration routerConf = new RouterConfigBuilder()
.stateStore()
.admin()
.rpc()
.build();
routerConf.set(RBFConfigKeys.DFS_ROUTER_RPC_ADDRESS_KEY, "0.0.0.0:0");
routerConf.set(RBFConfigKeys.DFS_ROUTER_HTTP_ADDRESS_KEY, "0.0.0.0:0");
routerConf.set(RBFConfigKeys.DFS_ROUTER_ADMIN_ADDRESS_KEY, "0.0.0.0:0");
Configuration stateStoreConf = getStateStoreConfiguration();
stateStoreConf.setClass(
RBFConfigKeys.FEDERATION_NAMENODE_RESOLVER_CLIENT_CLASS,
MembershipNamenodeResolver.class, ActiveNamenodeResolver.class);
stateStoreConf.setClass(
RBFConfigKeys.FEDERATION_FILE_RESOLVER_CLIENT_CLASS,
MultipleDestinationMountTableResolver.class,
FileSubclusterResolver.class);
routerConf.addResource(stateStoreConf);
routerConf.setBoolean(RBFConfigKeys.DFS_ROUTER_ALLOW_PARTIAL_LIST, false);
router = new Router();
router.init(routerConf);
router.start();
LOG.info("Registering the subclusters in the Routers");
registerSubclusters(router, namenodes.values());
}
@After
public void cleanup() throws Exception {
LOG.info("Stopping the cluster");
for (final MockNamenode nn : namenodes.values()) {
nn.stop();
}
namenodes.clear();
if (router != null) {
router.stop();
router = null;
}
}
@Test
public void testSuccess() throws Exception {
FileSystem fs = getFileSystem(router);
String mountPoint = "/test-success";
createMountTableEntry(router, mountPoint,
DestinationOrder.HASH_ALL, namenodes.keySet());
Path folder = new Path(mountPoint, "folder-all");
for (int i = 0; i < NUM_FILES; i++) {
Path file = new Path(folder, "file-" + i + ".txt");
FSDataOutputStream os = fs.create(file);
os.close();
}
FileStatus[] files = fs.listStatus(folder);
assertEquals(NUM_FILES, files.length);
ContentSummary contentSummary = fs.getContentSummary(folder);
assertEquals(NUM_FILES, contentSummary.getFileCount());
}
@Test
public void testFileNotFound() throws Exception {
FileSystem fs = getFileSystem(router);
String mountPoint = "/test-non-existing";
createMountTableEntry(router,
mountPoint, DestinationOrder.HASH_ALL, namenodes.keySet());
Path path = new Path(mountPoint, "folder-all");
LambdaTestUtils.intercept(FileNotFoundException.class,
() -> fs.listStatus(path));
LambdaTestUtils.intercept(FileNotFoundException.class,
() -> fs.getContentSummary(path));
}
@Test
public void testOneMissing() throws Exception {
FileSystem fs = getFileSystem(router);
String mountPoint = "/test-one-missing";
createMountTableEntry(router, mountPoint,
DestinationOrder.HASH_ALL, namenodes.keySet());
// Create the folders directly in only one of the Namenodes
MockNamenode nn = namenodes.get("ns0");
int nnRpcPort = nn.getRPCPort();
FileSystem nnFs = getFileSystem(nnRpcPort);
Path folder = new Path(mountPoint, "folder-all");
for (int i = 0; i < NUM_FILES; i++) {
Path file = new Path(folder, "file-" + i + ".txt");
FSDataOutputStream os = nnFs.create(file);
os.close();
}
FileStatus[] files = fs.listStatus(folder);
assertEquals(NUM_FILES, files.length);
ContentSummary summary = fs.getContentSummary(folder);
assertEquals(NUM_FILES, summary.getFileAndDirectoryCount());
}
}