diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ConfigUtil.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ConfigUtil.java index 4c3dae9a9f..6dd1f65894 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ConfigUtil.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ConfigUtil.java @@ -135,6 +135,17 @@ public static void addLinkMerge(Configuration conf, final URI[] targets) { addLinkMerge(conf, Constants.CONFIG_VIEWFS_DEFAULT_MOUNT_TABLE, targets); } + /** + * Add nfly link to configuration for the given mount table. + */ + public static void addLinkNfly(Configuration conf, String mountTableName, + String src, String settings, final String targets) { + conf.set( + getConfigViewFsPrefix(mountTableName) + "." + + Constants.CONFIG_VIEWFS_LINK_NFLY + "." + settings + "." + src, + targets); + } + /** * * @param conf @@ -149,9 +160,7 @@ public static void addLinkNfly(Configuration conf, String mountTableName, settings = settings == null ? "minReplication=2,repairOnRead=true" : settings; - - conf.set(getConfigViewFsPrefix(mountTableName) + "." + - Constants.CONFIG_VIEWFS_LINK_NFLY + "." + settings + "." + src, + addLinkNfly(conf, mountTableName, src, settings, StringUtils.uriToString(targets)); } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/FsGetter.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/FsGetter.java new file mode 100644 index 0000000000..071af11e63 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/FsGetter.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs.viewfs; + +import java.io.IOException; +import java.net.URI; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; + +/** + * File system instance getter. + */ +@Private +class FsGetter { + + /** + * Gets new file system instance of given uri. + */ + public FileSystem getNewInstance(URI uri, Configuration conf) + throws IOException { + return FileSystem.newInstance(uri, conf); + } + + /** + * Gets file system instance of given uri. + */ + public FileSystem get(URI uri, Configuration conf) throws IOException { + return FileSystem.get(uri, conf); + } +} \ No newline at end of file diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/HCFSMountTableConfigLoader.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/HCFSMountTableConfigLoader.java index c7e5aabe6f..3968e3650c 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/HCFSMountTableConfigLoader.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/HCFSMountTableConfigLoader.java @@ -59,8 +59,7 @@ public void load(String mountTableConfigPath, Configuration conf) throws IOException { this.mountTable = new Path(mountTableConfigPath); String scheme = mountTable.toUri().getScheme(); - ViewFileSystem.FsGetter fsGetter = - new ViewFileSystemOverloadScheme.ChildFsGetter(scheme); + FsGetter fsGetter = new ViewFileSystemOverloadScheme.ChildFsGetter(scheme); try (FileSystem fs = fsGetter.getNewInstance(mountTable.toUri(), conf)) { RemoteIterator listFiles = fs.listFiles(mountTable, false); diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/NflyFSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/NflyFSystem.java index a406d77f2e..85af68af31 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/NflyFSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/NflyFSystem.java @@ -212,6 +212,21 @@ private static String getRack(String rackString) { */ private NflyFSystem(URI[] uris, Configuration conf, int minReplication, EnumSet nflyFlags) throws IOException { + this(uris, conf, minReplication, nflyFlags, null); + } + + /** + * Creates a new Nfly instance. + * + * @param uris the list of uris in the mount point + * @param conf configuration object + * @param minReplication minimum copies to commit a write op + * @param nflyFlags modes such readMostRecent + * @param fsGetter to get the file system instance with the given uri + * @throws IOException + */ + private NflyFSystem(URI[] uris, Configuration conf, int minReplication, + EnumSet nflyFlags, FsGetter fsGetter) throws IOException { if (uris.length < minReplication) { throw new IOException(minReplication + " < " + uris.length + ": Minimum replication < #destinations"); @@ -238,8 +253,14 @@ private NflyFSystem(URI[] uris, Configuration conf, int minReplication, nodes = new NflyNode[uris.length]; final Iterator rackIter = rackStrings.iterator(); for (int i = 0; i < nodes.length; i++) { - nodes[i] = new NflyNode(hostStrings.get(i), rackIter.next(), uris[i], - conf); + if (fsGetter != null) { + nodes[i] = new NflyNode(hostStrings.get(i), rackIter.next(), + new ChRootedFileSystem(fsGetter.getNewInstance(uris[i], conf), + uris[i])); + } else { + nodes[i] = + new NflyNode(hostStrings.get(i), rackIter.next(), uris[i], conf); + } } // sort all the uri's by distance from myNode, the local file system will // automatically be the the first one. @@ -921,7 +942,7 @@ private static void processThrowable(NflyNode nflyNode, String op, * @throws IOException */ static FileSystem createFileSystem(URI[] uris, Configuration conf, - String settings) throws IOException { + String settings, FsGetter fsGetter) throws IOException { // assert settings != null int minRepl = DEFAULT_MIN_REPLICATION; EnumSet nflyFlags = EnumSet.noneOf(NflyKey.class); @@ -946,6 +967,6 @@ static FileSystem createFileSystem(URI[] uris, Configuration conf, throw new IllegalArgumentException(nflyKey + ": Infeasible"); } } - return new NflyFSystem(uris, conf, minRepl, nflyFlags); + return new NflyFSystem(uris, conf, minRepl, nflyFlags, fsGetter); } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ViewFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ViewFileSystem.java index 0cbcafce0e..4f02feeebe 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ViewFileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ViewFileSystem.java @@ -96,27 +96,6 @@ static AccessControlException readOnlyMountTable(final String operation, return readOnlyMountTable(operation, p.toString()); } - /** - * File system instance getter. - */ - static class FsGetter { - - /** - * Gets new file system instance of given uri. - */ - public FileSystem getNewInstance(URI uri, Configuration conf) - throws IOException { - return FileSystem.newInstance(uri, conf); - } - - /** - * Gets file system instance of given uri. - */ - public FileSystem get(URI uri, Configuration conf) throws IOException { - return FileSystem.get(uri, conf); - } - } - /** * Gets file system creator instance. */ @@ -316,7 +295,8 @@ protected FileSystem getTargetFileSystem(final INodeDir dir) @Override protected FileSystem getTargetFileSystem(final String settings, final URI[] uris) throws URISyntaxException, IOException { - return NflyFSystem.createFileSystem(uris, config, settings); + return NflyFSystem.createFileSystem(uris, config, settings, + fsGetter); } }; workingDir = this.getHomeDirectory(); diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/ViewFileSystemBaseTest.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/ViewFileSystemBaseTest.java index 74b2bc0202..59588a527f 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/ViewFileSystemBaseTest.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/ViewFileSystemBaseTest.java @@ -50,7 +50,6 @@ import org.apache.hadoop.fs.permission.AclUtil; import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.hadoop.fs.viewfs.ViewFileSystem.FsGetter; import org.apache.hadoop.fs.viewfs.ViewFileSystem.MountPoint; import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.Credentials; diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/ViewFsTestSetup.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/ViewFsTestSetup.java index 3cc2805672..f051c9c001 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/ViewFsTestSetup.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/ViewFsTestSetup.java @@ -31,6 +31,7 @@ import org.apache.hadoop.fs.viewfs.ViewFileSystemOverloadScheme.ChildFsGetter; import org.apache.hadoop.util.Shell; import org.eclipse.jetty.util.log.Log; +import org.junit.Assert; /** @@ -146,7 +147,8 @@ static void addMountLinksToFile(String mountTable, String[] sources, throws IOException, URISyntaxException { ChildFsGetter cfs = new ViewFileSystemOverloadScheme.ChildFsGetter( mountTableConfPath.toUri().getScheme()); - try (FileSystem fs = cfs.getNewInstance(mountTableConfPath.toUri(), conf)) { + try (FileSystem fs = cfs.getNewInstance(mountTableConfPath.toUri(), + conf)) { try (FSDataOutputStream out = fs.create(mountTableConfPath)) { String prefix = new StringBuilder(Constants.CONFIG_VIEWFS_PREFIX).append(".") @@ -158,17 +160,23 @@ static void addMountLinksToFile(String mountTable, String[] sources, for (int i = 0; i < sources.length; i++) { String src = sources[i]; String target = targets[i]; + boolean isNfly = src.startsWith(Constants.CONFIG_VIEWFS_LINK_NFLY); out.writeBytes(""); - if (Constants.CONFIG_VIEWFS_LINK_FALLBACK.equals(src)) { + if (isNfly) { + String[] srcParts = src.split("[.]"); + Assert.assertEquals("Invalid NFlyLink format", 3, srcParts.length); + String actualSrc = srcParts[srcParts.length - 1]; + String params = srcParts[srcParts.length - 2]; + out.writeBytes(prefix + Constants.CONFIG_VIEWFS_LINK_NFLY + "." + + params + "." + actualSrc); + } else if (Constants.CONFIG_VIEWFS_LINK_FALLBACK.equals(src)) { out.writeBytes(prefix + Constants.CONFIG_VIEWFS_LINK_FALLBACK); - out.writeBytes(""); } else if (Constants.CONFIG_VIEWFS_LINK_MERGE_SLASH.equals(src)) { out.writeBytes(prefix + Constants.CONFIG_VIEWFS_LINK_MERGE_SLASH); - out.writeBytes(""); } else { out.writeBytes(prefix + Constants.CONFIG_VIEWFS_LINK + "." + src); - out.writeBytes(""); } + out.writeBytes(""); out.writeBytes(""); out.writeBytes(target); out.writeBytes(""); @@ -191,7 +199,15 @@ static void addMountLinksToConf(String mountTable, String[] sources, String target = targets[i]; String mountTableName = mountTable == null ? Constants.CONFIG_VIEWFS_DEFAULT_MOUNT_TABLE : mountTable; - if (src.equals(Constants.CONFIG_VIEWFS_LINK_FALLBACK)) { + boolean isNfly = src.startsWith(Constants.CONFIG_VIEWFS_LINK_NFLY); + if (isNfly) { + String[] srcParts = src.split("[.]"); + Assert.assertEquals("Invalid NFlyLink format", 3, srcParts.length); + String actualSrc = srcParts[srcParts.length - 1]; + String params = srcParts[srcParts.length - 2]; + ConfigUtil.addLinkNfly(config, mountTableName, actualSrc, params, + target); + } else if (src.equals(Constants.CONFIG_VIEWFS_LINK_FALLBACK)) { ConfigUtil.addLinkFallback(config, mountTableName, new URI(target)); } else if (src.equals(Constants.CONFIG_VIEWFS_LINK_MERGE_SLASH)) { ConfigUtil.addLinkMergeSlash(config, mountTableName, new URI(target)); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFileSystemOverloadSchemeWithHdfsScheme.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFileSystemOverloadSchemeWithHdfsScheme.java index 0681c223e9..3860fa423e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFileSystemOverloadSchemeWithHdfsScheme.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFileSystemOverloadSchemeWithHdfsScheme.java @@ -17,6 +17,10 @@ */ package org.apache.hadoop.fs.viewfs; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + import java.io.File; import java.io.IOException; import java.net.URI; @@ -24,6 +28,9 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FsConstants; @@ -44,6 +51,7 @@ * Tests ViewFileSystemOverloadScheme with configured mount links. */ public class TestViewFileSystemOverloadSchemeWithHdfsScheme { + private static final String TEST_STRING = "Hello ViewFSOverloadedScheme!"; private static final String FS_IMPL_PATTERN_KEY = "fs.%s.impl"; private static final String HDFS_SCHEME = "hdfs"; private Configuration conf = null; @@ -63,6 +71,8 @@ public void startCluster() throws IOException { conf = new Configuration(); conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_KEY, true); + conf.setInt( + CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 1); conf.set(String.format(FS_IMPL_PATTERN_KEY, HDFS_SCHEME), ViewFileSystemOverloadScheme.class.getName()); conf.set(String.format( @@ -438,6 +448,117 @@ public void testViewFsOverloadSchemeWithNoInnerCacheAndLocalSchemeTargets() } } + /** + * Tests the rename with nfly mount link. + */ + @Test(timeout = 3000) + public void testNflyRename() throws Exception { + final Path hdfsTargetPath1 = new Path(defaultFSURI + HDFS_USER_FOLDER); + final Path hdfsTargetPath2 = new Path(defaultFSURI + HDFS_USER_FOLDER + 1); + final URI uri1 = hdfsTargetPath1.toUri(); + final URI uri2 = hdfsTargetPath2.toUri(); + final Path nflyRoot = new Path("/nflyroot"); + + final String nflyLinkKey = Constants.CONFIG_VIEWFS_LINK_NFLY + + ".minReplication=2." + nflyRoot.toString(); + addMountLinks(defaultFSURI.getAuthority(), new String[] {nflyLinkKey }, + new String[] {uri1.toString() + "," + uri2.toString() }, conf); + final FileSystem nfly = FileSystem.get(defaultFSURI, conf); + + final Path testDir = new Path("/nflyroot/testdir1/sub1/sub3"); + final Path testDirTmp = new Path("/nflyroot/testdir1/sub1/sub3_temp"); + assertTrue(testDir + ": Failed to create!", nfly.mkdirs(testDir)); + + // Test renames + assertTrue(nfly.rename(testDir, testDirTmp)); + assertTrue(nfly.rename(testDirTmp, testDir)); + + final URI[] testUris = new URI[] {uri1, uri2 }; + for (final URI testUri : testUris) { + final FileSystem fs = FileSystem.get(testUri, conf); + assertTrue(testDir + " should exist!", fs.exists(testDir)); + } + } + + /** + * Tests the write and read contents with nfly mount link. + */ + @Test(timeout = 3000) + public void testNflyWriteRead() throws Exception { + final Path hdfsTargetPath1 = new Path(defaultFSURI + HDFS_USER_FOLDER); + final Path hdfsTargetPath2 = new Path(defaultFSURI + HDFS_USER_FOLDER + 1); + final URI uri1 = hdfsTargetPath1.toUri(); + final URI uri2 = hdfsTargetPath2.toUri(); + final Path nflyRoot = new Path("/nflyroot"); + final String nflyLinkKey = Constants.CONFIG_VIEWFS_LINK_NFLY + + ".minReplication=2." + nflyRoot.toString(); + addMountLinks(defaultFSURI.getAuthority(), new String[] {nflyLinkKey }, + new String[] {uri1.toString() + "," + uri2.toString() }, conf); + final FileSystem nfly = FileSystem.get(defaultFSURI, conf); + final Path testFile = new Path("/nflyroot/test.txt"); + writeString(nfly, TEST_STRING, testFile); + final URI[] testUris = new URI[] {uri1, uri2 }; + for (final URI testUri : testUris) { + try (FileSystem fs = FileSystem.get(testUri, conf)) { + readString(fs, testFile, TEST_STRING, testUri); + } + } + } + + /** + * 1. Writes contents with nfly link having two target uris. 2. Deletes one + * target file. 3. Tests the read works with repairOnRead flag. 4. Tests that + * previously deleted file fully recovered and exists. + */ + @Test(timeout = 3000) + public void testNflyRepair() throws Exception { + final NflyFSystem.NflyKey repairKey = NflyFSystem.NflyKey.repairOnRead; + final Path hdfsTargetPath1 = new Path(defaultFSURI + HDFS_USER_FOLDER); + final Path hdfsTargetPath2 = new Path(defaultFSURI + HDFS_USER_FOLDER + 1); + final URI uri1 = hdfsTargetPath1.toUri(); + final URI uri2 = hdfsTargetPath2.toUri(); + final Path nflyRoot = new Path("/nflyroot"); + final String nflyLinkKey = Constants.CONFIG_VIEWFS_LINK_NFLY + + ".minReplication=2," + repairKey + "=true." + nflyRoot.toString(); + addMountLinks(defaultFSURI.getAuthority(), new String[] {nflyLinkKey }, + new String[] {uri1.toString() + "," + uri2.toString() }, conf); + try (FileSystem nfly = FileSystem.get(defaultFSURI, conf)) { + // write contents to nfly + final Path testFilePath = new Path("/nflyroot/test.txt"); + writeString(nfly, TEST_STRING, testFilePath); + + final URI[] testUris = new URI[] {uri1, uri2 }; + // both nodes are up again, test repair + FsGetter getter = new ViewFileSystemOverloadScheme.ChildFsGetter("hdfs"); + try (FileSystem fs1 = getter.getNewInstance(testUris[0], conf)) { + // Delete a file from one target URI + String testFile = "/test.txt"; + assertTrue( + fs1.delete(new Path(testUris[0].toString() + testFile), false)); + assertFalse(fs1.exists(new Path(testUris[0].toString() + testFile))); + + // Verify read success. + readString(nfly, testFilePath, TEST_STRING, testUris[0]); + // Verify file recovered. + assertTrue(fs1.exists(new Path(testUris[0].toString() + testFile))); + } + } + } + + private void writeString(final FileSystem nfly, final String testString, + final Path testFile) throws IOException { + try (FSDataOutputStream fsDos = nfly.create(testFile)) { + fsDos.writeUTF(testString); + } + } + + private void readString(final FileSystem nfly, final Path testFile, + final String testString, final URI testUri) throws IOException { + try (FSDataInputStream fsDis = nfly.open(testFile)) { + assertEquals("Wrong file content", testString, fsDis.readUTF()); + } + } + /** * @return configuration. */