diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java index 4e9f172a4c..2376c051c9 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java @@ -2064,7 +2064,12 @@ public FileStatus[] listStatus(Path[] files, PathFilter filter) * @throws IOException IO failure */ public FileStatus[] globStatus(Path pathPattern) throws IOException { - return new Globber(this, pathPattern, DEFAULT_FILTER).glob(); + return Globber.createGlobber(this) + .withPathPattern(pathPattern) + .withPathFiltern(DEFAULT_FILTER) + .withResolveSymlinks(true) + .build() + .glob(); } /** diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Globber.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Globber.java index b241a949b1..f301f22057 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Globber.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Globber.java @@ -25,15 +25,24 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.util.DurationInfo; import org.apache.htrace.core.TraceScope; import org.apache.htrace.core.Tracer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * Implementation of {@link FileSystem#globStatus(Path, PathFilter)}. + * This has historically been package-private; it has been opened + * up for object stores within the {@code hadoop-*} codebase ONLY. + * It could be expanded for external store implementations in future. + */ @InterfaceAudience.Private @InterfaceStability.Unstable -class Globber { +public class Globber { public static final Logger LOG = LoggerFactory.getLogger(Globber.class.getName()); @@ -42,21 +51,62 @@ class Globber { private final Path pathPattern; private final PathFilter filter; private final Tracer tracer; - - public Globber(FileSystem fs, Path pathPattern, PathFilter filter) { + private final boolean resolveSymlinks; + + Globber(FileSystem fs, Path pathPattern, PathFilter filter) { this.fs = fs; this.fc = null; this.pathPattern = pathPattern; this.filter = filter; this.tracer = FsTracer.get(fs.getConf()); + this.resolveSymlinks = true; } - public Globber(FileContext fc, Path pathPattern, PathFilter filter) { + Globber(FileContext fc, Path pathPattern, PathFilter filter) { this.fs = null; this.fc = fc; this.pathPattern = pathPattern; this.filter = filter; this.tracer = fc.getTracer(); + this.resolveSymlinks = true; + } + + /** + * Filesystem constructor for use by {@link GlobBuilder}. + * @param fs filesystem + * @param pathPattern path pattern + * @param filter optional filter + * @param resolveSymlinks should symlinks be resolved. + */ + private Globber(FileSystem fs, Path pathPattern, PathFilter filter, + boolean resolveSymlinks) { + this.fs = fs; + this.fc = null; + this.pathPattern = pathPattern; + this.filter = filter; + this.resolveSymlinks = resolveSymlinks; + this.tracer = FsTracer.get(fs.getConf()); + LOG.debug("Created Globber for path={}, symlinks={}", + pathPattern, resolveSymlinks); + } + + /** + * File Context constructor for use by {@link GlobBuilder}. + * @param fc file context + * @param pathPattern path pattern + * @param filter optional filter + * @param resolveSymlinks should symlinks be resolved. + */ + private Globber(FileContext fc, Path pathPattern, PathFilter filter, + boolean resolveSymlinks) { + this.fs = null; + this.fc = fc; + this.pathPattern = pathPattern; + this.filter = filter; + this.resolveSymlinks = resolveSymlinks; + this.tracer = fc.getTracer(); + LOG.debug("Created Globber path={}, symlinks={}", + pathPattern, resolveSymlinks); } private FileStatus getFileStatus(Path path) throws IOException { @@ -67,6 +117,7 @@ private FileStatus getFileStatus(Path path) throws IOException { return fc.getFileStatus(path); } } catch (FileNotFoundException e) { + LOG.debug("getFileStatus({}) failed; returning null", path, e); return null; } } @@ -79,6 +130,7 @@ private FileStatus[] listStatus(Path path) throws IOException { return fc.util().listStatus(path); } } catch (FileNotFoundException e) { + LOG.debug("listStatus({}) failed; returning empty array", path, e); return new FileStatus[0]; } } @@ -107,7 +159,7 @@ private static String unescapePathComponent(String name) { */ private static List getPathComponents(String path) throws IOException { - ArrayList ret = new ArrayList(); + ArrayList ret = new ArrayList<>(); for (String component : path.split(Path.SEPARATOR)) { if (!component.isEmpty()) { ret.add(component); @@ -145,7 +197,8 @@ private String authorityFromPath(Path path) throws IOException { public FileStatus[] glob() throws IOException { TraceScope scope = tracer.newScope("Globber#glob"); scope.addKVAnnotation("pattern", pathPattern.toUri().getPath()); - try { + try (DurationInfo ignored = new DurationInfo(LOG, false, + "glob %s", pathPattern)) { return doGlob(); } finally { scope.close(); @@ -164,10 +217,11 @@ private FileStatus[] doGlob() throws IOException { String pathPatternString = pathPattern.toUri().getPath(); List flattenedPatterns = GlobExpander.expand(pathPatternString); + LOG.debug("Filesystem glob {}", pathPatternString); // Now loop over all flattened patterns. In every case, we'll be trying to // match them to entries in the filesystem. ArrayList results = - new ArrayList(flattenedPatterns.size()); + new ArrayList<>(flattenedPatterns.size()); boolean sawWildcard = false; for (String flatPattern : flattenedPatterns) { // Get the absolute path for this flattened pattern. We couldn't do @@ -175,13 +229,14 @@ private FileStatus[] doGlob() throws IOException { // path you go down influences how the path must be made absolute. Path absPattern = fixRelativePart(new Path( flatPattern.isEmpty() ? Path.CUR_DIR : flatPattern)); + LOG.debug("Pattern: {}", absPattern); // Now we break the flattened, absolute pattern into path components. // For example, /a/*/c would be broken into the list [a, *, c] List components = getPathComponents(absPattern.toUri().getPath()); // Starting out at the root of the filesystem, we try to match // filesystem entries against pattern components. - ArrayList candidates = new ArrayList(1); + ArrayList candidates = new ArrayList<>(1); // To get the "real" FileStatus of root, we'd have to do an expensive // RPC to the NameNode. So we create a placeholder FileStatus which has // the correct path, but defaults for the rest of the information. @@ -206,12 +261,13 @@ private FileStatus[] doGlob() throws IOException { for (int componentIdx = 0; componentIdx < components.size(); componentIdx++) { ArrayList newCandidates = - new ArrayList(candidates.size()); + new ArrayList<>(candidates.size()); GlobFilter globFilter = new GlobFilter(components.get(componentIdx)); String component = unescapePathComponent(components.get(componentIdx)); if (globFilter.hasPattern()) { sawWildcard = true; } + LOG.debug("Component {}, patterned={}", component, sawWildcard); if (candidates.isEmpty() && sawWildcard) { // Optimization: if there are no more candidates left, stop examining // the path components. We can only do this if we've already seen @@ -245,19 +301,31 @@ private FileStatus[] doGlob() throws IOException { // incorrectly conclude that /a/b was a file and should not match // /a/*/*. So we use getFileStatus of the path we just listed to // disambiguate. - Path path = candidate.getPath(); - FileStatus status = getFileStatus(path); - if (status == null) { - // null means the file was not found - LOG.warn("File/directory {} not found:" - + " it may have been deleted." - + " If this is an object store, this can be a sign of" - + " eventual consistency problems.", - path); - continue; - } - if (!status.isDirectory()) { - continue; + if (resolveSymlinks) { + LOG.debug("listStatus found one entry; disambiguating {}", + children[0]); + Path path = candidate.getPath(); + FileStatus status = getFileStatus(path); + if (status == null) { + // null means the file was not found + LOG.warn("File/directory {} not found:" + + " it may have been deleted." + + " If this is an object store, this can be a sign of" + + " eventual consistency problems.", + path); + continue; + } + if (!status.isDirectory()) { + LOG.debug("Resolved entry is a file; skipping: {}", status); + continue; + } + } else { + // there's no symlinks in this store, so no need to issue + // another call, just see if the result is a directory or a file + if (children[0].getPath().equals(candidate.getPath())) { + // the listing status is of a file + continue; + } } } for (FileStatus child : children) { @@ -312,6 +380,8 @@ private FileStatus[] doGlob() throws IOException { */ if ((!sawWildcard) && results.isEmpty() && (flattenedPatterns.size() <= 1)) { + LOG.debug("No matches found and there was no wildcard in the path {}", + pathPattern); return null; } /* @@ -324,4 +394,98 @@ private FileStatus[] doGlob() throws IOException { Arrays.sort(ret); return ret; } + + /** + * Create a builder for a Globber, bonded to the specific filesystem. + * @param filesystem filesystem + * @return the builder to finish configuring. + */ + public static GlobBuilder createGlobber(FileSystem filesystem) { + return new GlobBuilder(filesystem); + } + + /** + * Create a builder for a Globber, bonded to the specific file + * context. + * @param fileContext file context. + * @return the builder to finish configuring. + */ + public static GlobBuilder createGlobber(FileContext fileContext) { + return new GlobBuilder(fileContext); + } + + /** + * Builder for Globber instances. + */ + @InterfaceAudience.Private + public static class GlobBuilder { + + private final FileSystem fs; + + private final FileContext fc; + + private Path pathPattern; + + private PathFilter filter; + + private boolean resolveSymlinks = true; + + /** + * Construct bonded to a file context. + * @param fc file context. + */ + public GlobBuilder(final FileContext fc) { + this.fs = null; + this.fc = checkNotNull(fc); + } + + /** + * Construct bonded to a filesystem. + * @param fs file system. + */ + public GlobBuilder(final FileSystem fs) { + this.fs = checkNotNull(fs); + this.fc = null; + } + + /** + * Set the path pattern. + * @param pattern pattern to use. + * @return the builder + */ + public GlobBuilder withPathPattern(Path pattern) { + pathPattern = pattern; + return this; + } + + /** + * Set the path filter. + * @param pathFilter filter + * @return the builder + */ + public GlobBuilder withPathFiltern(PathFilter pathFilter) { + filter = pathFilter; + return this; + } + + /** + * Set the symlink resolution policy. + * @param resolve resolution flag. + * @return the builder + */ + public GlobBuilder withResolveSymlinks(boolean resolve) { + resolveSymlinks = resolve; + return this; + } + + /** + * Build the Globber. + * @return a new instance. + */ + public Globber build() { + return fs != null + ? new Globber(fs, pathPattern, filter, resolveSymlinks) + : new Globber(fc, pathPattern, filter, resolveSymlinks); + } + } } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/LambdaTestUtils.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/LambdaTestUtils.java index c1b6cc4081..db36154c15 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/LambdaTestUtils.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/LambdaTestUtils.java @@ -575,6 +575,9 @@ private static String robustToString(Object o) { if (o == null) { return NULL_RESULT; } else { + if (o instanceof String) { + return '"' + (String)o + '"'; + } try { return o.toString(); } catch (Exception e) { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java index afdc0ca400..b3e2b4ade8 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java @@ -19,6 +19,7 @@ package org.apache.hadoop.mapred; import java.io.IOException; +import java.io.InterruptedIOException; import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; @@ -250,7 +251,9 @@ protected FileStatus[] listStatus(JobConf job) throws IOException { job, dirs, recursive, inputFilter, false); locatedFiles = locatedFileStatusFetcher.getFileStatuses(); } catch (InterruptedException e) { - throw new IOException("Interrupted while getting file statuses"); + throw (IOException) + new InterruptedIOException("Interrupted while getting file statuses") + .initCause(e); } result = Iterables.toArray(locatedFiles, FileStatus.class); } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/InvalidInputException.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/InvalidInputException.java index e1bb36bcdd..faf1a3877c 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/InvalidInputException.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/InvalidInputException.java @@ -38,10 +38,14 @@ public class InvalidInputException extends IOException { /** * Create the exception with the given list. + * The first element of the list is used as the init cause value. * @param probs the list of problems to report. this list is not copied. */ public InvalidInputException(List probs) { problems = probs; + if (!probs.isEmpty()) { + initCause(probs.get(0)); + } } /** diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LocatedFileStatusFetcher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LocatedFileStatusFetcher.java index 3869c493a0..a248f1401c 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LocatedFileStatusFetcher.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LocatedFileStatusFetcher.java @@ -46,15 +46,23 @@ import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.apache.hadoop.util.concurrent.HadoopExecutors; /** * Utility class to fetch block locations for specified Input paths using a * configured number of threads. + * The thread count is determined from the value of + * "mapreduce.input.fileinputformat.list-status.num-threads" in the + * configuration. */ @Private public class LocatedFileStatusFetcher { + public static final Logger LOG = + LoggerFactory.getLogger(LocatedFileStatusFetcher.class.getName()); private final Path[] inputDirs; private final PathFilter inputFilter; private final Configuration conf; @@ -64,7 +72,7 @@ public class LocatedFileStatusFetcher { private final ExecutorService rawExec; private final ListeningExecutorService exec; private final BlockingQueue> resultQueue; - private final List invalidInputErrors = new LinkedList(); + private final List invalidInputErrors = new LinkedList<>(); private final ProcessInitialInputPathCallback processInitialInputPathCallback = new ProcessInitialInputPathCallback(); @@ -79,25 +87,30 @@ public class LocatedFileStatusFetcher { private volatile Throwable unknownError; /** + * Instantiate. + * The newApi switch is only used to configure what exception is raised + * on failure of {@link #getFileStatuses()}, it does not change the algorithm. * @param conf configuration for the job * @param dirs the initial list of paths - * @param recursive whether to traverse the patchs recursively + * @param recursive whether to traverse the paths recursively * @param inputFilter inputFilter to apply to the resulting paths * @param newApi whether using the mapred or mapreduce API * @throws InterruptedException * @throws IOException */ public LocatedFileStatusFetcher(Configuration conf, Path[] dirs, - boolean recursive, PathFilter inputFilter, boolean newApi) throws InterruptedException, - IOException { + boolean recursive, PathFilter inputFilter, boolean newApi) + throws InterruptedException, IOException { int numThreads = conf.getInt(FileInputFormat.LIST_STATUS_NUM_THREADS, FileInputFormat.DEFAULT_LIST_STATUS_NUM_THREADS); + LOG.debug("Instantiated LocatedFileStatusFetcher with {} threads", + numThreads); rawExec = HadoopExecutors.newFixedThreadPool( numThreads, new ThreadFactoryBuilder().setDaemon(true) .setNameFormat("GetFileInfo #%d").build()); exec = MoreExecutors.listeningDecorator(rawExec); - resultQueue = new LinkedBlockingQueue>(); + resultQueue = new LinkedBlockingQueue<>(); this.conf = conf; this.inputDirs = dirs; this.recursive = recursive; @@ -106,10 +119,13 @@ public LocatedFileStatusFetcher(Configuration conf, Path[] dirs, } /** - * Start executing and return FileStatuses based on the parameters specified + * Start executing and return FileStatuses based on the parameters specified. * @return fetched file statuses - * @throws InterruptedException - * @throws IOException + * @throws InterruptedException interruption waiting for results. + * @throws IOException IO failure or other error. + * @throws InvalidInputException on an invalid input and the old API + * @throws org.apache.hadoop.mapreduce.lib.input.InvalidInputException on an + * invalid input and the new API. */ public Iterable getFileStatuses() throws InterruptedException, IOException { @@ -117,6 +133,7 @@ public Iterable getFileStatuses() throws InterruptedException, // rest being scheduled does not lead to a termination. runningTasks.incrementAndGet(); for (Path p : inputDirs) { + LOG.debug("Queuing scan of directory {}", p); runningTasks.incrementAndGet(); ListenableFuture future = exec .submit(new ProcessInitialInputPathCallable(p, conf, inputFilter)); @@ -128,14 +145,20 @@ public Iterable getFileStatuses() throws InterruptedException, lock.lock(); try { + LOG.debug("Waiting scan completion"); while (runningTasks.get() != 0 && unknownError == null) { condition.await(); } } finally { lock.unlock(); } + // either the scan completed or an error was raised. + // in the case of an error shutting down the executor will interrupt all + // active threads, which can add noise to the logs. + LOG.debug("Scan complete: shutting down"); this.exec.shutdownNow(); if (this.unknownError != null) { + LOG.debug("Scan failed", this.unknownError); if (this.unknownError instanceof Error) { throw (Error) this.unknownError; } else if (this.unknownError instanceof RuntimeException) { @@ -148,7 +171,11 @@ public Iterable getFileStatuses() throws InterruptedException, throw new IOException(this.unknownError); } } - if (this.invalidInputErrors.size() != 0) { + if (!this.invalidInputErrors.isEmpty()) { + LOG.debug("Invalid Input Errors raised"); + for (IOException error : invalidInputErrors) { + LOG.debug("Error", error); + } if (this.newApi) { throw new org.apache.hadoop.mapreduce.lib.input.InvalidInputException( invalidInputErrors); @@ -161,7 +188,7 @@ public Iterable getFileStatuses() throws InterruptedException, /** * Collect misconfigured Input errors. Errors while actually reading file info - * are reported immediately + * are reported immediately. */ private void registerInvalidInputError(List errors) { synchronized (this) { @@ -171,9 +198,10 @@ private void registerInvalidInputError(List errors) { /** * Register fatal errors - example an IOException while accessing a file or a - * full exection queue + * full execution queue. */ private void registerError(Throwable t) { + LOG.debug("Error", t); lock.lock(); try { if (unknownError == null) { @@ -221,7 +249,7 @@ private static class ProcessInputDirCallable implements public Result call() throws Exception { Result result = new Result(); result.fs = fs; - + LOG.debug("ProcessInputDirCallable {}", fileStatus); if (fileStatus.isDirectory()) { RemoteIterator iter = fs .listLocatedStatus(fileStatus.getPath()); @@ -242,8 +270,8 @@ public Result call() throws Exception { } private static class Result { - private List locatedFileStatuses = new LinkedList(); - private List dirsNeedingRecursiveCalls = new LinkedList(); + private List locatedFileStatuses = new LinkedList<>(); + private List dirsNeedingRecursiveCalls = new LinkedList<>(); private FileSystem fs; } } @@ -259,11 +287,12 @@ private class ProcessInputDirCallback implements @Override public void onSuccess(ProcessInputDirCallable.Result result) { try { - if (result.locatedFileStatuses.size() != 0) { + if (!result.locatedFileStatuses.isEmpty()) { resultQueue.add(result.locatedFileStatuses); } - if (result.dirsNeedingRecursiveCalls.size() != 0) { + if (!result.dirsNeedingRecursiveCalls.isEmpty()) { for (FileStatus fileStatus : result.dirsNeedingRecursiveCalls) { + LOG.debug("Queueing directory scan {}", fileStatus.getPath()); runningTasks.incrementAndGet(); ListenableFuture future = exec .submit(new ProcessInputDirCallable(result.fs, fileStatus, @@ -285,7 +314,7 @@ public void onFailure(Throwable t) { } } - + /** * Processes an initial Input Path pattern through the globber and PathFilter * to generate a list of files which need further processing. @@ -309,6 +338,7 @@ public Result call() throws Exception { Result result = new Result(); FileSystem fs = path.getFileSystem(conf); result.fs = fs; + LOG.debug("ProcessInitialInputPathCallable path {}", path); FileStatus[] matches = fs.globStatus(path, inputFilter); if (matches == null) { result.addError(new IOException("Input path does not exist: " + path)); @@ -337,7 +367,7 @@ void addError(IOException ioe) { /** * The callback handler to handle results generated by - * {@link ProcessInitialInputPathCallable} + * {@link ProcessInitialInputPathCallable}. * */ private class ProcessInitialInputPathCallback implements diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java index e2658caabe..22efe1471f 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java @@ -19,6 +19,7 @@ package org.apache.hadoop.mapreduce.lib.input; import java.io.IOException; +import java.io.InterruptedIOException; import java.util.ArrayList; import java.util.List; @@ -283,7 +284,10 @@ protected List listStatus(JobContext job job.getConfiguration(), dirs, recursive, inputFilter, true); locatedFiles = locatedFileStatusFetcher.getFileStatuses(); } catch (InterruptedException e) { - throw new IOException("Interrupted while getting file statuses"); + throw (IOException) + new InterruptedIOException( + "Interrupted while getting file statuses") + .initCause(e); } result = Lists.newArrayList(locatedFiles); } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/InvalidInputException.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/InvalidInputException.java index 61e14841de..1113bec188 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/InvalidInputException.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/InvalidInputException.java @@ -37,10 +37,14 @@ public class InvalidInputException extends IOException { /** * Create the exception with the given list. + * The first element of the list is used as the init cause value. * @param probs the list of problems to report. this list is not copied. */ public InvalidInputException(List probs) { problems = probs; + if (!probs.isEmpty()) { + initCause(probs.get(0)); + } } /** diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Invoker.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Invoker.java index a59ffa9c6e..bbb9faa000 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Invoker.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Invoker.java @@ -31,6 +31,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.io.retry.RetryPolicy; +import org.apache.hadoop.util.DurationInfo; /** * Class to provide lambda expression invocation of AWS operations. @@ -105,7 +106,7 @@ public Retried getRetryCallback() { @Retries.OnceTranslated public static T once(String action, String path, Operation operation) throws IOException { - try { + try (DurationInfo ignored = new DurationInfo(LOG, false, "%s", action)) { return operation.execute(); } catch (AmazonClientException e) { throw S3AUtils.translateException(action, path, e); diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index 159505b055..1a1d9b75e4 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -94,6 +94,7 @@ import org.apache.hadoop.fs.CreateFlag; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.Globber; import org.apache.hadoop.fs.s3a.impl.ChangeDetectionPolicy; import org.apache.hadoop.fs.s3a.impl.ContextAccessors; import org.apache.hadoop.fs.s3a.impl.CopyOutcome; @@ -2472,7 +2473,7 @@ private S3ListRequest createListObjectsRequest(String key, * @param newDir the current working directory. */ public void setWorkingDirectory(Path newDir) { - workingDir = newDir; + workingDir = makeQualified(newDir); } /** @@ -3669,19 +3670,27 @@ public boolean isMagicCommitPath(Path path) { */ @Override public FileStatus[] globStatus(Path pathPattern) throws IOException { - entryPoint(INVOCATION_GLOB_STATUS); - return super.globStatus(pathPattern); + return globStatus(pathPattern, ACCEPT_ALL); } /** - * Override superclass so as to add statistic collection. + * Override superclass so as to disable symlink resolution and so avoid + * some calls to the FS which may have problems when the store is being + * inconsistent. * {@inheritDoc} */ @Override - public FileStatus[] globStatus(Path pathPattern, PathFilter filter) + public FileStatus[] globStatus( + final Path pathPattern, + final PathFilter filter) throws IOException { entryPoint(INVOCATION_GLOB_STATUS); - return super.globStatus(pathPattern, filter); + return Globber.createGlobber(this) + .withPathPattern(pathPattern) + .withPathFiltern(filter) + .withResolveSymlinks(true) + .build() + .glob(); } /** diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestLocatedFileStatusFetcher.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestLocatedFileStatusFetcher.java new file mode 100644 index 0000000000..bd6bf2f6cd --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestLocatedFileStatusFetcher.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a; + +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Test the LocatedFileStatusFetcher can do. + * This is related to HADOOP-16458. + * There's basic tests in ITestS3AFSMainOperations; this + * is see if we can create better corner cases. + */ +public class ITestLocatedFileStatusFetcher extends AbstractS3ATestBase { + + private static final Logger LOG = + LoggerFactory.getLogger(ITestLocatedFileStatusFetcher.class); + + @Test + public void testGlobScan() throws Throwable { + + } +} diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFSMainOperations.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFSMainOperations.java new file mode 100644 index 0000000000..511aa0fc80 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFSMainOperations.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a; + +import org.junit.Ignore; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSMainOperationsBaseTest; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.contract.s3a.S3AContract; + +import static org.apache.hadoop.fs.s3a.S3ATestUtils.createTestPath; + +/** + * S3A Test suite for the FSMainOperationsBaseTest tests. + */ +public class ITestS3AFSMainOperations extends FSMainOperationsBaseTest { + + + public ITestS3AFSMainOperations() { + super(createTestPath( + new Path("/ITestS3AFSMainOperations")).toUri().toString()); + } + + @Override + protected FileSystem createFileSystem() throws Exception { + S3AContract contract = new S3AContract(new Configuration()); + contract.init(); + return contract.getTestFileSystem(); + } + + @Override + @Ignore("Permissions not supported") + public void testListStatusThrowsExceptionForUnreadableDir() { + } + + @Override + @Ignore("Permissions not supported") + public void testGlobStatusThrowsExceptionForUnreadableDir() { + } + + @Override + @Ignore("local FS path setup broken") + public void testCopyToLocalWithUseRawLocalFileSystemOption() + throws Exception { + } + +} diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/ITestRestrictedReadAccess.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/ITestRestrictedReadAccess.java new file mode 100644 index 0000000000..a741cd6f9d --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/ITestRestrictedReadAccess.java @@ -0,0 +1,707 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.auth; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.nio.charset.Charset; +import java.nio.file.AccessDeniedException; +import java.util.Arrays; +import java.util.Collection; +import java.util.concurrent.Callable; + +import org.assertj.core.api.Assertions; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.fs.contract.ContractTestUtils; +import org.apache.hadoop.fs.s3a.AbstractS3ATestBase; +import org.apache.hadoop.fs.s3a.Constants; +import org.apache.hadoop.fs.s3a.S3AFileSystem; +import org.apache.hadoop.fs.s3a.S3ATestUtils; +import org.apache.hadoop.fs.s3a.S3AUtils; +import org.apache.hadoop.fs.s3a.Statistic; +import org.apache.hadoop.fs.s3a.s3guard.LocalMetadataStore; +import org.apache.hadoop.fs.s3a.s3guard.MetadataStore; +import org.apache.hadoop.fs.s3a.s3guard.NullMetadataStore; +import org.apache.hadoop.mapred.LocatedFileStatusFetcher; +import org.apache.hadoop.mapreduce.lib.input.InvalidInputException; + +import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile; +import static org.apache.hadoop.fs.contract.ContractTestUtils.touch; +import static org.apache.hadoop.fs.s3a.Constants.ASSUMED_ROLE_ARN; +import static org.apache.hadoop.fs.s3a.Constants.METADATASTORE_AUTHORITATIVE; +import static org.apache.hadoop.fs.s3a.Constants.S3_METADATA_STORE_IMPL; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.assume; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.disableFilesystemCaching; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.getTestBucketName; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.lsR; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBucketOverrides; +import static org.apache.hadoop.fs.s3a.auth.RoleModel.Effects; +import static org.apache.hadoop.fs.s3a.auth.RoleModel.Statement; +import static org.apache.hadoop.fs.s3a.auth.RoleModel.directory; +import static org.apache.hadoop.fs.s3a.auth.RoleModel.statement; +import static org.apache.hadoop.fs.s3a.auth.RolePolicies.*; +import static org.apache.hadoop.fs.s3a.auth.RoleTestUtils.bindRolePolicyStatements; +import static org.apache.hadoop.fs.s3a.auth.RoleTestUtils.newAssumedRoleConfig; +import static org.apache.hadoop.mapreduce.lib.input.FileInputFormat.LIST_STATUS_NUM_THREADS; +import static org.apache.hadoop.test.GenericTestUtils.assertExceptionContains; +import static org.apache.hadoop.test.GenericTestUtils.failif; +import static org.apache.hadoop.test.LambdaTestUtils.intercept; + +/** + * This test creates a client with no read access to the underlying + * filesystem and then tries to perform various read operations on it. + * S3Guard in non-auth mode always goes to the FS, so we parameterize the + * test for S3Guard + Auth to see how failures move around. + *
    + *
  1. Tests only run if an assumed role is provided.
  2. + *
  3. And the s3guard tests use the local metastore if + * there was not one already.
  4. + *
+ * The tests are all bundled into one big test case. + * From a purist unit test perspective, this is utterly wrong as it goes + * against the + * "Each test case tests exactly one thing" + * philosophy of JUnit. + *

+ * However is significantly reduces setup costs on the parameterized test runs, + * as it means that the filesystems and directories only need to be + * created and destroyed once per parameterized suite, rather than + * once per individual test. + *

+ * All the test probes have informative messages so when a test failure + * does occur, its cause should be discoverable. It main weaknesses are + * therefore: + *

    + *
  1. A failure of an assertion blocks all subsequent assertions from + * being checked.
  2. + *
  3. Maintenance is potentially harder.
  4. + *
+ * To simplify maintenance, the operations tested are broken up into + * their own methods, with fields used to share the restricted role and + * created paths. + */ +@SuppressWarnings("ThrowableNotThrown") +@RunWith(Parameterized.class) +public class ITestRestrictedReadAccess extends AbstractS3ATestBase { + + private static final Logger LOG = + LoggerFactory.getLogger(ITestRestrictedReadAccess.class); + + /** Filter to select everything. */ + private static final PathFilter EVERYTHING = t -> true; + + /** Filter to select .txt files. */ + private static final PathFilter TEXT_FILE = + path -> path.toUri().toString().endsWith(".txt"); + + /** The same path filter used in FileInputFormat. */ + private static final PathFilter HIDDEN_FILE_FILTER = + (p) -> { + String n = p.getName(); + return !n.startsWith("_") && !n.startsWith("."); + }; + + /** + * Text found in LocatedFileStatusFetcher exception when the glob + * returned "null". + */ + private static final String DOES_NOT_EXIST = "does not exist"; + + /** + * Text found in LocatedFileStatusFetcher exception when + * the glob returned an empty list. + */ + private static final String MATCHES_0_FILES = "matches 0 files"; + + /** + * Text used in files. + */ + public static final byte[] HELLO = "hello".getBytes(Charset.forName("UTF-8")); + + /** + * Wildcard scan to find *.txt in the no-read directory. + * When a scan/glob is done with S3Guard in auth mode, the scan will + * succeed but the file open will fail for any non-empty file. + * In non-auth mode, the read restrictions will fail the actual scan. + */ + private Path noReadWildcard; + + /** + * Parameterization. + */ + @Parameterized.Parameters(name = "{0}") + public static Collection params() { + return Arrays.asList(new Object[][]{ + {"raw", false, false}, + {"nonauth", true, false}, + {"auth", true, true} + }); + } + + private final String name; + + private final boolean s3guard; + + private final boolean authMode; + + private Path basePath; + + private Path noReadDir; + + private Path emptyDir; + + private Path emptyFile; + + private Path subDir; + + private Path subdirFile; + + private Path subDir2; + + private Path subdir2File1; + + private Path subdir2File2; + + private Configuration roleConfig; + + /** + * A read-only FS; if non-null it is closed in teardown. + */ + private S3AFileSystem readonlyFS; + + /** + * Test suite setup. + * @param name name for logs/paths. + * @param s3guard is S3Guard enabled? + * @param authMode is S3Guard in auth mode? + */ + public ITestRestrictedReadAccess( + final String name, + final boolean s3guard, + final boolean authMode) { + this.name = name; + this.s3guard = s3guard; + this.authMode = authMode; + } + + @Override + public Configuration createConfiguration() { + Configuration conf = super.createConfiguration(); + String bucketName = getTestBucketName(conf); + removeBucketOverrides(bucketName, conf, + S3_METADATA_STORE_IMPL, + METADATASTORE_AUTHORITATIVE); + conf.setClass(Constants.S3_METADATA_STORE_IMPL, + s3guard ? + LocalMetadataStore.class + : NullMetadataStore.class, + MetadataStore.class); + conf.setBoolean(METADATASTORE_AUTHORITATIVE, authMode); + disableFilesystemCaching(conf); + return conf; + } + + @Override + public void setup() throws Exception { + super.setup(); + assumeRoleTests(); + } + + @Override + public void teardown() throws Exception { + S3AUtils.closeAll(LOG, readonlyFS); + super.teardown(); + } + + private void assumeRoleTests() { + assume("No ARN for role tests", !getAssumedRoleARN().isEmpty()); + } + + private String getAssumedRoleARN() { + return getContract().getConf().getTrimmed(ASSUMED_ROLE_ARN, ""); + } + + /** + * Create the assumed role configuration. + * @return a config bonded to the ARN of the assumed role + */ + public Configuration createAssumedRoleConfig() { + return createAssumedRoleConfig(getAssumedRoleARN()); + } + + /** + * Create a config for an assumed role; it also disables FS caching. + * @param roleARN ARN of role + * @return the new configuration + */ + private Configuration createAssumedRoleConfig(String roleARN) { + return newAssumedRoleConfig(getContract().getConf(), roleARN); + } + + /** + * This is a single test case which invokes the individual test cases + * in sequence. + */ + @Test + public void testNoReadAccess() throws Throwable { + describe("Test failure handling if the client doesn't" + + " have read access under a path"); + initNoReadAccess(); + + // now move up the API Chain, from the calls made by globStatus, + // to globStatus itself, and then to LocatedFileStatusFetcher, + // which invokes globStatus + + checkBasicFileOperations(); + checkGlobOperations(); + checkSingleThreadedLocatedFileStatus(); + checkLocatedFileStatusFourThreads(); + checkLocatedFileStatusScanFile(); + checkLocatedFileStatusNonexistentPath(); + checkDeleteOperations(); + } + + /** + * Initialize the directory tree and the role filesystem. + */ + public void initNoReadAccess() throws Throwable { + describe("Setting up filesystem"); + + S3AFileSystem realFS = getFileSystem(); + + // avoiding the parameterization to steer clear of accidentally creating + // patterns + basePath = path("testNoReadAccess-" + name); + + // define the paths and create them. + describe("Creating test directories and files"); + + // this is the directory to which the restricted role has no read + // access. + noReadDir = new Path(basePath, "noReadDir"); + // wildcard scan to find *.txt + noReadWildcard = new Path(noReadDir, "*/*.txt"); + + // an empty directory directory under the noReadDir + emptyDir = new Path(noReadDir, "emptyDir"); + realFS.mkdirs(emptyDir); + + // an empty file directory under the noReadDir + emptyFile = new Path(noReadDir, "emptyFile.txt"); + touch(realFS, emptyFile); + + // a subdirectory + subDir = new Path(noReadDir, "subDir"); + + // and a file in that subdirectory + subdirFile = new Path(subDir, "subdirFile.txt"); + createFile(realFS, subdirFile, true, HELLO); + subDir2 = new Path(noReadDir, "subDir2"); + subdir2File1 = new Path(subDir2, "subdir2File1.txt"); + subdir2File2 = new Path(subDir2, "subdir2File2.docx"); + createFile(realFS, subdir2File1, true, HELLO); + createFile(realFS, subdir2File2, true, HELLO); + + // create a role filesystem which does not have read access under a path + // it still has write access, which can be explored in the final + // step to delete files and directories. + roleConfig = createAssumedRoleConfig(); + bindRolePolicyStatements(roleConfig, + STATEMENT_S3GUARD_CLIENT, + STATEMENT_ALLOW_SSE_KMS_RW, + statement(true, S3_ALL_BUCKETS, S3_ALL_OPERATIONS), + new Statement(Effects.Deny) + .addActions(S3_ALL_GET) + .addResources(directory(noReadDir))); + readonlyFS = (S3AFileSystem) basePath.getFileSystem(roleConfig); + } + + /** + * Validate basic IO operations. + */ + public void checkBasicFileOperations() throws Throwable { + + // this is a LIST call; there's no marker. + // so the sequence is + // - HEAD path -> FNFE + // - HEAD path + / -> FNFE + // - LIST path -> list results + // Because the client has list access, this succeeds + readonlyFS.listStatus(basePath); + + // this is HEAD + "/" on S3; get on S3Guard auth + readonlyFS.listStatus(emptyDir); + + // a recursive list of the no-read-directory works because + // there is no directory marker, it becomes a LIST call. + lsR(readonlyFS, noReadDir, true); + + // similarly, a getFileStatus ends up being a list and generating + // a file status marker. + readonlyFS.getFileStatus(noReadDir); + + // empty dir checks work! + readonlyFS.getFileStatus(emptyDir); + + // now look at a file; the outcome depends on the mode. + if (authMode) { + // auth mode doesn't check S3, so no failure + readonlyFS.getFileStatus(subdirFile); + } else { + accessDenied(() -> + readonlyFS.getFileStatus(subdirFile)); + } + + // irrespective of mode, the attempt to read the data will fail. + // the only variable is where the failure occurs + accessDenied(() -> + ContractTestUtils.readUTF8(readonlyFS, subdirFile, HELLO.length)); + + // the empty file is interesting + if (!authMode) { + // non-auth mode, it fails at some point in the opening process. + // due to a HEAD being called on the object + accessDenied(() -> + ContractTestUtils.readUTF8(readonlyFS, emptyFile, 0)); + } else { + // auth mode doesn't check the store. + // Furthermore, because it knows the file length is zero, + // it returns -1 without even opening the file. + // This means that permissions on the file do not get checked. + // See: HADOOP-16464. + try (FSDataInputStream is = readonlyFS.open(emptyFile)) { + Assertions.assertThat(is.read()) + .describedAs("read of empty file") + .isEqualTo(-1); + } + readonlyFS.getFileStatus(subdirFile); + } + } + + /** + * Explore Glob's recursive scan. + */ + public void checkGlobOperations() throws Throwable { + + describe("Glob Status operations"); + // baseline: the real filesystem on a subdir + globFS(getFileSystem(), subdirFile, null, false, 1); + // a file fails if not in auth mode + globFS(readonlyFS, subdirFile, null, !authMode, 1); + // empty directories don't fail. + assertStatusPathEquals(emptyDir, + globFS(readonlyFS, emptyDir, null, false, 1)); + + FileStatus[] st = globFS(readonlyFS, + noReadWildcard, + null, false, 2); + Assertions.assertThat(st) + .extracting(FileStatus::getPath) + .containsExactlyInAnyOrder(subdirFile, subdir2File1); + + // there is precisely one .docx file (subdir2File2.docx) + globFS(readonlyFS, + new Path(noReadDir, "*/*.docx"), + null, false, 1); + + // there are no .doc files. + globFS(readonlyFS, + new Path(noReadDir, "*/*.doc"), + null, false, 0); + globFS(readonlyFS, noReadDir, + EVERYTHING, false, 1); + // and a filter without any wildcarded pattern only finds + // the role dir itself. + FileStatus[] st2 = globFS(readonlyFS, noReadDir, + EVERYTHING, false, 1); + Assertions.assertThat(st2) + .extracting(FileStatus::getPath) + .containsExactly(noReadDir); + } + + /** + * Run a located file status fetcher against the directory tree. + */ + public void checkSingleThreadedLocatedFileStatus() throws Throwable { + + describe("LocatedFileStatusFetcher operations"); + // use the same filter as FileInputFormat; single thread. + roleConfig.setInt(LIST_STATUS_NUM_THREADS, 1); + LocatedFileStatusFetcher fetcher = + new LocatedFileStatusFetcher( + roleConfig, + new Path[]{basePath}, + true, + HIDDEN_FILE_FILTER, + true); + Assertions.assertThat(fetcher.getFileStatuses()) + .describedAs("result of located scan") + .flatExtracting(FileStatus::getPath) + .containsExactlyInAnyOrder( + emptyFile, + subdirFile, + subdir2File1, + subdir2File2); + + } + + /** + * Run a located file status fetcher against the directory tree. + */ + public void checkLocatedFileStatusFourThreads() throws Throwable { + + // four threads and the text filter. + int threads = 4; + describe("LocatedFileStatusFetcher with %d", threads); + roleConfig.setInt(LIST_STATUS_NUM_THREADS, threads); + LocatedFileStatusFetcher fetcher2 = + new LocatedFileStatusFetcher( + roleConfig, + new Path[]{noReadWildcard}, + true, + EVERYTHING, + true); + Assertions.assertThat(fetcher2.getFileStatuses()) + .describedAs("result of located scan") + .isNotNull() + .flatExtracting(FileStatus::getPath) + .containsExactlyInAnyOrder(subdirFile, subdir2File1); + } + + /** + * Run a located file status fetcher against the directory tree. + */ + public void checkLocatedFileStatusScanFile() throws Throwable { + // pass in a file as the base of the scan. + describe("LocatedFileStatusFetcher with file %s", subdirFile); + roleConfig.setInt(LIST_STATUS_NUM_THREADS, 16); + try { + Iterable fetched = new LocatedFileStatusFetcher( + roleConfig, + new Path[]{subdirFile}, + true, + TEXT_FILE, + true).getFileStatuses(); + // when not in auth mode, the HEAD request MUST have failed. + failif(!authMode, "LocatedFileStatusFetcher(" + subdirFile + ")" + + " should have failed"); + // and in auth mode, the file MUST have been found. + Assertions.assertThat(fetched) + .describedAs("result of located scan") + .isNotNull() + .flatExtracting(FileStatus::getPath) + .containsExactly(subdirFile); + } catch (AccessDeniedException e) { + // we require the HEAD request to fail with access denied in non-auth + // mode, but not in auth mode. + failif(authMode, "LocatedFileStatusFetcher(" + subdirFile + ")", e); + } + } + + /** + * Explore what happens with a path that does not exist. + */ + public void checkLocatedFileStatusNonexistentPath() throws Throwable { + // scan a path that doesn't exist + Path nonexistent = new Path(noReadDir, "nonexistent"); + InvalidInputException ex = intercept(InvalidInputException.class, + DOES_NOT_EXIST, + () -> new LocatedFileStatusFetcher( + roleConfig, + new Path[]{nonexistent}, + true, + EVERYTHING, + true) + .getFileStatuses()); + // validate nested exception + assertExceptionContains(DOES_NOT_EXIST, ex.getCause()); + + // a file which exists but which doesn't match the pattern + // is downgraded to not existing. + intercept(InvalidInputException.class, + DOES_NOT_EXIST, + () -> new LocatedFileStatusFetcher( + roleConfig, + new Path[]{noReadDir}, + true, + TEXT_FILE, + true) + .getFileStatuses()); + + // a pattern under a nonexistent path is considered to not be a match. + ex = intercept( + InvalidInputException.class, + MATCHES_0_FILES, + () -> new LocatedFileStatusFetcher( + roleConfig, + new Path[]{new Path(nonexistent, "*.txt)")}, + true, + TEXT_FILE, + true) + .getFileStatuses()); + // validate nested exception + assertExceptionContains(MATCHES_0_FILES, ex.getCause()); + } + + /** + * Do some cleanup to see what happens with delete calls. + * Cleanup happens in test teardown anyway; doing it here + * just makes use of the delete calls to see how delete failures + * change with permissions and S3Guard stettings. + */ + public void checkDeleteOperations() throws Throwable { + describe("Testing delete operations"); + + if (!authMode) { + // unguarded or non-auth S3Guard to fail on HEAD + / + accessDenied(() -> readonlyFS.delete(emptyDir, true)); + // to fail on HEAD + accessDenied(() -> readonlyFS.delete(emptyFile, true)); + } else { + // auth mode checks DDB for status and then issues the DELETE + readonlyFS.delete(emptyDir, true); + readonlyFS.delete(emptyFile, true); + } + + // this will succeed for both as there is no subdir marker. + readonlyFS.delete(subDir, true); + // after which it is not there + fileNotFound(() -> readonlyFS.getFileStatus(subDir)); + // and nor is its child. + fileNotFound(() -> readonlyFS.getFileStatus(subdirFile)); + + // now delete the base path + readonlyFS.delete(basePath, true); + // and expect an FNFE + fileNotFound(() -> readonlyFS.getFileStatus(subDir)); + } + + /** + * Require an operation to fail with a FileNotFoundException. + * @param eval closure to evaluate. + * @param type of callable + * @return the exception. + * @throws Exception any other exception + */ + protected FileNotFoundException fileNotFound(final Callable eval) + throws Exception { + return intercept(FileNotFoundException.class, eval); + } + + /** + * Require an operation to fail with an AccessDeniedException. + * @param eval closure to evaluate. + * @param type of callable + * @return the exception. + * @throws Exception any other exception + */ + protected AccessDeniedException accessDenied(final Callable eval) + throws Exception { + return intercept(AccessDeniedException.class, eval); + } + + /** + * Assert that a status array has exactly one element and its + * value is as expected. + * @param expected expected path + * @param statuses list of statuses + */ + protected void assertStatusPathEquals(final Path expected, + final FileStatus[] statuses) { + Assertions.assertThat(statuses) + .describedAs("List of status entries") + .isNotNull() + .hasSize(1); + Assertions.assertThat(statuses[0].getPath()) + .describedAs("Status entry %s", statuses[0]) + .isEqualTo(expected); + } + + /** + * Glob under a path with expected outcomes. + * @param fs filesystem to use + * @param path path (which can include patterns) + * @param filter optional filter + * @param expectAuthFailure is auth failure expected? + * @param expectedCount expected count of results; -1 means null response + * @return the result of a successful glob or null if an expected auth + * failure was caught. + * @throws IOException failure. + */ + protected FileStatus[] globFS( + final S3AFileSystem fs, + final Path path, + final PathFilter filter, + boolean expectAuthFailure, + final int expectedCount) + throws IOException { + LOG.info("Glob {}", path); + S3ATestUtils.MetricDiff getMetric = new S3ATestUtils.MetricDiff(fs, + Statistic.OBJECT_METADATA_REQUESTS); + S3ATestUtils.MetricDiff listMetric = new S3ATestUtils.MetricDiff(fs, + Statistic.OBJECT_LIST_REQUESTS); + FileStatus[] st; + try { + st = filter == null + ? fs.globStatus(path) + : fs.globStatus(path, filter); + LOG.info("Metrics:\n {},\n {}", getMetric, listMetric); + if (expectAuthFailure) { + // should have failed here + String resultStr; + if (st == null) { + resultStr = "A null array"; + } else { + resultStr = StringUtils.join(st, ","); + } + fail(String.format("globStatus(%s) should have raised" + + " an exception, but returned %s", path, resultStr)); + } + } catch (AccessDeniedException e) { + LOG.info("Metrics:\n {},\n {}", getMetric, listMetric); + failif(!expectAuthFailure, "Access denied in glob of " + path, + e); + return null; + } + if (expectedCount < 0) { + Assertions.assertThat(st) + .describedAs("Glob of %s", path) + .isNull(); + } else { + Assertions.assertThat(st) + .describedAs("Glob of %s", path) + .isNotNull() + .hasSize(expectedCount); + } + return st; + } + +} +