diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FsConstants.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FsConstants.java index 344048f0ce..6034542106 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FsConstants.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FsConstants.java @@ -45,5 +45,4 @@ public interface FsConstants { String FS_VIEWFS_OVERLOAD_SCHEME_TARGET_FS_IMPL_PATTERN = "fs.viewfs.overload.scheme.target.%s.impl"; String VIEWFS_TYPE = "viewfs"; - String VIEWFSOS_TYPE = "viewfsOverloadScheme"; } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/InodeTree.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/InodeTree.java index 422e7337b5..003694f2e9 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/InodeTree.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/InodeTree.java @@ -599,9 +599,10 @@ protected InodeTree(final Configuration config, final String viewName, if (!gotMountTableEntry) { if (!initingUriAsFallbackOnNoMounts) { - throw new IOException( - "ViewFs: Cannot initialize: Empty Mount table in config for " - + "viewfs://" + mountTableName + "/"); + throw new IOException(new StringBuilder( + "ViewFs: Cannot initialize: Empty Mount table in config for ") + .append(theUri.getScheme()).append("://").append(mountTableName) + .append("/").toString()); } StringBuilder msg = new StringBuilder("Empty mount table detected for ").append(theUri) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ViewFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ViewFileSystem.java index ad62f94ec6..8c659d1b15 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ViewFileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ViewFileSystem.java @@ -259,13 +259,14 @@ public String getScheme() { } /** - * Returns the ViewFileSystem type. - * @return viewfs + * Returns false as it does not support to add fallback link automatically on + * no mounts. */ - String getType() { - return FsConstants.VIEWFS_TYPE; + boolean supportAutoAddingFallbackOnNoMounts() { + return false; } + /** * Called after a new FileSystem instance is constructed. * @param theUri a uri whose authority section names the host, port, etc. for @@ -293,7 +294,7 @@ public void initialize(final URI theUri, final Configuration conf) try { myUri = new URI(getScheme(), authority, "/", null, null); boolean initingUriAsFallbackOnNoMounts = - !FsConstants.VIEWFS_TYPE.equals(getType()); + supportAutoAddingFallbackOnNoMounts(); fsState = new InodeTree(conf, tableName, myUri, initingUriAsFallbackOnNoMounts) { @Override diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ViewFileSystemOverloadScheme.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ViewFileSystemOverloadScheme.java index 2165a3f9ee..5353e93b6f 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ViewFileSystemOverloadScheme.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ViewFileSystemOverloadScheme.java @@ -104,6 +104,7 @@ @InterfaceStability.Evolving public class ViewFileSystemOverloadScheme extends ViewFileSystem { private URI myUri; + private boolean supportAutoAddingFallbackOnNoMounts = true; public ViewFileSystemOverloadScheme() throws IOException { super(); } @@ -114,11 +115,19 @@ public String getScheme() { } /** - * Returns the ViewFileSystem type. - * @return viewfs + * By default returns false as ViewFileSystemOverloadScheme supports auto + * adding fallback on no mounts. */ - String getType() { - return FsConstants.VIEWFSOS_TYPE; + public boolean supportAutoAddingFallbackOnNoMounts() { + return this.supportAutoAddingFallbackOnNoMounts; + } + + /** + * Sets whether to add fallback automatically when no mount points found. + */ + public void setSupportAutoAddingFallbackOnNoMounts( + boolean addAutoFallbackOnNoMounts) { + this.supportAutoAddingFallbackOnNoMounts = addAutoFallbackOnNoMounts; } @Override @@ -287,4 +296,62 @@ public FileSystem getRawFileSystem(Path path, Configuration conf) } } + /** + * Gets the mount path info, which contains the target file system and + * remaining path to pass to the target file system. + */ + public MountPathInfo getMountPathInfo(Path path, + Configuration conf) throws IOException { + InodeTree.ResolveResult res; + try { + res = fsState.resolve(getUriPath(path), true); + FileSystem fs = res.isInternalDir() ? + (fsState.getRootFallbackLink() != null ? + ((ChRootedFileSystem) fsState + .getRootFallbackLink().targetFileSystem).getMyFs() : + fsGetter().get(path.toUri(), conf)) : + ((ChRootedFileSystem) res.targetFileSystem).getMyFs(); + return new MountPathInfo(res.remainingPath, res.resolvedPath, + fs); + } catch (FileNotFoundException e) { + // No link configured with passed path. + throw new NotInMountpointException(path, + "No link found for the given path."); + } + } + + /** + * A class to maintain the target file system and a path to pass to the target + * file system. + */ + public static class MountPathInfo { + private Path pathOnTarget; + private T targetFs; + + public MountPathInfo(Path pathOnTarget, String resolvedPath, T targetFs) { + this.pathOnTarget = pathOnTarget; + this.targetFs = targetFs; + } + + public Path getPathOnTarget() { + return this.pathOnTarget; + } + + public T getTargetFs() { + return this.targetFs; + } + } + + /** + * @return Gets the fallback file system configured. Usually, this will be the + * default cluster. + */ + public FileSystem getFallbackFileSystem() { + if (fsState.getRootFallbackLink() == null) { + return null; + } + return ((ChRootedFileSystem) fsState.getRootFallbackLink().targetFileSystem) + .getMyFs(); + } + } \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java index 7694f789e1..1d5c5af8c3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java @@ -186,7 +186,7 @@ public void initialize(URI uri, Configuration conf) throws IOException { throw new IOException("Incomplete HDFS URI, no host: "+ uri); } - this.dfs = new DFSClient(uri, conf, statistics); + initDFSClient(uri, conf); this.uri = URI.create(uri.getScheme()+"://"+uri.getAuthority()); this.workingDir = getHomeDirectory(); @@ -200,6 +200,10 @@ public StorageStatistics provide() { }); } + void initDFSClient(URI theUri, Configuration conf) throws IOException { + this.dfs = new DFSClient(theUri, conf, statistics); + } + @Override public Path getWorkingDirectory() { return workingDir; @@ -1510,10 +1514,14 @@ protected boolean primitiveMkdir(Path f, FsPermission absolutePermission) @Override public void close() throws IOException { try { - dfs.closeOutputStreams(false); + if (dfs != null) { + dfs.closeOutputStreams(false); + } super.close(); } finally { - dfs.close(); + if (dfs != null) { + dfs.close(); + } } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ViewDistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ViewDistributedFileSystem.java new file mode 100644 index 0000000000..0a681693a4 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ViewDistributedFileSystem.java @@ -0,0 +1,2307 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import org.apache.hadoop.HadoopIllegalArgumentException; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.crypto.key.KeyProvider; +import org.apache.hadoop.fs.BlockLocation; +import org.apache.hadoop.fs.BlockStoragePolicySpi; +import org.apache.hadoop.fs.CacheFlag; +import org.apache.hadoop.fs.ContentSummary; +import org.apache.hadoop.fs.CreateFlag; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileChecksum; +import org.apache.hadoop.fs.FileEncryptionInfo; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FsServerDefaults; +import org.apache.hadoop.fs.FsStatus; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.Options; +import org.apache.hadoop.fs.PartialListing; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.fs.PathHandle; +import org.apache.hadoop.fs.QuotaUsage; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.fs.XAttrSetFlag; +import org.apache.hadoop.fs.permission.AclEntry; +import org.apache.hadoop.fs.permission.AclStatus; +import org.apache.hadoop.fs.permission.FsAction; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.fs.viewfs.ViewFileSystem; +import org.apache.hadoop.fs.viewfs.ViewFileSystemOverloadScheme; +import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; +import org.apache.hadoop.hdfs.protocol.AddErasureCodingPolicyResponse; +import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; +import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry; +import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo; +import org.apache.hadoop.hdfs.protocol.CachePoolEntry; +import org.apache.hadoop.hdfs.protocol.CachePoolInfo; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.ECTopologyVerifierResult; +import org.apache.hadoop.hdfs.protocol.EncryptionZone; +import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; +import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicyInfo; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.hdfs.protocol.HdfsPathHandle; +import org.apache.hadoop.hdfs.protocol.OpenFileEntry; +import org.apache.hadoop.hdfs.protocol.OpenFilesIterator; +import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo; +import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport; +import org.apache.hadoop.hdfs.protocol.SnapshotDiffReportListing; +import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus; +import org.apache.hadoop.hdfs.protocol.ZoneReencryptionStatus; +import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; +import org.apache.hadoop.io.MultipleIOException; +import org.apache.hadoop.security.AccessControlException; +import org.apache.hadoop.security.token.DelegationTokenIssuer; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.util.Progressable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.FileNotFoundException; +import java.io.IOException; + +import java.net.InetSocketAddress; +import java.net.URI; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; + +/** + * The ViewDistributedFileSystem is an extended class to DistributedFileSystem + * with additional mounting functionality. The goal is to have better API + * compatibility for HDFS users when using mounting + * filesystem(ViewFileSystemOverloadScheme). + * The ViewFileSystemOverloadScheme{@link ViewFileSystemOverloadScheme} is a new + * filesystem with inherited mounting functionality from ViewFileSystem. + * For the user who is using ViewFileSystemOverloadScheme by setting + * fs.hdfs.impl=org.apache.hadoop.fs.viewfs.ViewFileSystemOverloadScheme, now + * they can set fs.hdfs.impl=org.apache.hadoop.hdfs.ViewDistributedFileSystem. + * So, that the hdfs users will get closely compatible API with mount + * functionality. For the rest of all other schemes can continue to use + * ViewFileSystemOverloadScheme class directly for mount functionality. Please + * note that ViewFileSystemOverloadScheme provides only + * ViewFileSystem{@link ViewFileSystem} APIs. + * If user configured this class but no mount point configured? Then it will + * simply work as existing DistributedFileSystem class. If user configured both + * fs.hdfs.impl to this class and mount configurations, then users will be able + * to make calls the APIs available in this class, they are nothing but DFS + * APIs, but they will be delegated to viewfs functionality. Please note, APIs + * without any path in arguments( ex: isInSafeMode), will be delegated to + * default filesystem only, that is the configured fallback link. If you want to + * make these API calls on specific child filesystem, you may want to initialize + * them separately and call. In ViewDistributedFileSystem, we strongly recommend + * to configure linkFallBack when you add mount links and it's recommended to + * point be to your base cluster, usually your current fs.defaultFS if that's + * pointing to hdfs. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +public class ViewDistributedFileSystem extends DistributedFileSystem { + private static final Logger LOGGER = + LoggerFactory.getLogger(ViewDistributedFileSystem.class); + + // A mounting file system. + private ViewFileSystemOverloadScheme vfs; + // A default DFS, which should have set via linkFallback + private DistributedFileSystem defaultDFS; + + @Override + public void initialize(URI uri, Configuration conf) throws IOException { + super.initialize(uri, conf); + try { + this.vfs = tryInitializeMountingViewFs(uri, conf); + } catch (IOException ioe) { + LOGGER.debug(new StringBuilder("Mount tree initialization failed with ") + .append("the reason => {}. Falling back to regular DFS") + .append(" initialization. Please re-initialize the fs after updating") + .append(" mount point.").toString(), ioe.getMessage()); + // Previous super.initialize would have skipped the dfsclient init and + // setWorkingDirectory as we planned to initialize vfs. Since vfs init + // failed, let's init dfsClient now. + super.initDFSClient(uri, conf); + super.setWorkingDirectory(super.getHomeDirectory()); + return; + } + + setConf(conf); + // A child DFS with the current initialized URI. This must be same as + // fallback fs. The fallback must point to root of your filesystems. + // Some APIs(without path in argument, for example isInSafeMode) will + // support only for base cluster filesystem. Only that APIs will use this + // fs. + defaultDFS = (DistributedFileSystem) this.vfs.getFallbackFileSystem(); + // Please don't access internal dfs client directly except in tests. + dfs = (defaultDFS != null) ? defaultDFS.dfs : null; + super.setWorkingDirectory(this.vfs.getHomeDirectory()); + } + + @Override + void initDFSClient(URI uri, Configuration conf) throws IOException { + // Since we plan to initialize vfs in this class, we will not need to + // initialize DFS client. + } + + public ViewDistributedFileSystem() { + } + + private ViewFileSystemOverloadScheme tryInitializeMountingViewFs(URI theUri, + Configuration conf) throws IOException { + ViewFileSystemOverloadScheme viewFs = new ViewFileSystemOverloadScheme(); + viewFs.setSupportAutoAddingFallbackOnNoMounts(false); + viewFs.initialize(theUri, conf); + return viewFs; + } + + @Override + public URI getUri() { + if (this.vfs == null) { + return super.getUri(); + } + return this.vfs.getUri(); + } + + @Override + public String getScheme() { + if (this.vfs == null) { + return super.getScheme(); + } + return this.vfs.getScheme(); + } + + @Override + public Path getWorkingDirectory() { + if (this.vfs == null) { + return super.getWorkingDirectory(); + } + return this.vfs.getWorkingDirectory(); + } + + @Override + public void setWorkingDirectory(Path dir) { + if (this.vfs == null) { + super.setWorkingDirectory(dir); + return; + } + this.vfs.setWorkingDirectory(dir); + } + + @Override + public Path getHomeDirectory() { + if (super.dfs == null) { + return null; + } + if (this.vfs == null) { + return super.getHomeDirectory(); + } + return this.vfs.getHomeDirectory(); + } + + /** + * Returns only default cluster getHedgedReadMetrics. + */ + @Override + public DFSHedgedReadMetrics getHedgedReadMetrics() { + if (this.vfs == null) { + return super.getHedgedReadMetrics(); + } + checkDefaultDFS(defaultDFS, "getHedgedReadMetrics"); + return defaultDFS.getHedgedReadMetrics(); + } + + @Override + public BlockLocation[] getFileBlockLocations(FileStatus fs, long start, + long len) throws IOException { + if (this.vfs == null) { + return super.getFileBlockLocations(fs, start, len); + } + return this.vfs.getFileBlockLocations(fs, start, len); + } + + @Override + public BlockLocation[] getFileBlockLocations(Path p, final long start, + final long len) throws IOException { + if (this.vfs == null) { + return super.getFileBlockLocations(p, start, len); + } + return this.vfs.getFileBlockLocations(p, start, len); + } + + @Override + public void setVerifyChecksum(final boolean verifyChecksum) { + if (this.vfs == null) { + super.setVerifyChecksum(verifyChecksum); + return; + } + this.vfs.setVerifyChecksum(verifyChecksum); + } + + @Override + public boolean recoverLease(final Path f) throws IOException { + ViewFileSystemOverloadScheme.MountPathInfo mountPathInfo = + this.vfs.getMountPathInfo(f, getConf()); + checkDFS(mountPathInfo.getTargetFs(), "recoverLease"); + return ((DistributedFileSystem) mountPathInfo.getTargetFs()) + .recoverLease(mountPathInfo.getPathOnTarget()); + } + + @Override + public FSDataInputStream open(final Path f, final int bufferSize) + throws AccessControlException, FileNotFoundException, IOException { + if (this.vfs == null) { + return super.open(f, bufferSize); + } + + return this.vfs.open(f, bufferSize); + } + + @Override + public FSDataInputStream open(PathHandle fd, int bufferSize) + throws IOException { + return this.vfs.open(fd, bufferSize); + } + + @Override + protected HdfsPathHandle createPathHandle(FileStatus st, + Options.HandleOpt... opts) { + if (this.vfs == null) { + return super.createPathHandle(st, opts); + } + throw new UnsupportedOperationException(); + } + + @Override + public FSDataOutputStream append(final Path f, final int bufferSize, + final Progressable progress) throws IOException { + if (this.vfs == null) { + return super.append(f, bufferSize, progress); + } + return this.vfs.append(f, bufferSize, progress); + } + + @Override + public FSDataOutputStream append(Path f, final EnumSet flag, + final int bufferSize, final Progressable progress) throws IOException { + if (this.vfs == null) { + return super.append(f, flag, bufferSize, progress); + } + ViewFileSystemOverloadScheme.MountPathInfo mountPathInfo = + this.vfs.getMountPathInfo(f, getConf()); + checkDFS(mountPathInfo.getTargetFs(), "append"); + return ((DistributedFileSystem) mountPathInfo.getTargetFs()) + .append(mountPathInfo.getPathOnTarget(), flag, bufferSize, progress); + } + + @Override + public FSDataOutputStream append(Path f, final EnumSet flag, + final int bufferSize, final Progressable progress, + final InetSocketAddress[] favoredNodes) throws IOException { + if (this.vfs == null) { + return super.append(f, flag, bufferSize, progress, favoredNodes); + } + ViewFileSystemOverloadScheme.MountPathInfo mountPathInfo = + this.vfs.getMountPathInfo(f, getConf()); + checkDFS(mountPathInfo.getTargetFs(), "append"); + return ((DistributedFileSystem) mountPathInfo.getTargetFs()) + .append(mountPathInfo.getPathOnTarget(), flag, bufferSize, progress, + favoredNodes); + } + + @Override + public FSDataOutputStream create(Path f, FsPermission permission, + boolean overwrite, int bufferSize, short replication, long blockSize, + Progressable progress) throws IOException { + if (this.vfs == null) { + return super + .create(f, permission, overwrite, bufferSize, replication, blockSize, + progress); + } + return this.vfs + .create(f, permission, overwrite, bufferSize, replication, blockSize, + progress); + } + + @Override + public HdfsDataOutputStream create(final Path f, + final FsPermission permission, final boolean overwrite, + final int bufferSize, final short replication, final long blockSize, + final Progressable progress, final InetSocketAddress[] favoredNodes) + throws IOException { + if (this.vfs == null) { + return super + .create(f, permission, overwrite, bufferSize, replication, blockSize, + progress, favoredNodes); + } + ViewFileSystemOverloadScheme.MountPathInfo mountPathInfo = + this.vfs.getMountPathInfo(f, getConf()); + checkDFS(mountPathInfo.getTargetFs(), "create"); + return ((DistributedFileSystem) mountPathInfo.getTargetFs()) + .create(mountPathInfo.getPathOnTarget(), permission, overwrite, + bufferSize, replication, blockSize, progress, favoredNodes); + } + + @Override + //DFS specific API + public FSDataOutputStream create(final Path f, final FsPermission permission, + final EnumSet cflags, final int bufferSize, + final short replication, final long blockSize, + final Progressable progress, final Options.ChecksumOpt checksumOpt) + throws IOException { + if (this.vfs == null) { + return super + .create(f, permission, cflags, bufferSize, replication, blockSize, + progress, checksumOpt); + } + ViewFileSystemOverloadScheme.MountPathInfo mountPathInfo = + this.vfs.getMountPathInfo(f, getConf()); + checkDFS(mountPathInfo.getTargetFs(), "create"); + return mountPathInfo.getTargetFs() + .create(mountPathInfo.getPathOnTarget(), permission, cflags, bufferSize, + replication, blockSize, progress, checksumOpt); + } + + void checkDFS(FileSystem fs, String methodName) { + if (!(fs instanceof DistributedFileSystem)) { + String msg = new StringBuilder("This API:").append(methodName) + .append(" is specific to DFS. Can't run on other fs:") + .append(fs.getUri()).toString(); + throw new UnsupportedOperationException(msg); + } + } + + void checkDefaultDFS(FileSystem fs, String methodName) { + if (fs == null) { + String msg = new StringBuilder("This API:").append(methodName).append( + " cannot be supported without default cluster(that is linkFallBack).") + .toString(); + throw new UnsupportedOperationException(msg); + } + } + + @Override + // DFS specific API + protected HdfsDataOutputStream primitiveCreate(Path f, + FsPermission absolutePermission, EnumSet flag, int bufferSize, + short replication, long blockSize, Progressable progress, + Options.ChecksumOpt checksumOpt) throws IOException { + if (this.vfs == null) { + return super + .primitiveCreate(f, absolutePermission, flag, bufferSize, replication, + blockSize, progress, checksumOpt); + } + ViewFileSystemOverloadScheme.MountPathInfo mountPathInfo = + this.vfs.getMountPathInfo(f, getConf()); + checkDFS(mountPathInfo.getTargetFs(), "primitiveCreate"); + return ((DistributedFileSystem) mountPathInfo.getTargetFs()) + .primitiveCreate(f, absolutePermission, flag, bufferSize, replication, + blockSize, progress, checksumOpt); + } + + @Override + public FSDataOutputStream createNonRecursive(Path f, FsPermission permission, + EnumSet flags, int bufferSize, short replication, + long blockSize, Progressable progress) throws IOException { + if (this.vfs == null) { + return super + .createNonRecursive(f, permission, flags, bufferSize, replication, + bufferSize, progress); + } + return this.vfs + .createNonRecursive(f, permission, flags, bufferSize, replication, + bufferSize, progress); + } + + @Override + public boolean setReplication(final Path f, final short replication) + throws AccessControlException, FileNotFoundException, IOException { + if (this.vfs == null) { + return super.setReplication(f, replication); + } + return this.vfs.setReplication(f, replication); + } + + @Override + public void setStoragePolicy(Path src, String policyName) throws IOException { + if (this.vfs == null) { + super.setStoragePolicy(src, policyName); + return; + } + this.vfs.setStoragePolicy(src, policyName); + } + + @Override + public void unsetStoragePolicy(Path src) throws IOException { + if (this.vfs == null) { + super.unsetStoragePolicy(src); + return; + } + this.vfs.unsetStoragePolicy(src); + } + + @Override + public BlockStoragePolicySpi getStoragePolicy(Path src) throws IOException { + if (this.vfs == null) { + return super.getStoragePolicy(src); + } + return this.vfs.getStoragePolicy(src); + } + + @Override + public Collection getAllStoragePolicies() + throws IOException { + if (this.vfs == null) { + return super.getAllStoragePolicies(); + } + Collection allStoragePolicies = + this.vfs.getAllStoragePolicies(); + return (Collection) allStoragePolicies; + } + + @Override + public long getBytesWithFutureGenerationStamps() throws IOException { + if (this.vfs == null) { + return super.getBytesWithFutureGenerationStamps(); + } + checkDefaultDFS(defaultDFS, "getBytesWithFutureGenerationStamps"); + return defaultDFS.getBytesWithFutureGenerationStamps(); + } + + @Deprecated + @Override + public BlockStoragePolicy[] getStoragePolicies() throws IOException { + if (this.vfs == null) { + return super.getStoragePolicies(); + } + checkDefaultDFS(defaultDFS, "getStoragePolicies"); + return defaultDFS.getStoragePolicies(); + } + + @Override + //Make sure your target fs supports this API, otherwise you will get + // Unsupported operation exception. + public void concat(Path trg, Path[] psrcs) throws IOException { + if (this.vfs == null) { + super.concat(trg, psrcs); + return; + } + ViewFileSystemOverloadScheme.MountPathInfo mountPathInfo = + this.vfs.getMountPathInfo(trg, getConf()); + mountPathInfo.getTargetFs().concat(mountPathInfo.getPathOnTarget(), psrcs); + } + + @SuppressWarnings("deprecation") + @Override + public boolean rename(final Path src, final Path dst) throws IOException { + if (this.vfs == null) { + return super.rename(src, dst); + } + return this.vfs.rename(src, dst); + } + + @SuppressWarnings("deprecation") + @Override + public void rename(Path src, Path dst, final Options.Rename... options) + throws IOException { + if (this.vfs == null) { + super.rename(src, dst, options); + return; + } + + // TODO: revisit + ViewFileSystemOverloadScheme.MountPathInfo mountSrcPathInfo = + this.vfs.getMountPathInfo(src, getConf()); + checkDFS(mountSrcPathInfo.getTargetFs(), "rename"); + + ViewFileSystemOverloadScheme.MountPathInfo mountDstPathInfo = + this.vfs.getMountPathInfo(src, getConf()); + checkDFS(mountDstPathInfo.getTargetFs(), "rename"); + + //Check both in same cluster. + if (!mountSrcPathInfo.getTargetFs().getUri() + .equals(mountDstPathInfo.getTargetFs().getUri())) { + throw new HadoopIllegalArgumentException( + "Can't rename across file systems."); + } + + ((DistributedFileSystem) mountSrcPathInfo.getTargetFs()) + .rename(mountSrcPathInfo.getPathOnTarget(), + mountDstPathInfo.getPathOnTarget(), options); + } + + @Override + public boolean truncate(final Path f, final long newLength) + throws IOException { + if (this.vfs == null) { + return super.truncate(f, newLength); + } + return this.vfs.truncate(f, newLength); + } + + public boolean delete(final Path f, final boolean recursive) + throws AccessControlException, FileNotFoundException, IOException { + if (this.vfs == null) { + return super.delete(f, recursive); + } + return this.vfs.delete(f, recursive); + } + + @Override + public ContentSummary getContentSummary(Path f) throws IOException { + if (this.vfs == null) { + return super.getContentSummary(f); + } + return this.vfs.getContentSummary(f); + } + + @Override + public QuotaUsage getQuotaUsage(Path f) throws IOException { + if (this.vfs == null) { + return super.getQuotaUsage(f); + } + return this.vfs.getQuotaUsage(f); + } + + @Override + public void setQuota(Path src, final long namespaceQuota, + final long storagespaceQuota) throws IOException { + if (this.vfs == null) { + super.setQuota(src, namespaceQuota, storagespaceQuota); + return; + } + ViewFileSystemOverloadScheme.MountPathInfo mountPathInfo = + this.vfs.getMountPathInfo(src, getConf()); + mountPathInfo.getTargetFs() + .setQuota(mountPathInfo.getPathOnTarget(), namespaceQuota, + storagespaceQuota); + } + + @Override + public void setQuotaByStorageType(Path src, final StorageType type, + final long quota) throws IOException { + if (this.vfs == null) { + super.setQuotaByStorageType(src, type, quota); + return; + } + ViewFileSystemOverloadScheme.MountPathInfo mountPathInfo = + this.vfs.getMountPathInfo(src, getConf()); + mountPathInfo.getTargetFs() + .setQuotaByStorageType(mountPathInfo.getPathOnTarget(), type, quota); + } + + @Override + public FileStatus[] listStatus(Path p) throws IOException { + if (this.vfs == null) { + return super.listStatus(p); + } + return this.vfs.listStatus(p); + } + + @Override + public RemoteIterator listLocatedStatus(final Path f, + final PathFilter filter) throws FileNotFoundException, IOException { + if (this.vfs == null) { + return super.listLocatedStatus(f, filter); + } + return this.vfs.listLocatedStatus(f, filter); + } + + @Override + public RemoteIterator listStatusIterator(final Path p) + throws IOException { + if (this.vfs == null) { + return super.listStatusIterator(p); + } + ViewFileSystemOverloadScheme.MountPathInfo mountPathInfo = + this.vfs.getMountPathInfo(p, getConf()); + return mountPathInfo.getTargetFs() + .listStatusIterator(mountPathInfo.getPathOnTarget()); + } + + @Override + public RemoteIterator> batchedListStatusIterator( + final List paths) throws IOException { + if (this.vfs == null) { + return super.batchedListStatusIterator(paths); + } + // TODO: revisit for correct implementation. + return this.defaultDFS.batchedListStatusIterator(paths); + } + + @Override + public RemoteIterator> batchedListLocatedStatusIterator( + final List paths) throws IOException { + if (this.vfs == null) { + return super.batchedListLocatedStatusIterator(paths); + } + // TODO: revisit for correct implementation. + return this.defaultDFS.batchedListLocatedStatusIterator(paths); + } + + public boolean mkdir(Path f, FsPermission permission) throws IOException { + if (this.vfs == null) { + return super.mkdir(f, permission); + } + ViewFileSystemOverloadScheme.MountPathInfo mountPathInfo = + this.vfs.getMountPathInfo(f, getConf()); + checkDFS(mountPathInfo.getTargetFs(), "mkdir"); + return ((DistributedFileSystem) mountPathInfo.getTargetFs()) + .mkdir(mountPathInfo.getPathOnTarget(), permission); + } + + @Override + public boolean mkdirs(Path f, FsPermission permission) throws IOException { + if (this.vfs == null) { + return super.mkdirs(f, permission); + } + return this.vfs.mkdirs(f, permission); + } + + @SuppressWarnings("deprecation") + @Override + protected boolean primitiveMkdir(Path f, FsPermission absolutePermission) + throws IOException { + if (this.vfs == null) { + return super.primitiveMkdir(f, absolutePermission); + } + ViewFileSystemOverloadScheme.MountPathInfo mountPathInfo = + this.vfs.getMountPathInfo(f, getConf()); + checkDFS(mountPathInfo.getTargetFs(), "primitiveMkdir"); + return ((DistributedFileSystem) mountPathInfo.getTargetFs()) + .primitiveMkdir(mountPathInfo.getPathOnTarget(), absolutePermission); + } + + @Override + public void close() throws IOException { + if (this.vfs != null) { + this.vfs.close(); + } + super.close(); + } + + @InterfaceAudience.Private + @Override + public DFSClient getClient() { + if (this.vfs == null) { + return super.getClient(); + } + checkDefaultDFS(defaultDFS, "getClient"); + return defaultDFS.getClient(); + } + + @Override + public FsStatus getStatus(Path p) throws IOException { + if (this.vfs == null) { + return super.getStatus(p); + } + return this.vfs.getStatus(p); + } + + @Override + public long getMissingBlocksCount() throws IOException { + if (this.vfs == null) { + return super.getMissingBlocksCount(); + } + checkDefaultDFS(defaultDFS, "getMissingBlocksCount"); + return defaultDFS.getMissingBlocksCount(); + } + + @Override + public long getPendingDeletionBlocksCount() throws IOException { + if (this.vfs == null) { + return super.getPendingDeletionBlocksCount(); + } + checkDefaultDFS(defaultDFS, "getPendingDeletionBlocksCount"); + return defaultDFS.getPendingDeletionBlocksCount(); + } + + @Override + public long getMissingReplOneBlocksCount() throws IOException { + if (this.vfs == null) { + return super.getMissingReplOneBlocksCount(); + } + checkDefaultDFS(defaultDFS, "getMissingReplOneBlocksCount"); + return defaultDFS.getMissingReplOneBlocksCount(); + } + + @Override + public long getLowRedundancyBlocksCount() throws IOException { + if (this.vfs == null) { + return super.getLowRedundancyBlocksCount(); + } + checkDefaultDFS(defaultDFS, "getLowRedundancyBlocksCount"); + return defaultDFS.getLowRedundancyBlocksCount(); + } + + @Override + public long getCorruptBlocksCount() throws IOException { + if (this.vfs == null) { + return super.getCorruptBlocksCount(); + } + checkDefaultDFS(defaultDFS, "getCorruptBlocksCount"); + return defaultDFS.getLowRedundancyBlocksCount(); + } + + @Override + public RemoteIterator listCorruptFileBlocks(final Path path) + throws IOException { + if (this.vfs == null) { + return super.listCorruptFileBlocks(path); + } + ViewFileSystemOverloadScheme.MountPathInfo mountPathInfo = + this.vfs.getMountPathInfo(path, getConf()); + return mountPathInfo.getTargetFs() + .listCorruptFileBlocks(mountPathInfo.getPathOnTarget()); + } + + @Override + public DatanodeInfo[] getDataNodeStats() throws IOException { + if (this.vfs == null) { + return super.getDataNodeStats(); + } + checkDefaultDFS(defaultDFS, "getDataNodeStats"); + return defaultDFS.getDataNodeStats(); + } + + @Override + public DatanodeInfo[] getDataNodeStats( + final HdfsConstants.DatanodeReportType type) throws IOException { + if (this.vfs == null) { + return super.getDataNodeStats(type); + } + checkDefaultDFS(defaultDFS, "getDataNodeStats"); + return defaultDFS.getDataNodeStats(type); + } + + @Override + public boolean setSafeMode(HdfsConstants.SafeModeAction action) + throws IOException { + if (this.vfs == null) { + return super.setSafeMode(action); + } + checkDefaultDFS(defaultDFS, "setSafeMode"); + return defaultDFS.setSafeMode(action); + } + + @Override + public boolean setSafeMode(HdfsConstants.SafeModeAction action, + boolean isChecked) throws IOException { + if (this.vfs == null) { + return super.setSafeMode(action, isChecked); + } + checkDefaultDFS(defaultDFS, "setSafeMode"); + return defaultDFS.setSafeMode(action, isChecked); + } + + @Override + public boolean saveNamespace(long timeWindow, long txGap) throws IOException { + if (this.vfs == null) { + return super.saveNamespace(timeWindow, txGap); + } + checkDefaultDFS(defaultDFS, "saveNamespace"); + return defaultDFS.saveNamespace(timeWindow, txGap); + } + + @Override + public void saveNamespace() throws IOException { + if (this.vfs == null) { + super.saveNamespace(); + return; + } + checkDefaultDFS(defaultDFS, "saveNamespace"); + defaultDFS.saveNamespace(); + } + + @Override + public long rollEdits() throws IOException { + if (this.vfs == null) { + return super.rollEdits(); + } + checkDefaultDFS(defaultDFS, "rollEdits"); + return defaultDFS.rollEdits(); + } + + @Override + public boolean restoreFailedStorage(String arg) throws IOException { + if (this.vfs == null) { + return super.restoreFailedStorage(arg); + } + checkDefaultDFS(defaultDFS, "restoreFailedStorage"); + return defaultDFS.restoreFailedStorage(arg); + } + + @Override + public void refreshNodes() throws IOException { + if (this.vfs == null) { + super.refreshNodes(); + return; + } + checkDefaultDFS(defaultDFS, "refreshNodes"); + defaultDFS.refreshNodes(); + } + + @Override + public void finalizeUpgrade() throws IOException { + if (this.vfs == null) { + super.finalizeUpgrade(); + return; + } + checkDefaultDFS(defaultDFS, "finalizeUpgrade"); + defaultDFS.finalizeUpgrade(); + } + + @Override + public boolean upgradeStatus() throws IOException { + if (this.vfs == null) { + return super.upgradeStatus(); + } + checkDefaultDFS(defaultDFS, "upgradeStatus"); + return defaultDFS.upgradeStatus(); + } + + @Override + public RollingUpgradeInfo rollingUpgrade( + HdfsConstants.RollingUpgradeAction action) throws IOException { + if (this.vfs == null) { + return super.rollingUpgrade(action); + } + checkDefaultDFS(defaultDFS, "rollingUpgrade"); + return defaultDFS.rollingUpgrade(action); + } + + @Override + public void metaSave(String pathname) throws IOException { + if (this.vfs == null) { + super.metaSave(pathname); + return; + } + checkDefaultDFS(defaultDFS, "metaSave"); + defaultDFS.metaSave(pathname); + } + + @Override + public FsServerDefaults getServerDefaults() throws IOException { + if (this.vfs == null) { + return super.getServerDefaults(); + } + checkDefaultDFS(defaultDFS, "getServerDefaults"); + //TODO: Need to revisit. + return defaultDFS.getServerDefaults(); + } + + @Override + public FileStatus getFileStatus(final Path f) + throws AccessControlException, FileNotFoundException, IOException { + if (this.vfs == null) { + return super.getFileStatus(f); + } + return this.vfs.getFileStatus(f); + } + + @SuppressWarnings("deprecation") + @Override + public void createSymlink(final Path target, final Path link, + final boolean createParent) throws IOException { + // Regular DFS behavior + if (this.vfs == null) { + super.createSymlink(target, link, createParent); + return; + } + + throw new UnsupportedOperationException( + "createSymlink is not supported in ViewHDFS"); + } + + @Override + public boolean supportsSymlinks() { + if (this.vfs == null) { + return super.supportsSymlinks(); + } + // we can enabled later if we want to support symlinks. + return false; + } + + @Override + public FileStatus getFileLinkStatus(final Path f) throws IOException { + if (this.vfs == null) { + return super.getFileLinkStatus(f); + } + ViewFileSystemOverloadScheme.MountPathInfo mountPathInfo = + this.vfs.getMountPathInfo(f, getConf()); + return mountPathInfo.getTargetFs() + .getFileLinkStatus(mountPathInfo.getPathOnTarget()); + } + + @Override + public Path getLinkTarget(Path path) throws IOException { + if(this.vfs==null){ + return super.getLinkTarget(path); + } + return this.vfs.getLinkTarget(path); + } + + @Override + protected Path resolveLink(Path f) throws IOException { + if(this.vfs==null){ + return super.resolveLink(f); + } + ViewFileSystemOverloadScheme.MountPathInfo mountPathInfo = + this.vfs.getMountPathInfo(f, getConf()); + checkDFS(mountPathInfo.getTargetFs(), "resolveLink"); + return ((DistributedFileSystem) mountPathInfo.getTargetFs()) + .resolveLink(mountPathInfo.getPathOnTarget()); + } + + @Override + public FileChecksum getFileChecksum(final Path f) + throws AccessControlException, FileNotFoundException, IOException { + if (this.vfs == null) { + return super.getFileChecksum(f); + } + return this.vfs.getFileChecksum(f); + } + + @Override + public void setPermission(final Path f, final FsPermission permission) + throws AccessControlException, FileNotFoundException, IOException { + if (this.vfs == null) { + super.setPermission(f, permission); + return; + } + this.vfs.setPermission(f, permission); + } + + @Override + public void setOwner(final Path f, final String username, + final String groupname) + throws AccessControlException, FileNotFoundException, IOException { + if (this.vfs == null) { + super.setOwner(f, username, groupname); + return; + } + this.vfs.setOwner(f, username, groupname); + } + + @Override + public void setTimes(final Path f, final long mtime, final long atime) + throws AccessControlException, FileNotFoundException, IOException { + if (this.vfs == null) { + super.setTimes(f, mtime, atime); + return; + } + this.vfs.setTimes(f, mtime, atime); + } + + @Override + // DFS specific API + protected int getDefaultPort() { + return super.getDefaultPort(); + } + + @Override + public Token getDelegationToken(String renewer) + throws IOException { + if (this.vfs == null) { + return super.getDelegationToken(renewer); + } + //Let applications call getDelegationTokenIssuers and get respective + // delegation tokens from child fs. + throw new UnsupportedOperationException(); + } + + @Override + public void setBalancerBandwidth(long bandwidth) throws IOException { + if (this.vfs == null) { + super.setBalancerBandwidth(bandwidth); + return; + } + checkDefaultDFS(defaultDFS, "setBalancerBandwidth"); + defaultDFS.setBalancerBandwidth(bandwidth); + } + + @Override + public String getCanonicalServiceName() { + if (this.vfs == null) { + return super.getCanonicalServiceName(); + } + checkDefaultDFS(defaultDFS, "getCanonicalServiceName"); + return defaultDFS.getCanonicalServiceName(); + } + + @Override + protected URI canonicalizeUri(URI uri) { + if (this.vfs == null) { + return super.canonicalizeUri(uri); + } + + ViewFileSystemOverloadScheme.MountPathInfo mountPathInfo = null; + try { + mountPathInfo = this.vfs.getMountPathInfo(new Path(uri), getConf()); + } catch (IOException e) { + LOGGER.warn("Failed to resolve the uri as mount path", e); + return null; + } + checkDFS(mountPathInfo.getTargetFs(), "canonicalizeUri"); + return ((DistributedFileSystem) mountPathInfo.getTargetFs()) + .canonicalizeUri(uri); + } + + @Override + public boolean isInSafeMode() throws IOException { + if (this.vfs == null) { + return super.isInSafeMode(); + } + checkDefaultDFS(defaultDFS, "isInSafeMode"); + return defaultDFS.isInSafeMode(); + } + + @Override + // DFS specific API + public void allowSnapshot(Path path) throws IOException { + if (this.vfs == null) { + super.allowSnapshot(path); + return; + } + ViewFileSystemOverloadScheme.MountPathInfo mountPathInfo = + this.vfs.getMountPathInfo(path, getConf()); + checkDFS(mountPathInfo.getTargetFs(), "allowSnapshot"); + ((DistributedFileSystem) mountPathInfo.getTargetFs()) + .allowSnapshot(mountPathInfo.getPathOnTarget()); + } + + @Override + public void disallowSnapshot(final Path path) throws IOException { + if (this.vfs == null) { + super.disallowSnapshot(path); + return; + } + ViewFileSystemOverloadScheme.MountPathInfo mountPathInfo = + this.vfs.getMountPathInfo(path, getConf()); + checkDFS(mountPathInfo.getTargetFs(), "disallowSnapshot"); + ((DistributedFileSystem) mountPathInfo.getTargetFs()) + .disallowSnapshot(mountPathInfo.getPathOnTarget()); + } + + @Override + public Path createSnapshot(Path path, String snapshotName) + throws IOException { + if (this.vfs == null) { + return super.createSnapshot(path, snapshotName); + } + return this.vfs.createSnapshot(path, snapshotName); + } + + @Override + public void renameSnapshot(Path path, String snapshotOldName, + String snapshotNewName) throws IOException { + if (this.vfs == null) { + super.renameSnapshot(path, snapshotOldName, snapshotNewName); + return; + } + this.vfs.renameSnapshot(path, snapshotOldName, snapshotNewName); + } + + @Override + //Ony for HDFS users + public SnapshottableDirectoryStatus[] getSnapshottableDirListing() + throws IOException { + if (this.vfs == null) { + return super.getSnapshottableDirListing(); + } + checkDefaultDFS(defaultDFS, "getSnapshottableDirListing"); + return defaultDFS.getSnapshottableDirListing(); + } + + @Override + public void deleteSnapshot(Path path, String snapshotName) + throws IOException { + if (this.vfs == null) { + super.deleteSnapshot(path, snapshotName); + return; + } + this.vfs.deleteSnapshot(path, snapshotName); + } + + @Override + public RemoteIterator snapshotDiffReportListingRemoteIterator( + final Path snapshotDir, final String fromSnapshot, + final String toSnapshot) throws IOException { + if (this.vfs == null) { + return super + .snapshotDiffReportListingRemoteIterator(snapshotDir, fromSnapshot, + toSnapshot); + } + ViewFileSystemOverloadScheme.MountPathInfo mountPathInfo = + this.vfs.getMountPathInfo(snapshotDir, getConf()); + checkDFS(mountPathInfo.getTargetFs(), + "snapshotDiffReportListingRemoteIterator"); + return ((DistributedFileSystem) mountPathInfo.getTargetFs()) + .snapshotDiffReportListingRemoteIterator( + mountPathInfo.getPathOnTarget(), fromSnapshot, toSnapshot); + } + + @Override + public SnapshotDiffReport getSnapshotDiffReport(final Path snapshotDir, + final String fromSnapshot, final String toSnapshot) throws IOException { + if (this.vfs == null) { + return super.getSnapshotDiffReport(snapshotDir, fromSnapshot, toSnapshot); + } + ViewFileSystemOverloadScheme.MountPathInfo mountPathInfo = + this.vfs.getMountPathInfo(snapshotDir, getConf()); + checkDFS(mountPathInfo.getTargetFs(), "getSnapshotDiffReport"); + return ((DistributedFileSystem) mountPathInfo.getTargetFs()) + .getSnapshotDiffReport(mountPathInfo.getPathOnTarget(), fromSnapshot, + toSnapshot); + } + + @Override + public boolean isFileClosed(final Path src) throws IOException { + if (this.vfs == null) { + return super.isFileClosed(src); + } + ViewFileSystemOverloadScheme.MountPathInfo mountPathInfo = + this.vfs.getMountPathInfo(src, getConf()); + checkDFS(mountPathInfo.getTargetFs(), "isFileClosed"); + return ((DistributedFileSystem) mountPathInfo.getTargetFs()) + .isFileClosed(mountPathInfo.getPathOnTarget()); + } + + @Override + public long addCacheDirective(CacheDirectiveInfo info) throws IOException { + if (this.vfs == null) { + return super.addCacheDirective(info); + } + ViewFileSystemOverloadScheme.MountPathInfo mountPathInfo = + this.vfs.getMountPathInfo(info.getPath(), getConf()); + checkDFS(mountPathInfo.getTargetFs(), "addCacheDirective"); + + return ((DistributedFileSystem) mountPathInfo.getTargetFs()) + .addCacheDirective(new CacheDirectiveInfo.Builder(info) + .setPath(mountPathInfo.getPathOnTarget()).build()); + } + + @Override + public long addCacheDirective(CacheDirectiveInfo info, + EnumSet flags) throws IOException { + if (this.vfs == null) { + return super.addCacheDirective(info, flags); + } + ViewFileSystemOverloadScheme.MountPathInfo mountPathInfo = + this.vfs.getMountPathInfo(info.getPath(), getConf()); + checkDFS(mountPathInfo.getTargetFs(), "addCacheDirective"); + + return ((DistributedFileSystem) mountPathInfo.getTargetFs()) + .addCacheDirective(new CacheDirectiveInfo.Builder(info) + .setPath(mountPathInfo.getPathOnTarget()).build(), flags); + } + + @Override + public void modifyCacheDirective(CacheDirectiveInfo info) throws IOException { + if (this.vfs == null) { + super.modifyCacheDirective(info); + return; + } + if (info.getPath() != null) { + ViewFileSystemOverloadScheme.MountPathInfo mountPathInfo = + this.vfs.getMountPathInfo(info.getPath(), getConf()); + checkDFS(mountPathInfo.getTargetFs(), "modifyCacheDirective"); + ((DistributedFileSystem) mountPathInfo.getTargetFs()) + .modifyCacheDirective(new CacheDirectiveInfo.Builder(info) + .setPath(mountPathInfo.getPathOnTarget()).build()); + return; + } + + // No path available in CacheDirectiveInfo, Let's shoot to all child fs. + List failedExceptions = new ArrayList<>(); + boolean isDFSExistsInChilds = false; + + for (FileSystem fs : getChildFileSystems()) { + if (!(fs instanceof DistributedFileSystem)) { + continue; + } + isDFSExistsInChilds = true; + DistributedFileSystem dfs = (DistributedFileSystem) fs; + try { + dfs.modifyCacheDirective(info); + } catch (IOException ioe) { + failedExceptions.add(ioe); + } + } + if (!isDFSExistsInChilds) { + throw new UnsupportedOperationException( + "No DFS available in child file systems."); + } + if (failedExceptions.size() > 0) { + throw MultipleIOException.createIOException(failedExceptions); + } + } + + @Override + public void modifyCacheDirective(CacheDirectiveInfo info, + EnumSet flags) throws IOException { + if (this.vfs == null) { + super.modifyCacheDirective(info, flags); + return; + } + if (info.getPath() != null) { + ViewFileSystemOverloadScheme.MountPathInfo mountPathInfo = + this.vfs.getMountPathInfo(info.getPath(), getConf()); + checkDFS(mountPathInfo.getTargetFs(), "modifyCacheDirective"); + ((DistributedFileSystem) mountPathInfo.getTargetFs()) + .modifyCacheDirective(new CacheDirectiveInfo.Builder(info) + .setPath(mountPathInfo.getPathOnTarget()).build(), flags); + return; + } + // No path available in CacheDirectiveInfo, Let's shoot to all child fs. + List failedExceptions = new ArrayList<>(); + boolean isDFSExistsInChilds = false; + for (FileSystem fs : getChildFileSystems()) { + if (!(fs instanceof DistributedFileSystem)) { + continue; + } + isDFSExistsInChilds = true; + DistributedFileSystem dfs = (DistributedFileSystem) fs; + try { + dfs.modifyCacheDirective(info, flags); + } catch (IOException ioe) { + failedExceptions.add(ioe); + } + } + if (!isDFSExistsInChilds) { + throw new UnsupportedOperationException( + "No DFS available in child file systems."); + } + if (failedExceptions.size() > 0) { + throw MultipleIOException.createIOException(failedExceptions); + } + } + + @Override + public void removeCacheDirective(long id) throws IOException { + if (this.vfs == null) { + super.removeCacheDirective(id); + return; + } + List failedExceptions = new ArrayList<>(); + boolean isDFSExistsInChilds = false; + + for (FileSystem fs : getChildFileSystems()) { + if (!(fs instanceof DistributedFileSystem)) { + continue; + } + isDFSExistsInChilds = true; + DistributedFileSystem dfs = (DistributedFileSystem) fs; + try { + dfs.removeCacheDirective(id); + } catch (IOException ioe) { + failedExceptions.add(ioe); + } + } + if (!isDFSExistsInChilds) { + throw new UnsupportedOperationException( + "No DFS available in child file systems."); + } + if (failedExceptions.size() > 0) { + throw MultipleIOException.createIOException(failedExceptions); + } + } + + @Override + public RemoteIterator listCacheDirectives( + CacheDirectiveInfo filter) throws IOException { + if (this.vfs == null) { + return super.listCacheDirectives(filter); + } + + if (filter != null && filter.getPath() != null) { + ViewFileSystemOverloadScheme.MountPathInfo mountPathInfo = + this.vfs.getMountPathInfo(filter.getPath(), getConf()); + checkDFS(mountPathInfo.getTargetFs(), "listCacheDirectives"); + return ((DistributedFileSystem) mountPathInfo.getTargetFs()) + .listCacheDirectives(new CacheDirectiveInfo.Builder(filter) + .setPath(mountPathInfo.getPathOnTarget()).build()); + } + + // No path available in filter. Let's try to shoot to all child fs. + final List> iters = new ArrayList<>(); + for (FileSystem fs : getChildFileSystems()) { + if (fs instanceof DistributedFileSystem) { + iters.add(((DistributedFileSystem) fs).listCacheDirectives(filter)); + } + } + if (iters.size() == 0) { + throw new UnsupportedOperationException( + "No DFS found in child fs. This API can't be supported in non DFS"); + } + + return new RemoteIterator() { + int currIdx = 0; + RemoteIterator currIter = iters.get(currIdx++); + + @Override + public boolean hasNext() throws IOException { + if (currIter.hasNext()) { + return true; + } + while (currIdx < iters.size()) { + currIter = iters.get(currIdx++); + if (currIter.hasNext()) { + return true; + } + } + return false; + } + + @Override + public CacheDirectiveEntry next() throws IOException { + if (hasNext()) { + return currIter.next(); + } + throw new NoSuchElementException("No more elements"); + } + }; + } + + //Currently Cache pool APIs supported only in default cluster. + @Override + public void addCachePool(CachePoolInfo info) throws IOException { + if (this.vfs == null) { + super.addCachePool(info); + return; + } + List failedExceptions = new ArrayList<>(); + boolean isDFSExistsInChilds = false; + + for (FileSystem fs : getChildFileSystems()) { + if (!(fs instanceof DistributedFileSystem)) { + continue; + } + isDFSExistsInChilds = true; + DistributedFileSystem dfs = (DistributedFileSystem) fs; + try { + dfs.addCachePool(info); + } catch (IOException ioe) { + failedExceptions.add(ioe); + } + } + if (!isDFSExistsInChilds) { + throw new UnsupportedOperationException( + "No DFS available in child file systems."); + } + if (failedExceptions.size() > 0) { + throw MultipleIOException.createIOException(failedExceptions); + } + } + + @Override + public void modifyCachePool(CachePoolInfo info) throws IOException { + if (this.vfs == null) { + super.modifyCachePool(info); + return; + } + List failedExceptions = new ArrayList<>(); + boolean isDFSExistsInChilds = false; + + for (FileSystem fs : getChildFileSystems()) { + if (!(fs instanceof DistributedFileSystem)) { + continue; + } + isDFSExistsInChilds = true; + DistributedFileSystem dfs = (DistributedFileSystem) fs; + try { + dfs.modifyCachePool(info); + } catch (IOException ioe) { + failedExceptions.add(ioe); + } + } + if (!isDFSExistsInChilds) { + throw new UnsupportedOperationException( + "No DFS available in child file systems."); + } + if (failedExceptions.size() > 0) { + throw MultipleIOException.createIOException(failedExceptions); + } + } + + @Override + public void removeCachePool(String poolName) throws IOException { + if (this.vfs == null) { + super.removeCachePool(poolName); + return; + } + List failedExceptions = new ArrayList<>(); + boolean isDFSExistsInChilds = false; + + for (FileSystem fs : getChildFileSystems()) { + if (!(fs instanceof DistributedFileSystem)) { + continue; + } + isDFSExistsInChilds = true; + DistributedFileSystem dfs = (DistributedFileSystem) fs; + try { + dfs.removeCachePool(poolName); + } catch (IOException ioe) { + failedExceptions.add(ioe); + } + } + if (!isDFSExistsInChilds) { + throw new UnsupportedOperationException( + "No DFS available in child file systems."); + } + if (failedExceptions.size() > 0) { + throw MultipleIOException.createIOException(failedExceptions); + } + } + + @Override + public RemoteIterator listCachePools() throws IOException { + if (this.vfs == null) { + return super.listCachePools(); + } + + List childDFSs = new ArrayList<>(); + for (FileSystem fs : getChildFileSystems()) { + if (fs instanceof DistributedFileSystem) { + childDFSs.add((DistributedFileSystem) fs); + } + } + if (childDFSs.size() == 0) { + throw new UnsupportedOperationException( + "No DFS found in child fs. This API can't be supported in non DFS"); + } + return new RemoteIterator() { + int curDfsIdx = 0; + RemoteIterator currIter = + childDFSs.get(curDfsIdx++).listCachePools(); + + @Override + public boolean hasNext() throws IOException { + if (currIter.hasNext()) { + return true; + } + while (curDfsIdx < childDFSs.size()) { + currIter = childDFSs.get(curDfsIdx++).listCachePools(); + if (currIter.hasNext()) { + return true; + } + } + return false; + } + + @Override + public CachePoolEntry next() throws IOException { + if (hasNext()) { + return currIter.next(); + } + throw new java.util.NoSuchElementException("No more entries"); + } + }; + } + + @Override + public void modifyAclEntries(Path path, List aclSpec) + throws IOException { + if (this.vfs == null) { + super.modifyAclEntries(path, aclSpec); + return; + } + this.vfs.modifyAclEntries(path, aclSpec); + } + + @Override + public void removeAclEntries(Path path, List aclSpec) + throws IOException { + if (this.vfs == null) { + super.removeAclEntries(path, aclSpec); + return; + } + this.vfs.removeAclEntries(path, aclSpec); + } + + @Override + public void removeDefaultAcl(Path path) throws IOException { + if (this.vfs == null) { + super.removeDefaultAcl(path); + return; + } + this.vfs.removeDefaultAcl(path); + } + + @Override + public void removeAcl(Path path) throws IOException { + if (this.vfs == null) { + super.removeAcl(path); + return; + } + this.vfs.removeAcl(path); + } + + @Override + public void setAcl(Path path, List aclSpec) throws IOException { + if (this.vfs == null) { + super.setAcl(path, aclSpec); + return; + } + this.vfs.setAcl(path, aclSpec); + } + + @Override + public AclStatus getAclStatus(Path path) throws IOException { + if (this.vfs == null) { + return super.getAclStatus(path); + } + return this.vfs.getAclStatus(path); + } + + @Override + public void createEncryptionZone(final Path path, final String keyName) + throws IOException { + if (this.vfs == null) { + super.createEncryptionZone(path, keyName); + return; + } + ViewFileSystemOverloadScheme.MountPathInfo mountPathInfo = + this.vfs.getMountPathInfo(path, getConf()); + checkDFS(mountPathInfo.getTargetFs(), "createEncryptionZone"); + ((DistributedFileSystem) mountPathInfo.getTargetFs()) + .createEncryptionZone(mountPathInfo.getPathOnTarget(), keyName); + } + + @Override + public EncryptionZone getEZForPath(final Path path) throws IOException { + if (this.vfs == null) { + return super.getEZForPath(path); + } + ViewFileSystemOverloadScheme.MountPathInfo mountPathInfo = + this.vfs.getMountPathInfo(path, getConf()); + checkDFS(mountPathInfo.getTargetFs(), "getEZForPath"); + return ((DistributedFileSystem) mountPathInfo.getTargetFs()) + .getEZForPath(mountPathInfo.getPathOnTarget()); + } + + /** + * Returns the results from default DFS (fallback). If you want the results + * from specific clusters, please invoke them on child fs instance directly. + */ + @Override + public RemoteIterator listEncryptionZones() + throws IOException { + if (this.vfs == null) { + return super.listEncryptionZones(); + } + checkDefaultDFS(defaultDFS, "listEncryptionZones"); + return defaultDFS.listEncryptionZones(); + } + + @Override + public void reencryptEncryptionZone(final Path zone, + final HdfsConstants.ReencryptAction action) throws IOException { + if (this.vfs == null) { + super.reencryptEncryptionZone(zone, action); + return; + } + ViewFileSystemOverloadScheme.MountPathInfo mountPathInfo = + this.vfs.getMountPathInfo(zone, getConf()); + checkDFS(mountPathInfo.getTargetFs(), "reencryptEncryptionZone"); + ((DistributedFileSystem) mountPathInfo.getTargetFs()) + .reencryptEncryptionZone(mountPathInfo.getPathOnTarget(), action); + } + + /** + * Returns the results from default DFS (fallback). If you want the results + * from specific clusters, please invoke them on child fs instance directly. + */ + @Override + public RemoteIterator listReencryptionStatus() + throws IOException { + if (this.vfs == null) { + return super.listReencryptionStatus(); + } + checkDefaultDFS(defaultDFS, "listReencryptionStatus"); + return defaultDFS.listReencryptionStatus(); + } + + @Override + public FileEncryptionInfo getFileEncryptionInfo(final Path path) + throws IOException { + if (this.vfs == null) { + return super.getFileEncryptionInfo(path); + } + ViewFileSystemOverloadScheme.MountPathInfo mountPathInfo = + this.vfs.getMountPathInfo(path, getConf()); + checkDFS(mountPathInfo.getTargetFs(), "getFileEncryptionInfo"); + return ((DistributedFileSystem) mountPathInfo.getTargetFs()) + .getFileEncryptionInfo(mountPathInfo.getPathOnTarget()); + } + + @Override + public void provisionEZTrash(final Path path, + final FsPermission trashPermission) throws IOException { + if (this.vfs == null) { + super.provisionEZTrash(path, trashPermission); + return; + } + ViewFileSystemOverloadScheme.MountPathInfo mountPathInfo = + this.vfs.getMountPathInfo(path, getConf()); + checkDFS(mountPathInfo.getTargetFs(), "provisionEZTrash"); + ((DistributedFileSystem) mountPathInfo.getTargetFs()) + .provisionEZTrash(mountPathInfo.getPathOnTarget(), trashPermission); + } + + @Override + public void setXAttr(Path path, String name, byte[] value, + EnumSet flag) throws IOException { + if (this.vfs == null) { + super.setXAttr(path, name, value, flag); + return; + } + ViewFileSystemOverloadScheme.MountPathInfo mountPathInfo = + this.vfs.getMountPathInfo(path, getConf()); + mountPathInfo.getTargetFs() + .setXAttr(mountPathInfo.getPathOnTarget(), name, value, flag); + } + + @Override + public byte[] getXAttr(Path path, String name) throws IOException { + if (this.vfs == null) { + return super.getXAttr(path, name); + } + ViewFileSystemOverloadScheme.MountPathInfo mountPathInfo = + this.vfs.getMountPathInfo(path, getConf()); + return mountPathInfo.getTargetFs() + .getXAttr(mountPathInfo.getPathOnTarget(), name); + } + + @Override + public Map getXAttrs(Path path) throws IOException { + if (this.vfs == null) { + return super.getXAttrs(path); + } + ViewFileSystemOverloadScheme.MountPathInfo mountPathInfo = + this.vfs.getMountPathInfo(path, getConf()); + return mountPathInfo.getTargetFs() + .getXAttrs(mountPathInfo.getPathOnTarget()); + } + + @Override + public Map getXAttrs(Path path, List names) + throws IOException { + if (this.vfs == null) { + return super.getXAttrs(path, names); + } + ViewFileSystemOverloadScheme.MountPathInfo mountPathInfo = + this.vfs.getMountPathInfo(path, getConf()); + return mountPathInfo.getTargetFs() + .getXAttrs(mountPathInfo.getPathOnTarget(), names); + } + + @Override + public List listXAttrs(Path path) throws IOException { + if (this.vfs == null) { + return super.listXAttrs(path); + } + ViewFileSystemOverloadScheme.MountPathInfo mountPathInfo = + this.vfs.getMountPathInfo(path, getConf()); + return mountPathInfo.getTargetFs() + .listXAttrs(mountPathInfo.getPathOnTarget()); + } + + @Override + public void removeXAttr(Path path, String name) throws IOException { + if (this.vfs == null) { + super.removeXAttr(path, name); + return; + } + ViewFileSystemOverloadScheme.MountPathInfo mountPathInfo = + this.vfs.getMountPathInfo(path, getConf()); + mountPathInfo.getTargetFs() + .removeXAttr(mountPathInfo.getPathOnTarget(), name); + } + + @Override + public void access(Path path, FsAction mode) + throws AccessControlException, FileNotFoundException, IOException { + if (this.vfs == null) { + super.access(path, mode); + return; + } + this.vfs.access(path, mode); + } + + @Override + public URI getKeyProviderUri() throws IOException { + if (this.vfs == null) { + return super.getKeyProviderUri(); + } + checkDefaultDFS(defaultDFS, "getKeyProviderUri"); + return defaultDFS.getKeyProviderUri(); + } + + @Override + public KeyProvider getKeyProvider() throws IOException { + if (this.vfs == null) { + return super.getKeyProvider(); + } + checkDefaultDFS(defaultDFS, "getKeyProvider"); + return defaultDFS.getKeyProvider(); + } + + @Override + public DelegationTokenIssuer[] getAdditionalTokenIssuers() + throws IOException { + if (this.vfs == null) { + return super.getChildFileSystems(); + } + + return this.vfs.getChildFileSystems(); + } + + @Override + public DFSInotifyEventInputStream getInotifyEventStream() throws IOException { + if (this.vfs == null) { + return super.getInotifyEventStream(); + } + checkDefaultDFS(defaultDFS, "getInotifyEventStream"); + return defaultDFS.getInotifyEventStream(); + } + + @Override + public DFSInotifyEventInputStream getInotifyEventStream(long lastReadTxid) + throws IOException { + if (this.vfs == null) { + return super.getInotifyEventStream(); + } + checkDefaultDFS(defaultDFS, "getInotifyEventStream"); + return defaultDFS.getInotifyEventStream(); + } + + @Override + // DFS only API. + public void setErasureCodingPolicy(final Path path, final String ecPolicyName) + throws IOException { + if (this.vfs == null) { + super.setErasureCodingPolicy(path, ecPolicyName); + return; + } + + ViewFileSystemOverloadScheme.MountPathInfo mountPathInfo = + this.vfs.getMountPathInfo(path, getConf()); + checkDFS(mountPathInfo.getTargetFs(), "setErasureCodingPolicy"); + ((DistributedFileSystem) mountPathInfo.getTargetFs()) + .setErasureCodingPolicy(mountPathInfo.getPathOnTarget(), ecPolicyName); + } + + @Override + public void satisfyStoragePolicy(Path src) throws IOException { + if (this.vfs == null) { + super.satisfyStoragePolicy(src); + return; + } + this.vfs.satisfyStoragePolicy(src); + } + + @Override + public ErasureCodingPolicy getErasureCodingPolicy(final Path path) + throws IOException { + if (this.vfs == null) { + return super.getErasureCodingPolicy(path); + } + + ViewFileSystemOverloadScheme.MountPathInfo mountPathInfo = + this.vfs.getMountPathInfo(path, getConf()); + checkDFS(mountPathInfo.getTargetFs(), "getErasureCodingPolicy"); + return ((DistributedFileSystem) mountPathInfo.getTargetFs()) + .getErasureCodingPolicy(mountPathInfo.getPathOnTarget()); + } + + /** + * Gets all erasure coding policies from all available child file systems. + */ + @Override + public Collection getAllErasureCodingPolicies() + throws IOException { + if (this.vfs == null) { + return super.getAllErasureCodingPolicies(); + } + FileSystem[] childFss = getChildFileSystems(); + List results = new ArrayList<>(); + List failedExceptions = new ArrayList<>(); + boolean isDFSExistsInChilds = false; + for (FileSystem fs : childFss) { + if (!(fs instanceof DistributedFileSystem)) { + continue; + } + isDFSExistsInChilds = true; + DistributedFileSystem dfs = (DistributedFileSystem) fs; + try { + results.addAll(dfs.getAllErasureCodingPolicies()); + } catch (IOException ioe) { + failedExceptions.add(ioe); + } + } + + if (!isDFSExistsInChilds) { + throw new UnsupportedOperationException( + "No DFS available in child file systems."); + } + if (failedExceptions.size() > 0) { + throw MultipleIOException.createIOException(failedExceptions); + } + return results; + } + + @Override + public Map getAllErasureCodingCodecs() throws IOException { + if (this.vfs == null) { + return super.getAllErasureCodingCodecs(); + } + FileSystem[] childFss = getChildFileSystems(); + Map results = new HashMap<>(); + List failedExceptions = new ArrayList<>(); + boolean isDFSExistsInChilds = false; + for (FileSystem fs : childFss) { + if (!(fs instanceof DistributedFileSystem)) { + continue; + } + isDFSExistsInChilds = true; + DistributedFileSystem dfs = (DistributedFileSystem) fs; + try { + results.putAll(dfs.getAllErasureCodingCodecs()); + } catch (IOException ioe) { + failedExceptions.add(ioe); + } + } + if (!isDFSExistsInChilds) { + throw new UnsupportedOperationException( + "No DFS available in child file systems."); + } + if (failedExceptions.size() > 0) { + throw MultipleIOException.createIOException(failedExceptions); + } + return results; + } + + @Override + public AddErasureCodingPolicyResponse[] addErasureCodingPolicies( + ErasureCodingPolicy[] policies) throws IOException { + if (this.vfs == null) { + return super.addErasureCodingPolicies(policies); + } + List failedExceptions = new ArrayList<>(); + List results = new ArrayList<>(); + boolean isDFSExistsInChilds = false; + for (FileSystem fs : getChildFileSystems()) { + if (!(fs instanceof DistributedFileSystem)) { + continue; + } + isDFSExistsInChilds = true; + DistributedFileSystem dfs = (DistributedFileSystem) fs; + try { + results.addAll(Arrays.asList(dfs.addErasureCodingPolicies(policies))); + } catch (IOException ioe) { + failedExceptions.add(ioe); + } + } + if (!isDFSExistsInChilds) { + throw new UnsupportedOperationException( + "No DFS available in child file systems."); + } + if (failedExceptions.size() > 0) { + throw MultipleIOException.createIOException(failedExceptions); + } + return results.toArray(new AddErasureCodingPolicyResponse[results.size()]); + } + + @Override + public void removeErasureCodingPolicy(String ecPolicyName) + throws IOException { + if (this.vfs == null) { + super.removeErasureCodingPolicy(ecPolicyName); + return; + } + + List failedExceptions = new ArrayList<>(); + boolean isDFSExistsInChilds = false; + + for (FileSystem fs : getChildFileSystems()) { + if (!(fs instanceof DistributedFileSystem)) { + continue; + } + isDFSExistsInChilds = true; + DistributedFileSystem dfs = (DistributedFileSystem) fs; + try { + dfs.removeErasureCodingPolicy(ecPolicyName); + } catch (IOException ioe) { + failedExceptions.add(ioe); + } + } + if (!isDFSExistsInChilds) { + throw new UnsupportedOperationException( + "No DFS available in child file systems."); + } + if (failedExceptions.size() > 0) { + throw MultipleIOException.createIOException(failedExceptions); + } + } + + @Override + public void enableErasureCodingPolicy(String ecPolicyName) + throws IOException { + if (this.vfs == null) { + super.enableErasureCodingPolicy(ecPolicyName); + return; + } + List failedExceptions = new ArrayList<>(); + boolean isDFSExistsInChilds = false; + + for (FileSystem fs : getChildFileSystems()) { + if (!(fs instanceof DistributedFileSystem)) { + continue; + } + isDFSExistsInChilds = true; + DistributedFileSystem dfs = (DistributedFileSystem) fs; + try { + dfs.enableErasureCodingPolicy(ecPolicyName); + } catch (IOException ioe) { + failedExceptions.add(ioe); + } + } + if (!isDFSExistsInChilds) { + throw new UnsupportedOperationException( + "No DFS available in child file systems."); + } + if (failedExceptions.size() > 0) { + throw MultipleIOException.createIOException(failedExceptions); + } + } + + @Override + public void disableErasureCodingPolicy(String ecPolicyName) + throws IOException { + if (this.vfs == null) { + super.disableErasureCodingPolicy(ecPolicyName); + return; + } + List failedExceptions = new ArrayList<>(); + boolean isDFSExistsInChilds = false; + + for (FileSystem fs : getChildFileSystems()) { + if (!(fs instanceof DistributedFileSystem)) { + continue; + } + isDFSExistsInChilds = true; + DistributedFileSystem dfs = (DistributedFileSystem) fs; + try { + dfs.disableErasureCodingPolicy(ecPolicyName); + } catch (IOException ioe) { + failedExceptions.add(ioe); + } + } + if (!isDFSExistsInChilds) { + throw new UnsupportedOperationException( + "No DFS available in child file systems."); + } + if (failedExceptions.size() > 0) { + throw MultipleIOException.createIOException(failedExceptions); + } + } + + @Override + public void unsetErasureCodingPolicy(final Path path) throws IOException { + if (this.vfs == null) { + super.unsetErasureCodingPolicy(path); + return; + } + + ViewFileSystemOverloadScheme.MountPathInfo mountPathInfo = + this.vfs.getMountPathInfo(path, getConf()); + checkDFS(mountPathInfo.getTargetFs(), "unsetErasureCodingPolicy"); + ((DistributedFileSystem) mountPathInfo.getTargetFs()) + .unsetErasureCodingPolicy(mountPathInfo.getPathOnTarget()); + } + + @Override + public ECTopologyVerifierResult getECTopologyResultForPolicies( + final String... policyNames) throws IOException { + if (this.vfs == null) { + return super.getECTopologyResultForPolicies(policyNames); + } + + List failedExceptions = new ArrayList<>(); + ECTopologyVerifierResult result = null; + for (FileSystem fs : getChildFileSystems()) { + if (!(fs instanceof DistributedFileSystem)) { + continue; + } + DistributedFileSystem dfs = (DistributedFileSystem) fs; + try { + result = dfs.getECTopologyResultForPolicies(policyNames); + if (!result.isSupported()) { + // whenever we see negative result. + return result; + } + } catch (IOException ioe) { + failedExceptions.add(ioe); + } + } + if (result == null) { + throw new UnsupportedOperationException( + "No DFS available in child filesystems"); + } + if (failedExceptions.size() > 0) { + throw MultipleIOException.createIOException(failedExceptions); + } + // Let's just return the last one. + return result; + } + + @Override + public Path getTrashRoot(Path path) { + if (this.vfs == null) { + return super.getTrashRoot(path); + } + return this.vfs.getTrashRoot(path); + } + + @Override + public Collection getTrashRoots(boolean allUsers) { + if (this.vfs == null) { + return super.getTrashRoots(allUsers); + } + List trashRoots = new ArrayList<>(); + for (FileSystem fs : getChildFileSystems()) { + trashRoots.addAll(fs.getTrashRoots(allUsers)); + } + return trashRoots; + } + + // Just proovided the same implementation as default in dfs as thats just + // delegated to FileSystem parent class. + @Override + protected Path fixRelativePart(Path p) { + return super.fixRelativePart(p); + } + + Statistics getFsStatistics() { + if (this.vfs == null) { + return super.getFsStatistics(); + } + return statistics; + } + + DFSOpsCountStatistics getDFSOpsCountStatistics() { + if (this.vfs == null) { + return super.getDFSOpsCountStatistics(); + } + return defaultDFS.getDFSOpsCountStatistics(); + } + + @Override + // Works only for HDFS + public HdfsDataOutputStreamBuilder createFile(Path path) { + if (this.vfs == null) { + return super.createFile(path); + } + ViewFileSystemOverloadScheme.MountPathInfo mountPathInfo = null; + try { + mountPathInfo = this.vfs.getMountPathInfo(path, getConf()); + } catch (IOException e) { + // TODO: can we return null here? + return null; + } + checkDFS(mountPathInfo.getTargetFs(), "createFile"); + return (HdfsDataOutputStreamBuilder) mountPathInfo.getTargetFs() + .createFile(mountPathInfo.getPathOnTarget()); + } + + @Deprecated + @Override + public RemoteIterator listOpenFiles() throws IOException { + if (this.vfs == null) { + return super.listOpenFiles(); + } + checkDefaultDFS(defaultDFS, "listOpenFiles"); + return defaultDFS.listOpenFiles(); + } + + @Deprecated + @Override + public RemoteIterator listOpenFiles( + EnumSet openFilesTypes) + throws IOException { + if (this.vfs == null) { + return super.listOpenFiles(openFilesTypes); + } + checkDefaultDFS(defaultDFS, "listOpenFiles"); + return defaultDFS.listOpenFiles(openFilesTypes); + } + + @Override + public RemoteIterator listOpenFiles( + EnumSet openFilesTypes, String path) + throws IOException { + if (this.vfs == null) { + return super.listOpenFiles(openFilesTypes, path); + } + Path absF = fixRelativePart(new Path(path)); + ViewFileSystemOverloadScheme.MountPathInfo mountPathInfo = + this.vfs.getMountPathInfo(absF, getConf()); + checkDFS(mountPathInfo.getTargetFs(), "listOpenFiles"); + return ((DistributedFileSystem) mountPathInfo.getTargetFs()) + .listOpenFiles(openFilesTypes, + mountPathInfo.getPathOnTarget().toString()); + } + + @Override + public HdfsDataOutputStreamBuilder appendFile(Path path) { + if (this.vfs == null) { + return super.appendFile(path); + } + ViewFileSystemOverloadScheme.MountPathInfo mountPathInfo = null; + try { + mountPathInfo = this.vfs.getMountPathInfo(path, getConf()); + } catch (IOException e) { + LOGGER.warn("Failed to resolve the path as mount path", e); + return null; + } + checkDFS(mountPathInfo.getTargetFs(), "appendFile"); + return (HdfsDataOutputStreamBuilder) mountPathInfo.getTargetFs() + .appendFile(mountPathInfo.getPathOnTarget()); + } + + @Override + public boolean hasPathCapability(Path path, String capability) + throws IOException { + if (this.vfs == null) { + return super.hasPathCapability(path, capability); + } + return this.vfs.hasPathCapability(path, capability); + } + + //Below API provided implementations are in ViewFS but not there in DFS. + @Override + public Path resolvePath(final Path f) throws IOException { + if (this.vfs == null) { + return super.resolvePath(f); + } + return this.vfs.resolvePath(f); + } + + @Override + @SuppressWarnings("deprecation") + public boolean delete(final Path f) + throws AccessControlException, FileNotFoundException, IOException { + if (this.vfs == null) { + return super.delete(f); + } + return this.vfs.delete(f); + } + + @Override + public FileChecksum getFileChecksum(final Path f, final long length) + throws AccessControlException, FileNotFoundException, IOException { + if (this.vfs == null) { + return super.getFileChecksum(f, length); + } + return this.vfs.getFileChecksum(f, length); + } + + @Override + public boolean mkdirs(Path dir) throws IOException { + if (this.vfs == null) { + return super.mkdirs(dir); + } + return this.vfs.mkdirs(dir); + } + + @Override + public long getDefaultBlockSize(Path f) { + if (this.vfs == null) { + return super.getDefaultBlockSize(f); + } + return this.vfs.getDefaultBlockSize(f); + } + + @Override + public short getDefaultReplication(Path f) { + if (this.vfs == null) { + return super.getDefaultReplication(f); + } + return this.vfs.getDefaultReplication(f); + } + + @Override + public FsServerDefaults getServerDefaults(Path f) throws IOException { + if (this.vfs == null) { + return super.getServerDefaults(f); + } + return this.vfs.getServerDefaults(f); + } + + @Override + public void setWriteChecksum(final boolean writeChecksum) { + if (this.vfs == null) { + super.setWriteChecksum(writeChecksum); + return; + } + this.vfs.setWriteChecksum(writeChecksum); + } + + @Override + public FileSystem[] getChildFileSystems() { + if (this.vfs == null) { + return super.getChildFileSystems(); + } + return this.vfs.getChildFileSystems(); + } + + public ViewFileSystem.MountPoint[] getMountPoints() { + if (this.vfs == null) { + return null; + } + return this.vfs.getMountPoints(); + } + + @Override + public FsStatus getStatus() throws IOException { + if (this.vfs == null) { + return super.getStatus(); + } + return this.vfs.getStatus(); + } + + @Override + public long getUsed() throws IOException { + if (this.vfs == null) { + return super.getUsed(); + } + return this.vfs.getUsed(); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java index 9bffaaabeb..e58072a7a7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java @@ -41,7 +41,6 @@ import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.DFSUtilClient; -import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.HAUtil; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; @@ -727,11 +726,6 @@ protected void initialize(Configuration conf) throws IOException { intervals); } } - // Currently NN uses FileSystem.get to initialize DFS in startTrashEmptier. - // If fs.hdfs.impl was overridden by core-site.xml, we may get other - // filesystem. To make sure we get DFS, we are setting fs.hdfs.impl to DFS. - // HDFS-15450 - conf.set(FS_HDFS_IMPL_KEY, DistributedFileSystem.class.getName()); UserGroupInformation.setConfiguration(conf); loginAsNameNodeUser(conf); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFSOverloadSchemeWithMountTableConfigInHDFS.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFSOverloadSchemeWithMountTableConfigInHDFS.java index e7afbed0a1..5e2f42b77a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFSOverloadSchemeWithMountTableConfigInHDFS.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFSOverloadSchemeWithMountTableConfigInHDFS.java @@ -37,8 +37,8 @@ public class TestViewFSOverloadSchemeWithMountTableConfigInHDFS @Before @Override - public void startCluster() throws IOException { - super.startCluster(); + public void setUp() throws IOException { + super.setUp(); String mountTableDir = URI.create(getConf().get(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY)) .toString() + "/MountTable/"; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFileSystemOverloadSchemeWithHdfsScheme.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFileSystemOverloadSchemeWithHdfsScheme.java index 8b7eb88404..31674f8d13 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFileSystemOverloadSchemeWithHdfsScheme.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFileSystemOverloadSchemeWithHdfsScheme.java @@ -36,13 +36,15 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RawLocalFileSystem; import org.apache.hadoop.fs.UnsupportedFileSystemException; -import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.test.LambdaTestUtils; import org.apache.hadoop.test.PathUtils; import org.junit.After; +import org.junit.AfterClass; import org.junit.Assert; import org.junit.Before; +import org.junit.BeforeClass; import org.junit.Test; import static org.apache.hadoop.fs.viewfs.Constants.CONFIG_VIEWFS_IGNORE_PORT_IN_MOUNT_TABLE_NAME; @@ -58,7 +60,7 @@ public class TestViewFileSystemOverloadSchemeWithHdfsScheme { private static final String FS_IMPL_PATTERN_KEY = "fs.%s.impl"; private static final String HDFS_SCHEME = "hdfs"; private Configuration conf = null; - private MiniDFSCluster cluster = null; + private static MiniDFSCluster cluster = null; private URI defaultFSURI; private File localTargetDir; private static final String TEST_ROOT_DIR = PathUtils @@ -66,33 +68,52 @@ public class TestViewFileSystemOverloadSchemeWithHdfsScheme { private static final String HDFS_USER_FOLDER = "/HDFSUser"; private static final String LOCAL_FOLDER = "/local"; + @BeforeClass + public static void init() throws IOException { + cluster = + new MiniDFSCluster.Builder(new Configuration()).numDataNodes(2).build(); + cluster.waitClusterUp(); + } + /** * Sets up the configurations and starts the MiniDFSCluster. */ @Before - public void startCluster() throws IOException { - conf = new Configuration(); - conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_KEY, - true); - conf.setInt( + public void setUp() throws IOException { + Configuration config = getNewConf(); + config.setInt( CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 1); - conf.set(String.format(FS_IMPL_PATTERN_KEY, HDFS_SCHEME), + config.set(String.format(FS_IMPL_PATTERN_KEY, HDFS_SCHEME), ViewFileSystemOverloadScheme.class.getName()); - conf.set(String.format( - FsConstants.FS_VIEWFS_OVERLOAD_SCHEME_TARGET_FS_IMPL_PATTERN, - HDFS_SCHEME), DistributedFileSystem.class.getName()); - conf.setBoolean(CONFIG_VIEWFS_IGNORE_PORT_IN_MOUNT_TABLE_NAME, + config.setBoolean(CONFIG_VIEWFS_IGNORE_PORT_IN_MOUNT_TABLE_NAME, CONFIG_VIEWFS_IGNORE_PORT_IN_MOUNT_TABLE_NAME_DEFAULT); - cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build(); - cluster.waitClusterUp(); + setConf(config); defaultFSURI = - URI.create(conf.get(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY)); + URI.create(config.get(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY)); localTargetDir = new File(TEST_ROOT_DIR, "/root/"); + localTargetDir.mkdirs(); Assert.assertEquals(HDFS_SCHEME, defaultFSURI.getScheme()); // hdfs scheme. } @After - public void tearDown() throws IOException { + public void cleanUp() throws IOException { + if (cluster != null) { + FileSystem fs = new DistributedFileSystem(); + fs.initialize(defaultFSURI, conf); + try { + FileStatus[] statuses = fs.listStatus(new Path("/")); + for (FileStatus st : statuses) { + Assert.assertTrue(fs.delete(st.getPath(), true)); + } + } finally { + fs.close(); + } + FileSystem.closeAll(); + } + } + + @AfterClass + public static void tearDown() throws IOException { if (cluster != null) { FileSystem.closeAll(); cluster.shutdown(); @@ -132,9 +153,9 @@ public void testMountLinkWithLocalAndHDFS() throws Exception { // /local/test Path localDir = new Path(LOCAL_FOLDER + "/test"); - try (ViewFileSystemOverloadScheme fs - = (ViewFileSystemOverloadScheme) FileSystem.get(conf)) { - Assert.assertEquals(2, fs.getMountPoints().length); + try (FileSystem fs + = FileSystem.get(conf)) { + Assert.assertEquals(2, fs.getChildFileSystems().length); fs.createNewFile(hdfsFile); // /HDFSUser/testfile fs.mkdirs(localDir); // /local/test } @@ -166,8 +187,13 @@ public void testMountLinkWithLocalAndHDFS() throws Exception { * hdfs://localhost:xxx/HDFSUser --> nonexistent://NonExistent/User/ * It should fail to add non existent fs link. */ - @Test(expected = IOException.class, timeout = 30000) + @Test(timeout = 30000) public void testMountLinkWithNonExistentLink() throws Exception { + testMountLinkWithNonExistentLink(true); + } + + public void testMountLinkWithNonExistentLink(boolean expectFsInitFailure) + throws Exception { final String userFolder = "/User"; final Path nonExistTargetPath = new Path("nonexistent://NonExistent" + userFolder); @@ -176,10 +202,17 @@ public void testMountLinkWithNonExistentLink() throws Exception { * Below addLink will create following mount points * hdfs://localhost:xxx/User --> nonexistent://NonExistent/User/ */ - addMountLinks(defaultFSURI.getAuthority(), new String[] {userFolder }, - new String[] {nonExistTargetPath.toUri().toString() }, conf); - FileSystem.get(conf); - Assert.fail("Expected to fail with non existent link"); + addMountLinks(defaultFSURI.getAuthority(), new String[] {userFolder}, + new String[] {nonExistTargetPath.toUri().toString()}, conf); + if (expectFsInitFailure) { + LambdaTestUtils.intercept(IOException.class, () -> { + FileSystem.get(conf); + }); + } else { + try (FileSystem fs = FileSystem.get(conf)) { + Assert.assertEquals("hdfs", fs.getScheme()); + } + } } /** @@ -271,14 +304,10 @@ public void testAccessViewFsPathWithoutAuthority() throws Exception { // check for viewfs path without authority Path viewFsRootPath = new Path("viewfs:/"); - try { - viewFsRootPath.getFileSystem(conf); - Assert.fail( - "Mount table with authority default should not be initialized"); - } catch (IOException e) { - assertTrue(e.getMessage().contains( - "Empty Mount table in config for viewfs://default/")); - } + LambdaTestUtils.intercept(IOException.class, + "Empty Mount table in config for viewfs://default", () -> { + viewFsRootPath.getFileSystem(conf); + }); // set the name of the default mount table here and // subsequent calls should succeed. @@ -334,18 +363,25 @@ public void testWithLinkFallBack() throws Exception { * * It cannot find any mount link. ViewFS expects a mount point from root. */ - @Test(expected = NotInMountpointException.class, timeout = 30000) - public void testCreateOnRootShouldFailWhenMountLinkConfigured() - throws Exception { + @Test(timeout = 30000) + public void testCreateOnRoot() throws Exception { + testCreateOnRoot(false); + } + + public void testCreateOnRoot(boolean fallbackExist) throws Exception { final Path hdfsTargetPath = new Path(defaultFSURI + HDFS_USER_FOLDER); addMountLinks(defaultFSURI.getAuthority(), - new String[] {HDFS_USER_FOLDER, LOCAL_FOLDER }, + new String[] {HDFS_USER_FOLDER, LOCAL_FOLDER}, new String[] {hdfsTargetPath.toUri().toString(), - localTargetDir.toURI().toString() }, - conf); + localTargetDir.toURI().toString()}, conf); try (FileSystem fs = FileSystem.get(conf)) { - fs.createNewFile(new Path("/newFileOnRoot")); - Assert.fail("It should fail as root is read only in viewFS."); + if (fallbackExist) { + Assert.assertTrue(fs.createNewFile(new Path("/newFileOnRoot"))); + } else { + LambdaTestUtils.intercept(NotInMountpointException.class, () -> { + fs.createNewFile(new Path("/newFileOnRoot")); + }); + } } } @@ -433,15 +469,13 @@ public void testViewFsOverloadSchemeWithInnerCache() conf); // 1. Only 1 hdfs child file system should be there with cache. - try (ViewFileSystemOverloadScheme vfs = - (ViewFileSystemOverloadScheme) FileSystem.get(conf)) { + try (FileSystem vfs = FileSystem.get(conf)) { Assert.assertEquals(1, vfs.getChildFileSystems().length); } // 2. Two hdfs file systems should be there if no cache. conf.setBoolean(Constants.CONFIG_VIEWFS_ENABLE_INNER_CACHE, false); - try (ViewFileSystemOverloadScheme vfs = - (ViewFileSystemOverloadScheme) FileSystem.get(conf)) { + try (FileSystem vfs = FileSystem.get(conf)) { Assert.assertEquals(2, vfs.getChildFileSystems().length); } } @@ -466,8 +500,7 @@ public void testViewFsOverloadSchemeWithNoInnerCacheAndHdfsTargets() conf.setBoolean(Constants.CONFIG_VIEWFS_ENABLE_INNER_CACHE, false); // Two hdfs file systems should be there if no cache. - try (ViewFileSystemOverloadScheme vfs = - (ViewFileSystemOverloadScheme) FileSystem.get(conf)) { + try (FileSystem vfs = FileSystem.get(conf)) { Assert.assertEquals(2, vfs.getChildFileSystems().length); } } @@ -494,8 +527,7 @@ public void testViewFsOverloadSchemeWithNoInnerCacheAndLocalSchemeTargets() // Only one local file system should be there if no InnerCache, but fs // cache should work. conf.setBoolean(Constants.CONFIG_VIEWFS_ENABLE_INNER_CACHE, false); - try (ViewFileSystemOverloadScheme vfs = - (ViewFileSystemOverloadScheme) FileSystem.get(conf)) { + try (FileSystem vfs = FileSystem.get(conf)) { Assert.assertEquals(1, vfs.getChildFileSystems().length); } } @@ -656,4 +688,18 @@ private void readString(final FileSystem nfly, final Path testFile, public Configuration getConf() { return this.conf; } + + /** + * @return configuration. + */ + public Configuration getNewConf() { + return new Configuration(cluster.getConfiguration(0)); + } + + /** + * sets configuration. + */ + public void setConf(Configuration config) { + conf = config; + } } \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java index c4355d50fb..d629370904 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java @@ -142,7 +142,7 @@ public class TestDistributedFileSystem { private boolean noXmlDefaults = false; - private HdfsConfiguration getTestConfiguration() { + HdfsConfiguration getTestConfiguration() { HdfsConfiguration conf; if (noXmlDefaults) { conf = new HdfsConfiguration(false); @@ -813,7 +813,7 @@ public void testStatistics() throws IOException { @Test public void testStatistics2() throws IOException, NoSuchAlgorithmException { - HdfsConfiguration conf = new HdfsConfiguration(); + HdfsConfiguration conf = getTestConfiguration(); conf.set(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_KEY, StoragePolicySatisfierMode.EXTERNAL.toString()); File tmpDir = GenericTestUtils.getTestDir(UUID.randomUUID().toString()); @@ -1475,7 +1475,7 @@ public void testCreateWithCustomChecksum() throws Exception { @Test(timeout=60000) public void testFileCloseStatus() throws IOException { - Configuration conf = new HdfsConfiguration(); + Configuration conf = getTestConfiguration(); MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build(); DistributedFileSystem fs = cluster.getFileSystem(); try { @@ -1495,7 +1495,7 @@ public void testFileCloseStatus() throws IOException { @Test public void testCreateWithStoragePolicy() throws Throwable { - Configuration conf = new HdfsConfiguration(); + Configuration conf = getTestConfiguration(); try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) .storageTypes( new StorageType[] {StorageType.DISK, StorageType.ARCHIVE, @@ -1534,7 +1534,7 @@ public void testCreateWithStoragePolicy() throws Throwable { @Test(timeout=60000) public void testListFiles() throws IOException { - Configuration conf = new HdfsConfiguration(); + Configuration conf = getTestConfiguration(); MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build(); try { @@ -1557,7 +1557,7 @@ public void testListFiles() throws IOException { @Test public void testListStatusOfSnapshotDirs() throws IOException { - MiniDFSCluster cluster = new MiniDFSCluster.Builder(new HdfsConfiguration()) + MiniDFSCluster cluster = new MiniDFSCluster.Builder(getTestConfiguration()) .build(); try { DistributedFileSystem dfs = cluster.getFileSystem(); @@ -1577,7 +1577,7 @@ public void testListStatusOfSnapshotDirs() throws IOException { @Test(timeout=10000) public void testDFSClientPeerReadTimeout() throws IOException { final int timeout = 1000; - final Configuration conf = new HdfsConfiguration(); + final Configuration conf = getTestConfiguration(); conf.setInt(HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, timeout); // only need cluster to create a dfs client to get a peer @@ -1611,7 +1611,7 @@ public void testDFSClientPeerReadTimeout() throws IOException { @Test(timeout=60000) public void testGetServerDefaults() throws IOException { - Configuration conf = new HdfsConfiguration(); + Configuration conf = getTestConfiguration(); MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build(); try { cluster.waitActive(); @@ -1626,7 +1626,7 @@ public void testGetServerDefaults() throws IOException { @Test(timeout=10000) public void testDFSClientPeerWriteTimeout() throws IOException { final int timeout = 1000; - final Configuration conf = new HdfsConfiguration(); + final Configuration conf = getTestConfiguration(); conf.setInt(HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, timeout); // only need cluster to create a dfs client to get a peer @@ -1663,7 +1663,7 @@ public void testDFSClientPeerWriteTimeout() throws IOException { @Test(timeout = 30000) public void testTotalDfsUsed() throws Exception { - Configuration conf = new HdfsConfiguration(); + Configuration conf = getTestConfiguration(); MiniDFSCluster cluster = null; try { cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); @@ -1850,7 +1850,7 @@ public void testDFSDataOutputStreamBuilderForAppend() throws IOException { @Test public void testSuperUserPrivilege() throws Exception { - HdfsConfiguration conf = new HdfsConfiguration(); + HdfsConfiguration conf = getTestConfiguration(); File tmpDir = GenericTestUtils.getTestDir(UUID.randomUUID().toString()); final Path jksPath = new Path(tmpDir.toString(), "test.jks"); conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH, @@ -1903,7 +1903,7 @@ public Void run() throws Exception { @Test public void testListingStoragePolicyNonSuperUser() throws Exception { - HdfsConfiguration conf = new HdfsConfiguration(); + HdfsConfiguration conf = getTestConfiguration(); try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build()) { cluster.waitActive(); final DistributedFileSystem dfs = cluster.getFileSystem(); @@ -2055,7 +2055,7 @@ public Object run() throws Exception { @Test public void testStorageFavouredNodes() throws IOException, InterruptedException, TimeoutException { - Configuration conf = new HdfsConfiguration(); + Configuration conf = getTestConfiguration(); try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) .storageTypes(new StorageType[] {StorageType.SSD, StorageType.DISK}) .numDataNodes(3).storagesPerDatanode(2).build()) { @@ -2080,7 +2080,7 @@ public void testStorageFavouredNodes() @Test public void testGetECTopologyResultForPolicies() throws Exception { - Configuration conf = new HdfsConfiguration(); + Configuration conf = getTestConfiguration(); try (MiniDFSCluster cluster = DFSTestUtil.setupCluster(conf, 9, 3, 0)) { DistributedFileSystem dfs = cluster.getFileSystem(); dfs.enableErasureCodingPolicy("RS-6-3-1024k"); @@ -2111,7 +2111,7 @@ public void testGetECTopologyResultForPolicies() throws Exception { @Test public void testECCloseCommittedBlock() throws Exception { - HdfsConfiguration conf = new HdfsConfiguration(); + HdfsConfiguration conf = getTestConfiguration(); conf.setInt(DFS_NAMENODE_FILE_CLOSE_NUM_COMMITTED_ALLOWED_KEY, 1); try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) .numDataNodes(3).build()) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestViewDistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestViewDistributedFileSystem.java new file mode 100644 index 0000000000..3c5a0be303 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestViewDistributedFileSystem.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.test.Whitebox; + +import java.io.IOException; + +public class TestViewDistributedFileSystem extends TestDistributedFileSystem{ + @Override + HdfsConfiguration getTestConfiguration() { + HdfsConfiguration conf = super.getTestConfiguration(); + conf.set("fs.hdfs.impl", ViewDistributedFileSystem.class.getName()); + return conf; + } + + @Override + public void testStatistics() throws IOException { + FileSystem.getStatistics(HdfsConstants.HDFS_URI_SCHEME, + ViewDistributedFileSystem.class).reset(); + @SuppressWarnings("unchecked") + ThreadLocal data = + (ThreadLocal) Whitebox + .getInternalState(FileSystem + .getStatistics(HdfsConstants.HDFS_URI_SCHEME, + ViewDistributedFileSystem.class), "threadData"); + data.set(null); + super.testStatistics(); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestViewDistributedFileSystemContract.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestViewDistributedFileSystemContract.java new file mode 100644 index 0000000000..418c814c88 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestViewDistributedFileSystemContract.java @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileSystemContractBaseTest; +import org.apache.hadoop.fs.viewfs.ConfigUtil; +import org.apache.hadoop.security.AccessControlException; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.test.LambdaTestUtils; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Ignore; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.net.URI; + +public class TestViewDistributedFileSystemContract + extends TestHDFSFileSystemContract { + private static MiniDFSCluster cluster; + private static String defaultWorkingDirectory; + private static Configuration conf = new HdfsConfiguration(); + + @BeforeClass + public static void init() throws IOException { + final File basedir = GenericTestUtils.getRandomizedTestDir(); + conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, + FileSystemContractBaseTest.TEST_UMASK); + cluster = new MiniDFSCluster.Builder(conf, basedir) + .numDataNodes(2) + .build(); + defaultWorkingDirectory = + "/user/" + UserGroupInformation.getCurrentUser().getShortUserName(); + } + + @Before + public void setUp() throws Exception { + conf.set("fs.hdfs.impl", ViewDistributedFileSystem.class.getName()); + URI defaultFSURI = + URI.create(conf.get(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY)); + ConfigUtil.addLink(conf, defaultFSURI.getHost(), "/user", + defaultFSURI); + ConfigUtil.addLinkFallback(conf, defaultFSURI.getHost(), + defaultFSURI); + fs = FileSystem.get(conf); + } + + @AfterClass + public static void tearDownAfter() throws Exception { + if (cluster != null) { + cluster.shutdown(); + cluster = null; + } + } + + @Override + protected String getDefaultWorkingDirectory() { + return defaultWorkingDirectory; + } + + @Test + public void testRenameRootDirForbidden() throws Exception { + LambdaTestUtils.intercept(AccessControlException.class, + "InternalDir of ViewFileSystem is readonly", () -> { + super.testRenameRootDirForbidden(); + }); + } + + @Ignore("Ignore this test until HDFS-15532") + @Override + public void testLSRootDir() throws Throwable { + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestViewDistributedFileSystemWithMountLinks.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestViewDistributedFileSystemWithMountLinks.java new file mode 100644 index 0000000000..23b5793eee --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestViewDistributedFileSystemWithMountLinks.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.viewfs.ConfigUtil; +import org.apache.hadoop.fs.viewfs.TestViewFileSystemOverloadSchemeWithHdfsScheme; +import org.junit.Test; + +import java.io.IOException; +import java.net.URI; + +import static org.apache.hadoop.fs.viewfs.Constants.CONFIG_VIEWFS_IGNORE_PORT_IN_MOUNT_TABLE_NAME; +import static org.apache.hadoop.fs.viewfs.Constants.CONFIG_VIEWFS_IGNORE_PORT_IN_MOUNT_TABLE_NAME_DEFAULT; + +public class TestViewDistributedFileSystemWithMountLinks extends + TestViewFileSystemOverloadSchemeWithHdfsScheme { + @Override + public void setUp() throws IOException { + super.setUp(); + Configuration conf = getConf(); + conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_KEY, + true); + conf.setInt( + CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 1); + conf.set("fs.hdfs.impl", + ViewDistributedFileSystem.class.getName()); + conf.setBoolean(CONFIG_VIEWFS_IGNORE_PORT_IN_MOUNT_TABLE_NAME, + CONFIG_VIEWFS_IGNORE_PORT_IN_MOUNT_TABLE_NAME_DEFAULT); + URI defaultFSURI = + URI.create(conf.get(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY)); + ConfigUtil.addLinkFallback(conf, defaultFSURI.getAuthority(), + new Path(defaultFSURI.toString()).toUri()); + setConf(conf); + } + + @Test(timeout = 30000) + public void testCreateOnRoot() throws Exception { + testCreateOnRoot(true); + } + + @Test(timeout = 30000) + public void testMountLinkWithNonExistentLink() throws Exception { + testMountLinkWithNonExistentLink(false); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCacheDirectives.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCacheDirectives.java index c5bc211977..9a52548140 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCacheDirectives.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCacheDirectives.java @@ -132,17 +132,23 @@ private static HdfsConfiguration createCachingConf() { conf.setInt(DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_POOLS_NUM_RESPONSES, 2); conf.setInt(DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_DIRECTIVES_NUM_RESPONSES, 2); - return conf; } + /** + * @return the configuration. + */ + Configuration getConf() { + return this.conf; + } + @Before public void setup() throws Exception { conf = createCachingConf(); cluster = new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATANODES).build(); cluster.waitActive(); - dfs = cluster.getFileSystem(); + dfs = getDFS(); proto = cluster.getNameNodeRpc(); namenode = cluster.getNameNode(); prevCacheManipulator = NativeIO.POSIX.getCacheManipulator(); @@ -150,6 +156,13 @@ public void setup() throws Exception { BlockReaderTestUtil.enableHdfsCachingTracing(); } + /** + * @return the dfs instance. + */ + DistributedFileSystem getDFS() throws IOException { + return (DistributedFileSystem) FileSystem.get(conf); + } + @After public void teardown() throws Exception { // Remove cache directives left behind by tests so that we release mmaps. @@ -1613,6 +1626,14 @@ public void testAddingCacheDirectiveInfosWhenCachingIsDisabled() "testAddingCacheDirectiveInfosWhenCachingIsDisabled:2"); } + /** + * @return the dfs instance for nnIdx. + */ + DistributedFileSystem getDFS(MiniDFSCluster cluster, int nnIdx) + throws IOException { + return cluster.getFileSystem(0); + } + @Test(timeout=120000) public void testExpiryTimeConsistency() throws Exception { conf.setInt(DFSConfigKeys.DFS_HA_LOGROLL_PERIOD_KEY, 1); @@ -1623,7 +1644,7 @@ public void testExpiryTimeConsistency() throws Exception { .build(); dfsCluster.transitionToActive(0); - DistributedFileSystem fs = dfsCluster.getFileSystem(0); + DistributedFileSystem fs = getDFS(dfsCluster, 0); final NameNode ann = dfsCluster.getNameNode(0); final Path filename = new Path("/file"); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCacheDirectivesWithViewDFS.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCacheDirectivesWithViewDFS.java new file mode 100644 index 0000000000..b80c2a656c --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCacheDirectivesWithViewDFS.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.namenode; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.viewfs.ConfigUtil; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.ViewDistributedFileSystem; + +import java.io.IOException; +import java.net.URI; + +public class TestCacheDirectivesWithViewDFS extends TestCacheDirectives { + + @Override + public DistributedFileSystem getDFS() throws IOException { + Configuration conf = getConf(); + conf.set("fs.hdfs.impl", ViewDistributedFileSystem.class.getName()); + URI defaultFSURI = + URI.create(conf.get(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY)); + ConfigUtil.addLinkFallback(conf, defaultFSURI.getHost(), + new Path(defaultFSURI.toString()).toUri()); + ConfigUtil.addLink(conf, defaultFSURI.getHost(), "/tmp", + new Path(defaultFSURI.toString()).toUri()); + return super.getDFS(); + } + + @Override + public DistributedFileSystem getDFS(MiniDFSCluster cluster, int nnIdx) + throws IOException { + Configuration conf = cluster.getConfiguration(nnIdx); + conf.set("fs.hdfs.impl", ViewDistributedFileSystem.class.getName()); + URI uri = cluster.getURI(0); + ConfigUtil.addLinkFallback(conf, uri.getHost(), uri); + ConfigUtil.addLink(conf, uri.getHost(), "/tmp", uri); + return cluster.getFileSystem(0); + } +}