HDFS-15322. Make NflyFS to work when ViewFsOverloadScheme's scheme and target uris schemes are same. Contributed by Uma Maheswara Rao G.

(cherry picked from commit 4734c77b4b)
This commit is contained in:
Uma Maheswara Rao G 2020-05-21 21:34:58 -07:00
parent 5b248de42d
commit 8e71e85af7
8 changed files with 230 additions and 38 deletions

View File

@ -135,6 +135,17 @@ public static void addLinkMerge(Configuration conf, final URI[] targets) {
addLinkMerge(conf, Constants.CONFIG_VIEWFS_DEFAULT_MOUNT_TABLE, targets);
}
/**
* Add nfly link to configuration for the given mount table.
*/
public static void addLinkNfly(Configuration conf, String mountTableName,
String src, String settings, final String targets) {
conf.set(
getConfigViewFsPrefix(mountTableName) + "."
+ Constants.CONFIG_VIEWFS_LINK_NFLY + "." + settings + "." + src,
targets);
}
/**
*
* @param conf
@ -149,9 +160,7 @@ public static void addLinkNfly(Configuration conf, String mountTableName,
settings = settings == null
? "minReplication=2,repairOnRead=true"
: settings;
conf.set(getConfigViewFsPrefix(mountTableName) + "." +
Constants.CONFIG_VIEWFS_LINK_NFLY + "." + settings + "." + src,
addLinkNfly(conf, mountTableName, src, settings,
StringUtils.uriToString(targets));
}

View File

@ -0,0 +1,47 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.viewfs;
import java.io.IOException;
import java.net.URI;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
/**
* File system instance getter.
*/
@Private
class FsGetter {
/**
* Gets new file system instance of given uri.
*/
public FileSystem getNewInstance(URI uri, Configuration conf)
throws IOException {
return FileSystem.newInstance(uri, conf);
}
/**
* Gets file system instance of given uri.
*/
public FileSystem get(URI uri, Configuration conf) throws IOException {
return FileSystem.get(uri, conf);
}
}

View File

@ -59,8 +59,7 @@ public void load(String mountTableConfigPath, Configuration conf)
throws IOException {
this.mountTable = new Path(mountTableConfigPath);
String scheme = mountTable.toUri().getScheme();
ViewFileSystem.FsGetter fsGetter =
new ViewFileSystemOverloadScheme.ChildFsGetter(scheme);
FsGetter fsGetter = new ViewFileSystemOverloadScheme.ChildFsGetter(scheme);
try (FileSystem fs = fsGetter.getNewInstance(mountTable.toUri(), conf)) {
RemoteIterator<LocatedFileStatus> listFiles =
fs.listFiles(mountTable, false);

View File

@ -212,6 +212,21 @@ private static String getRack(String rackString) {
*/
private NflyFSystem(URI[] uris, Configuration conf, int minReplication,
EnumSet<NflyKey> nflyFlags) throws IOException {
this(uris, conf, minReplication, nflyFlags, null);
}
/**
* Creates a new Nfly instance.
*
* @param uris the list of uris in the mount point
* @param conf configuration object
* @param minReplication minimum copies to commit a write op
* @param nflyFlags modes such readMostRecent
* @param fsGetter to get the file system instance with the given uri
* @throws IOException
*/
private NflyFSystem(URI[] uris, Configuration conf, int minReplication,
EnumSet<NflyKey> nflyFlags, FsGetter fsGetter) throws IOException {
if (uris.length < minReplication) {
throw new IOException(minReplication + " < " + uris.length
+ ": Minimum replication < #destinations");
@ -238,8 +253,14 @@ private NflyFSystem(URI[] uris, Configuration conf, int minReplication,
nodes = new NflyNode[uris.length];
final Iterator<String> rackIter = rackStrings.iterator();
for (int i = 0; i < nodes.length; i++) {
nodes[i] = new NflyNode(hostStrings.get(i), rackIter.next(), uris[i],
conf);
if (fsGetter != null) {
nodes[i] = new NflyNode(hostStrings.get(i), rackIter.next(),
new ChRootedFileSystem(fsGetter.getNewInstance(uris[i], conf),
uris[i]));
} else {
nodes[i] =
new NflyNode(hostStrings.get(i), rackIter.next(), uris[i], conf);
}
}
// sort all the uri's by distance from myNode, the local file system will
// automatically be the the first one.
@ -921,7 +942,7 @@ private static void processThrowable(NflyNode nflyNode, String op,
* @throws IOException
*/
static FileSystem createFileSystem(URI[] uris, Configuration conf,
String settings) throws IOException {
String settings, FsGetter fsGetter) throws IOException {
// assert settings != null
int minRepl = DEFAULT_MIN_REPLICATION;
EnumSet<NflyKey> nflyFlags = EnumSet.noneOf(NflyKey.class);
@ -946,6 +967,6 @@ static FileSystem createFileSystem(URI[] uris, Configuration conf,
throw new IllegalArgumentException(nflyKey + ": Infeasible");
}
}
return new NflyFSystem(uris, conf, minRepl, nflyFlags);
return new NflyFSystem(uris, conf, minRepl, nflyFlags, fsGetter);
}
}

View File

@ -96,27 +96,6 @@ static AccessControlException readOnlyMountTable(final String operation,
return readOnlyMountTable(operation, p.toString());
}
/**
* File system instance getter.
*/
static class FsGetter {
/**
* Gets new file system instance of given uri.
*/
public FileSystem getNewInstance(URI uri, Configuration conf)
throws IOException {
return FileSystem.newInstance(uri, conf);
}
/**
* Gets file system instance of given uri.
*/
public FileSystem get(URI uri, Configuration conf) throws IOException {
return FileSystem.get(uri, conf);
}
}
/**
* Gets file system creator instance.
*/
@ -316,7 +295,8 @@ protected FileSystem getTargetFileSystem(final INodeDir<FileSystem> dir)
@Override
protected FileSystem getTargetFileSystem(final String settings,
final URI[] uris) throws URISyntaxException, IOException {
return NflyFSystem.createFileSystem(uris, config, settings);
return NflyFSystem.createFileSystem(uris, config, settings,
fsGetter);
}
};
workingDir = this.getHomeDirectory();

View File

@ -50,7 +50,6 @@
import org.apache.hadoop.fs.permission.AclUtil;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.viewfs.ViewFileSystem.FsGetter;
import org.apache.hadoop.fs.viewfs.ViewFileSystem.MountPoint;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.Credentials;

View File

@ -31,6 +31,7 @@
import org.apache.hadoop.fs.viewfs.ViewFileSystemOverloadScheme.ChildFsGetter;
import org.apache.hadoop.util.Shell;
import org.eclipse.jetty.util.log.Log;
import org.junit.Assert;
/**
@ -146,7 +147,8 @@ static void addMountLinksToFile(String mountTable, String[] sources,
throws IOException, URISyntaxException {
ChildFsGetter cfs = new ViewFileSystemOverloadScheme.ChildFsGetter(
mountTableConfPath.toUri().getScheme());
try (FileSystem fs = cfs.getNewInstance(mountTableConfPath.toUri(), conf)) {
try (FileSystem fs = cfs.getNewInstance(mountTableConfPath.toUri(),
conf)) {
try (FSDataOutputStream out = fs.create(mountTableConfPath)) {
String prefix =
new StringBuilder(Constants.CONFIG_VIEWFS_PREFIX).append(".")
@ -158,17 +160,23 @@ static void addMountLinksToFile(String mountTable, String[] sources,
for (int i = 0; i < sources.length; i++) {
String src = sources[i];
String target = targets[i];
boolean isNfly = src.startsWith(Constants.CONFIG_VIEWFS_LINK_NFLY);
out.writeBytes("<property><name>");
if (Constants.CONFIG_VIEWFS_LINK_FALLBACK.equals(src)) {
if (isNfly) {
String[] srcParts = src.split("[.]");
Assert.assertEquals("Invalid NFlyLink format", 3, srcParts.length);
String actualSrc = srcParts[srcParts.length - 1];
String params = srcParts[srcParts.length - 2];
out.writeBytes(prefix + Constants.CONFIG_VIEWFS_LINK_NFLY + "."
+ params + "." + actualSrc);
} else if (Constants.CONFIG_VIEWFS_LINK_FALLBACK.equals(src)) {
out.writeBytes(prefix + Constants.CONFIG_VIEWFS_LINK_FALLBACK);
out.writeBytes("</name>");
} else if (Constants.CONFIG_VIEWFS_LINK_MERGE_SLASH.equals(src)) {
out.writeBytes(prefix + Constants.CONFIG_VIEWFS_LINK_MERGE_SLASH);
out.writeBytes("</name>");
} else {
out.writeBytes(prefix + Constants.CONFIG_VIEWFS_LINK + "." + src);
out.writeBytes("</name>");
}
out.writeBytes("</name>");
out.writeBytes("<value>");
out.writeBytes(target);
out.writeBytes("</value></property>");
@ -191,7 +199,15 @@ static void addMountLinksToConf(String mountTable, String[] sources,
String target = targets[i];
String mountTableName = mountTable == null ?
Constants.CONFIG_VIEWFS_DEFAULT_MOUNT_TABLE : mountTable;
if (src.equals(Constants.CONFIG_VIEWFS_LINK_FALLBACK)) {
boolean isNfly = src.startsWith(Constants.CONFIG_VIEWFS_LINK_NFLY);
if (isNfly) {
String[] srcParts = src.split("[.]");
Assert.assertEquals("Invalid NFlyLink format", 3, srcParts.length);
String actualSrc = srcParts[srcParts.length - 1];
String params = srcParts[srcParts.length - 2];
ConfigUtil.addLinkNfly(config, mountTableName, actualSrc, params,
target);
} else if (src.equals(Constants.CONFIG_VIEWFS_LINK_FALLBACK)) {
ConfigUtil.addLinkFallback(config, mountTableName, new URI(target));
} else if (src.equals(Constants.CONFIG_VIEWFS_LINK_MERGE_SLASH)) {
ConfigUtil.addLinkMergeSlash(config, mountTableName, new URI(target));

View File

@ -17,6 +17,10 @@
*/
package org.apache.hadoop.fs.viewfs;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.IOException;
import java.net.URI;
@ -24,6 +28,9 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FsConstants;
@ -44,6 +51,7 @@
* Tests ViewFileSystemOverloadScheme with configured mount links.
*/
public class TestViewFileSystemOverloadSchemeWithHdfsScheme {
private static final String TEST_STRING = "Hello ViewFSOverloadedScheme!";
private static final String FS_IMPL_PATTERN_KEY = "fs.%s.impl";
private static final String HDFS_SCHEME = "hdfs";
private Configuration conf = null;
@ -63,6 +71,8 @@ public void startCluster() throws IOException {
conf = new Configuration();
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_KEY,
true);
conf.setInt(
CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 1);
conf.set(String.format(FS_IMPL_PATTERN_KEY, HDFS_SCHEME),
ViewFileSystemOverloadScheme.class.getName());
conf.set(String.format(
@ -438,6 +448,117 @@ public void testViewFsOverloadSchemeWithNoInnerCacheAndLocalSchemeTargets()
}
}
/**
* Tests the rename with nfly mount link.
*/
@Test(timeout = 3000)
public void testNflyRename() throws Exception {
final Path hdfsTargetPath1 = new Path(defaultFSURI + HDFS_USER_FOLDER);
final Path hdfsTargetPath2 = new Path(defaultFSURI + HDFS_USER_FOLDER + 1);
final URI uri1 = hdfsTargetPath1.toUri();
final URI uri2 = hdfsTargetPath2.toUri();
final Path nflyRoot = new Path("/nflyroot");
final String nflyLinkKey = Constants.CONFIG_VIEWFS_LINK_NFLY
+ ".minReplication=2." + nflyRoot.toString();
addMountLinks(defaultFSURI.getAuthority(), new String[] {nflyLinkKey },
new String[] {uri1.toString() + "," + uri2.toString() }, conf);
final FileSystem nfly = FileSystem.get(defaultFSURI, conf);
final Path testDir = new Path("/nflyroot/testdir1/sub1/sub3");
final Path testDirTmp = new Path("/nflyroot/testdir1/sub1/sub3_temp");
assertTrue(testDir + ": Failed to create!", nfly.mkdirs(testDir));
// Test renames
assertTrue(nfly.rename(testDir, testDirTmp));
assertTrue(nfly.rename(testDirTmp, testDir));
final URI[] testUris = new URI[] {uri1, uri2 };
for (final URI testUri : testUris) {
final FileSystem fs = FileSystem.get(testUri, conf);
assertTrue(testDir + " should exist!", fs.exists(testDir));
}
}
/**
* Tests the write and read contents with nfly mount link.
*/
@Test(timeout = 3000)
public void testNflyWriteRead() throws Exception {
final Path hdfsTargetPath1 = new Path(defaultFSURI + HDFS_USER_FOLDER);
final Path hdfsTargetPath2 = new Path(defaultFSURI + HDFS_USER_FOLDER + 1);
final URI uri1 = hdfsTargetPath1.toUri();
final URI uri2 = hdfsTargetPath2.toUri();
final Path nflyRoot = new Path("/nflyroot");
final String nflyLinkKey = Constants.CONFIG_VIEWFS_LINK_NFLY
+ ".minReplication=2." + nflyRoot.toString();
addMountLinks(defaultFSURI.getAuthority(), new String[] {nflyLinkKey },
new String[] {uri1.toString() + "," + uri2.toString() }, conf);
final FileSystem nfly = FileSystem.get(defaultFSURI, conf);
final Path testFile = new Path("/nflyroot/test.txt");
writeString(nfly, TEST_STRING, testFile);
final URI[] testUris = new URI[] {uri1, uri2 };
for (final URI testUri : testUris) {
try (FileSystem fs = FileSystem.get(testUri, conf)) {
readString(fs, testFile, TEST_STRING, testUri);
}
}
}
/**
* 1. Writes contents with nfly link having two target uris. 2. Deletes one
* target file. 3. Tests the read works with repairOnRead flag. 4. Tests that
* previously deleted file fully recovered and exists.
*/
@Test(timeout = 3000)
public void testNflyRepair() throws Exception {
final NflyFSystem.NflyKey repairKey = NflyFSystem.NflyKey.repairOnRead;
final Path hdfsTargetPath1 = new Path(defaultFSURI + HDFS_USER_FOLDER);
final Path hdfsTargetPath2 = new Path(defaultFSURI + HDFS_USER_FOLDER + 1);
final URI uri1 = hdfsTargetPath1.toUri();
final URI uri2 = hdfsTargetPath2.toUri();
final Path nflyRoot = new Path("/nflyroot");
final String nflyLinkKey = Constants.CONFIG_VIEWFS_LINK_NFLY
+ ".minReplication=2," + repairKey + "=true." + nflyRoot.toString();
addMountLinks(defaultFSURI.getAuthority(), new String[] {nflyLinkKey },
new String[] {uri1.toString() + "," + uri2.toString() }, conf);
try (FileSystem nfly = FileSystem.get(defaultFSURI, conf)) {
// write contents to nfly
final Path testFilePath = new Path("/nflyroot/test.txt");
writeString(nfly, TEST_STRING, testFilePath);
final URI[] testUris = new URI[] {uri1, uri2 };
// both nodes are up again, test repair
FsGetter getter = new ViewFileSystemOverloadScheme.ChildFsGetter("hdfs");
try (FileSystem fs1 = getter.getNewInstance(testUris[0], conf)) {
// Delete a file from one target URI
String testFile = "/test.txt";
assertTrue(
fs1.delete(new Path(testUris[0].toString() + testFile), false));
assertFalse(fs1.exists(new Path(testUris[0].toString() + testFile)));
// Verify read success.
readString(nfly, testFilePath, TEST_STRING, testUris[0]);
// Verify file recovered.
assertTrue(fs1.exists(new Path(testUris[0].toString() + testFile)));
}
}
}
private void writeString(final FileSystem nfly, final String testString,
final Path testFile) throws IOException {
try (FSDataOutputStream fsDos = nfly.create(testFile)) {
fsDos.writeUTF(testString);
}
}
private void readString(final FileSystem nfly, final Path testFile,
final String testString, final URI testUri) throws IOException {
try (FSDataInputStream fsDis = nfly.open(testFile)) {
assertEquals("Wrong file content", testString, fsDis.readUTF());
}
}
/**
* @return configuration.
*/