HDFS-15533: Provide DFS API compatible class, but use ViewFileSystemOverloadScheme inside. (#2229). Contributed by Uma Maheswara Rao G.

This commit is contained in:
Uma Maheswara Rao G 2020-08-19 09:30:41 -07:00 committed by GitHub
parent 82ec28f442
commit dd013f2fdf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 2795 additions and 90 deletions

View File

@ -45,5 +45,4 @@ public interface FsConstants {
String FS_VIEWFS_OVERLOAD_SCHEME_TARGET_FS_IMPL_PATTERN = String FS_VIEWFS_OVERLOAD_SCHEME_TARGET_FS_IMPL_PATTERN =
"fs.viewfs.overload.scheme.target.%s.impl"; "fs.viewfs.overload.scheme.target.%s.impl";
String VIEWFS_TYPE = "viewfs"; String VIEWFS_TYPE = "viewfs";
String VIEWFSOS_TYPE = "viewfsOverloadScheme";
} }

View File

@ -599,9 +599,10 @@ protected InodeTree(final Configuration config, final String viewName,
if (!gotMountTableEntry) { if (!gotMountTableEntry) {
if (!initingUriAsFallbackOnNoMounts) { if (!initingUriAsFallbackOnNoMounts) {
throw new IOException( throw new IOException(new StringBuilder(
"ViewFs: Cannot initialize: Empty Mount table in config for " "ViewFs: Cannot initialize: Empty Mount table in config for ")
+ "viewfs://" + mountTableName + "/"); .append(theUri.getScheme()).append("://").append(mountTableName)
.append("/").toString());
} }
StringBuilder msg = StringBuilder msg =
new StringBuilder("Empty mount table detected for ").append(theUri) new StringBuilder("Empty mount table detected for ").append(theUri)

View File

@ -259,13 +259,14 @@ public String getScheme() {
} }
/** /**
* Returns the ViewFileSystem type. * Returns false as it does not support to add fallback link automatically on
* @return <code>viewfs</code> * no mounts.
*/ */
String getType() { boolean supportAutoAddingFallbackOnNoMounts() {
return FsConstants.VIEWFS_TYPE; return false;
} }
/** /**
* Called after a new FileSystem instance is constructed. * Called after a new FileSystem instance is constructed.
* @param theUri a uri whose authority section names the host, port, etc. for * @param theUri a uri whose authority section names the host, port, etc. for
@ -293,7 +294,7 @@ public void initialize(final URI theUri, final Configuration conf)
try { try {
myUri = new URI(getScheme(), authority, "/", null, null); myUri = new URI(getScheme(), authority, "/", null, null);
boolean initingUriAsFallbackOnNoMounts = boolean initingUriAsFallbackOnNoMounts =
!FsConstants.VIEWFS_TYPE.equals(getType()); supportAutoAddingFallbackOnNoMounts();
fsState = new InodeTree<FileSystem>(conf, tableName, myUri, fsState = new InodeTree<FileSystem>(conf, tableName, myUri,
initingUriAsFallbackOnNoMounts) { initingUriAsFallbackOnNoMounts) {
@Override @Override

View File

@ -104,6 +104,7 @@
@InterfaceStability.Evolving @InterfaceStability.Evolving
public class ViewFileSystemOverloadScheme extends ViewFileSystem { public class ViewFileSystemOverloadScheme extends ViewFileSystem {
private URI myUri; private URI myUri;
private boolean supportAutoAddingFallbackOnNoMounts = true;
public ViewFileSystemOverloadScheme() throws IOException { public ViewFileSystemOverloadScheme() throws IOException {
super(); super();
} }
@ -114,11 +115,19 @@ public String getScheme() {
} }
/** /**
* Returns the ViewFileSystem type. * By default returns false as ViewFileSystemOverloadScheme supports auto
* @return <code>viewfs</code> * adding fallback on no mounts.
*/ */
String getType() { public boolean supportAutoAddingFallbackOnNoMounts() {
return FsConstants.VIEWFSOS_TYPE; return this.supportAutoAddingFallbackOnNoMounts;
}
/**
* Sets whether to add fallback automatically when no mount points found.
*/
public void setSupportAutoAddingFallbackOnNoMounts(
boolean addAutoFallbackOnNoMounts) {
this.supportAutoAddingFallbackOnNoMounts = addAutoFallbackOnNoMounts;
} }
@Override @Override
@ -287,4 +296,62 @@ public FileSystem getRawFileSystem(Path path, Configuration conf)
} }
} }
/**
* Gets the mount path info, which contains the target file system and
* remaining path to pass to the target file system.
*/
public MountPathInfo<FileSystem> getMountPathInfo(Path path,
Configuration conf) throws IOException {
InodeTree.ResolveResult<FileSystem> res;
try {
res = fsState.resolve(getUriPath(path), true);
FileSystem fs = res.isInternalDir() ?
(fsState.getRootFallbackLink() != null ?
((ChRootedFileSystem) fsState
.getRootFallbackLink().targetFileSystem).getMyFs() :
fsGetter().get(path.toUri(), conf)) :
((ChRootedFileSystem) res.targetFileSystem).getMyFs();
return new MountPathInfo<FileSystem>(res.remainingPath, res.resolvedPath,
fs);
} catch (FileNotFoundException e) {
// No link configured with passed path.
throw new NotInMountpointException(path,
"No link found for the given path.");
}
}
/**
* A class to maintain the target file system and a path to pass to the target
* file system.
*/
public static class MountPathInfo<T> {
private Path pathOnTarget;
private T targetFs;
public MountPathInfo(Path pathOnTarget, String resolvedPath, T targetFs) {
this.pathOnTarget = pathOnTarget;
this.targetFs = targetFs;
}
public Path getPathOnTarget() {
return this.pathOnTarget;
}
public T getTargetFs() {
return this.targetFs;
}
}
/**
* @return Gets the fallback file system configured. Usually, this will be the
* default cluster.
*/
public FileSystem getFallbackFileSystem() {
if (fsState.getRootFallbackLink() == null) {
return null;
}
return ((ChRootedFileSystem) fsState.getRootFallbackLink().targetFileSystem)
.getMyFs();
}
} }

View File

@ -186,7 +186,7 @@ public void initialize(URI uri, Configuration conf) throws IOException {
throw new IOException("Incomplete HDFS URI, no host: "+ uri); throw new IOException("Incomplete HDFS URI, no host: "+ uri);
} }
this.dfs = new DFSClient(uri, conf, statistics); initDFSClient(uri, conf);
this.uri = URI.create(uri.getScheme()+"://"+uri.getAuthority()); this.uri = URI.create(uri.getScheme()+"://"+uri.getAuthority());
this.workingDir = getHomeDirectory(); this.workingDir = getHomeDirectory();
@ -200,6 +200,10 @@ public StorageStatistics provide() {
}); });
} }
void initDFSClient(URI theUri, Configuration conf) throws IOException {
this.dfs = new DFSClient(theUri, conf, statistics);
}
@Override @Override
public Path getWorkingDirectory() { public Path getWorkingDirectory() {
return workingDir; return workingDir;
@ -1510,10 +1514,14 @@ protected boolean primitiveMkdir(Path f, FsPermission absolutePermission)
@Override @Override
public void close() throws IOException { public void close() throws IOException {
try { try {
dfs.closeOutputStreams(false); if (dfs != null) {
dfs.closeOutputStreams(false);
}
super.close(); super.close();
} finally { } finally {
dfs.close(); if (dfs != null) {
dfs.close();
}
} }
} }

View File

@ -41,7 +41,6 @@
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.DFSUtilClient; import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HAUtil; import org.apache.hadoop.hdfs.HAUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
@ -727,11 +726,6 @@ protected void initialize(Configuration conf) throws IOException {
intervals); intervals);
} }
} }
// Currently NN uses FileSystem.get to initialize DFS in startTrashEmptier.
// If fs.hdfs.impl was overridden by core-site.xml, we may get other
// filesystem. To make sure we get DFS, we are setting fs.hdfs.impl to DFS.
// HDFS-15450
conf.set(FS_HDFS_IMPL_KEY, DistributedFileSystem.class.getName());
UserGroupInformation.setConfiguration(conf); UserGroupInformation.setConfiguration(conf);
loginAsNameNodeUser(conf); loginAsNameNodeUser(conf);

View File

@ -37,8 +37,8 @@ public class TestViewFSOverloadSchemeWithMountTableConfigInHDFS
@Before @Before
@Override @Override
public void startCluster() throws IOException { public void setUp() throws IOException {
super.startCluster(); super.setUp();
String mountTableDir = String mountTableDir =
URI.create(getConf().get(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY)) URI.create(getConf().get(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY))
.toString() + "/MountTable/"; .toString() + "/MountTable/";

View File

@ -36,13 +36,15 @@
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RawLocalFileSystem; import org.apache.hadoop.fs.RawLocalFileSystem;
import org.apache.hadoop.fs.UnsupportedFileSystemException; import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.test.LambdaTestUtils;
import org.apache.hadoop.test.PathUtils; import org.apache.hadoop.test.PathUtils;
import org.junit.After; import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
import static org.apache.hadoop.fs.viewfs.Constants.CONFIG_VIEWFS_IGNORE_PORT_IN_MOUNT_TABLE_NAME; import static org.apache.hadoop.fs.viewfs.Constants.CONFIG_VIEWFS_IGNORE_PORT_IN_MOUNT_TABLE_NAME;
@ -58,7 +60,7 @@ public class TestViewFileSystemOverloadSchemeWithHdfsScheme {
private static final String FS_IMPL_PATTERN_KEY = "fs.%s.impl"; private static final String FS_IMPL_PATTERN_KEY = "fs.%s.impl";
private static final String HDFS_SCHEME = "hdfs"; private static final String HDFS_SCHEME = "hdfs";
private Configuration conf = null; private Configuration conf = null;
private MiniDFSCluster cluster = null; private static MiniDFSCluster cluster = null;
private URI defaultFSURI; private URI defaultFSURI;
private File localTargetDir; private File localTargetDir;
private static final String TEST_ROOT_DIR = PathUtils private static final String TEST_ROOT_DIR = PathUtils
@ -66,33 +68,52 @@ public class TestViewFileSystemOverloadSchemeWithHdfsScheme {
private static final String HDFS_USER_FOLDER = "/HDFSUser"; private static final String HDFS_USER_FOLDER = "/HDFSUser";
private static final String LOCAL_FOLDER = "/local"; private static final String LOCAL_FOLDER = "/local";
@BeforeClass
public static void init() throws IOException {
cluster =
new MiniDFSCluster.Builder(new Configuration()).numDataNodes(2).build();
cluster.waitClusterUp();
}
/** /**
* Sets up the configurations and starts the MiniDFSCluster. * Sets up the configurations and starts the MiniDFSCluster.
*/ */
@Before @Before
public void startCluster() throws IOException { public void setUp() throws IOException {
conf = new Configuration(); Configuration config = getNewConf();
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_KEY, config.setInt(
true);
conf.setInt(
CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 1); CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 1);
conf.set(String.format(FS_IMPL_PATTERN_KEY, HDFS_SCHEME), config.set(String.format(FS_IMPL_PATTERN_KEY, HDFS_SCHEME),
ViewFileSystemOverloadScheme.class.getName()); ViewFileSystemOverloadScheme.class.getName());
conf.set(String.format( config.setBoolean(CONFIG_VIEWFS_IGNORE_PORT_IN_MOUNT_TABLE_NAME,
FsConstants.FS_VIEWFS_OVERLOAD_SCHEME_TARGET_FS_IMPL_PATTERN,
HDFS_SCHEME), DistributedFileSystem.class.getName());
conf.setBoolean(CONFIG_VIEWFS_IGNORE_PORT_IN_MOUNT_TABLE_NAME,
CONFIG_VIEWFS_IGNORE_PORT_IN_MOUNT_TABLE_NAME_DEFAULT); CONFIG_VIEWFS_IGNORE_PORT_IN_MOUNT_TABLE_NAME_DEFAULT);
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build(); setConf(config);
cluster.waitClusterUp();
defaultFSURI = defaultFSURI =
URI.create(conf.get(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY)); URI.create(config.get(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY));
localTargetDir = new File(TEST_ROOT_DIR, "/root/"); localTargetDir = new File(TEST_ROOT_DIR, "/root/");
localTargetDir.mkdirs();
Assert.assertEquals(HDFS_SCHEME, defaultFSURI.getScheme()); // hdfs scheme. Assert.assertEquals(HDFS_SCHEME, defaultFSURI.getScheme()); // hdfs scheme.
} }
@After @After
public void tearDown() throws IOException { public void cleanUp() throws IOException {
if (cluster != null) {
FileSystem fs = new DistributedFileSystem();
fs.initialize(defaultFSURI, conf);
try {
FileStatus[] statuses = fs.listStatus(new Path("/"));
for (FileStatus st : statuses) {
Assert.assertTrue(fs.delete(st.getPath(), true));
}
} finally {
fs.close();
}
FileSystem.closeAll();
}
}
@AfterClass
public static void tearDown() throws IOException {
if (cluster != null) { if (cluster != null) {
FileSystem.closeAll(); FileSystem.closeAll();
cluster.shutdown(); cluster.shutdown();
@ -132,9 +153,9 @@ public void testMountLinkWithLocalAndHDFS() throws Exception {
// /local/test // /local/test
Path localDir = new Path(LOCAL_FOLDER + "/test"); Path localDir = new Path(LOCAL_FOLDER + "/test");
try (ViewFileSystemOverloadScheme fs try (FileSystem fs
= (ViewFileSystemOverloadScheme) FileSystem.get(conf)) { = FileSystem.get(conf)) {
Assert.assertEquals(2, fs.getMountPoints().length); Assert.assertEquals(2, fs.getChildFileSystems().length);
fs.createNewFile(hdfsFile); // /HDFSUser/testfile fs.createNewFile(hdfsFile); // /HDFSUser/testfile
fs.mkdirs(localDir); // /local/test fs.mkdirs(localDir); // /local/test
} }
@ -166,8 +187,13 @@ public void testMountLinkWithLocalAndHDFS() throws Exception {
* hdfs://localhost:xxx/HDFSUser --> nonexistent://NonExistent/User/ * hdfs://localhost:xxx/HDFSUser --> nonexistent://NonExistent/User/
* It should fail to add non existent fs link. * It should fail to add non existent fs link.
*/ */
@Test(expected = IOException.class, timeout = 30000) @Test(timeout = 30000)
public void testMountLinkWithNonExistentLink() throws Exception { public void testMountLinkWithNonExistentLink() throws Exception {
testMountLinkWithNonExistentLink(true);
}
public void testMountLinkWithNonExistentLink(boolean expectFsInitFailure)
throws Exception {
final String userFolder = "/User"; final String userFolder = "/User";
final Path nonExistTargetPath = final Path nonExistTargetPath =
new Path("nonexistent://NonExistent" + userFolder); new Path("nonexistent://NonExistent" + userFolder);
@ -176,10 +202,17 @@ public void testMountLinkWithNonExistentLink() throws Exception {
* Below addLink will create following mount points * Below addLink will create following mount points
* hdfs://localhost:xxx/User --> nonexistent://NonExistent/User/ * hdfs://localhost:xxx/User --> nonexistent://NonExistent/User/
*/ */
addMountLinks(defaultFSURI.getAuthority(), new String[] {userFolder }, addMountLinks(defaultFSURI.getAuthority(), new String[] {userFolder},
new String[] {nonExistTargetPath.toUri().toString() }, conf); new String[] {nonExistTargetPath.toUri().toString()}, conf);
FileSystem.get(conf); if (expectFsInitFailure) {
Assert.fail("Expected to fail with non existent link"); LambdaTestUtils.intercept(IOException.class, () -> {
FileSystem.get(conf);
});
} else {
try (FileSystem fs = FileSystem.get(conf)) {
Assert.assertEquals("hdfs", fs.getScheme());
}
}
} }
/** /**
@ -271,14 +304,10 @@ public void testAccessViewFsPathWithoutAuthority() throws Exception {
// check for viewfs path without authority // check for viewfs path without authority
Path viewFsRootPath = new Path("viewfs:/"); Path viewFsRootPath = new Path("viewfs:/");
try { LambdaTestUtils.intercept(IOException.class,
viewFsRootPath.getFileSystem(conf); "Empty Mount table in config for viewfs://default", () -> {
Assert.fail( viewFsRootPath.getFileSystem(conf);
"Mount table with authority default should not be initialized"); });
} catch (IOException e) {
assertTrue(e.getMessage().contains(
"Empty Mount table in config for viewfs://default/"));
}
// set the name of the default mount table here and // set the name of the default mount table here and
// subsequent calls should succeed. // subsequent calls should succeed.
@ -334,18 +363,25 @@ public void testWithLinkFallBack() throws Exception {
* *
* It cannot find any mount link. ViewFS expects a mount point from root. * It cannot find any mount link. ViewFS expects a mount point from root.
*/ */
@Test(expected = NotInMountpointException.class, timeout = 30000) @Test(timeout = 30000)
public void testCreateOnRootShouldFailWhenMountLinkConfigured() public void testCreateOnRoot() throws Exception {
throws Exception { testCreateOnRoot(false);
}
public void testCreateOnRoot(boolean fallbackExist) throws Exception {
final Path hdfsTargetPath = new Path(defaultFSURI + HDFS_USER_FOLDER); final Path hdfsTargetPath = new Path(defaultFSURI + HDFS_USER_FOLDER);
addMountLinks(defaultFSURI.getAuthority(), addMountLinks(defaultFSURI.getAuthority(),
new String[] {HDFS_USER_FOLDER, LOCAL_FOLDER }, new String[] {HDFS_USER_FOLDER, LOCAL_FOLDER},
new String[] {hdfsTargetPath.toUri().toString(), new String[] {hdfsTargetPath.toUri().toString(),
localTargetDir.toURI().toString() }, localTargetDir.toURI().toString()}, conf);
conf);
try (FileSystem fs = FileSystem.get(conf)) { try (FileSystem fs = FileSystem.get(conf)) {
fs.createNewFile(new Path("/newFileOnRoot")); if (fallbackExist) {
Assert.fail("It should fail as root is read only in viewFS."); Assert.assertTrue(fs.createNewFile(new Path("/newFileOnRoot")));
} else {
LambdaTestUtils.intercept(NotInMountpointException.class, () -> {
fs.createNewFile(new Path("/newFileOnRoot"));
});
}
} }
} }
@ -433,15 +469,13 @@ public void testViewFsOverloadSchemeWithInnerCache()
conf); conf);
// 1. Only 1 hdfs child file system should be there with cache. // 1. Only 1 hdfs child file system should be there with cache.
try (ViewFileSystemOverloadScheme vfs = try (FileSystem vfs = FileSystem.get(conf)) {
(ViewFileSystemOverloadScheme) FileSystem.get(conf)) {
Assert.assertEquals(1, vfs.getChildFileSystems().length); Assert.assertEquals(1, vfs.getChildFileSystems().length);
} }
// 2. Two hdfs file systems should be there if no cache. // 2. Two hdfs file systems should be there if no cache.
conf.setBoolean(Constants.CONFIG_VIEWFS_ENABLE_INNER_CACHE, false); conf.setBoolean(Constants.CONFIG_VIEWFS_ENABLE_INNER_CACHE, false);
try (ViewFileSystemOverloadScheme vfs = try (FileSystem vfs = FileSystem.get(conf)) {
(ViewFileSystemOverloadScheme) FileSystem.get(conf)) {
Assert.assertEquals(2, vfs.getChildFileSystems().length); Assert.assertEquals(2, vfs.getChildFileSystems().length);
} }
} }
@ -466,8 +500,7 @@ public void testViewFsOverloadSchemeWithNoInnerCacheAndHdfsTargets()
conf.setBoolean(Constants.CONFIG_VIEWFS_ENABLE_INNER_CACHE, false); conf.setBoolean(Constants.CONFIG_VIEWFS_ENABLE_INNER_CACHE, false);
// Two hdfs file systems should be there if no cache. // Two hdfs file systems should be there if no cache.
try (ViewFileSystemOverloadScheme vfs = try (FileSystem vfs = FileSystem.get(conf)) {
(ViewFileSystemOverloadScheme) FileSystem.get(conf)) {
Assert.assertEquals(2, vfs.getChildFileSystems().length); Assert.assertEquals(2, vfs.getChildFileSystems().length);
} }
} }
@ -494,8 +527,7 @@ public void testViewFsOverloadSchemeWithNoInnerCacheAndLocalSchemeTargets()
// Only one local file system should be there if no InnerCache, but fs // Only one local file system should be there if no InnerCache, but fs
// cache should work. // cache should work.
conf.setBoolean(Constants.CONFIG_VIEWFS_ENABLE_INNER_CACHE, false); conf.setBoolean(Constants.CONFIG_VIEWFS_ENABLE_INNER_CACHE, false);
try (ViewFileSystemOverloadScheme vfs = try (FileSystem vfs = FileSystem.get(conf)) {
(ViewFileSystemOverloadScheme) FileSystem.get(conf)) {
Assert.assertEquals(1, vfs.getChildFileSystems().length); Assert.assertEquals(1, vfs.getChildFileSystems().length);
} }
} }
@ -656,4 +688,18 @@ private void readString(final FileSystem nfly, final Path testFile,
public Configuration getConf() { public Configuration getConf() {
return this.conf; return this.conf;
} }
/**
* @return configuration.
*/
public Configuration getNewConf() {
return new Configuration(cluster.getConfiguration(0));
}
/**
* sets configuration.
*/
public void setConf(Configuration config) {
conf = config;
}
} }

View File

@ -142,7 +142,7 @@ public class TestDistributedFileSystem {
private boolean noXmlDefaults = false; private boolean noXmlDefaults = false;
private HdfsConfiguration getTestConfiguration() { HdfsConfiguration getTestConfiguration() {
HdfsConfiguration conf; HdfsConfiguration conf;
if (noXmlDefaults) { if (noXmlDefaults) {
conf = new HdfsConfiguration(false); conf = new HdfsConfiguration(false);
@ -813,7 +813,7 @@ public void testStatistics() throws IOException {
@Test @Test
public void testStatistics2() throws IOException, NoSuchAlgorithmException { public void testStatistics2() throws IOException, NoSuchAlgorithmException {
HdfsConfiguration conf = new HdfsConfiguration(); HdfsConfiguration conf = getTestConfiguration();
conf.set(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_KEY, conf.set(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
StoragePolicySatisfierMode.EXTERNAL.toString()); StoragePolicySatisfierMode.EXTERNAL.toString());
File tmpDir = GenericTestUtils.getTestDir(UUID.randomUUID().toString()); File tmpDir = GenericTestUtils.getTestDir(UUID.randomUUID().toString());
@ -1475,7 +1475,7 @@ public void testCreateWithCustomChecksum() throws Exception {
@Test(timeout=60000) @Test(timeout=60000)
public void testFileCloseStatus() throws IOException { public void testFileCloseStatus() throws IOException {
Configuration conf = new HdfsConfiguration(); Configuration conf = getTestConfiguration();
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build(); MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
DistributedFileSystem fs = cluster.getFileSystem(); DistributedFileSystem fs = cluster.getFileSystem();
try { try {
@ -1495,7 +1495,7 @@ public void testFileCloseStatus() throws IOException {
@Test @Test
public void testCreateWithStoragePolicy() throws Throwable { public void testCreateWithStoragePolicy() throws Throwable {
Configuration conf = new HdfsConfiguration(); Configuration conf = getTestConfiguration();
try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.storageTypes( .storageTypes(
new StorageType[] {StorageType.DISK, StorageType.ARCHIVE, new StorageType[] {StorageType.DISK, StorageType.ARCHIVE,
@ -1534,7 +1534,7 @@ public void testCreateWithStoragePolicy() throws Throwable {
@Test(timeout=60000) @Test(timeout=60000)
public void testListFiles() throws IOException { public void testListFiles() throws IOException {
Configuration conf = new HdfsConfiguration(); Configuration conf = getTestConfiguration();
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build(); MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
try { try {
@ -1557,7 +1557,7 @@ public void testListFiles() throws IOException {
@Test @Test
public void testListStatusOfSnapshotDirs() throws IOException { public void testListStatusOfSnapshotDirs() throws IOException {
MiniDFSCluster cluster = new MiniDFSCluster.Builder(new HdfsConfiguration()) MiniDFSCluster cluster = new MiniDFSCluster.Builder(getTestConfiguration())
.build(); .build();
try { try {
DistributedFileSystem dfs = cluster.getFileSystem(); DistributedFileSystem dfs = cluster.getFileSystem();
@ -1577,7 +1577,7 @@ public void testListStatusOfSnapshotDirs() throws IOException {
@Test(timeout=10000) @Test(timeout=10000)
public void testDFSClientPeerReadTimeout() throws IOException { public void testDFSClientPeerReadTimeout() throws IOException {
final int timeout = 1000; final int timeout = 1000;
final Configuration conf = new HdfsConfiguration(); final Configuration conf = getTestConfiguration();
conf.setInt(HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, timeout); conf.setInt(HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, timeout);
// only need cluster to create a dfs client to get a peer // only need cluster to create a dfs client to get a peer
@ -1611,7 +1611,7 @@ public void testDFSClientPeerReadTimeout() throws IOException {
@Test(timeout=60000) @Test(timeout=60000)
public void testGetServerDefaults() throws IOException { public void testGetServerDefaults() throws IOException {
Configuration conf = new HdfsConfiguration(); Configuration conf = getTestConfiguration();
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build(); MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
try { try {
cluster.waitActive(); cluster.waitActive();
@ -1626,7 +1626,7 @@ public void testGetServerDefaults() throws IOException {
@Test(timeout=10000) @Test(timeout=10000)
public void testDFSClientPeerWriteTimeout() throws IOException { public void testDFSClientPeerWriteTimeout() throws IOException {
final int timeout = 1000; final int timeout = 1000;
final Configuration conf = new HdfsConfiguration(); final Configuration conf = getTestConfiguration();
conf.setInt(HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, timeout); conf.setInt(HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, timeout);
// only need cluster to create a dfs client to get a peer // only need cluster to create a dfs client to get a peer
@ -1663,7 +1663,7 @@ public void testDFSClientPeerWriteTimeout() throws IOException {
@Test(timeout = 30000) @Test(timeout = 30000)
public void testTotalDfsUsed() throws Exception { public void testTotalDfsUsed() throws Exception {
Configuration conf = new HdfsConfiguration(); Configuration conf = getTestConfiguration();
MiniDFSCluster cluster = null; MiniDFSCluster cluster = null;
try { try {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
@ -1850,7 +1850,7 @@ public void testDFSDataOutputStreamBuilderForAppend() throws IOException {
@Test @Test
public void testSuperUserPrivilege() throws Exception { public void testSuperUserPrivilege() throws Exception {
HdfsConfiguration conf = new HdfsConfiguration(); HdfsConfiguration conf = getTestConfiguration();
File tmpDir = GenericTestUtils.getTestDir(UUID.randomUUID().toString()); File tmpDir = GenericTestUtils.getTestDir(UUID.randomUUID().toString());
final Path jksPath = new Path(tmpDir.toString(), "test.jks"); final Path jksPath = new Path(tmpDir.toString(), "test.jks");
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH, conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH,
@ -1903,7 +1903,7 @@ public Void run() throws Exception {
@Test @Test
public void testListingStoragePolicyNonSuperUser() throws Exception { public void testListingStoragePolicyNonSuperUser() throws Exception {
HdfsConfiguration conf = new HdfsConfiguration(); HdfsConfiguration conf = getTestConfiguration();
try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build()) { try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build()) {
cluster.waitActive(); cluster.waitActive();
final DistributedFileSystem dfs = cluster.getFileSystem(); final DistributedFileSystem dfs = cluster.getFileSystem();
@ -2055,7 +2055,7 @@ public Object run() throws Exception {
@Test @Test
public void testStorageFavouredNodes() public void testStorageFavouredNodes()
throws IOException, InterruptedException, TimeoutException { throws IOException, InterruptedException, TimeoutException {
Configuration conf = new HdfsConfiguration(); Configuration conf = getTestConfiguration();
try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.storageTypes(new StorageType[] {StorageType.SSD, StorageType.DISK}) .storageTypes(new StorageType[] {StorageType.SSD, StorageType.DISK})
.numDataNodes(3).storagesPerDatanode(2).build()) { .numDataNodes(3).storagesPerDatanode(2).build()) {
@ -2080,7 +2080,7 @@ public void testStorageFavouredNodes()
@Test @Test
public void testGetECTopologyResultForPolicies() throws Exception { public void testGetECTopologyResultForPolicies() throws Exception {
Configuration conf = new HdfsConfiguration(); Configuration conf = getTestConfiguration();
try (MiniDFSCluster cluster = DFSTestUtil.setupCluster(conf, 9, 3, 0)) { try (MiniDFSCluster cluster = DFSTestUtil.setupCluster(conf, 9, 3, 0)) {
DistributedFileSystem dfs = cluster.getFileSystem(); DistributedFileSystem dfs = cluster.getFileSystem();
dfs.enableErasureCodingPolicy("RS-6-3-1024k"); dfs.enableErasureCodingPolicy("RS-6-3-1024k");
@ -2111,7 +2111,7 @@ public void testGetECTopologyResultForPolicies() throws Exception {
@Test @Test
public void testECCloseCommittedBlock() throws Exception { public void testECCloseCommittedBlock() throws Exception {
HdfsConfiguration conf = new HdfsConfiguration(); HdfsConfiguration conf = getTestConfiguration();
conf.setInt(DFS_NAMENODE_FILE_CLOSE_NUM_COMMITTED_ALLOWED_KEY, 1); conf.setInt(DFS_NAMENODE_FILE_CLOSE_NUM_COMMITTED_ALLOWED_KEY, 1);
try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(3).build()) { .numDataNodes(3).build()) {

View File

@ -0,0 +1,47 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.test.Whitebox;
import java.io.IOException;
public class TestViewDistributedFileSystem extends TestDistributedFileSystem{
@Override
HdfsConfiguration getTestConfiguration() {
HdfsConfiguration conf = super.getTestConfiguration();
conf.set("fs.hdfs.impl", ViewDistributedFileSystem.class.getName());
return conf;
}
@Override
public void testStatistics() throws IOException {
FileSystem.getStatistics(HdfsConstants.HDFS_URI_SCHEME,
ViewDistributedFileSystem.class).reset();
@SuppressWarnings("unchecked")
ThreadLocal<FileSystem.Statistics.StatisticsData> data =
(ThreadLocal<FileSystem.Statistics.StatisticsData>) Whitebox
.getInternalState(FileSystem
.getStatistics(HdfsConstants.HDFS_URI_SCHEME,
ViewDistributedFileSystem.class), "threadData");
data.set(null);
super.testStatistics();
}
}

View File

@ -0,0 +1,94 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileSystemContractBaseTest;
import org.apache.hadoop.fs.viewfs.ConfigUtil;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.LambdaTestUtils;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.net.URI;
public class TestViewDistributedFileSystemContract
extends TestHDFSFileSystemContract {
private static MiniDFSCluster cluster;
private static String defaultWorkingDirectory;
private static Configuration conf = new HdfsConfiguration();
@BeforeClass
public static void init() throws IOException {
final File basedir = GenericTestUtils.getRandomizedTestDir();
conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY,
FileSystemContractBaseTest.TEST_UMASK);
cluster = new MiniDFSCluster.Builder(conf, basedir)
.numDataNodes(2)
.build();
defaultWorkingDirectory =
"/user/" + UserGroupInformation.getCurrentUser().getShortUserName();
}
@Before
public void setUp() throws Exception {
conf.set("fs.hdfs.impl", ViewDistributedFileSystem.class.getName());
URI defaultFSURI =
URI.create(conf.get(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY));
ConfigUtil.addLink(conf, defaultFSURI.getHost(), "/user",
defaultFSURI);
ConfigUtil.addLinkFallback(conf, defaultFSURI.getHost(),
defaultFSURI);
fs = FileSystem.get(conf);
}
@AfterClass
public static void tearDownAfter() throws Exception {
if (cluster != null) {
cluster.shutdown();
cluster = null;
}
}
@Override
protected String getDefaultWorkingDirectory() {
return defaultWorkingDirectory;
}
@Test
public void testRenameRootDirForbidden() throws Exception {
LambdaTestUtils.intercept(AccessControlException.class,
"InternalDir of ViewFileSystem is readonly", () -> {
super.testRenameRootDirForbidden();
});
}
@Ignore("Ignore this test until HDFS-15532")
@Override
public void testLSRootDir() throws Throwable {
}
}

View File

@ -0,0 +1,64 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.viewfs.ConfigUtil;
import org.apache.hadoop.fs.viewfs.TestViewFileSystemOverloadSchemeWithHdfsScheme;
import org.junit.Test;
import java.io.IOException;
import java.net.URI;
import static org.apache.hadoop.fs.viewfs.Constants.CONFIG_VIEWFS_IGNORE_PORT_IN_MOUNT_TABLE_NAME;
import static org.apache.hadoop.fs.viewfs.Constants.CONFIG_VIEWFS_IGNORE_PORT_IN_MOUNT_TABLE_NAME_DEFAULT;
public class TestViewDistributedFileSystemWithMountLinks extends
TestViewFileSystemOverloadSchemeWithHdfsScheme {
@Override
public void setUp() throws IOException {
super.setUp();
Configuration conf = getConf();
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_KEY,
true);
conf.setInt(
CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 1);
conf.set("fs.hdfs.impl",
ViewDistributedFileSystem.class.getName());
conf.setBoolean(CONFIG_VIEWFS_IGNORE_PORT_IN_MOUNT_TABLE_NAME,
CONFIG_VIEWFS_IGNORE_PORT_IN_MOUNT_TABLE_NAME_DEFAULT);
URI defaultFSURI =
URI.create(conf.get(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY));
ConfigUtil.addLinkFallback(conf, defaultFSURI.getAuthority(),
new Path(defaultFSURI.toString()).toUri());
setConf(conf);
}
@Test(timeout = 30000)
public void testCreateOnRoot() throws Exception {
testCreateOnRoot(true);
}
@Test(timeout = 30000)
public void testMountLinkWithNonExistentLink() throws Exception {
testMountLinkWithNonExistentLink(false);
}
}

View File

@ -132,17 +132,23 @@ private static HdfsConfiguration createCachingConf() {
conf.setInt(DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_POOLS_NUM_RESPONSES, 2); conf.setInt(DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_POOLS_NUM_RESPONSES, 2);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_DIRECTIVES_NUM_RESPONSES, conf.setInt(DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_DIRECTIVES_NUM_RESPONSES,
2); 2);
return conf; return conf;
} }
/**
* @return the configuration.
*/
Configuration getConf() {
return this.conf;
}
@Before @Before
public void setup() throws Exception { public void setup() throws Exception {
conf = createCachingConf(); conf = createCachingConf();
cluster = cluster =
new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATANODES).build(); new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATANODES).build();
cluster.waitActive(); cluster.waitActive();
dfs = cluster.getFileSystem(); dfs = getDFS();
proto = cluster.getNameNodeRpc(); proto = cluster.getNameNodeRpc();
namenode = cluster.getNameNode(); namenode = cluster.getNameNode();
prevCacheManipulator = NativeIO.POSIX.getCacheManipulator(); prevCacheManipulator = NativeIO.POSIX.getCacheManipulator();
@ -150,6 +156,13 @@ public void setup() throws Exception {
BlockReaderTestUtil.enableHdfsCachingTracing(); BlockReaderTestUtil.enableHdfsCachingTracing();
} }
/**
* @return the dfs instance.
*/
DistributedFileSystem getDFS() throws IOException {
return (DistributedFileSystem) FileSystem.get(conf);
}
@After @After
public void teardown() throws Exception { public void teardown() throws Exception {
// Remove cache directives left behind by tests so that we release mmaps. // Remove cache directives left behind by tests so that we release mmaps.
@ -1613,6 +1626,14 @@ public void testAddingCacheDirectiveInfosWhenCachingIsDisabled()
"testAddingCacheDirectiveInfosWhenCachingIsDisabled:2"); "testAddingCacheDirectiveInfosWhenCachingIsDisabled:2");
} }
/**
* @return the dfs instance for nnIdx.
*/
DistributedFileSystem getDFS(MiniDFSCluster cluster, int nnIdx)
throws IOException {
return cluster.getFileSystem(0);
}
@Test(timeout=120000) @Test(timeout=120000)
public void testExpiryTimeConsistency() throws Exception { public void testExpiryTimeConsistency() throws Exception {
conf.setInt(DFSConfigKeys.DFS_HA_LOGROLL_PERIOD_KEY, 1); conf.setInt(DFSConfigKeys.DFS_HA_LOGROLL_PERIOD_KEY, 1);
@ -1623,7 +1644,7 @@ public void testExpiryTimeConsistency() throws Exception {
.build(); .build();
dfsCluster.transitionToActive(0); dfsCluster.transitionToActive(0);
DistributedFileSystem fs = dfsCluster.getFileSystem(0); DistributedFileSystem fs = getDFS(dfsCluster, 0);
final NameNode ann = dfsCluster.getNameNode(0); final NameNode ann = dfsCluster.getNameNode(0);
final Path filename = new Path("/file"); final Path filename = new Path("/file");

View File

@ -0,0 +1,56 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.viewfs.ConfigUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.ViewDistributedFileSystem;
import java.io.IOException;
import java.net.URI;
public class TestCacheDirectivesWithViewDFS extends TestCacheDirectives {
@Override
public DistributedFileSystem getDFS() throws IOException {
Configuration conf = getConf();
conf.set("fs.hdfs.impl", ViewDistributedFileSystem.class.getName());
URI defaultFSURI =
URI.create(conf.get(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY));
ConfigUtil.addLinkFallback(conf, defaultFSURI.getHost(),
new Path(defaultFSURI.toString()).toUri());
ConfigUtil.addLink(conf, defaultFSURI.getHost(), "/tmp",
new Path(defaultFSURI.toString()).toUri());
return super.getDFS();
}
@Override
public DistributedFileSystem getDFS(MiniDFSCluster cluster, int nnIdx)
throws IOException {
Configuration conf = cluster.getConfiguration(nnIdx);
conf.set("fs.hdfs.impl", ViewDistributedFileSystem.class.getName());
URI uri = cluster.getURI(0);
ConfigUtil.addLinkFallback(conf, uri.getHost(), uri);
ConfigUtil.addLink(conf, uri.getHost(), "/tmp", uri);
return cluster.getFileSystem(0);
}
}