HDFS-13250. RBF: Router to manage requests across multiple subclusters. Contributed by Inigo Goiri.

This commit is contained in:
Yiqun Lin 2018-03-21 11:32:05 +08:00
parent 69fe4407eb
commit 2caba999bb
4 changed files with 598 additions and 13 deletions

View File

@ -768,6 +768,66 @@ private static boolean isExpectedValue(Object expectedValue, Object value) {
}
}
/**
* Invoke method in all locations and return success if any succeeds.
*
* @param locations List of remote locations to call concurrently.
* @param method The remote method and parameters to invoke.
* @return If the call succeeds in any location.
* @throws IOException If any of the calls return an exception.
*/
public <T extends RemoteLocationContext, R> boolean invokeAll(
final Collection<T> locations, final RemoteMethod method)
throws IOException {
boolean anyResult = false;
Map<T, Boolean> results =
invokeConcurrent(locations, method, false, false, Boolean.class);
for (Boolean value : results.values()) {
boolean result = value.booleanValue();
if (result) {
anyResult = true;
}
}
return anyResult;
}
/**
* Invoke multiple concurrent proxy calls to different clients. Returns an
* array of results.
*
* Re-throws exceptions generated by the remote RPC call as either
* RemoteException or IOException.
*
* @param <T> The type of the remote location.
* @param locations List of remote locations to call concurrently.
* @param method The remote method and parameters to invoke.
* @throws IOException If all the calls throw an exception.
*/
public <T extends RemoteLocationContext, R> void invokeConcurrent(
final Collection<T> locations, final RemoteMethod method)
throws IOException {
invokeConcurrent(locations, method, void.class);
}
/**
* Invoke multiple concurrent proxy calls to different clients. Returns an
* array of results.
*
* Re-throws exceptions generated by the remote RPC call as either
* RemoteException or IOException.
*
* @param <T> The type of the remote location.
* @param locations List of remote locations to call concurrently.
* @param method The remote method and parameters to invoke.
* @return Result of invoking the method per subcluster: nsId -> result.
* @throws IOException If all the calls throw an exception.
*/
public <T extends RemoteLocationContext, R> Map<T, R> invokeConcurrent(
final Collection<T> locations, final RemoteMethod method, Class<R> clazz)
throws IOException {
return invokeConcurrent(locations, method, false, false, clazz);
}
/**
* Invoke multiple concurrent proxy calls to different clients. Returns an
* array of results.

View File

@ -510,6 +510,18 @@ public HdfsFileStatus create(String src, FsPermission masked,
throws IOException {
checkOperation(OperationCategory.WRITE);
if (createParent && isPathAll(src)) {
int index = src.lastIndexOf(Path.SEPARATOR);
String parent = src.substring(0, index);
LOG.debug("Creating {} requires creating parent {}", src, parent);
FsPermission parentPermissions = getParentPermission(masked);
boolean success = mkdirs(parent, parentPermissions, createParent);
if (!success) {
// This shouldn't happen as mkdirs returns true or exception
LOG.error("Couldn't create parents for {}", src);
}
}
RemoteLocation createLocation = getCreateLocation(src);
RemoteMethod method = new RemoteMethod("create",
new Class<?>[] {String.class, FsPermission.class, String.class,
@ -521,6 +533,32 @@ public HdfsFileStatus create(String src, FsPermission masked,
return (HdfsFileStatus) rpcClient.invokeSingle(createLocation, method);
}
/**
* Get the permissions for the parent of a child with given permissions. If
* the child has r--, we will set it to r-x.
* @param mask The permission mask of the child.
* @return The permission mask of the parent.
*/
private static FsPermission getParentPermission(final FsPermission mask) {
FsPermission ret = new FsPermission(
applyExecute(mask.getUserAction()),
applyExecute(mask.getGroupAction()),
applyExecute(mask.getOtherAction()));
return ret;
}
/**
* Apply the execute permissions if it can be read.
* @param action Input permission.
* @return Output permission.
*/
private static FsAction applyExecute(final FsAction action) {
if (action.and(FsAction.READ) == FsAction.READ) {
return action.or(FsAction.EXECUTE);
}
return action;
}
/**
* Get the location to create a file. It checks if the file already existed
* in one of the locations.
@ -580,7 +618,7 @@ public LastBlockWithStatus append(String src, final String clientName,
RemoteMethod method = new RemoteMethod("append",
new Class<?>[] {String.class, String.class, EnumSetWritable.class},
new RemoteParam(), clientName, flag);
return (LastBlockWithStatus) rpcClient.invokeSequential(
return rpcClient.invokeSequential(
locations, method, LastBlockWithStatus.class, null);
}
@ -643,7 +681,11 @@ public void setPermission(String src, FsPermission permissions)
RemoteMethod method = new RemoteMethod("setPermission",
new Class<?>[] {String.class, FsPermission.class},
new RemoteParam(), permissions);
rpcClient.invokeSequential(locations, method);
if (isPathAll(src)) {
rpcClient.invokeConcurrent(locations, method);
} else {
rpcClient.invokeSequential(locations, method);
}
}
@Override // ClientProtocol
@ -655,7 +697,11 @@ public void setOwner(String src, String username, String groupname)
RemoteMethod method = new RemoteMethod("setOwner",
new Class<?>[] {String.class, String.class, String.class},
new RemoteParam(), username, groupname);
rpcClient.invokeSequential(locations, method);
if (isPathAll(src)) {
rpcClient.invokeConcurrent(locations, method);
} else {
rpcClient.invokeSequential(locations, method);
}
}
/**
@ -933,8 +979,12 @@ public boolean delete(String src, boolean recursive) throws IOException {
RemoteMethod method = new RemoteMethod("delete",
new Class<?>[] {String.class, boolean.class}, new RemoteParam(),
recursive);
return ((Boolean) rpcClient.invokeSequential(locations, method,
Boolean.class, Boolean.TRUE)).booleanValue();
if (isPathAll(src)) {
return rpcClient.invokeAll(locations, method);
} else {
return rpcClient.invokeSequential(locations, method,
Boolean.class, Boolean.TRUE).booleanValue();
}
}
@Override // ClientProtocol
@ -943,6 +993,15 @@ public boolean mkdirs(String src, FsPermission masked, boolean createParent)
checkOperation(OperationCategory.WRITE);
final List<RemoteLocation> locations = getLocationsForPath(src, true);
RemoteMethod method = new RemoteMethod("mkdirs",
new Class<?>[] {String.class, FsPermission.class, boolean.class},
new RemoteParam(), masked, createParent);
// Create in all locations
if (isPathAll(src)) {
return rpcClient.invokeAll(locations, method);
}
if (locations.size() > 1) {
// Check if this directory already exists
try {
@ -959,9 +1018,6 @@ public boolean mkdirs(String src, FsPermission masked, boolean createParent)
}
RemoteLocation firstLocation = locations.get(0);
RemoteMethod method = new RemoteMethod("mkdirs",
new Class<?>[] {String.class, FsPermission.class, boolean.class},
new RemoteParam(), masked, createParent);
return ((Boolean) rpcClient.invokeSingle(firstLocation, method))
.booleanValue();
}
@ -1077,8 +1133,16 @@ public HdfsFileStatus getFileInfo(String src) throws IOException {
final List<RemoteLocation> locations = getLocationsForPath(src, false);
RemoteMethod method = new RemoteMethod("getFileInfo",
new Class<?>[] {String.class}, new RemoteParam());
HdfsFileStatus ret = (HdfsFileStatus) rpcClient.invokeSequential(
locations, method, HdfsFileStatus.class, null);
HdfsFileStatus ret = null;
// If it's a directory, we check in all locations
if (isPathAll(src)) {
ret = getFileInfoAll(locations, method);
} else {
// Check for file information sequentially
ret = (HdfsFileStatus) rpcClient.invokeSequential(
locations, method, HdfsFileStatus.class, null);
}
// If there is no real path, check mount points
if (ret == null) {
@ -1096,6 +1160,37 @@ public HdfsFileStatus getFileInfo(String src) throws IOException {
return ret;
}
/**
* Get the file info from all the locations.
*
* @param locations Locations to check.
* @param method The file information method to run.
* @return The first file info if it's a file, the directory if it's
* everywhere.
* @throws IOException If all the locations throw an exception.
*/
private HdfsFileStatus getFileInfoAll(final List<RemoteLocation> locations,
final RemoteMethod method) throws IOException {
// Get the file info from everybody
Map<RemoteLocation, HdfsFileStatus> results =
rpcClient.invokeConcurrent(locations, method, HdfsFileStatus.class);
// We return the first file
HdfsFileStatus dirStatus = null;
for (RemoteLocation loc : locations) {
HdfsFileStatus fileStatus = results.get(loc);
if (fileStatus != null) {
if (!fileStatus.isDirectory()) {
return fileStatus;
} else if (dirStatus == null) {
dirStatus = fileStatus;
}
}
}
return dirStatus;
}
@Override // ClientProtocol
public boolean isFileClosed(String src) throws IOException {
checkOperation(OperationCategory.READ);
@ -2071,6 +2166,27 @@ protected List<RemoteLocation> getLocationsForPath(
}
}
/**
* Check if a path should be in all subclusters.
*
* @param path Path to check.
* @return If a path should be in all subclusters.
*/
private boolean isPathAll(final String path) {
if (subclusterResolver instanceof MountTableResolver) {
try {
MountTableResolver mountTable = (MountTableResolver)subclusterResolver;
MountTable entry = mountTable.getMountPoint(path);
if (entry != null) {
return entry.isAll();
}
} catch (IOException e) {
LOG.error("Cannot get mount point: {}", e.getMessage());
}
}
return false;
}
/**
* Check if a path is in a read only mount point.
*

View File

@ -37,8 +37,6 @@
import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreSerializer;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Data schema for
@ -50,7 +48,6 @@
*/
public abstract class MountTable extends BaseRecord {
private static final Logger LOG = LoggerFactory.getLogger(MountTable.class);
public static final String ERROR_MSG_NO_SOURCE_PATH =
"Invalid entry, no source path specified ";
public static final String ERROR_MSG_MUST_START_WITH_BACK_SLASH =
@ -417,6 +414,16 @@ public boolean equals(Object obj) {
return false;
}
/**
* Check if a mount table spans all locations.
* @return If the mount table spreads across all locations.
*/
public boolean isAll() {
DestinationOrder order = getDestOrder();
return order == DestinationOrder.HASH_ALL ||
order == DestinationOrder.RANDOM;
}
/**
* Normalize a path for that filesystem.
*

View File

@ -0,0 +1,402 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.federation.router;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.net.URI;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
import org.apache.hadoop.hdfs.server.federation.RouterDFSCluster.NamenodeContext;
import org.apache.hadoop.hdfs.server.federation.RouterDFSCluster.RouterContext;
import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster;
import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager;
import org.apache.hadoop.hdfs.server.federation.resolver.MultipleDestinationMountTableResolver;
import org.apache.hadoop.hdfs.server.federation.resolver.order.DestinationOrder;
import org.apache.hadoop.hdfs.server.federation.store.StateStoreService;
import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryResponse;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesRequest;
import org.apache.hadoop.hdfs.server.federation.store.protocol.GetMountTableEntriesResponse;
import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
/**
* Tests the use of the resolvers that write in all subclusters from the
* Router. It supports:
* <li>HashResolver
* <li>RandomResolver.
*/
public class TestRouterAllResolver {
/** Directory that will be in a HASH_ALL mount point. */
private static final String TEST_DIR_HASH_ALL = "/hashall";
/** Directory that will be in a HASH_ALL mount point. */
private static final String TEST_DIR_RANDOM = "/random";
/** Number of namespaces. */
private static final int NUM_NAMESPACES = 2;
/** Mini HDFS clusters with Routers and State Store. */
private static StateStoreDFSCluster cluster;
/** Router for testing. */
private static RouterContext routerContext;
/** Router/federated filesystem. */
private static FileSystem routerFs;
/** Filesystem for each namespace. */
private static List<FileSystem> nsFss = new LinkedList<>();
@Before
public void setup() throws Exception {
// 2 nameservices with 1 namenode each (no HA needed for this test)
cluster = new StateStoreDFSCluster(
false, NUM_NAMESPACES, MultipleDestinationMountTableResolver.class);
// Start NNs and DNs and wait until ready
cluster.startCluster();
// Build and start a Router with: State Store + Admin + RPC
Configuration routerConf = new RouterConfigBuilder()
.stateStore()
.admin()
.rpc()
.build();
cluster.addRouterOverrides(routerConf);
cluster.startRouters();
routerContext = cluster.getRandomRouter();
// Register and verify all NNs with all routers
cluster.registerNamenodes();
cluster.waitNamenodeRegistration();
// Setup the test mount point
createMountTableEntry(TEST_DIR_HASH_ALL, DestinationOrder.HASH_ALL);
createMountTableEntry(TEST_DIR_RANDOM, DestinationOrder.RANDOM);
// Get filesystems for federated and each namespace
routerFs = routerContext.getFileSystem();
for (String nsId : cluster.getNameservices()) {
List<NamenodeContext> nns = cluster.getNamenodes(nsId);
for (NamenodeContext nn : nns) {
FileSystem nnFs = nn.getFileSystem();
nsFss.add(nnFs);
}
}
assertEquals(NUM_NAMESPACES, nsFss.size());
}
@After
public void cleanup() {
cluster.shutdown();
cluster = null;
routerContext = null;
routerFs = null;
nsFss.clear();
}
@Test
public void testHashAll() throws Exception {
testAll(TEST_DIR_HASH_ALL);
}
@Test
public void testRandomAll() throws Exception {
testAll(TEST_DIR_RANDOM);
}
/**
* Tests that the resolver spreads files across subclusters in the whole
* tree.
* @throws Exception If the resolver is not working.
*/
private void testAll(final String path) throws Exception {
// Create directories in different levels
routerFs.mkdirs(new Path(path + "/dir0"));
routerFs.mkdirs(new Path(path + "/dir1"));
routerFs.mkdirs(new Path(path + "/dir2/dir20"));
routerFs.mkdirs(new Path(path + "/dir2/dir21"));
routerFs.mkdirs(new Path(path + "/dir2/dir22"));
routerFs.mkdirs(new Path(path + "/dir2/dir22/dir220"));
routerFs.mkdirs(new Path(path + "/dir2/dir22/dir221"));
routerFs.mkdirs(new Path(path + "/dir2/dir22/dir222"));
assertDirsEverywhere(path, 9);
// Create 14 files at different levels of the tree
createTestFile(routerFs, path + "/dir0/file1.txt");
createTestFile(routerFs, path + "/dir0/file2.txt");
createTestFile(routerFs, path + "/dir1/file2.txt");
createTestFile(routerFs, path + "/dir1/file3.txt");
createTestFile(routerFs, path + "/dir2/dir20/file4.txt");
createTestFile(routerFs, path + "/dir2/dir20/file5.txt");
createTestFile(routerFs, path + "/dir2/dir21/file6.txt");
createTestFile(routerFs, path + "/dir2/dir21/file7.txt");
createTestFile(routerFs, path + "/dir2/dir22/file8.txt");
createTestFile(routerFs, path + "/dir2/dir22/file9.txt");
createTestFile(routerFs, path + "/dir2/dir22/dir220/file10.txt");
createTestFile(routerFs, path + "/dir2/dir22/dir220/file11.txt");
createTestFile(routerFs, path + "/dir2/dir22/dir220/file12.txt");
createTestFile(routerFs, path + "/dir2/dir22/dir220/file13.txt");
assertDirsEverywhere(path, 9);
assertFilesDistributed(path, 14);
// Test append
String testFile = path + "/dir2/dir22/dir220/file-append.txt";
createTestFile(routerFs, testFile);
Path testFilePath = new Path(testFile);
assertTrue("Created file is too small",
routerFs.getFileStatus(testFilePath).getLen() > 50);
appendTestFile(routerFs, testFile);
assertTrue("Append file is too small",
routerFs.getFileStatus(testFilePath).getLen() > 110);
assertDirsEverywhere(path, 9);
assertFilesDistributed(path, 15);
// Removing a directory should remove it from every subcluster
routerFs.delete(new Path(path + "/dir2/dir22/dir220"), true);
assertDirsEverywhere(path, 8);
assertFilesDistributed(path, 10);
// Removing all sub directories
routerFs.delete(new Path(path + "/dir0"), true);
routerFs.delete(new Path(path + "/dir1"), true);
routerFs.delete(new Path(path + "/dir2"), true);
assertDirsEverywhere(path, 0);
assertFilesDistributed(path, 0);
}
/**
* Directories in HASH_ALL mount points must be in every namespace.
* @param path Path to check under.
* @param expectedNumDirs Expected number of directories.
* @throws IOException If it cannot check the directories.
*/
private void assertDirsEverywhere(String path, int expectedNumDirs)
throws IOException {
// Check for the directories in each filesystem
List<FileStatus> files = listRecursive(routerFs, path);
int numDirs = 0;
for (FileStatus file : files) {
if (file.isDirectory()) {
numDirs++;
Path dirPath = file.getPath();
Path checkPath = getRelativePath(dirPath);
for (FileSystem nsFs : nsFss) {
FileStatus fileStatus1 = nsFs.getFileStatus(checkPath);
assertTrue(file + " should be a directory",
fileStatus1.isDirectory());
}
}
}
assertEquals(expectedNumDirs, numDirs);
}
/**
* Check that the files are somewhat spread across namespaces.
* @param path Path to check under.
* @param expectedNumFiles Number of files expected.
* @throws IOException If the files cannot be checked.
*/
private void assertFilesDistributed(String path, int expectedNumFiles)
throws IOException {
// Check where the files went
List<FileStatus> routerFiles = listRecursive(routerFs, path);
List<List<FileStatus>> nssFiles = new LinkedList<>();
for (FileSystem nsFs : nsFss) {
List<FileStatus> nsFiles = listRecursive(nsFs, path);
nssFiles.add(nsFiles);
}
// We should see all the files in the federated view
int numRouterFiles = getNumTxtFiles(routerFiles);
assertEquals(numRouterFiles, expectedNumFiles);
// All the files should be spread somewhat evenly across subclusters
List<Integer> numNsFiles = new LinkedList<>();
int sumNsFiles = 0;
for (int i = 0; i < NUM_NAMESPACES; i++) {
List<FileStatus> nsFiles = nssFiles.get(i);
int numFiles = getNumTxtFiles(nsFiles);
numNsFiles.add(numFiles);
sumNsFiles += numFiles;
}
assertEquals(numRouterFiles, sumNsFiles);
if (expectedNumFiles > 0) {
for (int numFiles : numNsFiles) {
assertTrue("Files not distributed: " + numNsFiles, numFiles > 0);
}
}
}
/**
* Create a test file in the filesystem and check if it was written.
* @param fs Filesystem.
* @param filename Name of the file to create.
* @throws IOException If it cannot create the file.
*/
private static void createTestFile(
final FileSystem fs, final String filename)throws IOException {
final Path path = new Path(filename);
// Write the data
FSDataOutputStream os = fs.create(path);
os.writeUTF("Test data " + filename);
os.close();
// Read the data and check
FSDataInputStream is = fs.open(path);
String read = is.readUTF();
assertEquals("Test data " + filename, read);
is.close();
}
/**
* Append to a test file in the filesystem and check if we appended.
* @param fs Filesystem.
* @param filename Name of the file to append to.
* @throws IOException
*/
private static void appendTestFile(
final FileSystem fs, final String filename) throws IOException {
final Path path = new Path(filename);
// Write the data
FSDataOutputStream os = fs.append(path);
os.writeUTF("Test append data " + filename);
os.close();
// Read the data previous data
FSDataInputStream is = fs.open(path);
String read = is.readUTF();
assertEquals(read, "Test data " + filename);
// Read the new data and check
read = is.readUTF();
assertEquals(read, "Test append data " + filename);
is.close();
}
/**
* Count the number of text files in a list.
* @param files File list.
* @return Number of .txt files.
*/
private static int getNumTxtFiles(final List<FileStatus> files) {
int numFiles = 0;
for (FileStatus file : files) {
if (file.getPath().getName().endsWith(".txt")) {
numFiles++;
}
}
return numFiles;
}
/**
* Get the relative path within a filesystem (removes the filesystem prefix).
* @param path Path to check.
* @return File within the filesystem.
*/
private static Path getRelativePath(final Path path) {
URI uri = path.toUri();
String uriPath = uri.getPath();
return new Path(uriPath);
}
/**
* Get the list the files/dirs under a path.
* @param fs Filesystem to check in.
* @param path Path to check for.
* @return List of files.
* @throws IOException If it cannot list the files.
*/
private List<FileStatus> listRecursive(
final FileSystem fs, final String path) throws IOException {
List<FileStatus> ret = new LinkedList<>();
List<Path> temp = new LinkedList<>();
temp.add(new Path(path));
while (!temp.isEmpty()) {
Path p = temp.remove(0);
for (FileStatus fileStatus : fs.listStatus(p)) {
ret.add(fileStatus);
if (fileStatus.isDirectory()) {
temp.add(fileStatus.getPath());
}
}
}
return ret;
}
/**
* Add a mount table entry in all nameservices and wait until it is
* available in all routers.
* @param mountPoint Name of the mount point.
* @param order Order of the mount table entry.
* @throws Exception If the entry could not be created.
*/
private void createMountTableEntry(
final String mountPoint, final DestinationOrder order) throws Exception {
RouterClient admin = routerContext.getAdminClient();
MountTableManager mountTable = admin.getMountTableManager();
Map<String, String> destMap = new HashMap<>();
for (String nsId : cluster.getNameservices()) {
destMap.put(nsId, mountPoint);
}
MountTable newEntry = MountTable.newInstance(mountPoint, destMap);
newEntry.setDestOrder(order);
AddMountTableEntryRequest addRequest =
AddMountTableEntryRequest.newInstance(newEntry);
AddMountTableEntryResponse addResponse =
mountTable.addMountTableEntry(addRequest);
boolean created = addResponse.getStatus();
assertTrue(created);
// Refresh the caches to get the mount table
Router router = routerContext.getRouter();
StateStoreService stateStore = router.getStateStore();
stateStore.refreshCaches(true);
// Check for the path
GetMountTableEntriesRequest getRequest =
GetMountTableEntriesRequest.newInstance(mountPoint);
GetMountTableEntriesResponse getResponse =
mountTable.getMountTableEntries(getRequest);
List<MountTable> entries = getResponse.getEntries();
assertEquals(1, entries.size());
assertEquals(mountPoint, entries.get(0).getSourcePath());
}
}