HDFS-15306. Make mount-table to read from central place ( Let's say from HDFS). Contributed by Uma Maheswara Rao G.

This commit is contained in:
Uma Maheswara Rao G 2020-05-14 17:29:35 -07:00 committed by GitHub
parent d08b9e94e3
commit ac4a2e11d9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 808 additions and 144 deletions

View File

@ -30,6 +30,11 @@ public interface Constants {
* Prefix for the config variable prefix for the ViewFs mount-table
*/
public static final String CONFIG_VIEWFS_PREFIX = "fs.viewfs.mounttable";
/**
* Prefix for the config variable for the ViewFs mount-table path.
*/
String CONFIG_VIEWFS_MOUNTTABLE_PATH = CONFIG_VIEWFS_PREFIX + ".path";
/**
* Prefix for the home dir for the mount table - if not specified

View File

@ -0,0 +1,122 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.viewfs;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* An implementation for Apache Hadoop compatible file system based mount-table
* file loading.
*/
public class HCFSMountTableConfigLoader implements MountTableConfigLoader {
private static final String REGEX_DOT = "[.]";
private static final Logger LOGGER =
LoggerFactory.getLogger(HCFSMountTableConfigLoader.class);
private Path mountTable = null;
/**
* Loads the mount-table configuration from hadoop compatible file system and
* add the configuration items to given configuration. Mount-table
* configuration format should be suffixed with version number.
* Format: mount-table.<versionNumber>.xml
* Example: mount-table.1.xml
* When user wants to update mount-table, the expectation is to upload new
* mount-table configuration file with monotonically increasing integer as
* version number. This API loads the highest version number file. We can
* also configure single file path directly.
*
* @param mountTableConfigPath : A directory path where mount-table files
* stored or a mount-table file path. We recommend to configure
* directory with the mount-table version files.
* @param conf : to add the mount table as resource.
*/
@Override
public void load(String mountTableConfigPath, Configuration conf)
throws IOException {
this.mountTable = new Path(mountTableConfigPath);
String scheme = mountTable.toUri().getScheme();
ViewFileSystem.FsGetter fsGetter =
new ViewFileSystemOverloadScheme.ChildFsGetter(scheme);
try (FileSystem fs = fsGetter.getNewInstance(mountTable.toUri(), conf)) {
RemoteIterator<LocatedFileStatus> listFiles =
fs.listFiles(mountTable, false);
LocatedFileStatus lfs = null;
int higherVersion = -1;
while (listFiles.hasNext()) {
LocatedFileStatus curLfs = listFiles.next();
String cur = curLfs.getPath().getName();
String[] nameParts = cur.split(REGEX_DOT);
if (nameParts.length < 2) {
logInvalidFileNameFormat(cur);
continue; // invalid file name
}
int curVersion = higherVersion;
try {
curVersion = Integer.parseInt(nameParts[nameParts.length - 2]);
} catch (NumberFormatException nfe) {
logInvalidFileNameFormat(cur);
continue;
}
if (curVersion > higherVersion) {
higherVersion = curVersion;
lfs = curLfs;
}
}
if (lfs == null) {
// No valid mount table file found.
// TODO: Should we fail? Currently viewfs init will fail if no mount
// links anyway.
LOGGER.warn("No valid mount-table file exist at: {}. At least one "
+ "mount-table file should present with the name format: "
+ "mount-table.<versionNumber>.xml", mountTableConfigPath);
return;
}
// Latest version file.
Path latestVersionMountTable = lfs.getPath();
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Loading the mount-table {} into configuration.",
latestVersionMountTable);
}
try (FSDataInputStream open = fs.open(latestVersionMountTable)) {
Configuration newConf = new Configuration(false);
newConf.addResource(open);
// This will add configuration props as resource, instead of stream
// itself. So, that stream can be closed now.
conf.addResource(newConf);
}
}
}
private void logInvalidFileNameFormat(String cur) {
LOGGER.warn("Invalid file name format for mount-table version file: {}. "
+ "The valid file name format is mount-table-name.<versionNumber>.xml",
cur);
}
}

View File

@ -0,0 +1,44 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.viewfs;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
/**
* An interface for loading mount-table configuration. This class can have more
* APIs like refreshing mount tables automatically etc.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public interface MountTableConfigLoader {
/**
* Loads the mount-table configuration into given configuration.
*
* @param mountTableConfigPath - Path of the mount table. It can be a file or
* a directory in the case of multiple versions of mount-table
* files(Recommended option).
* @param conf - Configuration object to add mount table.
*/
void load(String mountTableConfigPath, Configuration conf)
throws IOException;
}

View File

@ -98,107 +98,133 @@ public String getScheme() {
@Override
public void initialize(URI theUri, Configuration conf) throws IOException {
this.myUri = theUri;
if (LOG.isDebugEnabled()) {
LOG.debug("Initializing the ViewFileSystemOverloadScheme with the uri: "
+ theUri);
}
this.myUri = theUri;
String mountTableConfigPath =
conf.get(Constants.CONFIG_VIEWFS_MOUNTTABLE_PATH);
if (null != mountTableConfigPath) {
MountTableConfigLoader loader = new HCFSMountTableConfigLoader();
loader.load(mountTableConfigPath, conf);
} else {
// TODO: Should we fail here.?
if (LOG.isDebugEnabled()) {
LOG.debug(
"Missing configuration for fs.viewfs.mounttable.path. Proceeding"
+ "with core-site.xml mount-table information if avaialable.");
}
}
super.initialize(theUri, conf);
}
/**
* This method is overridden because in ViewFileSystemOverloadScheme if
* overloaded cheme matches with mounted target fs scheme, file system should
* be created without going into fs.<scheme>.impl based resolution. Otherwise
* it will end up in an infinite loop as the target will be resolved again
* to ViewFileSystemOverloadScheme as fs.<scheme>.impl points to
* ViewFileSystemOverloadScheme. So, below method will initialize the
* overloaded scheme matches with mounted target fs scheme, file system
* should be created without going into fs.<scheme>.impl based resolution.
* Otherwise it will end up in an infinite loop as the target will be
* resolved again to ViewFileSystemOverloadScheme as fs.<scheme>.impl points
* to ViewFileSystemOverloadScheme. So, below method will initialize the
* fs.viewfs.overload.scheme.target.<scheme>.impl. Other schemes can
* follow fs.newInstance
*/
@Override
protected FsGetter fsGetter() {
return new FsGetter() {
@Override
public FileSystem getNewInstance(URI uri, Configuration conf)
throws IOException {
if (uri.getScheme().equals(getScheme())) {
if (LOG.isDebugEnabled()) {
LOG.debug(
"The file system initialized uri scheme is matching with the "
+ "given target uri scheme. The target uri is: " + uri);
}
/*
* Avoid looping when target fs scheme is matching to overloaded
* scheme.
*/
return createFileSystem(uri, conf);
} else {
return FileSystem.newInstance(uri, conf);
}
}
/**
* When ViewFileSystemOverloadScheme scheme and target uri scheme are
* matching, it will not take advantage of FileSystem cache as it will
* create instance directly. For caching needs please set
* "fs.viewfs.enable.inner.cache" to true.
*/
@Override
public FileSystem get(URI uri, Configuration conf) throws IOException {
if (uri.getScheme().equals(getScheme())) {
// Avoid looping when target fs scheme is matching to overloaded
// scheme.
if (LOG.isDebugEnabled()) {
LOG.debug(
"The file system initialized uri scheme is matching with the "
+ "given target uri scheme. So, the target file system "
+ "instances will not be cached. To cache fs instances, "
+ "please set fs.viewfs.enable.inner.cache to true. "
+ "The target uri is: " + uri);
}
return createFileSystem(uri, conf);
} else {
return FileSystem.get(uri, conf);
}
}
private FileSystem createFileSystem(URI uri, Configuration conf)
throws IOException {
final String fsImplConf = String.format(
FsConstants.FS_VIEWFS_OVERLOAD_SCHEME_TARGET_FS_IMPL_PATTERN,
uri.getScheme());
Class<?> clazz = conf.getClass(fsImplConf, null);
if (clazz == null) {
throw new UnsupportedFileSystemException(
String.format("%s=null: %s: %s", fsImplConf,
"No overload scheme fs configured", uri.getScheme()));
}
FileSystem fs = (FileSystem) newInstance(clazz, uri, conf);
fs.initialize(uri, conf);
return fs;
}
private <T> T newInstance(Class<T> theClass, URI uri,
Configuration conf) {
T result;
try {
Constructor<T> meth = theClass.getConstructor();
meth.setAccessible(true);
result = meth.newInstance();
} catch (InvocationTargetException e) {
Throwable cause = e.getCause();
if (cause instanceof RuntimeException) {
throw (RuntimeException) cause;
} else {
throw new RuntimeException(cause);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
return result;
}
};
return new ChildFsGetter(getScheme());
}
/**
* This class checks whether the rooScheme is same as URI scheme. If both are
* same, then it will initialize file systems by using the configured
* fs.viewfs.overload.scheme.target.<scheme>.impl class.
*/
static class ChildFsGetter extends FsGetter {
private final String rootScheme;
ChildFsGetter(String rootScheme) {
this.rootScheme = rootScheme;
}
@Override
public FileSystem getNewInstance(URI uri, Configuration conf)
throws IOException {
if (uri.getScheme().equals(this.rootScheme)) {
if (LOG.isDebugEnabled()) {
LOG.debug(
"The file system initialized uri scheme is matching with the "
+ "given target uri scheme. The target uri is: " + uri);
}
/*
* Avoid looping when target fs scheme is matching to overloaded scheme.
*/
return createFileSystem(uri, conf);
} else {
return FileSystem.newInstance(uri, conf);
}
}
/**
* When ViewFileSystemOverloadScheme scheme and target uri scheme are
* matching, it will not take advantage of FileSystem cache as it will
* create instance directly. For caching needs please set
* "fs.viewfs.enable.inner.cache" to true.
*/
@Override
public FileSystem get(URI uri, Configuration conf) throws IOException {
if (uri.getScheme().equals(this.rootScheme)) {
// Avoid looping when target fs scheme is matching to overloaded
// scheme.
if (LOG.isDebugEnabled()) {
LOG.debug(
"The file system initialized uri scheme is matching with the "
+ "given target uri scheme. So, the target file system "
+ "instances will not be cached. To cache fs instances, "
+ "please set fs.viewfs.enable.inner.cache to true. "
+ "The target uri is: " + uri);
}
return createFileSystem(uri, conf);
} else {
return FileSystem.get(uri, conf);
}
}
private FileSystem createFileSystem(URI uri, Configuration conf)
throws IOException {
final String fsImplConf = String.format(
FsConstants.FS_VIEWFS_OVERLOAD_SCHEME_TARGET_FS_IMPL_PATTERN,
uri.getScheme());
Class<?> clazz = conf.getClass(fsImplConf, null);
if (clazz == null) {
throw new UnsupportedFileSystemException(
String.format("%s=null: %s: %s", fsImplConf,
"No overload scheme fs configured", uri.getScheme()));
}
FileSystem fs = (FileSystem) newInstance(clazz, uri, conf);
fs.initialize(uri, conf);
return fs;
}
private <T> T newInstance(Class<T> theClass, URI uri, Configuration conf) {
T result;
try {
Constructor<T> meth = theClass.getConstructor();
meth.setAccessible(true);
result = meth.newInstance();
} catch (InvocationTargetException e) {
Throwable cause = e.getCause();
if (cause instanceof RuntimeException) {
throw (RuntimeException) cause;
} else {
throw new RuntimeException(cause);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
return result;
}
}
}

View File

@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* ViewFileSystem and ViewFileSystemOverloadScheme classes.
*/
@InterfaceAudience.LimitedPrivate({"MapReduce", "HBase", "Hive" })
@InterfaceStability.Stable
package org.apache.hadoop.fs.viewfs;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;

View File

@ -0,0 +1,165 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.viewfs;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileSystemTestHelper;
import org.apache.hadoop.fs.FsConstants;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
/**
* Tests the mount table loading.
*/
public class TestHCFSMountTableConfigLoader {
private static final String DOT = ".";
private static final String TARGET_TWO = "/tar2";
private static final String TARGET_ONE = "/tar1";
private static final String SRC_TWO = "/src2";
private static final String SRC_ONE = "/src1";
private static final String TABLE_NAME = "test";
private MountTableConfigLoader loader = new HCFSMountTableConfigLoader();
private static FileSystem fsTarget;
private static Configuration conf;
private static Path targetTestRoot;
private static FileSystemTestHelper fileSystemTestHelper =
new FileSystemTestHelper();
private static File oldVersionMountTableFile;
private static File newVersionMountTableFile;
private static final String MOUNT_LINK_KEY_SRC_ONE =
new StringBuilder(Constants.CONFIG_VIEWFS_PREFIX).append(DOT)
.append(TABLE_NAME).append(DOT).append(Constants.CONFIG_VIEWFS_LINK)
.append(DOT).append(SRC_ONE).toString();
private static final String MOUNT_LINK_KEY_SRC_TWO =
new StringBuilder(Constants.CONFIG_VIEWFS_PREFIX).append(DOT)
.append(TABLE_NAME).append(DOT).append(Constants.CONFIG_VIEWFS_LINK)
.append(DOT).append(SRC_TWO).toString();
@BeforeClass
public static void init() throws Exception {
fsTarget = new LocalFileSystem();
fsTarget.initialize(new URI("file:///"), new Configuration());
targetTestRoot = fileSystemTestHelper.getAbsoluteTestRootPath(fsTarget);
fsTarget.delete(targetTestRoot, true);
fsTarget.mkdirs(targetTestRoot);
}
@Before
public void setUp() throws Exception {
conf = new Configuration();
conf.set(String.format(
FsConstants.FS_VIEWFS_OVERLOAD_SCHEME_TARGET_FS_IMPL_PATTERN, "file"),
LocalFileSystem.class.getName());
oldVersionMountTableFile =
new File(new URI(targetTestRoot.toString() + "/table.1.xml"));
oldVersionMountTableFile.createNewFile();
newVersionMountTableFile =
new File(new URI(targetTestRoot.toString() + "/table.2.xml"));
newVersionMountTableFile.createNewFile();
}
@Test
public void testMountTableFileLoadingWhenMultipleFilesExist()
throws Exception {
ViewFsTestSetup.addMountLinksToFile(TABLE_NAME,
new String[] {SRC_ONE, SRC_TWO }, new String[] {TARGET_ONE,
TARGET_TWO },
new Path(newVersionMountTableFile.toURI()), conf);
loader.load(targetTestRoot.toString(), conf);
Assert.assertEquals(conf.get(MOUNT_LINK_KEY_SRC_TWO), TARGET_TWO);
Assert.assertEquals(conf.get(MOUNT_LINK_KEY_SRC_ONE), TARGET_ONE);
}
@Test
public void testMountTableFileWithInvalidFormat() throws Exception {
Path path = new Path(new URI(
targetTestRoot.toString() + "/testMountTableFileWithInvalidFormat/"));
fsTarget.mkdirs(path);
File invalidMountFileName =
new File(new URI(path.toString() + "/table.InvalidVersion.xml"));
invalidMountFileName.createNewFile();
// Adding mount links to make sure it will not read it.
ViewFsTestSetup.addMountLinksToFile(TABLE_NAME,
new String[] {SRC_ONE, SRC_TWO }, new String[] {TARGET_ONE,
TARGET_TWO },
new Path(invalidMountFileName.toURI()), conf);
// Pass mount table directory
loader.load(path.toString(), conf);
Assert.assertEquals(null, conf.get(MOUNT_LINK_KEY_SRC_TWO));
Assert.assertEquals(null, conf.get(MOUNT_LINK_KEY_SRC_ONE));
invalidMountFileName.delete();
}
@Test
public void testMountTableFileWithInvalidFormatWithNoDotsInName()
throws Exception {
Path path = new Path(new URI(targetTestRoot.toString()
+ "/testMountTableFileWithInvalidFormatWithNoDots/"));
fsTarget.mkdirs(path);
File invalidMountFileName =
new File(new URI(path.toString() + "/tableInvalidVersionxml"));
invalidMountFileName.createNewFile();
// Pass mount table directory
loader.load(path.toString(), conf);
Assert.assertEquals(null, conf.get(MOUNT_LINK_KEY_SRC_TWO));
Assert.assertEquals(null, conf.get(MOUNT_LINK_KEY_SRC_ONE));
invalidMountFileName.delete();
}
@Test(expected = FileNotFoundException.class)
public void testLoadWithMountFile() throws Exception {
loader.load(new URI(targetTestRoot.toString() + "/Non-Existent-File.xml")
.toString(), conf);
}
@Test
public void testLoadWithNonExistentMountFile() throws Exception {
ViewFsTestSetup.addMountLinksToFile(TABLE_NAME,
new String[] {SRC_ONE, SRC_TWO },
new String[] {TARGET_ONE, TARGET_TWO },
new Path(oldVersionMountTableFile.toURI()), conf);
loader.load(oldVersionMountTableFile.toURI().toString(), conf);
Assert.assertEquals(conf.get(MOUNT_LINK_KEY_SRC_TWO), TARGET_TWO);
Assert.assertEquals(conf.get(MOUNT_LINK_KEY_SRC_ONE), TARGET_ONE);
}
@AfterClass
public static void tearDown() throws IOException {
fsTarget.delete(targetTestRoot, true);
}
}

View File

@ -0,0 +1,77 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.viewfs;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.net.URISyntaxException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.junit.Before;
/**
* Test the TestViewFSOverloadSchemeCentralMountTableConfig with mount-table
* configuration files in configured fs location.
*/
public class TestViewFSOverloadSchemeCentralMountTableConfig
extends TestViewFileSystemOverloadSchemeLocalFileSystem {
private Path oldMountTablePath;
private Path latestMountTablepath;
@Before
public void setUp() throws Exception {
super.setUp();
// Mount table name format: mount-table.<versionNumber>.xml
String mountTableFileName1 = "mount-table.1.xml";
String mountTableFileName2 = "mount-table.2.xml";
oldMountTablePath =
new Path(getTestRoot() + File.separator + mountTableFileName1);
latestMountTablepath =
new Path(getTestRoot() + File.separator + mountTableFileName2);
getConf().set(Constants.CONFIG_VIEWFS_MOUNTTABLE_PATH,
getTestRoot().toString());
File f = new File(oldMountTablePath.toUri());
f.createNewFile(); // Just creating empty mount-table file.
File f2 = new File(latestMountTablepath.toUri());
latestMountTablepath = new Path(f2.toURI());
f2.createNewFile();
}
/**
* This method saves the mount links in a local files.
*/
@Override
void addMountLinks(String mountTable, String[] sources, String[] targets,
Configuration conf) throws IOException, URISyntaxException {
// we don't use conf here, instead we use config paths to store links.
// Mount-table old version file mount-table-<version>.xml
try (BufferedWriter out = new BufferedWriter(
new FileWriter(new File(oldMountTablePath.toUri())))) {
out.write("<configuration>\n");
// Invalid tag. This file should not be read.
out.write("</\\\name//\\>");
out.write("</configuration>\n");
out.flush();
}
ViewFsTestSetup.addMountLinksToFile(mountTable, sources, targets,
latestMountTablepath, conf);
}
}

View File

@ -19,6 +19,7 @@
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -66,16 +67,26 @@ public void setUp() throws Exception {
fsTarget.mkdirs(targetTestRoot);
}
/**
* Adds the given mount links to config. sources contains mount link src and
* the respective index location in targets contains the target uri.
*/
void addMountLinks(String mountTable, String[] sources, String[] targets,
Configuration config) throws IOException, URISyntaxException {
ViewFsTestSetup.addMountLinksToConf(mountTable, sources, targets, config);
}
/**
* Tests write file and read file with ViewFileSystemOverloadScheme.
*/
@Test
public void testLocalTargetLinkWriteSimple() throws IOException {
public void testLocalTargetLinkWriteSimple()
throws IOException, URISyntaxException {
LOG.info("Starting testLocalTargetLinkWriteSimple");
final String testString = "Hello Local!...";
final Path lfsRoot = new Path("/lfsRoot");
ConfigUtil.addLink(conf, lfsRoot.toString(),
URI.create(targetTestRoot + "/local"));
addMountLinks(null, new String[] {lfsRoot.toString() },
new String[] {targetTestRoot + "/local" }, conf);
try (FileSystem lViewFs = FileSystem.get(URI.create("file:///"), conf)) {
final Path testPath = new Path(lfsRoot, "test.txt");
try (FSDataOutputStream fsDos = lViewFs.create(testPath)) {
@ -94,8 +105,8 @@ public void testLocalTargetLinkWriteSimple() throws IOException {
@Test
public void testLocalFsCreateAndDelete() throws Exception {
LOG.info("Starting testLocalFsCreateAndDelete");
ConfigUtil.addLink(conf, "mt", "/lfsroot",
URI.create(targetTestRoot + "/wd2"));
addMountLinks("mt", new String[] {"/lfsroot" },
new String[] {targetTestRoot + "/wd2" }, conf);
final URI mountURI = URI.create("file://mt/");
try (FileSystem lViewFS = FileSystem.get(mountURI, conf)) {
Path testPath = new Path(mountURI.toString() + "/lfsroot/test");
@ -113,8 +124,9 @@ public void testLocalFsCreateAndDelete() throws Exception {
@Test
public void testLocalFsLinkSlashMerge() throws Exception {
LOG.info("Starting testLocalFsLinkSlashMerge");
ConfigUtil.addLinkMergeSlash(conf, "mt",
URI.create(targetTestRoot + "/wd2"));
addMountLinks("mt",
new String[] {Constants.CONFIG_VIEWFS_LINK_MERGE_SLASH },
new String[] {targetTestRoot + "/wd2" }, conf);
final URI mountURI = URI.create("file://mt/");
try (FileSystem lViewFS = FileSystem.get(mountURI, conf)) {
Path fileOnRoot = new Path(mountURI.toString() + "/NewFile");
@ -130,10 +142,9 @@ public void testLocalFsLinkSlashMerge() throws Exception {
@Test(expected = IOException.class)
public void testLocalFsLinkSlashMergeWithOtherMountLinks() throws Exception {
LOG.info("Starting testLocalFsLinkSlashMergeWithOtherMountLinks");
ConfigUtil.addLink(conf, "mt", "/lfsroot",
URI.create(targetTestRoot + "/wd2"));
ConfigUtil.addLinkMergeSlash(conf, "mt",
URI.create(targetTestRoot + "/wd2"));
addMountLinks("mt",
new String[] {"/lfsroot", Constants.CONFIG_VIEWFS_LINK_MERGE_SLASH },
new String[] {targetTestRoot + "/wd2", targetTestRoot + "/wd2" }, conf);
final URI mountURI = URI.create("file://mt/");
FileSystem.get(mountURI, conf);
Assert.fail("A merge slash cannot be configured with other mount links.");
@ -146,4 +157,18 @@ public void tearDown() throws Exception {
fsTarget.close();
}
}
/**
* Returns the test root dir.
*/
public Path getTestRoot() {
return this.targetTestRoot;
}
/**
* Returns the conf.
*/
public Configuration getConf() {
return this.conf;
}
}

View File

@ -17,14 +17,18 @@
*/
package org.apache.hadoop.fs.viewfs;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileContextTestHelper;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FsConstants;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.viewfs.ConfigUtil;
import org.apache.hadoop.fs.viewfs.ViewFileSystemOverloadScheme.ChildFsGetter;
import org.apache.hadoop.util.Shell;
import org.eclipse.jetty.util.log.Log;
@ -132,4 +136,69 @@ static void linkUpFirstComponents(Configuration conf, String path,
+ firstComponent + "->" + linkTarget);
}
/**
* Adds the given mount links to the given Hadoop compatible file system path.
* Mount link mappings are in sources, targets at their respective index
* locations.
*/
static void addMountLinksToFile(String mountTable, String[] sources,
String[] targets, Path mountTableConfPath, Configuration conf)
throws IOException, URISyntaxException {
ChildFsGetter cfs = new ViewFileSystemOverloadScheme.ChildFsGetter(
mountTableConfPath.toUri().getScheme());
try (FileSystem fs = cfs.getNewInstance(mountTableConfPath.toUri(), conf)) {
try (FSDataOutputStream out = fs.create(mountTableConfPath)) {
String prefix =
new StringBuilder(Constants.CONFIG_VIEWFS_PREFIX).append(".")
.append((mountTable == null
? Constants.CONFIG_VIEWFS_DEFAULT_MOUNT_TABLE
: mountTable))
.append(".").toString();
out.writeBytes("<configuration>");
for (int i = 0; i < sources.length; i++) {
String src = sources[i];
String target = targets[i];
out.writeBytes("<property><name>");
if (Constants.CONFIG_VIEWFS_LINK_FALLBACK.equals(src)) {
out.writeBytes(prefix + Constants.CONFIG_VIEWFS_LINK_FALLBACK);
out.writeBytes("</name>");
} else if (Constants.CONFIG_VIEWFS_LINK_MERGE_SLASH.equals(src)) {
out.writeBytes(prefix + Constants.CONFIG_VIEWFS_LINK_MERGE_SLASH);
out.writeBytes("</name>");
} else {
out.writeBytes(prefix + Constants.CONFIG_VIEWFS_LINK + "." + src);
out.writeBytes("</name>");
}
out.writeBytes("<value>");
out.writeBytes(target);
out.writeBytes("</value></property>");
out.flush();
}
out.writeBytes(("</configuration>"));
out.flush();
}
}
}
/**
* Adds the given mount links to the configuration. Mount link mappings are
* in sources, targets at their respective index locations.
*/
static void addMountLinksToConf(String mountTable, String[] sources,
String[] targets, Configuration config) throws URISyntaxException {
for (int i = 0; i < sources.length; i++) {
String src = sources[i];
String target = targets[i];
String mountTableName = mountTable == null ?
Constants.CONFIG_VIEWFS_DEFAULT_MOUNT_TABLE : mountTable;
if (src.equals(Constants.CONFIG_VIEWFS_LINK_FALLBACK)) {
ConfigUtil.addLinkFallback(config, mountTableName, new URI(target));
} else if (src.equals(Constants.CONFIG_VIEWFS_LINK_MERGE_SLASH)) {
ConfigUtil.addLinkMergeSlash(config, mountTableName, new URI(target));
} else {
ConfigUtil.addLink(config, mountTableName, src, new URI(target));
}
}
}
}

View File

@ -0,0 +1,68 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.viewfs;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.junit.Before;
/**
* Tests ViewFileSystemOverloadScheme with configured mount links.
*/
public class TestViewFSOverloadSchemeWithMountTableConfigInHDFS
extends TestViewFileSystemOverloadSchemeWithHdfsScheme {
private Path oldVersionMountTablePath;
private Path newVersionMountTablePath;
@Before
@Override
public void startCluster() throws IOException {
super.startCluster();
String mountTableDir =
URI.create(getConf().get(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY))
.toString() + "/MountTable/";
getConf().set(Constants.CONFIG_VIEWFS_MOUNTTABLE_PATH, mountTableDir);
FileSystem fs = new ViewFileSystemOverloadScheme.ChildFsGetter("hdfs")
.getNewInstance(new Path(mountTableDir).toUri(), getConf());
fs.mkdirs(new Path(mountTableDir));
String oldVersionMountTable = "mount-table.30.xml";
String newVersionMountTable = "mount-table.31.xml";
oldVersionMountTablePath = new Path(mountTableDir, oldVersionMountTable);
newVersionMountTablePath = new Path(mountTableDir, newVersionMountTable);
fs.createNewFile(oldVersionMountTablePath);
fs.createNewFile(newVersionMountTablePath);
}
/**
* This method saves the mount links in a hdfs file newVersionMountTable.
* Since this file has highest version, this should be loaded by
* ViewFSOverloadScheme.
*/
@Override
void addMountLinks(String mountTable, String[] sources, String[] targets,
Configuration config) throws IOException, URISyntaxException {
ViewFsTestSetup.addMountLinksToFile(mountTable, sources, targets,
newVersionMountTablePath, getConf());
}
}

View File

@ -20,6 +20,7 @@
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
@ -54,6 +55,9 @@ public class TestViewFileSystemOverloadSchemeWithHdfsScheme {
private static final String HDFS_USER_FOLDER = "/HDFSUser";
private static final String LOCAL_FOLDER = "/local";
/**
* Sets up the configurations and starts the MiniDFSCluster.
*/
@Before
public void startCluster() throws IOException {
conf = new Configuration();
@ -80,16 +84,13 @@ public void tearDown() throws IOException {
}
}
private void createLinks(boolean needFalbackLink, Path hdfsTargetPath,
Path localTragetPath) {
ConfigUtil.addLink(conf, defaultFSURI.getAuthority(), HDFS_USER_FOLDER,
hdfsTargetPath.toUri());
ConfigUtil.addLink(conf, defaultFSURI.getAuthority(), LOCAL_FOLDER,
localTragetPath.toUri());
if (needFalbackLink) {
ConfigUtil.addLinkFallback(conf, defaultFSURI.getAuthority(),
hdfsTargetPath.toUri());
}
/**
* Adds the given mount links to config. sources contains mount link src and
* the respective index location in targets contains the target uri.
*/
void addMountLinks(String mountTable, String[] sources, String[] targets,
Configuration config) throws IOException, URISyntaxException {
ViewFsTestSetup.addMountLinksToConf(mountTable, sources, targets, config);
}
/**
@ -105,7 +106,11 @@ public void testMountLinkWithLocalAndHDFS() throws Exception {
final Path hdfsTargetPath = new Path(defaultFSURI + HDFS_USER_FOLDER);
final Path localTragetPath = new Path(localTargetDir.toURI());
createLinks(false, hdfsTargetPath, localTragetPath);
addMountLinks(defaultFSURI.getAuthority(),
new String[] {HDFS_USER_FOLDER, LOCAL_FOLDER },
new String[] {hdfsTargetPath.toUri().toString(),
localTargetDir.toURI().toString() },
conf);
// /HDFSUser/testfile
Path hdfsFile = new Path(HDFS_USER_FOLDER + "/testfile");
@ -156,8 +161,8 @@ public void testMountLinkWithNonExistentLink() throws Exception {
* Below addLink will create following mount points
* hdfs://localhost:xxx/User --> nonexistent://NonExistent/User/
*/
ConfigUtil.addLink(conf, defaultFSURI.getAuthority(), userFolder,
nonExistTargetPath.toUri());
addMountLinks(defaultFSURI.getAuthority(), new String[] {userFolder },
new String[] {nonExistTargetPath.toUri().toString() }, conf);
FileSystem.get(conf);
Assert.fail("Expected to fail with non existent link");
}
@ -171,9 +176,11 @@ public void testMountLinkWithNonExistentLink() throws Exception {
@Test(timeout = 30000)
public void testListStatusOnRootShouldListAllMountLinks() throws Exception {
final Path hdfsTargetPath = new Path(defaultFSURI + HDFS_USER_FOLDER);
final Path localTragetPath = new Path(localTargetDir.toURI());
createLinks(false, hdfsTargetPath, localTragetPath);
addMountLinks(defaultFSURI.getAuthority(),
new String[] {HDFS_USER_FOLDER, LOCAL_FOLDER },
new String[] {hdfsTargetPath.toUri().toString(),
localTargetDir.toURI().toString() },
conf);
try (FileSystem fs = FileSystem.get(conf)) {
FileStatus[] ls = fs.listStatus(new Path("/"));
@ -198,9 +205,11 @@ public void testListStatusOnRootShouldListAllMountLinks() throws Exception {
@Test(expected = IOException.class, timeout = 30000)
public void testListStatusOnNonMountedPath() throws Exception {
final Path hdfsTargetPath = new Path(defaultFSURI + HDFS_USER_FOLDER);
final Path localTragetPath = new Path(localTargetDir.toURI());
createLinks(false, hdfsTargetPath, localTragetPath);
addMountLinks(defaultFSURI.getAuthority(),
new String[] {HDFS_USER_FOLDER, LOCAL_FOLDER },
new String[] {hdfsTargetPath.toUri().toString(),
localTargetDir.toURI().toString() },
conf);
try (FileSystem fs = FileSystem.get(conf)) {
fs.listStatus(new Path("/nonMount"));
@ -218,9 +227,13 @@ public void testListStatusOnNonMountedPath() throws Exception {
@Test(timeout = 30000)
public void testWithLinkFallBack() throws Exception {
final Path hdfsTargetPath = new Path(defaultFSURI + HDFS_USER_FOLDER);
final Path localTragetPath = new Path(localTargetDir.toURI());
createLinks(true, hdfsTargetPath, localTragetPath);
addMountLinks(defaultFSURI.getAuthority(),
new String[] {HDFS_USER_FOLDER, LOCAL_FOLDER,
Constants.CONFIG_VIEWFS_LINK_FALLBACK },
new String[] {hdfsTargetPath.toUri().toString(),
localTargetDir.toURI().toString(),
hdfsTargetPath.toUri().toString() },
conf);
try (FileSystem fs = FileSystem.get(conf)) {
fs.createNewFile(new Path("/nonMount/myfile"));
@ -243,10 +256,11 @@ public void testWithLinkFallBack() throws Exception {
public void testCreateOnRootShouldFailWhenMountLinkConfigured()
throws Exception {
final Path hdfsTargetPath = new Path(defaultFSURI + HDFS_USER_FOLDER);
final Path localTragetPath = new Path(localTargetDir.toURI());
createLinks(false, hdfsTargetPath, localTragetPath);
addMountLinks(defaultFSURI.getAuthority(),
new String[] {HDFS_USER_FOLDER, LOCAL_FOLDER },
new String[] {hdfsTargetPath.toUri().toString(),
localTargetDir.toURI().toString() },
conf);
try (FileSystem fs = FileSystem.get(conf)) {
fs.createNewFile(new Path("/newFileOnRoot"));
Assert.fail("It should fail as root is read only in viewFS.");
@ -265,8 +279,13 @@ public void testCreateOnRootShouldFailWhenMountLinkConfigured()
public void testCreateOnRootShouldFailEvenFallBackMountLinkConfigured()
throws Exception {
final Path hdfsTargetPath = new Path(defaultFSURI + HDFS_USER_FOLDER);
final Path localTragetPath = new Path(localTargetDir.toURI());
createLinks(true, hdfsTargetPath, localTragetPath);
addMountLinks(defaultFSURI.getAuthority(),
new String[] {HDFS_USER_FOLDER, LOCAL_FOLDER,
Constants.CONFIG_VIEWFS_LINK_FALLBACK },
new String[] {hdfsTargetPath.toUri().toString(),
localTargetDir.toURI().toString(),
hdfsTargetPath.toUri().toString() },
conf);
try (FileSystem fs = FileSystem.get(conf)) {
fs.createNewFile(new Path("/onRootWhenFallBack"));
Assert.fail(
@ -290,9 +309,14 @@ public void testCreateOnRootShouldFailEvenFallBackMountLinkConfigured()
@Test(expected = UnsupportedFileSystemException.class, timeout = 30000)
public void testInvalidOverloadSchemeTargetFS() throws Exception {
final Path hdfsTargetPath = new Path(defaultFSURI + HDFS_USER_FOLDER);
final Path localTragetPath = new Path(localTargetDir.toURI());
conf = new Configuration();
createLinks(true, hdfsTargetPath, localTragetPath);
addMountLinks(defaultFSURI.getAuthority(),
new String[] {HDFS_USER_FOLDER, LOCAL_FOLDER,
Constants.CONFIG_VIEWFS_LINK_FALLBACK },
new String[] {hdfsTargetPath.toUri().toString(),
localTargetDir.toURI().toString(),
hdfsTargetPath.toUri().toString() },
conf);
conf.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY,
defaultFSURI.toString());
conf.set(String.format(FS_IMPL_PATTERN_KEY, HDFS_SCHEME),
@ -315,8 +339,11 @@ public void testInvalidOverloadSchemeTargetFS() throws Exception {
public void testViewFsOverloadSchemeWhenInnerCacheDisabled()
throws Exception {
final Path hdfsTargetPath = new Path(defaultFSURI + HDFS_USER_FOLDER);
final Path localTragetPath = new Path(localTargetDir.toURI());
createLinks(false, hdfsTargetPath, localTragetPath);
addMountLinks(defaultFSURI.getAuthority(),
new String[] {HDFS_USER_FOLDER, LOCAL_FOLDER },
new String[] {hdfsTargetPath.toUri().toString(),
localTargetDir.toURI().toString(), },
conf);
conf.setBoolean(Constants.CONFIG_VIEWFS_ENABLE_INNER_CACHE, false);
try (FileSystem fs = FileSystem.get(conf)) {
Path testFile = new Path(HDFS_USER_FOLDER + "/testFile");
@ -337,10 +364,11 @@ public void testViewFsOverloadSchemeWhenInnerCacheDisabled()
public void testViewFsOverloadSchemeWithInnerCache()
throws Exception {
final Path hdfsTargetPath = new Path(defaultFSURI + HDFS_USER_FOLDER);
ConfigUtil.addLink(conf, defaultFSURI.getAuthority(), HDFS_USER_FOLDER + 0,
hdfsTargetPath.toUri());
ConfigUtil.addLink(conf, defaultFSURI.getAuthority(), HDFS_USER_FOLDER + 1,
hdfsTargetPath.toUri());
addMountLinks(defaultFSURI.getAuthority(),
new String[] {HDFS_USER_FOLDER + 0, HDFS_USER_FOLDER + 1 },
new String[] {hdfsTargetPath.toUri().toString(),
hdfsTargetPath.toUri().toString() },
conf);
// 1. Only 1 hdfs child file system should be there with cache.
try (ViewFileSystemOverloadScheme vfs =
@ -368,10 +396,11 @@ public void testViewFsOverloadSchemeWithInnerCache()
public void testViewFsOverloadSchemeWithNoInnerCacheAndHdfsTargets()
throws Exception {
final Path hdfsTargetPath = new Path(defaultFSURI + HDFS_USER_FOLDER);
ConfigUtil.addLink(conf, defaultFSURI.getAuthority(), HDFS_USER_FOLDER + 0,
hdfsTargetPath.toUri());
ConfigUtil.addLink(conf, defaultFSURI.getAuthority(), HDFS_USER_FOLDER + 1,
hdfsTargetPath.toUri());
addMountLinks(defaultFSURI.getAuthority(),
new String[] {HDFS_USER_FOLDER + 0, HDFS_USER_FOLDER + 1 },
new String[] {hdfsTargetPath.toUri().toString(),
hdfsTargetPath.toUri().toString() },
conf);
conf.setBoolean(Constants.CONFIG_VIEWFS_ENABLE_INNER_CACHE, false);
// Two hdfs file systems should be there if no cache.
@ -394,10 +423,11 @@ public void testViewFsOverloadSchemeWithNoInnerCacheAndHdfsTargets()
public void testViewFsOverloadSchemeWithNoInnerCacheAndLocalSchemeTargets()
throws Exception {
final Path localTragetPath = new Path(localTargetDir.toURI());
ConfigUtil.addLink(conf, defaultFSURI.getAuthority(), LOCAL_FOLDER + 0,
localTragetPath.toUri());
ConfigUtil.addLink(conf, defaultFSURI.getAuthority(), LOCAL_FOLDER + 1,
localTragetPath.toUri());
addMountLinks(defaultFSURI.getAuthority(),
new String[] {LOCAL_FOLDER + 0, LOCAL_FOLDER + 1 },
new String[] {localTragetPath.toUri().toString(),
localTragetPath.toUri().toString() },
conf);
// Only one local file system should be there if no InnerCache, but fs
// cache should work.
@ -407,4 +437,11 @@ public void testViewFsOverloadSchemeWithNoInnerCacheAndLocalSchemeTargets()
Assert.assertEquals(1, vfs.getChildFileSystems().length);
}
}
/**
* @return configuration.
*/
public Configuration getConf() {
return this.conf;
}
}