From 03cfc852791c14fad39db4e5b14104a276c08e59 Mon Sep 17 00:00:00 2001 From: Ayush Saxena Date: Wed, 24 Mar 2021 02:36:26 +0530 Subject: [PATCH] HADOOP-17531. DistCp: Reduce memory usage on copying huge directories. (#2732). Contributed by Ayush Saxena. Signed-off-by: Steve Loughran --- .../functional/CommonCallableSupplier.java | 153 +++++++++ .../apache/hadoop/test/GenericTestUtils.java | 149 ++++++++- .../s3a/impl/ITestPartialRenamesDeletes.java | 46 +-- .../fs/s3a/scale/ITestS3ADeleteManyFiles.java | 2 +- .../apache/hadoop/tools/DistCpConstants.java | 1 + .../apache/hadoop/tools/DistCpContext.java | 4 + .../hadoop/tools/DistCpOptionSwitch.java | 7 +- .../apache/hadoop/tools/DistCpOptions.java | 19 ++ .../apache/hadoop/tools/OptionsParser.java | 4 +- .../hadoop/tools/SimpleCopyListing.java | 294 +++++++++++------- .../src/site/markdown/DistCp.md.vm | 1 + .../hadoop/tools/TestDistCpOptions.java | 2 +- .../apache/hadoop/tools/TestDistCpSystem.java | 5 - .../hadoop/tools/TestDistCpWithRawXAttrs.java | 23 ++ .../contract/AbstractContractDistCpTest.java | 39 +++ .../OptionalTestHDFSContractDistCp.java | 50 +++ .../src/test/resources/contract/hdfs.xml | 139 +++++++++ 17 files changed, 773 insertions(+), 165 deletions(-) create mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/CommonCallableSupplier.java create mode 100644 hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/contract/OptionalTestHDFSContractDistCp.java create mode 100644 hadoop-tools/hadoop-distcp/src/test/resources/contract/hdfs.xml diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/CommonCallableSupplier.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/CommonCallableSupplier.java new file mode 100644 index 0000000000..e2cdc0fd41 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/CommonCallableSupplier.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.util.functional; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.Executor; +import java.util.function.Supplier; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.util.DurationInfo; + +import static org.apache.hadoop.fs.impl.FutureIOSupport.raiseInnerCause; + +/** + * A bridge from Callable to Supplier; catching exceptions + * raised by the callable and wrapping them as appropriate. + * @param return type. + */ +public final class CommonCallableSupplier implements Supplier { + + private static final Logger LOG = + LoggerFactory.getLogger(CommonCallableSupplier.class); + + private final Callable call; + + /** + * Create. + * @param call call to invoke. + */ + public CommonCallableSupplier(final Callable call) { + this.call = call; + } + + @Override + public Object get() { + try { + return call.call(); + } catch (RuntimeException e) { + throw e; + } catch (IOException e) { + throw new UncheckedIOException(e); + } catch (Exception e) { + throw new UncheckedIOException(new IOException(e)); + } + } + + /** + * Submit a callable into a completable future. + * RTEs are rethrown. + * Non RTEs are caught and wrapped; IOExceptions to + * {@code RuntimeIOException} instances. + * @param executor executor. + * @param call call to invoke + * @param type + * @return the future to wait for + */ + @SuppressWarnings("unchecked") + public static CompletableFuture submit(final Executor executor, + final Callable call) { + return CompletableFuture + .supplyAsync(new CommonCallableSupplier(call), executor); + } + + /** + * Wait for a list of futures to complete. If the list is empty, + * return immediately. + * @param futures list of futures. + * @throws IOException if one of the called futures raised an IOE. + * @throws RuntimeException if one of the futures raised one. + */ + public static void waitForCompletion( + final List> futures) throws IOException { + if (futures.isEmpty()) { + return; + } + // await completion + waitForCompletion( + CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))); + } + + /** + * Wait for a single of future to complete, extracting IOEs afterwards. + * @param future future to wait for. + * @throws IOException if one of the called futures raised an IOE. + * @throws RuntimeException if one of the futures raised one. + */ + public static void waitForCompletion(final CompletableFuture future) + throws IOException { + try (DurationInfo ignore = new DurationInfo(LOG, false, + "Waiting for task completion")) { + future.join(); + } catch (CancellationException e) { + throw new IOException(e); + } catch (CompletionException e) { + raiseInnerCause(e); + } + } + + /** + * Wait for a single of future to complete, ignoring exceptions raised. + * @param future future to wait for. + */ + public static void waitForCompletionIgnoringExceptions( + @Nullable final CompletableFuture future) { + if (future != null) { + try (DurationInfo ignore = new DurationInfo(LOG, false, + "Waiting for task completion")) { + future.join(); + } catch (Exception e) { + LOG.debug("Ignoring exception raised in task completion: "); + } + } + } + + /** + * Block awaiting completion for any non-null future passed in; + * No-op if a null arg was supplied. + * @param future future + * @throws IOException if one of the called futures raised an IOE. + * @throws RuntimeException if one of the futures raised one. + */ + public static void maybeAwaitCompletion( + @Nullable final CompletableFuture future) throws IOException { + if (future != null) { + waitForCompletion(future); + } + } +} diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/GenericTestUtils.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/GenericTestUtils.java index e266f28568..33ebd8695b 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/GenericTestUtils.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/GenericTestUtils.java @@ -30,13 +30,17 @@ import java.lang.management.ThreadInfo; import java.lang.management.ThreadMXBean; import java.lang.reflect.InvocationTargetException; +import java.util.ArrayList; import java.util.Arrays; +import java.util.List; import java.util.Locale; import java.util.Objects; import java.util.Random; import java.util.Set; import java.util.Enumeration; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Supplier; @@ -46,8 +50,11 @@ import org.apache.commons.lang3.RandomStringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.impl.Log4JLogger; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.util.BlockingThreadPoolExecutorService; +import org.apache.hadoop.util.DurationInfo; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.Time; import org.apache.log4j.Appender; @@ -61,15 +68,28 @@ import org.junit.Assume; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; +import org.slf4j.LoggerFactory; +import org.apache.hadoop.thirdparty.com.google.common.base.Charsets; import org.apache.hadoop.thirdparty.com.google.common.base.Joiner; import org.apache.hadoop.thirdparty.com.google.common.collect.Sets; +import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile; +import static org.apache.hadoop.util.functional.CommonCallableSupplier.submit; +import static org.apache.hadoop.util.functional.CommonCallableSupplier.waitForCompletion; + /** * Test provides some very generic helpers which might be used across the tests */ public abstract class GenericTestUtils { + public static final int EXECUTOR_THREAD_COUNT = 64; + + private static final org.slf4j.Logger LOG = + LoggerFactory.getLogger(GenericTestUtils.class); + + public static final String PREFIX = "file-"; + private static final AtomicInteger sequence = new AtomicInteger(); /** @@ -896,5 +916,132 @@ public static int getTestsThreadCount() { } return threadCount; } + /** + * Write the text to a file asynchronously. Logs the operation duration. + * @param fs filesystem + * @param path path + * @return future to the patch created. + */ + private static CompletableFuture put(FileSystem fs, + Path path, String text) { + return submit(EXECUTOR, () -> { + try (DurationInfo ignore = + new DurationInfo(LOG, false, "Creating %s", path)) { + createFile(fs, path, true, text.getBytes(Charsets.UTF_8)); + return path; + } + }); + } -} + /** + * Build a set of files in a directory tree. + * @param fs filesystem + * @param destDir destination + * @param depth file depth + * @param fileCount number of files to create. + * @param dirCount number of dirs to create at each level + * @return the list of files created. + */ + public static List createFiles(final FileSystem fs, + final Path destDir, + final int depth, + final int fileCount, + final int dirCount) throws IOException { + return createDirsAndFiles(fs, destDir, depth, fileCount, dirCount, + new ArrayList(fileCount), + new ArrayList(dirCount)); + } + + /** + * Build a set of files in a directory tree. + * @param fs filesystem + * @param destDir destination + * @param depth file depth + * @param fileCount number of files to create. + * @param dirCount number of dirs to create at each level + * @param paths [out] list of file paths created + * @param dirs [out] list of directory paths created. + * @return the list of files created. + */ + public static List createDirsAndFiles(final FileSystem fs, + final Path destDir, + final int depth, + final int fileCount, + final int dirCount, + final List paths, + final List dirs) throws IOException { + buildPaths(paths, dirs, destDir, depth, fileCount, dirCount); + List> futures = new ArrayList<>(paths.size() + + dirs.size()); + + // create directories. With dir marker retention, that adds more entries + // to cause deletion issues + try (DurationInfo ignore = + new DurationInfo(LOG, "Creating %d directories", dirs.size())) { + for (Path path : dirs) { + futures.add(submit(EXECUTOR, () ->{ + fs.mkdirs(path); + return path; + })); + } + waitForCompletion(futures); + } + + try (DurationInfo ignore = + new DurationInfo(LOG, "Creating %d files", paths.size())) { + for (Path path : paths) { + futures.add(put(fs, path, path.getName())); + } + waitForCompletion(futures); + return paths; + } + } + + /** + * Recursive method to build up lists of files and directories. + * @param filePaths list of file paths to add entries to. + * @param dirPaths list of directory paths to add entries to. + * @param destDir destination directory. + * @param depth depth of directories + * @param fileCount number of files. + * @param dirCount number of directories. + */ + public static void buildPaths(final List filePaths, + final List dirPaths, final Path destDir, final int depth, + final int fileCount, final int dirCount) { + if (depth <= 0) { + return; + } + // create the file paths + for (int i = 0; i < fileCount; i++) { + String name = filenameOfIndex(i); + Path p = new Path(destDir, name); + filePaths.add(p); + } + for (int i = 0; i < dirCount; i++) { + String name = String.format("dir-%03d", i); + Path p = new Path(destDir, name); + dirPaths.add(p); + buildPaths(filePaths, dirPaths, p, depth - 1, fileCount, dirCount); + } + } + + /** + * Given an index, return a string to use as the filename. + * @param i index + * @return name + */ + public static String filenameOfIndex(final int i) { + return String.format("%s%03d", PREFIX, i); + } + + /** + * For submitting work. + */ + private static final BlockingThreadPoolExecutorService EXECUTOR = + BlockingThreadPoolExecutorService.newInstance( + EXECUTOR_THREAD_COUNT, + EXECUTOR_THREAD_COUNT * 2, + 30, TimeUnit.SECONDS, + "test-operations"); +} \ No newline at end of file diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestPartialRenamesDeletes.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestPartialRenamesDeletes.java index c920be1323..df45d0def5 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestPartialRenamesDeletes.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestPartialRenamesDeletes.java @@ -79,6 +79,7 @@ import static org.apache.hadoop.fs.s3a.test.ExtraAssertions.extractCause; import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsSourceToString; import static org.apache.hadoop.io.IOUtils.cleanupWithLogger; +import static org.apache.hadoop.test.GenericTestUtils.buildPaths; import static org.apache.hadoop.test.LambdaTestUtils.eval; /** @@ -162,8 +163,6 @@ public class ITestPartialRenamesDeletes extends AbstractS3ATestBase { public static final int DEPTH = 2; public static final int DEPTH_SCALED = 2; - public static final String PREFIX = "file-"; - /** * A role FS; if non-null it is closed in teardown. */ @@ -910,49 +909,6 @@ public static List createDirsAndFiles(final FileSystem fs, } } - /** - * Recursive method to build up lists of files and directories. - * @param filePaths list of file paths to add entries to. - * @param dirPaths list of directory paths to add entries to. - * @param destDir destination directory. - * @param depth depth of directories - * @param fileCount number of files. - * @param dirCount number of directories. - */ - private static void buildPaths( - final List filePaths, - final List dirPaths, - final Path destDir, - final int depth, - final int fileCount, - final int dirCount) { - if (depth<=0) { - return; - } - // create the file paths - for (int i = 0; i < fileCount; i++) { - String name = filenameOfIndex(i); - Path p = new Path(destDir, name); - filePaths.add(p); - } - for (int i = 0; i < dirCount; i++) { - String name = String.format("dir-%03d", i); - Path p = new Path(destDir, name); - dirPaths.add(p); - buildPaths(filePaths, dirPaths, p, depth - 1, fileCount, dirCount); - } - - } - - /** - * Given an index, return a string to use as the filename. - * @param i index - * @return name - */ - public static String filenameOfIndex(final int i) { - return String.format("%s%03d", PREFIX, i); - } - /** * Verifies that s3:DeleteObjectVersion is not required for rename. *

diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3ADeleteManyFiles.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3ADeleteManyFiles.java index efaec5f4fa..a724b9737c 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3ADeleteManyFiles.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3ADeleteManyFiles.java @@ -39,7 +39,7 @@ import static org.apache.hadoop.fs.s3a.Constants.USER_AGENT_PREFIX; import static org.apache.hadoop.fs.s3a.S3ATestUtils.lsR; import static org.apache.hadoop.fs.s3a.impl.ITestPartialRenamesDeletes.createFiles; -import static org.apache.hadoop.fs.s3a.impl.ITestPartialRenamesDeletes.filenameOfIndex; +import static org.apache.hadoop.test.GenericTestUtils.filenameOfIndex; /** * Test some scalable operations related to file renaming and deletion. diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java index 25815687c2..c75c0e85dd 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java @@ -139,6 +139,7 @@ private DistCpConstants() { public static final String CONF_LABEL_BLOCKS_PER_CHUNK = "distcp.blocks.per.chunk"; + public static final String CONF_LABEL_USE_ITERATOR = "distcp.use.iterator"; /** * Constants for DistCp return code to shell / consumer of ToolRunner's run */ diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpContext.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpContext.java index 1e63d802e8..0d08796ce2 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpContext.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpContext.java @@ -171,6 +171,10 @@ public int getBlocksPerChunk() { return options.getBlocksPerChunk(); } + public boolean shouldUseIterator() { + return options.shouldUseIterator(); + } + public final boolean splitLargeFile() { return options.getBlocksPerChunk() > 0; } diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java index 3d319dae83..4163f8274d 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java @@ -239,7 +239,12 @@ public enum DistCpOptionSwitch { */ DIRECT_WRITE(DistCpConstants.CONF_LABEL_DIRECT_WRITE, new Option("direct", false, "Write files directly to the" - + " target location, avoiding temporary file rename.")); + + " target location, avoiding temporary file rename.")), + + USE_ITERATOR(DistCpConstants.CONF_LABEL_USE_ITERATOR, + new Option("useiterator", false, + "Use single threaded list status iterator to build " + + "the listing to save the memory utilisation at the client")); public static final String PRESERVE_STATUS_DEFAULT = "-prbugpct"; diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java index 9354c5ea9f..6315528fb8 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java @@ -158,6 +158,8 @@ public final class DistCpOptions { /** Whether data should be written directly to the target paths. */ private final boolean directWrite; + private final boolean useIterator; + /** * File attributes for preserve. * @@ -222,6 +224,8 @@ private DistCpOptions(Builder builder) { this.trackPath = builder.trackPath; this.directWrite = builder.directWrite; + + this.useIterator = builder.useIterator; } public Path getSourceFileListing() { @@ -353,6 +357,10 @@ public boolean shouldDirectWrite() { return directWrite; } + public boolean shouldUseIterator() { + return useIterator; + } + /** * Add options to configuration. These will be used in the Mapper/committer * @@ -403,6 +411,9 @@ public void appendToConf(Configuration conf) { } DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.DIRECT_WRITE, String.valueOf(directWrite)); + + DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.USE_ITERATOR, + String.valueOf(useIterator)); } /** @@ -440,6 +451,7 @@ public String toString() { ", copyBufferSize=" + copyBufferSize + ", verboseLog=" + verboseLog + ", directWrite=" + directWrite + + ", useiterator=" + useIterator + '}'; } @@ -491,6 +503,8 @@ public static class Builder { private boolean directWrite = false; + private boolean useIterator = false; + public Builder(List sourcePaths, Path targetPath) { Preconditions.checkArgument(sourcePaths != null && !sourcePaths.isEmpty(), "Source paths should not be null or empty!"); @@ -748,6 +762,11 @@ public Builder withDirectWrite(boolean newDirectWrite) { this.directWrite = newDirectWrite; return this; } + + public Builder withUseIterator(boolean useItr) { + this.useIterator = useItr; + return this; + } } } diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java index 1fbea9a0ea..a4c3b0f851 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java @@ -115,7 +115,9 @@ public static DistCpOptions parse(String[] args) .withVerboseLog( command.hasOption(DistCpOptionSwitch.VERBOSE_LOG.getSwitch())) .withDirectWrite( - command.hasOption(DistCpOptionSwitch.DIRECT_WRITE.getSwitch())); + command.hasOption(DistCpOptionSwitch.DIRECT_WRITE.getSwitch())) + .withUseIterator( + command.hasOption(DistCpOptionSwitch.USE_ITERATOR.getSwitch())); if (command.hasOption(DistCpOptionSwitch.DIFF.getSwitch())) { String[] snapshots = getVals(command, diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/SimpleCopyListing.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/SimpleCopyListing.java index ddcbb14e83..900ce6296d 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/SimpleCopyListing.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/SimpleCopyListing.java @@ -25,6 +25,8 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.fs.statistics.IOStatisticsLogging; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport; import org.apache.hadoop.io.SequenceFile; @@ -36,6 +38,8 @@ import org.apache.hadoop.tools.util.WorkReport; import org.apache.hadoop.tools.util.WorkRequest; import org.apache.hadoop.tools.util.WorkRequestProcessor; +import org.apache.hadoop.util.DurationInfo; +import org.apache.hadoop.util.functional.RemoteIterators; import org.apache.hadoop.mapreduce.security.TokenCache; import org.apache.hadoop.security.Credentials; @@ -49,6 +53,7 @@ import java.util.List; import java.util.Random; import java.util.LinkedList; +import java.util.Stack; import static org.apache.hadoop.tools.DistCpConstants .HDFS_RESERVED_RAW_DIRECTORY_NAME; @@ -94,11 +99,9 @@ protected SimpleCopyListing(Configuration configuration, Credentials credentials randomizeFileListing = getConf().getBoolean( DistCpConstants.CONF_LABEL_SIMPLE_LISTING_RANDOMIZE_FILES, DEFAULT_RANDOMIZE_FILE_LISTING); - if (LOG.isDebugEnabled()) { - LOG.debug("numListstatusThreads=" + numListstatusThreads - + ", fileStatusLimit=" + fileStatusLimit - + ", randomizeFileListing=" + randomizeFileListing); - } + LOG.debug( + "numListstatusThreads={}, fileStatusLimit={}, randomizeFileListing={}", + numListstatusThreads, fileStatusLimit, randomizeFileListing); copyFilter = CopyFilter.getCopyFilter(getConf()); copyFilter.initialize(); } @@ -286,10 +289,8 @@ protected void doBuildListingWithSnapshotDiff( FileStatus sourceStatus = sourceFS.getFileStatus(diff.getTarget()); if (sourceStatus.isDirectory()) { - if (LOG.isDebugEnabled()) { - LOG.debug("Adding source dir for traverse: " + - sourceStatus.getPath()); - } + LOG.debug("Adding source dir for traverse: {}", + sourceStatus.getPath()); HashSet excludeList = distCpSync.getTraverseExcludeList(diff.getSource(), @@ -298,8 +299,9 @@ protected void doBuildListingWithSnapshotDiff( ArrayList sourceDirs = new ArrayList<>(); sourceDirs.add(sourceStatus); - traverseDirectory(fileListWriter, sourceFS, sourceDirs, - sourceRoot, context, excludeList, fileStatuses); + new TraverseDirectory(fileListWriter, sourceFS, sourceDirs, + sourceRoot, context, excludeList, fileStatuses) + .traverseDirectory(); } } } @@ -366,9 +368,8 @@ protected void doBuildListing(SequenceFile.Writer fileListWriter, if (explore) { ArrayList sourceDirs = new ArrayList(); for (FileStatus sourceStatus: sourceFiles) { - if (LOG.isDebugEnabled()) { - LOG.debug("Recording source-path: " + sourceStatus.getPath() + " for copy."); - } + LOG.debug("Recording source-path: {} for copy.", + sourceStatus.getPath()); LinkedList sourceCopyListingStatus = DistCpUtils.toCopyListingFileStatus(sourceFS, sourceStatus, preserveAcls && sourceStatus.isDirectory(), @@ -384,14 +385,13 @@ protected void doBuildListing(SequenceFile.Writer fileListWriter, } } if (sourceStatus.isDirectory()) { - if (LOG.isDebugEnabled()) { - LOG.debug("Adding source dir for traverse: " + sourceStatus.getPath()); - } + LOG.debug("Adding source dir for traverse: {}", + sourceStatus.getPath()); sourceDirs.add(sourceStatus); } } - traverseDirectory(fileListWriter, sourceFS, sourceDirs, - sourcePathRoot, context, null, statusList); + new TraverseDirectory(fileListWriter, sourceFS, sourceDirs, + sourcePathRoot, context, null, statusList).traverseDirectory(); } } if (randomizeFileListing) { @@ -429,16 +429,12 @@ private void writeToFileListing(List fileStatusInfoList, */ Collections.shuffle(fileStatusInfoList, rnd); for (FileStatusInfo fileStatusInfo : fileStatusInfoList) { - if (LOG.isDebugEnabled()) { - LOG.debug("Adding " + fileStatusInfo.fileStatus.getPath()); - } + LOG.debug("Adding {}", fileStatusInfo.fileStatus.getPath()); writeToFileListing(fileListWriter, fileStatusInfo.fileStatus, fileStatusInfo.sourceRootPath); } - if (LOG.isDebugEnabled()) { - LOG.debug("Number of paths written to fileListing=" - + fileStatusInfoList.size()); - } + LOG.debug("Number of paths written to fileListing={}", + fileStatusInfoList.size()); fileStatusInfoList.clear(); } @@ -590,8 +586,8 @@ public WorkReport processItem( result = new WorkReport(getFileStatus(parent.getPath()), retry, true); } catch (FileNotFoundException fnf) { - LOG.error("FileNotFoundException exception in listStatus: " + - fnf.getMessage()); + LOG.error("FileNotFoundException exception in listStatus: {}", + fnf.getMessage()); result = new WorkReport(new FileStatus[0], retry, true, fnf); } catch (Exception e) { @@ -605,8 +601,7 @@ public WorkReport processItem( } private void printStats() { - LOG.info("Paths (files+dirs) cnt = " + totalPaths + - "; dirCnt = " + totalDirs); + LOG.info("Paths (files+dirs) cnt = {}; dirCnt = ", totalPaths, totalDirs); } private void maybePrintStats() { @@ -615,79 +610,6 @@ private void maybePrintStats() { } } - private void traverseDirectory(SequenceFile.Writer fileListWriter, - FileSystem sourceFS, - ArrayList sourceDirs, - Path sourcePathRoot, - DistCpContext context, - HashSet excludeList, - List fileStatuses) - throws IOException { - final boolean preserveAcls = context.shouldPreserve(FileAttribute.ACL); - final boolean preserveXAttrs = context.shouldPreserve(FileAttribute.XATTR); - final boolean preserveRawXattrs = context.shouldPreserveRawXattrs(); - - assert numListstatusThreads > 0; - if (LOG.isDebugEnabled()) { - LOG.debug("Starting thread pool of " + numListstatusThreads + - " listStatus workers."); - } - ProducerConsumer workers = - new ProducerConsumer(numListstatusThreads); - for (int i = 0; i < numListstatusThreads; i++) { - workers.addWorker( - new FileStatusProcessor(sourcePathRoot.getFileSystem(getConf()), - excludeList)); - } - - for (FileStatus status : sourceDirs) { - workers.put(new WorkRequest(status, 0)); - } - - while (workers.hasWork()) { - try { - WorkReport workResult = workers.take(); - int retry = workResult.getRetry(); - for (FileStatus child: workResult.getItem()) { - if (LOG.isDebugEnabled()) { - LOG.debug("Recording source-path: " + child.getPath() + " for copy."); - } - if (workResult.getSuccess()) { - LinkedList childCopyListingStatus = - DistCpUtils.toCopyListingFileStatus(sourceFS, child, - preserveAcls && child.isDirectory(), - preserveXAttrs && child.isDirectory(), - preserveRawXattrs && child.isDirectory(), - context.getBlocksPerChunk()); - - for (CopyListingFileStatus fs : childCopyListingStatus) { - if (randomizeFileListing) { - addToFileListing(fileStatuses, - new FileStatusInfo(fs, sourcePathRoot), fileListWriter); - } else { - writeToFileListing(fileListWriter, fs, sourcePathRoot); - } - } - } - if (retry < maxRetries) { - if (child.isDirectory()) { - if (LOG.isDebugEnabled()) { - LOG.debug("Traversing into source dir: " + child.getPath()); - } - workers.put(new WorkRequest(child, retry)); - } - } else { - LOG.error("Giving up on " + child.getPath() + - " after " + retry + " retries."); - } - } - } catch (InterruptedException ie) { - LOG.error("Could not get item from childQueue. Retrying..."); - } - } - workers.shutdown(); - } - private void writeToFileListingRoot(SequenceFile.Writer fileListWriter, LinkedList fileStatus, Path sourcePathRoot, DistCpContext context) throws IOException { @@ -697,9 +619,7 @@ private void writeToFileListingRoot(SequenceFile.Writer fileListWriter, if (fs.getPath().equals(sourcePathRoot) && fs.isDirectory() && syncOrOverwrite) { // Skip the root-paths when syncOrOverwrite - if (LOG.isDebugEnabled()) { - LOG.debug("Skip " + fs.getPath()); - } + LOG.debug("Skip {}", fs.getPath()); return; } writeToFileListing(fileListWriter, fs, sourcePathRoot); @@ -709,10 +629,9 @@ private void writeToFileListingRoot(SequenceFile.Writer fileListWriter, private void writeToFileListing(SequenceFile.Writer fileListWriter, CopyListingFileStatus fileStatus, Path sourcePathRoot) throws IOException { - if (LOG.isDebugEnabled()) { - LOG.debug("REL PATH: " + DistCpUtils.getRelativePath(sourcePathRoot, - fileStatus.getPath()) + ", FULL PATH: " + fileStatus.getPath()); - } + LOG.debug("REL PATH: {}, FULL PATH: {}", + DistCpUtils.getRelativePath(sourcePathRoot, fileStatus.getPath()), + fileStatus.getPath()); if (!shouldCopy(fileStatus.getPath())) { return; @@ -730,4 +649,159 @@ private void writeToFileListing(SequenceFile.Writer fileListWriter, totalPaths++; maybePrintStats(); } + + /** + * A utility class to traverse a directory. + */ + private final class TraverseDirectory { + + private SequenceFile.Writer fileListWriter; + private FileSystem sourceFS; + private ArrayList sourceDirs; + private Path sourcePathRoot; + private DistCpContext context; + private HashSet excludeList; + private List fileStatuses; + private final boolean preserveAcls; + private final boolean preserveXAttrs; + private final boolean preserveRawXattrs; + + private TraverseDirectory(SequenceFile.Writer fileListWriter, + FileSystem sourceFS, ArrayList sourceDirs, + Path sourcePathRoot, DistCpContext context, HashSet excludeList, + List fileStatuses) { + this.fileListWriter = fileListWriter; + this.sourceFS = sourceFS; + this.sourceDirs = sourceDirs; + this.sourcePathRoot = sourcePathRoot; + this.context = context; + this.excludeList = excludeList; + this.fileStatuses = fileStatuses; + this.preserveAcls = context.shouldPreserve(FileAttribute.ACL); + this.preserveXAttrs = context.shouldPreserve(FileAttribute.XATTR); + this.preserveRawXattrs = context.shouldPreserveRawXattrs(); + } + + public void traverseDirectory() throws IOException { + if (context.shouldUseIterator()) { + try (DurationInfo ignored = new DurationInfo(LOG, + "Building listing using iterator mode for %s", sourcePathRoot)) { + traverseDirectoryLegacy(); + } + } else { + try (DurationInfo ignored = new DurationInfo(LOG, + "Building listing using multi threaded approach for %s", + sourcePathRoot)) { + traverseDirectoryMultiThreaded(); + } + } + } + + public void traverseDirectoryMultiThreaded() throws IOException { + assert numListstatusThreads > 0; + + LOG.debug("Starting thread pool of {} listStatus workers.", + numListstatusThreads); + + ProducerConsumer workers = + new ProducerConsumer(numListstatusThreads); + try { + for (int i = 0; i < numListstatusThreads; i++) { + workers.addWorker( + new FileStatusProcessor(sourcePathRoot.getFileSystem(getConf()), + excludeList)); + } + + for (FileStatus status : sourceDirs) { + workers.put(new WorkRequest(status, 0)); + } + + while (workers.hasWork()) { + try { + WorkReport workResult = workers.take(); + int retry = workResult.getRetry(); + for (FileStatus child : workResult.getItem()) { + LOG.debug("Recording source-path: {} for copy.", child.getPath()); + boolean isChildDirectory = child.isDirectory(); + if (workResult.getSuccess()) { + LinkedList childCopyListingStatus = + DistCpUtils.toCopyListingFileStatus(sourceFS, child, + preserveAcls && isChildDirectory, + preserveXAttrs && isChildDirectory, + preserveRawXattrs && isChildDirectory, + context.getBlocksPerChunk()); + + for (CopyListingFileStatus fs : childCopyListingStatus) { + if (randomizeFileListing) { + addToFileListing(fileStatuses, + new FileStatusInfo(fs, sourcePathRoot), fileListWriter); + } else { + writeToFileListing(fileListWriter, fs, sourcePathRoot); + } + } + } + if (retry < maxRetries) { + if (isChildDirectory) { + LOG.debug("Traversing into source dir: {}", child.getPath()); + workers.put(new WorkRequest(child, retry)); + } + } else { + LOG.error("Giving up on {} after {} retries.", child.getPath(), + retry); + } + } + } catch (InterruptedException ie) { + LOG.error("Could not get item from childQueue. Retrying..."); + } + } + } finally { + workers.shutdown(); + } + } + + private void traverseDirectoryLegacy() throws IOException { + Stack pathStack = new Stack(); + for (FileStatus fs : sourceDirs) { + if (excludeList == null || !excludeList + .contains(fs.getPath().toUri().getPath())) { + pathStack.add(fs); + } + } + while (!pathStack.isEmpty()) { + prepareListing(pathStack.pop().getPath()); + } + } + + private void prepareListing(Path path) throws IOException { + LOG.debug("Recording source-path: {} for copy.", path); + RemoteIterator listStatus = RemoteIterators + .filteringRemoteIterator(sourceFS.listStatusIterator(path), + i -> excludeList == null || !excludeList + .contains(i.getPath().toUri().getPath())); + while (listStatus.hasNext()) { + FileStatus child = listStatus.next(); + LinkedList childCopyListingStatus = DistCpUtils + .toCopyListingFileStatus(sourceFS, child, + preserveAcls && child.isDirectory(), + preserveXAttrs && child.isDirectory(), + preserveRawXattrs && child.isDirectory(), + context.getBlocksPerChunk()); + for (CopyListingFileStatus fs : childCopyListingStatus) { + if (randomizeFileListing) { + addToFileListing(fileStatuses, + new FileStatusInfo(fs, sourcePathRoot), fileListWriter); + } else { + writeToFileListing(fileListWriter, fs, sourcePathRoot); + } + } + if (child.isDirectory()) { + LOG.debug("Traversing into source dir: {}", child.getPath()); + prepareListing(child.getPath()); + } + } + IOStatisticsLogging + .logIOStatisticsAtDebug(LOG, "RemoteIterator Statistics: {}", + listStatus); + } + } } diff --git a/hadoop-tools/hadoop-distcp/src/site/markdown/DistCp.md.vm b/hadoop-tools/hadoop-distcp/src/site/markdown/DistCp.md.vm index e82d8bc966..136b6c8ca1 100644 --- a/hadoop-tools/hadoop-distcp/src/site/markdown/DistCp.md.vm +++ b/hadoop-tools/hadoop-distcp/src/site/markdown/DistCp.md.vm @@ -362,6 +362,7 @@ Command Line Options | `-copybuffersize ` | Size of the copy buffer to use. By default, `` is set to 8192B | | | `-xtrack ` | Save information about missing source files to the specified path. | This option is only valid with `-update` option. This is an experimental property and it cannot be used with `-atomic` option. | | `-direct` | Write directly to destination paths | Useful for avoiding potentially very expensive temporary file rename operations when the destination is an object store | +| `-useiterator` | Uses single threaded listStatusIterator to build listing | Useful for saving memory at the client side. Using this option will ignore the numListstatusThreads option | Architecture of DistCp ---------------------- diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpOptions.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpOptions.java index 7382795dd9..13497029a0 100644 --- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpOptions.java +++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpOptions.java @@ -289,7 +289,7 @@ public void testToString() { "atomicWorkPath=null, logPath=null, sourceFileListing=abc, " + "sourcePaths=null, targetPath=xyz, filtersFile='null', " + "blocksPerChunk=0, copyBufferSize=8192, verboseLog=false, " + - "directWrite=false}"; + "directWrite=false, useiterator=false}"; String optionString = option.toString(); Assert.assertEquals(val, optionString); Assert.assertNotSame(DistCpOptionSwitch.ATOMIC_COMMIT.toString(), diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpSystem.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpSystem.java index 14cce42e0f..47b850f4ba 100644 --- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpSystem.java +++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpSystem.java @@ -48,9 +48,7 @@ import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; -import org.junit.Rule; import org.junit.Test; -import org.junit.rules.Timeout; /** * A JUnit test for copying files recursively. @@ -60,9 +58,6 @@ public class TestDistCpSystem { private static final Logger LOG = LoggerFactory.getLogger(TestDistCpSystem.class); - @Rule - public Timeout globalTimeout = new Timeout(30000); - private static final String SRCDAT = "srcdat"; private static final String DSTDAT = "dstdat"; private static final long BLOCK_SIZE = 1024; diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpWithRawXAttrs.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpWithRawXAttrs.java index 978ccdd3ea..e0e103bfe8 100644 --- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpWithRawXAttrs.java +++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestDistCpWithRawXAttrs.java @@ -29,9 +29,12 @@ import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.tools.ECAdmin; import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.tools.util.DistCpTestUtils; import org.apache.hadoop.util.ToolRunner; +import org.apache.hadoop.util.functional.RemoteIterators; +import org.assertj.core.api.Assertions; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; @@ -69,6 +72,7 @@ public class TestDistCpWithRawXAttrs { public static void init() throws Exception { conf = new Configuration(); conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_XATTRS_ENABLED_KEY, true); + conf.setInt(DFSConfigKeys.DFS_LIST_LIMIT, 2); cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).format(true) .build(); cluster.waitActive(); @@ -217,4 +221,23 @@ public void testPreserveEC() throws Exception { assertTrue("/dest/dir1/subdir1 is not erasure coded!", destSubDir1Status.isErasureCoded()); } + + @Test + public void testUseIterator() throws Exception { + + Path source = new Path("/src"); + Path dest = new Path("/dest"); + fs.delete(source, true); + fs.delete(dest, true); + // Create a source dir + fs.mkdirs(source); + + GenericTestUtils.createFiles(fs, source, 3, 10, 10); + + DistCpTestUtils.assertRunDistCp(DistCpConstants.SUCCESS, source.toString(), + dest.toString(), "-useiterator", conf); + + Assertions.assertThat(RemoteIterators.toList(fs.listFiles(dest, true))) + .describedAs("files").hasSize(1110); + } } diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/contract/AbstractContractDistCpTest.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/contract/AbstractContractDistCpTest.java index 1a40d78b26..202ead69a1 100644 --- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/contract/AbstractContractDistCpTest.java +++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/contract/AbstractContractDistCpTest.java @@ -44,7 +44,9 @@ import org.apache.hadoop.tools.DistCpOptions; import org.apache.hadoop.tools.mapred.CopyMapper; import org.apache.hadoop.tools.util.DistCpTestUtils; +import org.apache.hadoop.util.functional.RemoteIterators; +import org.assertj.core.api.Assertions; import org.junit.Before; import org.junit.Rule; import org.junit.Test; @@ -59,6 +61,7 @@ * under test. The tests in the suite cover both copying from local to remote * (e.g. a backup use case) and copying from remote to local (e.g. a restore use * case). + * The HDFS contract test needs to be run explicitly. */ public abstract class AbstractContractDistCpTest extends AbstractFSContractTestBase { @@ -613,6 +616,42 @@ public void testNonDirectWrite() throws Exception { directWrite(localFS, localDir, remoteFS, remoteDir, false); } + @Test + public void testDistCpWithIterator() throws Exception { + describe("Build listing in distCp using the iterator option."); + Path source = new Path(remoteDir, "src"); + Path dest = new Path(localDir, "dest"); + dest = localFS.makeQualified(dest); + mkdirs(remoteFS, source); + verifyPathExists(remoteFS, "", source); + + GenericTestUtils + .createFiles(remoteFS, source, getDepth(), getWidth(), getWidth()); + + DistCpTestUtils.assertRunDistCp(DistCpConstants.SUCCESS, source.toString(), + dest.toString(), "-useiterator", conf); + + Assertions + .assertThat(RemoteIterators.toList(localFS.listFiles(dest, true))) + .describedAs("files").hasSize(getTotalFiles()); + } + + public int getDepth() { + return 3; + } + + public int getWidth() { + return 10; + } + + private int getTotalFiles() { + int totalFiles = 0; + for (int i = 1; i <= getDepth(); i++) { + totalFiles += Math.pow(getWidth(), i); + } + return totalFiles; + } + /** * Executes a test with support for using direct write option. * diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/contract/OptionalTestHDFSContractDistCp.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/contract/OptionalTestHDFSContractDistCp.java new file mode 100644 index 0000000000..d8c7424079 --- /dev/null +++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/contract/OptionalTestHDFSContractDistCp.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.tools.contract; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.contract.AbstractFSContract; +import org.apache.hadoop.fs.contract.hdfs.HDFSContract; +import org.junit.AfterClass; +import org.junit.BeforeClass; + +import java.io.IOException; + +/** + * Verifies that the HDFS passes all the tests in + * {@link AbstractContractDistCpTest}. + * As such, it acts as an in-module validation of this contract test itself. + */ +public class OptionalTestHDFSContractDistCp extends AbstractContractDistCpTest { + + @BeforeClass + public static void createCluster() throws IOException { + HDFSContract.createCluster(); + } + + @AfterClass + public static void teardownCluster() throws IOException { + HDFSContract.destroyCluster(); + } + + @Override + protected AbstractFSContract createContract(Configuration conf) { + return new HDFSContract(conf); + } +} diff --git a/hadoop-tools/hadoop-distcp/src/test/resources/contract/hdfs.xml b/hadoop-tools/hadoop-distcp/src/test/resources/contract/hdfs.xml new file mode 100644 index 0000000000..3c9396f79a --- /dev/null +++ b/hadoop-tools/hadoop-distcp/src/test/resources/contract/hdfs.xml @@ -0,0 +1,139 @@ + + + + + + + fs.contract.test.root-tests-enabled + true + + + + fs.file.contract.test.random-seek-count + 500 + + + + fs.contract.is-case-sensitive + true + + + + fs.contract.supports-append + true + + + + fs.contract.supports-atomic-directory-delete + true + + + + fs.contract.supports-atomic-rename + true + + + + fs.contract.supports-block-locality + true + + + + fs.contract.supports-concat + true + + + + fs.contract.supports-seek + true + + + + fs.contract.rejects-seek-past-eof + true + + + + fs.contract.supports-strict-exceptions + true + + + + fs.contract.supports-unix-permissions + true + + + + fs.contract.rename-returns-false-if-dest-exists + true + + + + fs.contract.rename-returns-false-if-source-missing + true + + + + fs.contract.supports-settimes + true + + + + fs.contract.supports-getfilestatus + true + + + + fs.contract.supports-file-reference + true + + + + fs.contract.supports-content-check + true + + + + fs.contract.supports-unbuffer + true + + + + fs.contract.supports-hflush + true + + + + fs.contract.supports-hsync + true + + + + fs.contract.metadata_updated_on_hsync + false + + + + + dfs.namenode.fs-limits.min-block-size + 0 + +