HADOOP-13055. Implement linkMergeSlash and linkFallback for ViewFileSystem

This commit is contained in:
Manoj Govindassamy 2017-10-13 17:43:13 -07:00
parent 3fb4718886
commit 133d7ca76e
9 changed files with 945 additions and 70 deletions

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.fs.viewfs;
import java.net.URI;
import java.util.Arrays;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.StringUtils;
@ -69,6 +70,71 @@ public static void addLink(final Configuration conf, final String src,
src, target);
}
/**
* Add a LinkMergeSlash to the config for the specified mount table.
* @param conf
* @param mountTableName
* @param target
*/
public static void addLinkMergeSlash(Configuration conf,
final String mountTableName, final URI target) {
conf.set(getConfigViewFsPrefix(mountTableName) + "." +
Constants.CONFIG_VIEWFS_LINK_MERGE_SLASH, target.toString());
}
/**
* Add a LinkMergeSlash to the config for the default mount table.
* @param conf
* @param target
*/
public static void addLinkMergeSlash(Configuration conf, final URI target) {
addLinkMergeSlash(conf, Constants.CONFIG_VIEWFS_DEFAULT_MOUNT_TABLE,
target);
}
/**
* Add a LinkFallback to the config for the specified mount table.
* @param conf
* @param mountTableName
* @param target
*/
public static void addLinkFallback(Configuration conf,
final String mountTableName, final URI target) {
conf.set(getConfigViewFsPrefix(mountTableName) + "." +
Constants.CONFIG_VIEWFS_LINK_FALLBACK, target.toString());
}
/**
* Add a LinkFallback to the config for the default mount table.
* @param conf
* @param target
*/
public static void addLinkFallback(Configuration conf, final URI target) {
addLinkFallback(conf, Constants.CONFIG_VIEWFS_DEFAULT_MOUNT_TABLE,
target);
}
/**
* Add a LinkMerge to the config for the specified mount table.
* @param conf
* @param mountTableName
* @param targets
*/
public static void addLinkMerge(Configuration conf,
final String mountTableName, final URI[] targets) {
conf.set(getConfigViewFsPrefix(mountTableName) + "." +
Constants.CONFIG_VIEWFS_LINK_MERGE, Arrays.toString(targets));
}
/**
* Add a LinkMerge to the config for the default mount table.
* @param conf
* @param targets
*/
public static void addLinkMerge(Configuration conf, final URI[] targets) {
addLinkMerge(conf, Constants.CONFIG_VIEWFS_DEFAULT_MOUNT_TABLE, targets);
}
/**
*
* @param conf

View File

@ -51,12 +51,17 @@ public interface Constants {
/**
* Config variable for specifying a simple link
*/
public static final String CONFIG_VIEWFS_LINK = "link";
String CONFIG_VIEWFS_LINK = "link";
/**
* Config variable for specifying a fallback for link mount points.
*/
String CONFIG_VIEWFS_LINK_FALLBACK = "linkFallback";
/**
* Config variable for specifying a merge link
*/
public static final String CONFIG_VIEWFS_LINK_MERGE = "linkMerge";
String CONFIG_VIEWFS_LINK_MERGE = "linkMerge";
/**
* Config variable for specifying an nfly link. Nfly writes to multiple
@ -68,10 +73,9 @@ public interface Constants {
* Config variable for specifying a merge of the root of the mount-table
* with the root of another file system.
*/
public static final String CONFIG_VIEWFS_LINK_MERGE_SLASH = "linkMergeSlash";
String CONFIG_VIEWFS_LINK_MERGE_SLASH = "linkMergeSlash";
static public final FsPermission PERMISSION_555 =
new FsPermission((short) 0555);
FsPermission PERMISSION_555 = new FsPermission((short) 0555);
String CONFIG_VIEWFS_RENAME_STRATEGY = "fs.viewfs.rename.strategy";
}

View File

@ -17,12 +17,15 @@
*/
package org.apache.hadoop.fs.viewfs;
import com.google.common.base.Preconditions;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@ -61,8 +64,12 @@ enum ResultKind {
}
static final Path SlashPath = new Path("/");
private final INodeDir<T> root; // the root of the mount table
private final String homedirPrefix; // the homedir for this mount table
// the root of the mount table
private final INode<T> root;
// the fallback filesystem
private final INodeLink<T> rootFallbackLink;
// the homedir for this mount table
private final String homedirPrefix;
private List<MountPoint<T>> mountPoints = new ArrayList<MountPoint<T>>();
static class MountPoint<T> {
@ -85,7 +92,7 @@ static String[] breakIntoPathComponents(final String path) {
}
/**
* Internal class for inode tree
* Internal class for INode tree.
* @param <T>
*/
abstract static class INode<T> {
@ -94,21 +101,58 @@ abstract static class INode<T> {
public INode(String pathToNode, UserGroupInformation aUgi) {
fullPath = pathToNode;
}
// INode forming the internal mount table directory tree
// for ViewFileSystem. This internal directory tree is
// constructed based on the mount table config entries
// and is read only.
abstract boolean isInternalDir();
// INode linking to another filesystem. Represented
// via mount table link config entries.
boolean isLink() {
return !isInternalDir();
}
}
/**
* Internal class to represent an internal dir of the mount table
* Internal class to represent an internal dir of the mount table.
* @param <T>
*/
static class INodeDir<T> extends INode<T> {
final Map<String,INode<T>> children = new HashMap<String,INode<T>>();
T InodeDirFs = null; // file system of this internal directory of mountT
boolean isRoot = false;
private final Map<String, INode<T>> children = new HashMap<>();
private T internalDirFs = null; //filesystem of this internal directory
private boolean isRoot = false;
INodeDir(final String pathToNode, final UserGroupInformation aUgi) {
super(pathToNode, aUgi);
}
@Override
boolean isInternalDir() {
return true;
}
T getInternalDirFs() {
return internalDirFs;
}
void setInternalDirFs(T internalDirFs) {
this.internalDirFs = internalDirFs;
}
void setRoot(boolean root) {
isRoot = root;
}
boolean isRoot() {
return isRoot;
}
Map<String, INode<T>> getChildren() {
return Collections.unmodifiableMap(children);
}
INode<T> resolveInternal(final String pathComponent) {
return children.get(pathComponent);
}
@ -119,7 +163,7 @@ INodeDir<T> addDir(final String pathComponent,
throw new FileAlreadyExistsException();
}
final INodeDir<T> newDir = new INodeDir<T>(fullPath +
(isRoot ? "" : "/") + pathComponent, aUgi);
(isRoot() ? "" : "/") + pathComponent, aUgi);
children.put(pathComponent, newDir);
return newDir;
}
@ -133,10 +177,43 @@ void addLink(final String pathComponent, final INodeLink<T> link)
}
}
/**
* Mount table link type.
*/
enum LinkType {
/**
* Link entry pointing to a single filesystem uri.
* Config prefix: fs.viewfs.mounttable.<mnt_tbl_name>.link.<link_name>
* Refer: {@link Constants#CONFIG_VIEWFS_LINK}
*/
SINGLE,
/**
* Fallback filesystem for the paths not mounted by
* any single link entries.
* Config prefix: fs.viewfs.mounttable.<mnt_tbl_name>.linkFallback
* Refer: {@link Constants#CONFIG_VIEWFS_LINK_FALLBACK}
*/
SINGLE_FALLBACK,
/**
* Link entry pointing to an union of two or more filesystem uris.
* Config prefix: fs.viewfs.mounttable.<mnt_tbl_name>.linkMerge.<link_name>
* Refer: {@link Constants#CONFIG_VIEWFS_LINK_MERGE}
*/
MERGE,
NFLY
/**
* Link entry for merging mount table's root with the
* root of another filesystem.
* Config prefix: fs.viewfs.mounttable.<mnt_tbl_name>.linkMergeSlash
* Refer: {@link Constants#CONFIG_VIEWFS_LINK_MERGE_SLASH}
*/
MERGE_SLASH,
/**
* Link entry to write to multiple filesystems and read
* from the closest filesystem.
* Config prefix: fs.viewfs.mounttable.<mnt_tbl_name>.linkNfly
* Refer: {@link Constants#CONFIG_VIEWFS_LINK_NFLY}
*/
NFLY;
}
/**
@ -188,6 +265,15 @@ Path getTargetLink() {
}
return new Path(result.toString());
}
@Override
boolean isInternalDir() {
return false;
}
public T getTargetFileSystem() {
return targetFileSystem;
}
}
private void createLink(final String src, final String target,
@ -203,7 +289,10 @@ private void createLink(final String src, final String target,
}
final String[] srcPaths = breakIntoPathComponents(src);
INodeDir<T> curInode = root;
// Make sure root is of INodeDir type before
// adding any regular links to it.
Preconditions.checkState(root.isInternalDir());
INodeDir<T> curInode = getRootDir();
int i;
// Ignore first initial slash, process all except last component
for (i = 1; i < srcPaths.length - 1; i++) {
@ -211,15 +300,15 @@ private void createLink(final String src, final String target,
INode<T> nextInode = curInode.resolveInternal(iPath);
if (nextInode == null) {
INodeDir<T> newDir = curInode.addDir(iPath, aUgi);
newDir.InodeDirFs = getTargetFileSystem(newDir);
newDir.setInternalDirFs(getTargetFileSystem(newDir));
nextInode = newDir;
}
if (nextInode instanceof INodeLink) {
if (nextInode.isLink()) {
// Error - expected a dir but got a link
throw new FileAlreadyExistsException("Path " + nextInode.fullPath +
" already exists as link");
} else {
assert (nextInode instanceof INodeDir);
assert(nextInode.isInternalDir());
curInode = (INodeDir<T>) nextInode;
}
}
@ -245,6 +334,11 @@ private void createLink(final String src, final String target,
newLink = new INodeLink<T>(fullPath, aUgi,
getTargetFileSystem(new URI(target)), new URI(target));
break;
case SINGLE_FALLBACK:
case MERGE_SLASH:
// Link fallback and link merge slash configuration
// are handled specially at InodeTree.
throw new IllegalArgumentException("Unexpected linkType: " + linkType);
case MERGE:
case NFLY:
final URI[] targetUris = StringUtils.stringToURI(
@ -273,6 +367,77 @@ protected abstract T getTargetFileSystem(INodeDir<T> dir)
protected abstract T getTargetFileSystem(String settings, URI[] mergeFsURIs)
throws UnsupportedFileSystemException, URISyntaxException, IOException;
private INodeDir<T> getRootDir() {
Preconditions.checkState(root.isInternalDir());
return (INodeDir<T>)root;
}
private INodeLink<T> getRootLink() {
Preconditions.checkState(root.isLink());
return (INodeLink<T>)root;
}
private boolean hasFallbackLink() {
return rootFallbackLink != null;
}
private INodeLink<T> getRootFallbackLink() {
Preconditions.checkState(root.isInternalDir());
return rootFallbackLink;
}
/**
* An internal class representing the ViewFileSystem mount table
* link entries and their attributes.
* @see LinkType
*/
private static class LinkEntry {
private final String src;
private final String target;
private final LinkType linkType;
private final String settings;
private final UserGroupInformation ugi;
private final Configuration config;
LinkEntry(String src, String target, LinkType linkType, String settings,
UserGroupInformation ugi, Configuration config) {
this.src = src;
this.target = target;
this.linkType = linkType;
this.settings = settings;
this.ugi = ugi;
this.config = config;
}
String getSrc() {
return src;
}
String getTarget() {
return target;
}
LinkType getLinkType() {
return linkType;
}
boolean isLinkType(LinkType type) {
return this.linkType == type;
}
String getSettings() {
return settings;
}
UserGroupInformation getUgi() {
return ugi;
}
Configuration getConfig() {
return config;
}
}
/**
* Create Inode Tree from the specified mount-table specified in Config
* @param config - the mount table keys are prefixed with
@ -286,39 +451,59 @@ protected abstract T getTargetFileSystem(String settings, URI[] mergeFsURIs)
protected InodeTree(final Configuration config, final String viewName)
throws UnsupportedFileSystemException, URISyntaxException,
FileAlreadyExistsException, IOException {
String vName = viewName;
if (vName == null) {
vName = Constants.CONFIG_VIEWFS_DEFAULT_MOUNT_TABLE;
String mountTableName = viewName;
if (mountTableName == null) {
mountTableName = Constants.CONFIG_VIEWFS_DEFAULT_MOUNT_TABLE;
}
homedirPrefix = ConfigUtil.getHomeDirValue(config, vName);
root = new INodeDir<T>("/", UserGroupInformation.getCurrentUser());
root.InodeDirFs = getTargetFileSystem(root);
root.isRoot = true;
homedirPrefix = ConfigUtil.getHomeDirValue(config, mountTableName);
final String mtPrefix = Constants.CONFIG_VIEWFS_PREFIX + "." +
vName + ".";
boolean isMergeSlashConfigured = false;
String mergeSlashTarget = null;
List<LinkEntry> linkEntries = new LinkedList<>();
final String mountTablePrefix =
Constants.CONFIG_VIEWFS_PREFIX + "." + mountTableName + ".";
final String linkPrefix = Constants.CONFIG_VIEWFS_LINK + ".";
final String linkFallbackPrefix = Constants.CONFIG_VIEWFS_LINK_FALLBACK;
final String linkMergePrefix = Constants.CONFIG_VIEWFS_LINK_MERGE + ".";
final String linkMergeSlashPrefix =
Constants.CONFIG_VIEWFS_LINK_MERGE_SLASH;
boolean gotMountTableEntry = false;
final UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
for (Entry<String, String> si : config) {
final String key = si.getKey();
if (key.startsWith(mtPrefix)) {
if (key.startsWith(mountTablePrefix)) {
gotMountTableEntry = true;
LinkType linkType = LinkType.SINGLE;
String src = key.substring(mtPrefix.length());
LinkType linkType;
String src = key.substring(mountTablePrefix.length());
String settings = null;
if (src.startsWith(linkPrefix)) {
src = src.substring(linkPrefix.length());
if (src.equals(SlashPath.toString())) {
throw new UnsupportedFileSystemException("Unexpected mount table "
+ "link entry '" + key + "'. "
+ Constants.CONFIG_VIEWFS_LINK_MERGE_SLASH + " is not "
+ "supported yet.");
+ "link entry '" + key + "'. Use "
+ Constants.CONFIG_VIEWFS_LINK_MERGE_SLASH + " instead!");
}
linkType = LinkType.SINGLE;
} else if (src.startsWith(linkFallbackPrefix)) {
if (src.length() != linkFallbackPrefix.length()) {
throw new IOException("ViewFs: Mount points initialization error." +
" Invalid " + Constants.CONFIG_VIEWFS_LINK_FALLBACK +
" entry in config: " + src);
}
linkType = LinkType.SINGLE_FALLBACK;
} else if (src.startsWith(linkMergePrefix)) { // A merge link
linkType = LinkType.MERGE;
src = src.substring(linkMergePrefix.length());
linkType = LinkType.MERGE;
} else if (src.startsWith(linkMergeSlashPrefix)) {
// This is a LinkMergeSlash entry. This entry should
// not have any additional source path.
if (src.length() != linkMergeSlashPrefix.length()) {
throw new IOException("ViewFs: Mount points initialization error." +
" Invalid " + Constants.CONFIG_VIEWFS_LINK_MERGE_SLASH +
" entry in config: " + src);
}
linkType = LinkType.MERGE_SLASH;
} else if (src.startsWith(Constants.CONFIG_VIEWFS_LINK_NFLY)) {
// prefix.settings.src
src = src.substring(Constants.CONFIG_VIEWFS_LINK_NFLY.length() + 1);
@ -338,14 +523,69 @@ protected InodeTree(final Configuration config, final String viewName)
throw new IOException("ViewFs: Cannot initialize: Invalid entry in " +
"Mount table in config: " + src);
}
final String target = si.getValue(); // link or merge link
createLink(src, target, linkType, settings, ugi, config);
final String target = si.getValue();
if (linkType != LinkType.MERGE_SLASH) {
if (isMergeSlashConfigured) {
throw new IOException("Mount table " + mountTableName
+ " has already been configured with a merge slash link. "
+ "A regular link should not be added.");
}
linkEntries.add(
new LinkEntry(src, target, linkType, settings, ugi, config));
} else {
if (!linkEntries.isEmpty()) {
throw new IOException("Mount table " + mountTableName
+ " has already been configured with regular links. "
+ "A merge slash link should not be configured.");
}
if (isMergeSlashConfigured) {
throw new IOException("Mount table " + mountTableName
+ " has already been configured with a merge slash link. "
+ "Multiple merge slash links for the same mount table is "
+ "not allowed.");
}
isMergeSlashConfigured = true;
mergeSlashTarget = target;
}
}
}
if (isMergeSlashConfigured) {
Preconditions.checkNotNull(mergeSlashTarget);
root = new INodeLink<T>(mountTableName, ugi,
getTargetFileSystem(new URI(mergeSlashTarget)),
new URI(mergeSlashTarget));
mountPoints.add(new MountPoint<T>("/", (INodeLink<T>) root));
rootFallbackLink = null;
} else {
root = new INodeDir<T>("/", UserGroupInformation.getCurrentUser());
getRootDir().setInternalDirFs(getTargetFileSystem(getRootDir()));
getRootDir().setRoot(true);
INodeLink<T> fallbackLink = null;
for (LinkEntry le : linkEntries) {
if (le.isLinkType(LinkType.SINGLE_FALLBACK)) {
if (fallbackLink != null) {
throw new IOException("Mount table " + mountTableName
+ " has already been configured with a link fallback. "
+ "Multiple fallback links for the same mount table is "
+ "not allowed.");
}
fallbackLink = new INodeLink<T>(mountTableName, ugi,
getTargetFileSystem(new URI(le.getTarget())),
new URI(le.getTarget()));
} else {
createLink(le.getSrc(), le.getTarget(), le.getLinkType(),
le.getSettings(), le.getUgi(), le.getConfig());
}
}
rootFallbackLink = fallbackLink;
}
if (!gotMountTableEntry) {
throw new IOException(
"ViewFs: Cannot initialize: Empty Mount table in config for " +
"viewfs://" + vName + "/");
"viewfs://" + mountTableName + "/");
}
}
@ -382,7 +622,7 @@ boolean isInternalDir() {
/**
* Resolve the pathname p relative to root InodeDir
* @param p - inout path
* @param p - input path
* @param resolveLastComponent
* @return ResolveResult which allows further resolution of the remaining path
* @throws FileNotFoundException
@ -391,27 +631,53 @@ ResolveResult<T> resolve(final String p, final boolean resolveLastComponent)
throws FileNotFoundException {
String[] path = breakIntoPathComponents(p);
if (path.length <= 1) { // special case for when path is "/"
ResolveResult<T> res =
new ResolveResult<T>(ResultKind.INTERNAL_DIR,
root.InodeDirFs, root.fullPath, SlashPath);
T targetFs = root.isInternalDir() ?
getRootDir().getInternalDirFs() : getRootLink().getTargetFileSystem();
ResolveResult<T> res = new ResolveResult<T>(ResultKind.INTERNAL_DIR,
targetFs, root.fullPath, SlashPath);
return res;
}
INodeDir<T> curInode = root;
/**
* linkMergeSlash has been configured. The root of this mount table has
* been linked to the root directory of a file system.
* The first non-slash path component should be name of the mount table.
*/
if (root.isLink()) {
Path remainingPath;
StringBuilder remainingPathStr = new StringBuilder();
// ignore first slash
for (int i = 1; i < path.length; i++) {
remainingPathStr.append("/").append(path[i]);
}
remainingPath = new Path(remainingPathStr.toString());
ResolveResult<T> res = new ResolveResult<T>(ResultKind.EXTERNAL_DIR,
getRootLink().getTargetFileSystem(), root.fullPath, remainingPath);
return res;
}
Preconditions.checkState(root.isInternalDir());
INodeDir<T> curInode = getRootDir();
int i;
// ignore first slash
for (i = 1; i < path.length - (resolveLastComponent ? 0 : 1); i++) {
INode<T> nextInode = curInode.resolveInternal(path[i]);
if (nextInode == null) {
StringBuilder failedAt = new StringBuilder(path[0]);
for (int j = 1; j <= i; ++j) {
failedAt.append('/').append(path[j]);
if (hasFallbackLink()) {
return new ResolveResult<T>(ResultKind.EXTERNAL_DIR,
getRootFallbackLink().getTargetFileSystem(),
root.fullPath, new Path(p));
} else {
StringBuilder failedAt = new StringBuilder(path[0]);
for (int j = 1; j <= i; ++j) {
failedAt.append('/').append(path[j]);
}
throw (new FileNotFoundException(
"File/Directory does not exist: " + failedAt.toString()));
}
throw (new FileNotFoundException("File/Directory does not exist: "
+ failedAt.toString()));
}
if (nextInode instanceof INodeLink) {
if (nextInode.isLink()) {
final INodeLink<T> link = (INodeLink<T>) nextInode;
final Path remainingPath;
if (i >= path.length - 1) {
@ -425,9 +691,9 @@ ResolveResult<T> resolve(final String p, final boolean resolveLastComponent)
}
final ResolveResult<T> res =
new ResolveResult<T>(ResultKind.EXTERNAL_DIR,
link.targetFileSystem, nextInode.fullPath, remainingPath);
link.getTargetFileSystem(), nextInode.fullPath, remainingPath);
return res;
} else if (nextInode instanceof INodeDir) {
} else if (nextInode.isInternalDir()) {
curInode = (INodeDir<T>) nextInode;
}
}
@ -449,7 +715,7 @@ ResolveResult<T> resolve(final String p, final boolean resolveLastComponent)
}
final ResolveResult<T> res =
new ResolveResult<T>(ResultKind.INTERNAL_DIR,
curInode.InodeDirFs, curInode.fullPath, remainingPath);
curInode.getInternalDirFs(), curInode.fullPath, remainingPath);
return res;
}

View File

@ -1037,12 +1037,12 @@ public FileStatus getFileStatus(Path f) throws IOException {
public FileStatus[] listStatus(Path f) throws AccessControlException,
FileNotFoundException, IOException {
checkPathIsSlash(f);
FileStatus[] result = new FileStatus[theInternalDir.children.size()];
FileStatus[] result = new FileStatus[theInternalDir.getChildren().size()];
int i = 0;
for (Entry<String, INode<FileSystem>> iEntry :
theInternalDir.children.entrySet()) {
theInternalDir.getChildren().entrySet()) {
INode<FileSystem> inode = iEntry.getValue();
if (inode instanceof INodeLink ) {
if (inode.isLink()) {
INodeLink<FileSystem> link = (INodeLink<FileSystem>) inode;
result[i++] = new FileStatus(0, false, 0, 0,
@ -1065,11 +1065,12 @@ public FileStatus[] listStatus(Path f) throws AccessControlException,
@Override
public boolean mkdirs(Path dir, FsPermission permission)
throws AccessControlException, FileAlreadyExistsException {
if (theInternalDir.isRoot && dir == null) {
if (theInternalDir.isRoot() && dir == null) {
throw new FileAlreadyExistsException("/ already exits");
}
// Note dir starts with /
if (theInternalDir.children.containsKey(dir.toString().substring(1))) {
if (theInternalDir.getChildren().containsKey(
dir.toString().substring(1))) {
return true; // this is the stupid semantics of FileSystem
}
throw readOnlyMountTable("mkdirs", dir);

View File

@ -899,13 +899,13 @@ public FileStatus getFileLinkStatus(final Path f)
throws IOException {
// look up i internalDirs children - ignore first Slash
INode<AbstractFileSystem> inode =
theInternalDir.children.get(f.toUri().toString().substring(1));
theInternalDir.getChildren().get(f.toUri().toString().substring(1));
if (inode == null) {
throw new FileNotFoundException(
"viewFs internal mount table - missing entry:" + f);
}
FileStatus result;
if (inode instanceof INodeLink) {
if (inode.isLink()) {
INodeLink<AbstractFileSystem> inodelink =
(INodeLink<AbstractFileSystem>) inode;
result = new FileStatus(0, false, 0, 0, creationTime, creationTime,
@ -947,14 +947,14 @@ public int getUriDefaultPort() {
public FileStatus[] listStatus(final Path f) throws AccessControlException,
IOException {
checkPathIsSlash(f);
FileStatus[] result = new FileStatus[theInternalDir.children.size()];
FileStatus[] result = new FileStatus[theInternalDir.getChildren().size()];
int i = 0;
for (Entry<String, INode<AbstractFileSystem>> iEntry :
theInternalDir.children.entrySet()) {
theInternalDir.getChildren().entrySet()) {
INode<AbstractFileSystem> inode = iEntry.getValue();
if (inode instanceof INodeLink ) {
if (inode.isLink()) {
INodeLink<AbstractFileSystem> link =
(INodeLink<AbstractFileSystem>) inode;
@ -979,7 +979,7 @@ public FileStatus[] listStatus(final Path f) throws AccessControlException,
public void mkdir(final Path dir, final FsPermission permission,
final boolean createParent) throws AccessControlException,
FileAlreadyExistsException {
if (theInternalDir.isRoot && dir == null) {
if (theInternalDir.isRoot() && dir == null) {
throw new FileAlreadyExistsException("/ already exits");
}
throw readOnlyMountTable("mkdir", dir);

View File

@ -1005,8 +1005,8 @@ public void testConfLinkSlash() throws Exception {
+ mtPrefix + Constants.CONFIG_VIEWFS_LINK + "." + "/");
} catch (Exception e) {
if (e instanceof UnsupportedFileSystemException) {
String msg = Constants.CONFIG_VIEWFS_LINK_MERGE_SLASH
+ " is not supported yet.";
String msg = " Use " + Constants.CONFIG_VIEWFS_LINK_MERGE_SLASH +
" instead";
assertThat(e.getMessage(), containsString(msg));
} else {
fail("Unexpected exception: " + e.getMessage());

View File

@ -91,7 +91,7 @@ In order to provide transparency with the old world, the ViewFs file system (i.e
ViewFs implements the Hadoop file system interface just like HDFS and the local file system. It is a trivial file system in the sense that it only allows linking to other file systems. Because ViewFs implements the Hadoop file system interface, it works transparently Hadoop tools. For example, all the shell commands work with ViewFs as with HDFS and local file system.
The mount points of a mount table are specified in the standard Hadoop configuration files. In the configuration of each cluster, the default file system is set to the mount table for that cluster as shown below (compare it with the configuration in [Single Namenode Clusters](#Single_Namenode_Clusters)).
In the configuration of each cluster, the default file system is set to the mount table for that cluster as shown below (compare it with the configuration in [Single Namenode Clusters](#Single_Namenode_Clusters)).
```xml
<property>
@ -100,7 +100,47 @@ The mount points of a mount table are specified in the standard Hadoop configura
</property>
```
The authority following the `viewfs://` scheme in the URI is the mount table name. It is recommanded that the mount table of a cluster should be named by the cluster name. Then Hadoop system will look for a mount table with the name "clusterX" in the Hadoop configuration files. Operations arrange all gateways and service machines to contain the mount tables for ALL clusters such that, for each cluster, the default file system is set to the ViewFs mount table for that cluster as described above.
The authority following the `viewfs://` scheme in the URI is the mount table name. It is recommended that the mount table of a cluster should be named by the cluster name. Then Hadoop system will look for a mount table with the name "clusterX" in the Hadoop configuration files. Operations arrange all gateways and service machines to contain the mount tables for ALL clusters such that, for each cluster, the default file system is set to the ViewFs mount table for that cluster as described above.
The mount points of a mount table are specified in the standard Hadoop configuration files. All the mount table config entries for `viewfs` are prefixed by `fs.viewfs.mounttable.`. The mount points that are linking other filesystems are specified using `link` tags. The recommendation is to have mount points name same as in the linked filesystem target locations. For all namespaces that are not configured in the mount table, we can have them fallback to a default filesystem via `linkFallback`.
In the below mount table configuration, namespace `/data` is linked to the filesystem `hdfs://nn1-clusterx.example.com:9820/data`, `/project` is linked to the filesystem `hdfs://nn2-clusterx.example.com:9820/project`. All namespaces that are not configured in the mount table, like `/logs` are linked to the filesystem `hdfs://nn5-clusterx.example.com:9820/home`.
```xml
<configuration>
<property>
<name>fs.viewfs.mounttable.ClusterX.link./data</name>
<value>hdfs://nn1-clusterx.example.com:9820/data</value>
</property>
<property>
<name>fs.viewfs.mounttable.ClusterX.link./project</name>
<value>hdfs://nn2-clusterx.example.com:9820/project</value>
</property>
<property>
<name>fs.viewfs.mounttable.ClusterX.link./user</name>
<value>hdfs://nn3-clusterx.example.com:9820/user</value>
</property>
<property>
<name>fs.viewfs.mounttable.ClusterX.link./tmp</name>
<value>hdfs://nn4-clusterx.example.com:9820/tmp</value>
</property>
<property>
<name>fs.viewfs.mounttable.ClusterX.linkFallback</name>
<value>hdfs://nn5-clusterx.example.com:9820/home</value>
</property>
</configuration>
```
Alternatively we can have the mount table's root merged with the root of another filesystem via `linkMergeSlash`. In the below mount table configuration, ClusterY's root is merged with the root filesystem at `hdfs://nn1-clustery.example.com:9820`.
```xml
<configuration>
<property>
<name>fs.viewfs.mounttable.ClusterY.linkMergeSlash</name>
<value>hdfs://nn1-clustery.example.com:9820/</value>
</property>
</configuration>
```
### Pathname Usage Patterns

View File

@ -0,0 +1,264 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.viewfs;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import javax.security.auth.login.LoginException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileSystemTestHelper;
import org.apache.hadoop.fs.FsConstants;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Test for viewfs with LinkFallback mount table entries.
*/
public class TestViewFileSystemLinkFallback extends ViewFileSystemBaseTest {
private static FileSystem fsDefault;
private static MiniDFSCluster cluster;
private static final int NAME_SPACES_COUNT = 3;
private static final int DATA_NODES_COUNT = 3;
private static final int FS_INDEX_DEFAULT = 0;
private static final String LINK_FALLBACK_CLUSTER_1_NAME = "Cluster1";
private static final FileSystem[] FS_HDFS = new FileSystem[NAME_SPACES_COUNT];
private static final Configuration CONF = new Configuration();
private static final File TEST_DIR = GenericTestUtils.getTestDir(
TestViewFileSystemLinkFallback.class.getSimpleName());
private static final String TEST_BASE_PATH =
"/tmp/TestViewFileSystemLinkFallback";
private final static Logger LOG = LoggerFactory.getLogger(
TestViewFileSystemLinkFallback.class);
@Override
protected FileSystemTestHelper createFileSystemHelper() {
return new FileSystemTestHelper(TEST_BASE_PATH);
}
@BeforeClass
public static void clusterSetupAtBeginning() throws IOException,
LoginException, URISyntaxException {
SupportsBlocks = true;
CONF.setBoolean(DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_KEY,
true);
cluster = new MiniDFSCluster.Builder(CONF)
.nnTopology(MiniDFSNNTopology.simpleFederatedTopology(
NAME_SPACES_COUNT))
.numDataNodes(DATA_NODES_COUNT)
.build();
cluster.waitClusterUp();
for (int i = 0; i < NAME_SPACES_COUNT; i++) {
FS_HDFS[i] = cluster.getFileSystem(i);
}
fsDefault = FS_HDFS[FS_INDEX_DEFAULT];
}
@AfterClass
public static void clusterShutdownAtEnd() throws Exception {
if (cluster != null) {
cluster.shutdown();
}
}
@Override
@Before
public void setUp() throws Exception {
fsTarget = fsDefault;
super.setUp();
}
/**
* Override this so that we don't set the targetTestRoot to any path under the
* root of the FS, and so that we don't try to delete the test dir, but rather
* only its contents.
*/
@Override
void initializeTargetTestRoot() throws IOException {
targetTestRoot = fsDefault.makeQualified(new Path("/"));
for (FileStatus status : fsDefault.listStatus(targetTestRoot)) {
fsDefault.delete(status.getPath(), true);
}
}
@Override
void setupMountPoints() {
super.setupMountPoints();
ConfigUtil.addLinkFallback(conf, LINK_FALLBACK_CLUSTER_1_NAME,
targetTestRoot.toUri());
}
@Override
int getExpectedDelegationTokenCount() {
return 1; // all point to the same fs so 1 unique token
}
@Override
int getExpectedDelegationTokenCountWithCredentials() {
return 1;
}
@Test
public void testConfLinkFallback() throws Exception {
Path testBasePath = new Path(TEST_BASE_PATH);
Path testLevel2Dir = new Path(TEST_BASE_PATH, "dir1/dirA");
Path testBaseFile = new Path(testBasePath, "testBaseFile.log");
Path testBaseFileRelative = new Path(testLevel2Dir,
"../../testBaseFile.log");
Path testLevel2File = new Path(testLevel2Dir, "testLevel2File.log");
fsTarget.mkdirs(testLevel2Dir);
fsTarget.createNewFile(testBaseFile);
FSDataOutputStream dataOutputStream = fsTarget.append(testBaseFile);
dataOutputStream.write(1);
dataOutputStream.close();
fsTarget.createNewFile(testLevel2File);
dataOutputStream = fsTarget.append(testLevel2File);
dataOutputStream.write("test link fallback".toString().getBytes());
dataOutputStream.close();
String clusterName = "ClusterFallback";
URI viewFsUri = new URI(FsConstants.VIEWFS_SCHEME, clusterName,
"/", null, null);
Configuration conf = new Configuration();
ConfigUtil.addLinkFallback(conf, clusterName, fsTarget.getUri());
FileSystem vfs = FileSystem.get(viewFsUri, conf);
assertEquals(ViewFileSystem.class, vfs.getClass());
FileStatus baseFileStat = vfs.getFileStatus(new Path(viewFsUri.toString()
+ testBaseFile.toUri().toString()));
LOG.info("BaseFileStat: " + baseFileStat);
FileStatus baseFileRelStat = vfs.getFileStatus(new Path(viewFsUri.toString()
+ testBaseFileRelative.toUri().toString()));
LOG.info("BaseFileRelStat: " + baseFileRelStat);
Assert.assertEquals("Unexpected file length for " + testBaseFile,
1, baseFileStat.getLen());
Assert.assertEquals("Unexpected file length for " + testBaseFileRelative,
baseFileStat.getLen(), baseFileRelStat.getLen());
FileStatus level2FileStat = vfs.getFileStatus(new Path(viewFsUri.toString()
+ testLevel2File.toUri().toString()));
LOG.info("Level2FileStat: " + level2FileStat);
vfs.close();
}
@Test
public void testConfLinkFallbackWithRegularLinks() throws Exception {
Path testBasePath = new Path(TEST_BASE_PATH);
Path testLevel2Dir = new Path(TEST_BASE_PATH, "dir1/dirA");
Path testBaseFile = new Path(testBasePath, "testBaseFile.log");
Path testLevel2File = new Path(testLevel2Dir, "testLevel2File.log");
fsTarget.mkdirs(testLevel2Dir);
fsTarget.createNewFile(testBaseFile);
fsTarget.createNewFile(testLevel2File);
FSDataOutputStream dataOutputStream = fsTarget.append(testLevel2File);
dataOutputStream.write("test link fallback".toString().getBytes());
dataOutputStream.close();
String clusterName = "ClusterFallback";
URI viewFsUri = new URI(FsConstants.VIEWFS_SCHEME, clusterName,
"/", null, null);
Configuration conf = new Configuration();
ConfigUtil.addLink(conf, clusterName,
"/internalDir/linkToDir2",
new Path(targetTestRoot, "dir2").toUri());
ConfigUtil.addLink(conf, clusterName,
"/internalDir/internalDirB/linkToDir3",
new Path(targetTestRoot, "dir3").toUri());
ConfigUtil.addLink(conf, clusterName,
"/danglingLink",
new Path(targetTestRoot, "missingTarget").toUri());
ConfigUtil.addLink(conf, clusterName,
"/linkToAFile",
new Path(targetTestRoot, "aFile").toUri());
System.out.println("ViewFs link fallback " + fsTarget.getUri());
ConfigUtil.addLinkFallback(conf, clusterName, targetTestRoot.toUri());
FileSystem vfs = FileSystem.get(viewFsUri, conf);
assertEquals(ViewFileSystem.class, vfs.getClass());
FileStatus baseFileStat = vfs.getFileStatus(
new Path(viewFsUri.toString() + testBaseFile.toUri().toString()));
LOG.info("BaseFileStat: " + baseFileStat);
Assert.assertEquals("Unexpected file length for " + testBaseFile,
0, baseFileStat.getLen());
FileStatus level2FileStat = vfs.getFileStatus(new Path(viewFsUri.toString()
+ testLevel2File.toUri().toString()));
LOG.info("Level2FileStat: " + level2FileStat);
dataOutputStream = vfs.append(testLevel2File);
dataOutputStream.write("Writing via viewfs fallback path".getBytes());
dataOutputStream.close();
FileStatus level2FileStatAfterWrite = vfs.getFileStatus(
new Path(viewFsUri.toString() + testLevel2File.toUri().toString()));
Assert.assertTrue("Unexpected file length for " + testLevel2File,
level2FileStatAfterWrite.getLen() > level2FileStat.getLen());
vfs.close();
}
@Test
public void testConfLinkFallbackWithMountPoint() throws Exception {
TEST_DIR.mkdirs();
Configuration conf = new Configuration();
String clusterName = "ClusterX";
String mountPoint = "/user";
URI viewFsUri = new URI(FsConstants.VIEWFS_SCHEME, clusterName,
"/", null, null);
String expectedErrorMsg = "Invalid linkFallback entry in config: " +
"linkFallback./user";
String mountTableEntry = Constants.CONFIG_VIEWFS_PREFIX + "."
+ clusterName + "." + Constants.CONFIG_VIEWFS_LINK_FALLBACK
+ "." + mountPoint;
conf.set(mountTableEntry, TEST_DIR.toURI().toString());
try {
FileSystem.get(viewFsUri, conf);
fail("Shouldn't allow linkMergeSlash to take extra mount points!");
} catch (IOException e) {
assertTrue("Unexpected error: " + e.getMessage(),
e.getMessage().contains(expectedErrorMsg));
}
}
}

View File

@ -0,0 +1,234 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.viewfs;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileSystemTestHelper;
import org.apache.hadoop.fs.FsConstants;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import javax.security.auth.login.LoginException;
/**
* Test for viewfs with LinkMergeSlash mount table entries.
*/
public class TestViewFileSystemLinkMergeSlash extends ViewFileSystemBaseTest {
private static FileSystem fsDefault;
private static MiniDFSCluster cluster;
private static final int NAME_SPACES_COUNT = 3;
private static final int DATA_NODES_COUNT = 3;
private static final int FS_INDEX_DEFAULT = 0;
private static final String LINK_MERGE_SLASH_CLUSTER_1_NAME = "ClusterLMS1";
private static final String LINK_MERGE_SLASH_CLUSTER_2_NAME = "ClusterLMS2";
private static final FileSystem[] FS_HDFS = new FileSystem[NAME_SPACES_COUNT];
private static final Configuration CONF = new Configuration();
private static final File TEST_DIR = GenericTestUtils.getTestDir(
TestViewFileSystemLinkMergeSlash.class.getSimpleName());
private static final String TEST_TEMP_PATH =
"/tmp/TestViewFileSystemLinkMergeSlash";
private final static Logger LOG = LoggerFactory.getLogger(
TestViewFileSystemLinkMergeSlash.class);
@Override
protected FileSystemTestHelper createFileSystemHelper() {
return new FileSystemTestHelper(TEST_TEMP_PATH);
}
@BeforeClass
public static void clusterSetupAtBeginning() throws IOException,
LoginException, URISyntaxException {
SupportsBlocks = true;
CONF.setBoolean(DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_KEY,
true);
cluster = new MiniDFSCluster.Builder(CONF)
.nnTopology(MiniDFSNNTopology.simpleFederatedTopology(
NAME_SPACES_COUNT))
.numDataNodes(DATA_NODES_COUNT)
.build();
cluster.waitClusterUp();
for (int i = 0; i < NAME_SPACES_COUNT; i++) {
FS_HDFS[i] = cluster.getFileSystem(i);
}
fsDefault = FS_HDFS[FS_INDEX_DEFAULT];
}
@AfterClass
public static void clusterShutdownAtEnd() throws Exception {
if (cluster != null) {
cluster.shutdown();
}
}
@Override
@Before
public void setUp() throws Exception {
fsTarget = fsDefault;
super.setUp();
}
/**
* Override this so that we don't set the targetTestRoot to any path under the
* root of the FS, and so that we don't try to delete the test dir, but rather
* only its contents.
*/
@Override
void initializeTargetTestRoot() throws IOException {
targetTestRoot = fsDefault.makeQualified(new Path("/"));
for (FileStatus status : fsDefault.listStatus(targetTestRoot)) {
fsDefault.delete(status.getPath(), true);
}
}
@Override
void setupMountPoints() {
super.setupMountPoints();
ConfigUtil.addLinkMergeSlash(conf, LINK_MERGE_SLASH_CLUSTER_1_NAME,
targetTestRoot.toUri());
ConfigUtil.addLinkMergeSlash(conf, LINK_MERGE_SLASH_CLUSTER_2_NAME,
targetTestRoot.toUri());
}
@Override
int getExpectedDelegationTokenCount() {
return 1; // all point to the same fs so 1 unique token
}
@Override
int getExpectedDelegationTokenCountWithCredentials() {
return 1;
}
@Test
public void testConfLinkMergeSlash() throws Exception {
TEST_DIR.mkdirs();
String clusterName = "ClusterMerge";
URI viewFsUri = new URI(FsConstants.VIEWFS_SCHEME, clusterName,
"/", null, null);
String testFileName = "testLinkMergeSlash";
File infile = new File(TEST_DIR, testFileName);
final byte[] content = "HelloWorld".getBytes();
FileOutputStream fos = null;
try {
fos = new FileOutputStream(infile);
fos.write(content);
} finally {
if (fos != null) {
fos.close();
}
}
assertEquals((long)content.length, infile.length());
Configuration conf = new Configuration();
ConfigUtil.addLinkMergeSlash(conf, clusterName, TEST_DIR.toURI());
FileSystem vfs = FileSystem.get(viewFsUri, conf);
assertEquals(ViewFileSystem.class, vfs.getClass());
FileStatus stat = vfs.getFileStatus(new Path(viewFsUri.toString() +
testFileName));
LOG.info("File stat: " + stat);
vfs.close();
}
@Test
public void testConfLinkMergeSlashWithRegularLinks() throws Exception {
TEST_DIR.mkdirs();
String clusterName = "ClusterMerge";
String expectedErrorMsg1 = "Mount table ClusterMerge has already been " +
"configured with a merge slash link";
String expectedErrorMsg2 = "Mount table ClusterMerge has already been " +
"configured with regular links";
URI viewFsUri = new URI(FsConstants.VIEWFS_SCHEME, clusterName,
"/", null, null);
Configuration conf = new Configuration();
ConfigUtil.addLinkMergeSlash(conf, clusterName, TEST_DIR.toURI());
ConfigUtil.addLink(conf, clusterName, "testDir", TEST_DIR.toURI());
try {
FileSystem.get(viewFsUri, conf);
fail("Shouldn't allow both merge slash link and regular link on same "
+ "mount table.");
} catch (IOException e) {
assertTrue("Unexpected error message: " + e.getMessage(),
e.getMessage().contains(expectedErrorMsg1) || e.getMessage()
.contains(expectedErrorMsg2));
}
}
@Test
public void testConfLinkMergeSlashWithMountPoint() throws Exception {
TEST_DIR.mkdirs();
Configuration conf = new Configuration();
String clusterName = "ClusterX";
String mountPoint = "/user";
URI viewFsUri = new URI(FsConstants.VIEWFS_SCHEME, clusterName,
"/", null, null);
String expectedErrorMsg = "Invalid linkMergeSlash entry in config: " +
"linkMergeSlash./user";
String mountTableEntry = Constants.CONFIG_VIEWFS_PREFIX + "."
+ clusterName + "." + Constants.CONFIG_VIEWFS_LINK_MERGE_SLASH
+ "." + mountPoint;
conf.set(mountTableEntry, TEST_DIR.toURI().toString());
try {
FileSystem.get(viewFsUri, conf);
fail("Shouldn't allow linkMergeSlash to take extra mount points!");
} catch (IOException e) {
assertTrue(e.getMessage().contains(expectedErrorMsg));
}
}
@Test
public void testChildFileSystems() throws Exception {
URI viewFsUri = new URI(FsConstants.VIEWFS_SCHEME,
LINK_MERGE_SLASH_CLUSTER_1_NAME, "/", null, null);
FileSystem fs = FileSystem.get(viewFsUri, conf);
FileSystem[] childFs = fs.getChildFileSystems();
Assert.assertEquals("Unexpected number of child filesystems!",
1, childFs.length);
Assert.assertEquals("Unexpected child filesystem!",
DistributedFileSystem.class, childFs[0].getClass());
}
}