From 1976e0066e9ae8852715fa69d8aea3769330e933 Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Thu, 15 Mar 2018 18:05:14 +0000 Subject: [PATCH] HADOOP-15209. DistCp to eliminate needless deletion of files under already-deleted directories. Contributed by Steve Loughran. --- .../hadoop/fs/contract/ContractTestUtils.java | 32 +- .../contract/s3a/ITestS3AContractDistCp.java | 17 + hadoop-tools/hadoop-azure-datalake/pom.xml | 19 + .../adl/live/TestAdlContractDistCpLive.java | 36 ++ .../org/apache/hadoop/tools/CopyListing.java | 3 +- .../hadoop/tools/CopyListingFileStatus.java | 13 +- .../apache/hadoop/tools/DistCpConstants.java | 20 +- .../hadoop/tools/DistCpOptionSwitch.java | 20 +- .../apache/hadoop/tools/DistCpOptions.java | 23 + .../apache/hadoop/tools/OptionsParser.java | 6 + .../hadoop/tools/mapred/CopyCommitter.java | 219 +++++++-- .../tools/mapred/DeletedDirTracker.java | 181 +++++++ .../apache/hadoop/tools/util/DistCpUtils.java | 41 +- .../contract/AbstractContractDistCpTest.java | 453 ++++++++++++++++-- .../contract/TestLocalContractDistCp.java | 36 ++ .../tools/mapred/TestCopyCommitter.java | 163 +++---- .../tools/mapred/TestDeletedDirTracker.java | 250 ++++++++++ .../hadoop/tools/util/TestDistCpUtils.java | 44 +- .../src/test/resources/contract/localfs.xml | 128 +++++ .../src/test/resources/log4j.properties | 13 +- 20 files changed, 1508 insertions(+), 209 deletions(-) create mode 100644 hadoop-tools/hadoop-azure-datalake/src/test/java/org/apache/hadoop/fs/adl/live/TestAdlContractDistCpLive.java create mode 100644 hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/DeletedDirTracker.java create mode 100644 hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/contract/TestLocalContractDistCp.java create mode 100644 hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestDeletedDirTracker.java create mode 100644 hadoop-tools/hadoop-distcp/src/test/resources/contract/localfs.xml diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java index d2cbca0198..54d015a955 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java @@ -228,9 +228,9 @@ public static byte[] readDataset(FileSystem fs, Path path, int len) public static void verifyFileContents(FileSystem fs, Path path, byte[] original) throws IOException { + assertIsFile(fs, path); FileStatus stat = fs.getFileStatus(path); String statText = stat.toString(); - assertTrue("not a file " + statText, stat.isFile()); assertEquals("wrong length " + statText, original.length, stat.getLen()); byte[] bytes = readDataset(fs, path, original.length); compareByteArrays(original, bytes, original.length); @@ -853,6 +853,36 @@ public static void assertIsFile(Path filename, FileStatus status) { status.isSymlink()); } + /** + * Assert that a varargs list of paths exist. + * @param fs filesystem + * @param message message for exceptions + * @param paths paths + * @throws IOException IO failure + */ + public static void assertPathsExist(FileSystem fs, + String message, + Path... paths) throws IOException { + for (Path path : paths) { + assertPathExists(fs, message, path); + } + } + + /** + * Assert that a varargs list of paths do not exist. + * @param fs filesystem + * @param message message for exceptions + * @param paths paths + * @throws IOException IO failure + */ + public static void assertPathsDoNotExist(FileSystem fs, + String message, + Path... paths) throws IOException { + for (Path path : paths) { + assertPathDoesNotExist(fs, message, path); + } + } + /** * Create a dataset for use in the tests; all data is in the range * base to (base+modulo-1) inclusive. diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractDistCp.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractDistCp.java index 8da8b6ad5b..b3d511ed6a 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractDistCp.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractDistCp.java @@ -18,11 +18,15 @@ package org.apache.hadoop.fs.contract.s3a; +import java.io.IOException; + import static org.apache.hadoop.fs.s3a.Constants.*; import static org.apache.hadoop.fs.s3a.S3ATestConstants.SCALE_TEST_TIMEOUT_MILLIS; import static org.apache.hadoop.fs.s3a.S3ATestUtils.maybeEnableS3Guard; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.s3a.FailureInjectionPolicy; import org.apache.hadoop.tools.contract.AbstractContractDistCpTest; /** @@ -57,4 +61,17 @@ protected Configuration createConfiguration() { protected S3AContract createContract(Configuration conf) { return new S3AContract(conf); } + + /** + * Always inject the delay path in, so if the destination is inconsistent, + * and uses this key, inconsistency triggered. + * @param filepath path string in + * @return path on the remote FS for distcp + * @throws IOException IO failure + */ + @Override + protected Path path(final String filepath) throws IOException { + Path path = super.path(filepath); + return new Path(path, FailureInjectionPolicy.DEFAULT_DELAY_KEY_SUBSTRING); + } } diff --git a/hadoop-tools/hadoop-azure-datalake/pom.xml b/hadoop-tools/hadoop-azure-datalake/pom.xml index 95eaef5f89..80203812de 100644 --- a/hadoop-tools/hadoop-azure-datalake/pom.xml +++ b/hadoop-tools/hadoop-azure-datalake/pom.xml @@ -147,5 +147,24 @@ ${okhttp.version} test + + + org.apache.hadoop + hadoop-mapreduce-client-jobclient + test + + + + org.apache.hadoop + hadoop-distcp + test + + + + org.apache.hadoop + hadoop-distcp + test + test-jar + diff --git a/hadoop-tools/hadoop-azure-datalake/src/test/java/org/apache/hadoop/fs/adl/live/TestAdlContractDistCpLive.java b/hadoop-tools/hadoop-azure-datalake/src/test/java/org/apache/hadoop/fs/adl/live/TestAdlContractDistCpLive.java new file mode 100644 index 0000000000..dae6e8087c --- /dev/null +++ b/hadoop-tools/hadoop-azure-datalake/src/test/java/org/apache/hadoop/fs/adl/live/TestAdlContractDistCpLive.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.hadoop.fs.adl.live; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.contract.AbstractFSContract; +import org.apache.hadoop.tools.contract.AbstractContractDistCpTest; + +/** + * Test DistCP operations. + */ +public class TestAdlContractDistCpLive extends AbstractContractDistCpTest { + + @Override + protected AbstractFSContract createContract(Configuration configuration) { + return new AdlStorageContract(configuration); + } + +} diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/CopyListing.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/CopyListing.java index 908b558047..9a40a4933a 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/CopyListing.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/CopyListing.java @@ -143,7 +143,6 @@ private void validateFinalListing(Path pathToListFile, DistCpContext context) throws DuplicateFileException, IOException { Configuration config = getConf(); - FileSystem fs = pathToListFile.getFileSystem(config); final boolean splitLargeFile = context.splitLargeFile(); @@ -153,7 +152,7 @@ private void validateFinalListing(Path pathToListFile, DistCpContext context) // is continuous. // Path checkPath = splitLargeFile? - pathToListFile : DistCpUtils.sortListing(fs, config, pathToListFile); + pathToListFile : DistCpUtils.sortListing(config, pathToListFile); SequenceFile.Reader reader = new SequenceFile.Reader( config, SequenceFile.Reader.file(checkPath)); diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/CopyListingFileStatus.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/CopyListingFileStatus.java index 138b491189..e01a6b15bd 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/CopyListingFileStatus.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/CopyListingFileStatus.java @@ -27,6 +27,7 @@ import java.util.Map.Entry; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.AclEntry; @@ -46,8 +47,18 @@ /** * CopyListingFileStatus is a view of {@link FileStatus}, recording additional * data members useful to distcp. + * + * This is the datastructure persisted in the sequence files generated + * in the CopyCommitter when deleting files. + * Any tool working with these generated files needs to be aware of an + * important stability guarantee: there is none; expect it to change + * across minor Hadoop releases without any support for reading the files of + * different versions. + * Tools parsing the listings must be built and tested against the point + * release of Hadoop which they intend to support. */ -@InterfaceAudience.Private +@InterfaceAudience.LimitedPrivate("Distcp support tools") +@InterfaceStability.Unstable public final class CopyListingFileStatus implements Writable { private static final byte NO_ACL_ENTRIES = -1; diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java index 0bae5d5034..212256ccfd 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java @@ -18,12 +18,19 @@ package org.apache.hadoop.tools; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.fs.Path; /** * Utility class to hold commonly used constants. */ -public class DistCpConstants { +@InterfaceAudience.LimitedPrivate("Distcp support tools") +@InterfaceStability.Evolving +public final class DistCpConstants { + + private DistCpConstants() { + } /* Default number of threads to use for building file listing */ public static final int DEFAULT_LISTSTATUS_THREADS = 1; @@ -52,6 +59,8 @@ public class DistCpConstants { "distcp.preserve.rawxattrs"; public static final String CONF_LABEL_SYNC_FOLDERS = "distcp.sync.folders"; public static final String CONF_LABEL_DELETE_MISSING = "distcp.delete.missing.source"; + public static final String CONF_LABEL_TRACK_MISSING = + "distcp.track.missing.source"; public static final String CONF_LABEL_LISTSTATUS_THREADS = "distcp.liststatus.threads"; public static final String CONF_LABEL_MAX_MAPS = "distcp.max.maps"; public static final String CONF_LABEL_SOURCE_LISTING = "distcp.source.listing"; @@ -148,4 +157,13 @@ public class DistCpConstants { static final String HDFS_DISTCP_DIFF_DIRECTORY_NAME = ".distcp.diff.tmp"; public static final int COPY_BUFFER_SIZE_DEFAULT = 8 * 1024; + + /** Filename of sorted files in when tracking saves them. */ + public static final String SOURCE_SORTED_FILE = "source_sorted.seq"; + + /** Filename of unsorted target listing. */ + public static final String TARGET_LISTING_FILE = "target_listing.seq"; + + /** Filename of sorted target listing. */ + public static final String TARGET_SORTED_FILE = "target_sorted.seq"; } diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java index faef7e6eda..3ce12b264d 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java @@ -19,6 +19,7 @@ package org.apache.hadoop.tools; import org.apache.commons.cli.Option; +import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; /** @@ -63,10 +64,10 @@ public enum DistCpOptionSwitch { */ SYNC_FOLDERS(DistCpConstants.CONF_LABEL_SYNC_FOLDERS, new Option("update", false, "Update target, copying only missing" + - "files or directories")), + " files or directories")), /** - * Deletes missing files in target that are missing from source + * Deletes missing files in target that are missing from source. * This allows the target to be in sync with the source contents * Typically used in conjunction with SYNC_FOLDERS * Incompatible with ATOMIC_COMMIT @@ -74,6 +75,21 @@ public enum DistCpOptionSwitch { DELETE_MISSING(DistCpConstants.CONF_LABEL_DELETE_MISSING, new Option("delete", false, "Delete from target, " + "files missing in source. Delete is applicable only with update or overwrite options")), + + /** + * Track missing files in target that are missing from source + * This allows for other applications to complete the synchronization, + * possibly with object-store-specific delete algorithms. + * Typically used in conjunction with SYNC_FOLDERS + * Incompatible with ATOMIC_COMMIT + */ + @InterfaceStability.Unstable + TRACK_MISSING(DistCpConstants.CONF_LABEL_TRACK_MISSING, + new Option("xtrack", true, + "Save information about missing source files to the" + + " specified directory")), + + /** * Number of threads for building source file listing (before map-reduce * phase, max one listStatus per thread at a time). diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java index f33f7fdcb9..ea99016b2c 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java @@ -24,6 +24,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.tools.util.DistCpUtils; @@ -43,6 +45,8 @@ * * This class is immutable. */ +@InterfaceAudience.Public +@InterfaceStability.Evolving public final class DistCpOptions { private static final Logger LOG = LoggerFactory.getLogger(Builder.class); public static final int MAX_NUM_LISTSTATUS_THREADS = 40; @@ -68,6 +72,9 @@ public final class DistCpOptions { /** Whether source and target folder contents be sync'ed up. */ private final boolean syncFolder; + /** Path to save source/dest sequence files to, if non-null. */ + private final Path trackPath; + /** Whether files only present in target should be deleted. */ private boolean deleteMissing; @@ -208,6 +215,7 @@ private DistCpOptions(Builder builder) { this.copyBufferSize = builder.copyBufferSize; this.verboseLog = builder.verboseLog; + this.trackPath = builder.trackPath; } public Path getSourceFileListing() { @@ -331,6 +339,10 @@ public boolean shouldVerboseLog() { return verboseLog; } + public Path getTrackPath() { + return trackPath; + } + /** * Add options to configuration. These will be used in the Mapper/committer * @@ -371,6 +383,11 @@ public void appendToConf(Configuration conf) { String.valueOf(copyBufferSize)); DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.VERBOSE_LOG, String.valueOf(verboseLog)); + if (trackPath != null) { + DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.TRACK_MISSING, + String.valueOf(trackPath)); + } + } /** @@ -441,6 +458,7 @@ public static class Builder { private String filtersFile; private Path logPath; + private Path trackPath; private String copyStrategy = DistCpConstants.UNIFORMSIZE; private int numListstatusThreads = 0; // 0 indicates that flag is not set. @@ -641,6 +659,11 @@ public Builder withLogPath(Path newLogPath) { return this; } + public Builder withTrackMissing(Path path) { + this.trackPath = path; + return this; + } + public Builder withCopyStrategy(String newCopyStrategy) { this.copyStrategy = newCopyStrategy; return this; diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java index 606ed3254f..668b594be6 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java @@ -145,6 +145,12 @@ public static DistCpOptions parse(String[] args) builder.withAtomicWorkPath(new Path(workPath)); } } + if (command.hasOption(DistCpOptionSwitch.TRACK_MISSING.getSwitch())) { + builder.withTrackMissing( + new Path(getVal( + command, + DistCpOptionSwitch.TRACK_MISSING.getSwitch()))); + } if (command.hasOption(DistCpOptionSwitch.BANDWIDTH.getSwitch())) { try { diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyCommitter.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyCommitter.java index 81c2be7e05..07eacb0483 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyCommitter.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyCommitter.java @@ -18,8 +18,9 @@ package org.apache.hadoop.tools.mapred; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -49,6 +50,8 @@ import java.util.LinkedList; import java.util.List; +import static org.apache.hadoop.tools.DistCpConstants.*; + /** * The CopyCommitter class is DistCp's OutputCommitter implementation. It is * responsible for handling the completion/cleanup of the DistCp run. @@ -62,7 +65,8 @@ * 5. Cleanup of any partially copied files, from previous, failed attempts. */ public class CopyCommitter extends FileOutputCommitter { - private static final Log LOG = LogFactory.getLog(CopyCommitter.class); + private static final Logger LOG = + LoggerFactory.getLogger(CopyCommitter.class); private final TaskAttemptContext taskAttemptContext; private boolean syncFolder = false; @@ -111,6 +115,9 @@ public void commitJob(JobContext jobContext) throws IOException { deleteMissing(conf); } else if (conf.getBoolean(DistCpConstants.CONF_LABEL_ATOMIC_COPY, false)) { commitData(conf); + } else if (conf.get(CONF_LABEL_TRACK_MISSING) != null) { + // save missing information to a directory + trackMissing(conf); } taskAttemptContext.setStatus("Commit Successful"); } @@ -334,40 +341,64 @@ private void preserveFileAttributesForDirectories(Configuration conf) LOG.info("Preserved status on " + preservedEntries + " dir entries on target"); } - // This method deletes "extra" files from the target, if they're not - // available at the source. + /** + * Track all the missing files by saving the listings to the tracking + * directory. + * This is the same as listing phase of the + * {@link #deleteMissing(Configuration)} operation. + * @param conf configuration to read options from, and for FS instantiation. + * @throws IOException IO failure + */ + private void trackMissing(Configuration conf) throws IOException { + // destination directory for all output files + Path trackDir = new Path( + conf.get(DistCpConstants.CONF_LABEL_TRACK_MISSING)); + + // where is the existing source listing? + Path sourceListing = new Path( + conf.get(DistCpConstants.CONF_LABEL_LISTING_FILE_PATH)); + LOG.info("Tracking file changes to directory {}", trackDir); + + // the destination path is under the track directory + Path sourceSortedListing = new Path(trackDir, + DistCpConstants.SOURCE_SORTED_FILE); + LOG.info("Source listing {}", sourceSortedListing); + + DistCpUtils.sortListing(conf, sourceListing, sourceSortedListing); + + // Similarly, create the listing of target-files. Sort alphabetically. + // target listing will be deleted after the sort + Path targetListing = new Path(trackDir, TARGET_LISTING_FILE); + Path sortedTargetListing = new Path(trackDir, TARGET_SORTED_FILE); + // list the target + listTargetFiles(conf, targetListing, sortedTargetListing); + LOG.info("Target listing {}", sortedTargetListing); + + targetListing.getFileSystem(conf).delete(targetListing, false); + } + + /** + * Deletes "extra" files and directories from the target, if they're not + * available at the source. + * @param conf configuration to read options from, and for FS instantiation. + * @throws IOException IO failure + */ private void deleteMissing(Configuration conf) throws IOException { LOG.info("-delete option is enabled. About to remove entries from " + "target that are missing in source"); + long listingStart = System.currentTimeMillis(); // Sort the source-file listing alphabetically. Path sourceListing = new Path(conf.get(DistCpConstants.CONF_LABEL_LISTING_FILE_PATH)); FileSystem clusterFS = sourceListing.getFileSystem(conf); - Path sortedSourceListing = DistCpUtils.sortListing(clusterFS, conf, sourceListing); + Path sortedSourceListing = DistCpUtils.sortListing(conf, sourceListing); // Similarly, create the listing of target-files. Sort alphabetically. Path targetListing = new Path(sourceListing.getParent(), "targetListing.seq"); - CopyListing target = new GlobbedCopyListing(new Configuration(conf), null); + Path sortedTargetListing = new Path(targetListing.toString() + "_sorted"); - List targets = new ArrayList(1); - Path targetFinalPath = new Path(conf.get(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH)); - targets.add(targetFinalPath); - Path resultNonePath = Path.getPathWithoutSchemeAndAuthority(targetFinalPath) - .toString().startsWith(DistCpConstants.HDFS_RESERVED_RAW_DIRECTORY_NAME) - ? DistCpConstants.RAW_NONE_PATH : DistCpConstants.NONE_PATH; - // - // Set up options to be the same from the CopyListing.buildListing's perspective, - // so to collect similar listings as when doing the copy - // - DistCpOptions options = new DistCpOptions.Builder(targets, resultNonePath) - .withOverwrite(overwrite) - .withSyncFolder(syncFolder) - .build(); - DistCpContext distCpContext = new DistCpContext(options); - distCpContext.setTargetPathExists(targetPathExists); - - target.buildListing(targetListing, distCpContext); - Path sortedTargetListing = DistCpUtils.sortListing(clusterFS, conf, targetListing); + Path targetFinalPath = listTargetFiles(conf, + targetListing, sortedTargetListing); long totalLen = clusterFS.getFileStatus(sortedTargetListing).getLen(); SequenceFile.Reader sourceReader = new SequenceFile.Reader(conf, @@ -377,41 +408,153 @@ private void deleteMissing(Configuration conf) throws IOException { // Walk both source and target file listings. // Delete all from target that doesn't also exist on source. + long deletionStart = System.currentTimeMillis(); + LOG.info("Listing completed in {}", + formatDuration(deletionStart - listingStart)); + long deletedEntries = 0; + long filesDeleted = 0; + long missingDeletes = 0; + long failedDeletes = 0; + long skippedDeletes = 0; + long deletedDirectories = 0; + // this is an arbitrary constant. + final DeletedDirTracker tracker = new DeletedDirTracker(1000); try { CopyListingFileStatus srcFileStatus = new CopyListingFileStatus(); Text srcRelPath = new Text(); CopyListingFileStatus trgtFileStatus = new CopyListingFileStatus(); Text trgtRelPath = new Text(); - FileSystem targetFS = targetFinalPath.getFileSystem(conf); + final FileSystem targetFS = targetFinalPath.getFileSystem(conf); + boolean showProgress; boolean srcAvailable = sourceReader.next(srcRelPath, srcFileStatus); while (targetReader.next(trgtRelPath, trgtFileStatus)) { // Skip sources that don't exist on target. while (srcAvailable && trgtRelPath.compareTo(srcRelPath) > 0) { srcAvailable = sourceReader.next(srcRelPath, srcFileStatus); } + Path targetEntry = trgtFileStatus.getPath(); + LOG.debug("Comparing {} and {}", + srcFileStatus.getPath(), targetEntry); if (srcAvailable && trgtRelPath.equals(srcRelPath)) continue; - // Target doesn't exist at source. Delete. - boolean result = targetFS.delete(trgtFileStatus.getPath(), true) - || !targetFS.exists(trgtFileStatus.getPath()); - if (result) { - LOG.info("Deleted " + trgtFileStatus.getPath() + " - Missing at source"); - deletedEntries++; + // Target doesn't exist at source. Try to delete it. + if (tracker.shouldDelete(trgtFileStatus)) { + showProgress = true; + try { + if (targetFS.delete(targetEntry, true)) { + // the delete worked. Unless the file is actually missing, this is the + LOG.info("Deleted " + targetEntry + " - missing at source"); + deletedEntries++; + if (trgtFileStatus.isDirectory()) { + deletedDirectories++; + } else { + filesDeleted++; + } + } else { + // delete returned false. + // For all the filestores which implement the FS spec properly, + // this means "the file wasn't there". + // so track but don't worry about it. + LOG.info("delete({}) returned false ({})", + targetEntry, trgtFileStatus); + missingDeletes++; + } + } catch (IOException e) { + if (!ignoreFailures) { + throw e; + } else { + // failed to delete, but ignoring errors. So continue + LOG.info("Failed to delete {}, ignoring exception {}", + targetEntry, e.toString()); + LOG.debug("Failed to delete {}", targetEntry, e); + // count and break out the loop + failedDeletes++; + } + } } else { - throw new IOException("Unable to delete " + trgtFileStatus.getPath()); + LOG.debug("Skipping deletion of {}", targetEntry); + skippedDeletes++; + showProgress = false; + } + if (showProgress) { + // update progress if there's been any FS IO/files deleted. + taskAttemptContext.progress(); + taskAttemptContext.setStatus("Deleting removed files from target. [" + + targetReader.getPosition() * 100 / totalLen + "%]"); } - taskAttemptContext.progress(); - taskAttemptContext.setStatus("Deleting missing files from target. [" + - targetReader.getPosition() * 100 / totalLen + "%]"); } + // if the FS toString() call prints statistics, they get logged here + LOG.info("Completed deletion of files from {}", targetFS); } finally { IOUtils.closeStream(sourceReader); IOUtils.closeStream(targetReader); } - LOG.info("Deleted " + deletedEntries + " from target: " + targets.get(0)); + long deletionEnd = System.currentTimeMillis(); + long deletedFileCount = deletedEntries - deletedDirectories; + LOG.info("Deleted from target: files: {} directories: {};" + + " skipped deletions {}; deletions already missing {};" + + " failed deletes {}", + deletedFileCount, deletedDirectories, skippedDeletes, + missingDeletes, failedDeletes); + LOG.info("Number of tracked deleted directories {}", tracker.size()); + LOG.info("Duration of deletions: {}", + formatDuration(deletionEnd - deletionStart)); + LOG.info("Total duration of deletion operation: {}", + formatDuration(deletionEnd - listingStart)); + } + + /** + * Take a duration and return a human-readable duration of + * hours:minutes:seconds.millis. + * @param duration to process + * @return a string for logging. + */ + private String formatDuration(long duration) { + + long seconds = duration > 0 ? (duration / 1000) : 0; + long minutes = (seconds / 60); + long hours = (minutes / 60); + return String.format("%d:%02d:%02d.%03d", + hours, minutes % 60, seconds % 60, duration % 1000); + } + + /** + * Build a listing of the target files, sorted and unsorted. + * @param conf configuration to work with + * @param targetListing target listing + * @param sortedTargetListing sorted version of the listing + * @return the target path of the operation + * @throws IOException IO failure. + */ + private Path listTargetFiles(final Configuration conf, + final Path targetListing, + final Path sortedTargetListing) throws IOException { + CopyListing target = new GlobbedCopyListing(new Configuration(conf), null); + Path targetFinalPath = new Path( + conf.get(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH)); + List targets = new ArrayList<>(1); + targets.add(targetFinalPath); + Path resultNonePath = Path.getPathWithoutSchemeAndAuthority(targetFinalPath) + .toString().startsWith(DistCpConstants.HDFS_RESERVED_RAW_DIRECTORY_NAME) + ? DistCpConstants.RAW_NONE_PATH + : DistCpConstants.NONE_PATH; + // + // Set up options to be the same from the CopyListing.buildListing's + // perspective, so to collect similar listings as when doing the copy + // + DistCpOptions options = new DistCpOptions.Builder(targets, resultNonePath) + .withOverwrite(overwrite) + .withSyncFolder(syncFolder) + .build(); + DistCpContext distCpContext = new DistCpContext(options); + distCpContext.setTargetPathExists(targetPathExists); + + target.buildListing(targetListing, distCpContext); + DistCpUtils.sortListing(conf, targetListing, sortedTargetListing); + return targetFinalPath; } private void commitData(Configuration conf) throws IOException { diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/DeletedDirTracker.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/DeletedDirTracker.java new file mode 100644 index 0000000000..64431f7e54 --- /dev/null +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/DeletedDirTracker.java @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.tools.mapred; + +import com.google.common.base.Preconditions; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.tools.CopyListingFileStatus; + +/** + * Track deleted directories and support queries to + * check for add them. + * + * Assumptions. + *
    + * + *
  1. Deep directory trees are being deleted.
  2. + *
  3. The total number of directories deleted is very much + * less than the number of files.
  4. + *
  5. Most deleted files are in directories which have + * been deleted.
  6. + *
  7. The cost of issuing a delete() call is less than that that + * of creating Path entries for parent directories and looking them + * up in a hash table.
  8. + *
  9. That a modest cache is sufficient to identify whether or not + * a parent directory has been deleted./li> + *
  10. And that if a path has been evicted from a path, the cost of + * the extra deletions incurred is not significant.
  11. + *
+ * + * The directory structure this algorithm is intended to optimize for is + * the deletion of datasets partitioned/bucketed into a directory tree, + * and deleted in bulk. + * + * The ordering of deletions comes from the merge sort of the copy listings; + * we rely on this placing a path "/dir1" ahead of "/dir1/file1", + * "/dir1/dir2/file2", and other descendants. + * We do not rely on parent entries being added immediately before children, + * as sorting may place "/dir12" between "/dir1" and its descendants. + * + * Algorithm + * + *
    + *
  1. + * Before deleting a directory or file, a check is made to see if an + * ancestor is in the cache of deleted directories. + *
  2. + *
  3. + * If an ancestor is found is: skip the delete. + *
  4. + *
  5. + * If an ancestor is not foundI: delete the file/dir. + *
  6. + *
  7. + * When the entry probed is a directory, it is always added to the cache of + * directories, irrespective of the search for an ancestor. + * This is to speed up scans of files directly underneath the path. + *
  8. + *
+ * + * + */ +final class DeletedDirTracker { + + /** + * An LRU cache of directories. + */ + private final Cache directories; + + /** + * Maximum size of the cache. + */ + private final int cacheSize; + + /** + * Create an instance. + * @param cacheSize maximum cache size. + */ + DeletedDirTracker(int cacheSize) { + this.cacheSize = cacheSize; + directories = CacheBuilder.newBuilder() + .maximumSize(this.cacheSize) + .build(); + } + + /** + * Recursive scan for a directory being in the cache of deleted paths. + * @param dir directory to look for. + * @return true iff the path or a parent is in the cache. + */ + boolean isDirectoryOrAncestorDeleted(Path dir) { + if (dir == null) { + // at root + return false; + } else if (isContained(dir)) { + // cache hit + return true; + } else { + // cache miss, check parent + return isDirectoryOrAncestorDeleted(dir.getParent()); + } + } + + /** + * Probe for a path being deleted by virtue of the fact that an + * ancestor dir has already been deleted. + * @param path path to check + * @return true if the parent dir is deleted. + */ + private boolean isInDeletedDirectory(Path path) { + Preconditions.checkArgument(!path.isRoot(), "Root Dir"); + return isDirectoryOrAncestorDeleted(path.getParent()); + } + + /** + * Should a file or directory be deleted? + * The cache of deleted directories will be updated with the path + * of the status if it references a directory. + * @param status file/path to check + * @return true if the path should be deleted. + */ + boolean shouldDelete(CopyListingFileStatus status) { + Path path = status.getPath(); + Preconditions.checkArgument(!path.isRoot(), "Root Dir"); + if (status.isDirectory()) { + boolean deleted = isDirectoryOrAncestorDeleted(path); + // even if an ancestor has been deleted, add this entry as + // a deleted directory. + directories.put(path, path); + return !deleted; + } else { + return !isInDeletedDirectory(path); + } + } + + /** + * Is a path directly contained in the set of deleted directories. + * @param dir directory to probe + * @return true if this directory is recorded as being deleted. + */ + boolean isContained(Path dir) { + return directories.getIfPresent(dir) != null; + } + + @Override + public String toString() { + final StringBuilder sb = new StringBuilder( + "DeletedDirTracker{"); + sb.append("maximum size=").append(cacheSize); + sb.append("; current size=").append(directories.size()); + sb.append('}'); + return sb.toString(); + } + + /** + * Return the current size of the tracker, as in #of entries in the cache. + * @return tracker size. + */ + long size() { + return directories.size(); + } +} diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/DistCpUtils.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/DistCpUtils.java index eba4beeba4..2a60e80112 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/DistCpUtils.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/DistCpUtils.java @@ -433,24 +433,45 @@ public static CopyListingFileStatus toCopyListingFileStatusHelper( } /** - * Sort sequence file containing FileStatus and Text as key and value respecitvely + * Sort sequence file containing FileStatus and Text as key and value + * respectively. * - * @param fs - File System * @param conf - Configuration * @param sourceListing - Source listing file * @return Path of the sorted file. Is source file with _sorted appended to the name * @throws IOException - Any exception during sort. */ - public static Path sortListing(FileSystem fs, Configuration conf, Path sourceListing) + public static Path sortListing(Configuration conf, + Path sourceListing) throws IOException { + Path output = new Path(sourceListing.toString() + "_sorted"); + sortListing(conf, sourceListing, output); + return output; + } + + /** + * Sort sequence file containing FileStatus and Text as key and value + * respectively, saving the result to the {@code output} path, which + * will be deleted first. + * + * @param conf - Configuration + * @param sourceListing - Source listing file + * @param output output path + * @throws IOException - Any exception during sort. + */ + + public static void sortListing(final Configuration conf, + final Path sourceListing, + final Path output) throws IOException { + FileSystem fs = sourceListing.getFileSystem(conf); + // force verify that the destination FS matches the input + fs.makeQualified(output); SequenceFile.Sorter sorter = new SequenceFile.Sorter(fs, Text.class, CopyListingFileStatus.class, conf); - Path output = new Path(sourceListing.toString() + "_sorted"); fs.delete(output, false); sorter.sort(sourceListing, output); - return output; } /** @@ -547,9 +568,13 @@ public static boolean checksumsAreEqual(FileSystem sourceFS, Path source, throws IOException { FileChecksum targetChecksum = null; try { - sourceChecksum = sourceChecksum != null ? sourceChecksum : sourceFS - .getFileChecksum(source); - targetChecksum = targetFS.getFileChecksum(target); + sourceChecksum = sourceChecksum != null + ? sourceChecksum + : sourceFS.getFileChecksum(source); + if (sourceChecksum != null) { + // iff there's a source checksum, look for one at the destination. + targetChecksum = targetFS.getFileChecksum(target); + } } catch (IOException e) { LOG.error("Unable to retrieve checksum for " + source + " or " + target, e); } diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/contract/AbstractContractDistCpTest.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/contract/AbstractContractDistCpTest.java index fb1a64db18..a5e0a03357 100644 --- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/contract/AbstractContractDistCpTest.java +++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/contract/AbstractContractDistCpTest.java @@ -20,22 +20,35 @@ import static org.apache.hadoop.fs.contract.ContractTestUtils.*; +import java.io.IOException; import java.util.Collections; +import java.util.HashMap; +import java.util.Map; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.fs.contract.AbstractFSContractTestBase; import org.apache.hadoop.fs.contract.ContractTestUtils; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Counter; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.tools.CopyListingFileStatus; import org.apache.hadoop.tools.DistCp; +import org.apache.hadoop.tools.DistCpConstants; import org.apache.hadoop.tools.DistCpOptions; +import org.apache.hadoop.tools.mapred.CopyMapper; import org.junit.Before; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TestName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Contract test suite covering a file system's integration with DistCp. The @@ -48,13 +61,70 @@ public abstract class AbstractContractDistCpTest extends AbstractFSContractTestBase { + private static final Logger LOG = + LoggerFactory.getLogger(AbstractContractDistCpTest.class); + + public static final String SCALE_TEST_DISTCP_FILE_SIZE_KB + = "scale.test.distcp.file.size.kb"; + + public static final int DEFAULT_DISTCP_SIZE_KB = 1024; + + protected static final int MB = 1024 * 1024; + @Rule public TestName testName = new TestName(); + /** + * The timeout value is extended over the default so that large updates + * are allowed to take time, especially to remote stores. + * @return the current test timeout + */ + protected int getTestTimeoutMillis() { + return 15 * 60 * 1000; + } + private Configuration conf; private FileSystem localFS, remoteFS; private Path localDir, remoteDir; + private Path inputDir; + + private Path inputSubDir1; + + private Path inputSubDir2; + + private Path inputSubDir4; + + private Path inputFile1; + + private Path inputFile2; + + private Path inputFile3; + + private Path inputFile4; + + private Path inputFile5; + + private Path outputDir; + + private Path outputSubDir1; + + private Path outputSubDir2; + + private Path outputSubDir4; + + private Path outputFile1; + + private Path outputFile2; + + private Path outputFile3; + + private Path outputFile4; + + private Path outputFile5; + + private Path inputDirUnderOutputDir; + @Override protected Configuration createConfiguration() { Configuration newConf = new Configuration(); @@ -73,20 +143,307 @@ public void setup() throws Exception { // All paths are fully qualified including scheme (not taking advantage of // default file system), so if something fails, the messages will make it // clear which paths are local and which paths are remote. - Path testSubDir = new Path(getClass().getSimpleName(), - testName.getMethodName()); - localDir = localFS.makeQualified(new Path(new Path( - GenericTestUtils.getTestDir().toURI()), testSubDir)); + String className = getClass().getSimpleName(); + String testSubDir = className + "/" + testName.getMethodName(); + localDir = + localFS.makeQualified(new Path(new Path( + GenericTestUtils.getTestDir().toURI()), testSubDir + "/local")); mkdirs(localFS, localDir); - remoteDir = remoteFS.makeQualified( - new Path(getContract().getTestPath(), testSubDir)); + remoteDir = path(testSubDir + "/remote"); mkdirs(remoteFS, remoteDir); + // test teardown does this, but IDE-based test debugging can skip + // that teardown; this guarantees the initial state is clean + remoteFS.delete(remoteDir, true); + localFS.delete(localDir, true); + } + + /** + * Set up both input and output fields. + * @param src source tree + * @param dest dest tree + */ + protected void initPathFields(final Path src, final Path dest) { + initInputFields(src); + initOutputFields(dest); + } + + /** + * Output field setup. + * @param path path to set up + */ + protected void initOutputFields(final Path path) { + outputDir = new Path(path, "outputDir"); + inputDirUnderOutputDir = new Path(outputDir, "inputDir"); + outputFile1 = new Path(inputDirUnderOutputDir, "file1"); + outputSubDir1 = new Path(inputDirUnderOutputDir, "subDir1"); + outputFile2 = new Path(outputSubDir1, "file2"); + outputSubDir2 = new Path(inputDirUnderOutputDir, "subDir2/subDir2"); + outputFile3 = new Path(outputSubDir2, "file3"); + outputSubDir4 = new Path(inputDirUnderOutputDir, "subDir4/subDir4"); + outputFile4 = new Path(outputSubDir4, "file4"); + outputFile5 = new Path(outputSubDir4, "file5"); + } + + /** + * this path setup is used across different methods (copy, update, track) + * so they are set up as fields. + * @param srcDir source directory for these to go under. + */ + protected void initInputFields(final Path srcDir) { + inputDir = new Path(srcDir, "inputDir"); + inputFile1 = new Path(inputDir, "file1"); + inputSubDir1 = new Path(inputDir, "subDir1"); + inputFile2 = new Path(inputSubDir1, "file2"); + inputSubDir2 = new Path(inputDir, "subDir2/subDir2"); + inputFile3 = new Path(inputSubDir2, "file3"); + inputSubDir4 = new Path(inputDir, "subDir4/subDir4"); + inputFile4 = new Path(inputSubDir4, "file4"); + inputFile5 = new Path(inputSubDir4, "file5"); + } + + protected FileSystem getLocalFS() { + return localFS; + } + + protected FileSystem getRemoteFS() { + return remoteFS; + } + + protected Path getLocalDir() { + return localDir; + } + + protected Path getRemoteDir() { + return remoteDir; } @Test - public void deepDirectoryStructureToRemote() throws Exception { + public void testUpdateDeepDirectoryStructureToRemote() throws Exception { + describe("update a deep directory structure from local to remote"); + distCpDeepDirectoryStructure(localFS, localDir, remoteFS, remoteDir); + distCpUpdateDeepDirectoryStructure(inputDirUnderOutputDir); + } + + @Test + public void testUpdateDeepDirectoryStructureNoChange() throws Exception { + describe("update an unchanged directory structure" + + " from local to remote; expect no copy"); + Path target = distCpDeepDirectoryStructure(localFS, localDir, remoteFS, + remoteDir); + describe("\nExecuting Update\n"); + Job job = distCpUpdate(localDir, target); + assertCounterInRange(job, CopyMapper.Counter.SKIP, 1, -1); + assertCounterInRange(job, CopyMapper.Counter.BYTESCOPIED, 0, 0); + } + + /** + * Assert that a counter is in a range; min and max values are inclusive. + * @param job job to query + * @param counter counter to examine + * @param min min value, if negative "no minimum" + * @param max max value, if negative "no maximum" + * @throws IOException IO problem + */ + void assertCounterInRange(Job job, Enum counter, long min, long max) + throws IOException { + Counter c = job.getCounters().findCounter(counter); + long value = c.getValue(); + String description = + String.format("%s value %s", c.getDisplayName(), value); + + if (min >= 0) { + assertTrue(description + " too below minimum " + min, + value >= min); + } + if (max >= 0) { + assertTrue(description + " above maximum " + max, + value <= max); + } + } + + /** + * Do a distcp from the local source to the destination filesystem. + * This is executed as part of + * {@link #testUpdateDeepDirectoryStructureToRemote()}; it's designed to be + * overidden or wrapped by subclasses which wish to add more assertions. + * + * Life is complicated here by the way that the src/dest paths + * on a distcp is different with -update. + * @param destDir output directory used by the initial distcp + * @return the distcp job + */ + protected Job distCpUpdateDeepDirectoryStructure(final Path destDir) + throws Exception { + describe("Now do an incremental update with deletion of missing files"); + Path srcDir = inputDir; + LOG.info("Source directory = {}, dest={}", srcDir, destDir); + + ContractTestUtils.assertPathsExist(localFS, + "Paths for test are wrong", + inputFile1, inputFile2, inputFile3, inputFile4, inputFile5); + + modifySourceDirectories(); + + Job job = distCpUpdate(srcDir, destDir); + + Path outputFileNew1 = new Path(outputSubDir2, "newfile1"); + + lsR("Updated Remote", remoteFS, destDir); + + ContractTestUtils.assertPathDoesNotExist(remoteFS, + " deleted from " + inputFile1, outputFile1); + ContractTestUtils.assertIsFile(remoteFS, outputFileNew1); + ContractTestUtils.assertPathsDoNotExist(remoteFS, + "DistCP should have deleted", + outputFile3, outputFile4, outputSubDir4); + assertCounterInRange(job, CopyMapper.Counter.COPY, 1, 1); + assertCounterInRange(job, CopyMapper.Counter.SKIP, 1, -1); + return job; + } + + /** + * Run distcp -update srcDir destDir. + * @param srcDir local source directory + * @param destDir remote destination directory. + * @return the completed job + * @throws Exception any failure. + */ + private Job distCpUpdate(final Path srcDir, final Path destDir) + throws Exception { + describe("\nDistcp -update from " + srcDir + " to " + destDir); + lsR("Local to update", localFS, srcDir); + lsR("Remote before update", remoteFS, destDir); + return runDistCp(buildWithStandardOptions( + new DistCpOptions.Builder( + Collections.singletonList(srcDir), destDir) + .withDeleteMissing(true) + .withSyncFolder(true) + .withCRC(true) + .withOverwrite(false))); + } + + /** + * Update the source directories as various tests expect, + * including adding a new file. + * @return the path to the newly created file + * @throws IOException IO failure + */ + private Path modifySourceDirectories() throws IOException { + localFS.delete(inputFile1, false); + localFS.delete(inputFile3, false); + // delete all of subdir4, so input/output file 4 & 5 will go + localFS.delete(inputSubDir4, true); + // add one new file + Path inputFileNew1 = new Path(inputSubDir2, "newfile1"); + ContractTestUtils.touch(localFS, inputFileNew1); + return inputFileNew1; + } + + + @Test + public void testTrackDeepDirectoryStructureToRemote() throws Exception { describe("copy a deep directory structure from local to remote"); - deepDirectoryStructure(localFS, localDir, remoteFS, remoteDir); + + Path destDir = distCpDeepDirectoryStructure(localFS, localDir, remoteFS, + remoteDir); + ContractTestUtils.assertIsDirectory(remoteFS, destDir); + + describe("Now do an incremental update and save of missing files"); + Path srcDir = inputDir; + // same path setup as in deepDirectoryStructure() + Path trackDir = new Path(localDir, "trackDir"); + + + describe("\nDirectories\n"); + lsR("Local to update", localFS, srcDir); + lsR("Remote before update", remoteFS, destDir); + + + ContractTestUtils.assertPathsExist(localFS, + "Paths for test are wrong", + inputFile2, inputFile3, inputFile4, inputFile5); + + Path inputFileNew1 = modifySourceDirectories(); + + // Distcp set to track but not delete + runDistCp(buildWithStandardOptions( + new DistCpOptions.Builder( + Collections.singletonList(srcDir), + inputDirUnderOutputDir) + .withTrackMissing(trackDir) + .withSyncFolder(true) + .withOverwrite(false))); + + lsR("tracked udpate", remoteFS, destDir); + // new file went over + Path outputFileNew1 = new Path(outputSubDir2, "newfile1"); + ContractTestUtils.assertIsFile(remoteFS, outputFileNew1); + + ContractTestUtils.assertPathExists(localFS, "tracking directory", + trackDir); + + // now read in the listings + Path sortedSourceListing = new Path(trackDir, + DistCpConstants.SOURCE_SORTED_FILE); + ContractTestUtils.assertIsFile(localFS, sortedSourceListing); + Path sortedTargetListing = new Path(trackDir, + DistCpConstants.TARGET_SORTED_FILE); + ContractTestUtils.assertIsFile(localFS, sortedTargetListing); + // deletion didn't happen + ContractTestUtils.assertPathsExist(remoteFS, + "DistCP should have retained", + outputFile2, outputFile3, outputFile4, outputSubDir4); + + // now scan the table and see that things are there. + Map sourceFiles = new HashMap<>(10); + Map targetFiles = new HashMap<>(10); + + try (SequenceFile.Reader sourceReader = new SequenceFile.Reader(conf, + SequenceFile.Reader.file(sortedSourceListing)); + SequenceFile.Reader targetReader = new SequenceFile.Reader(conf, + SequenceFile.Reader.file(sortedTargetListing))) { + CopyListingFileStatus copyStatus = new CopyListingFileStatus(); + Text name = new Text(); + while(sourceReader.next(name, copyStatus)) { + String key = name.toString(); + Path path = copyStatus.getPath(); + LOG.info("{}: {}", key, path); + sourceFiles.put(key, path); + } + while(targetReader.next(name, copyStatus)) { + String key = name.toString(); + Path path = copyStatus.getPath(); + LOG.info("{}: {}", key, path); + targetFiles.put(name.toString(), copyStatus.getPath()); + } + } + + // look for the new file in both lists + assertTrue("No " + outputFileNew1 + " in source listing", + sourceFiles.containsValue(inputFileNew1)); + assertTrue("No " + outputFileNew1 + " in target listing", + targetFiles.containsValue(outputFileNew1)); + assertTrue("No " + outputSubDir4 + " in target listing", + targetFiles.containsValue(outputSubDir4)); + assertFalse("Found " + inputSubDir4 + " in source listing", + sourceFiles.containsValue(inputSubDir4)); + + } + + public void lsR(final String description, + final FileSystem fs, + final Path dir) throws IOException { + RemoteIterator files = fs.listFiles(dir, true); + LOG.info("{}: {}:", description, dir); + StringBuilder sb = new StringBuilder(); + while(files.hasNext()) { + LocatedFileStatus status = files.next(); + sb.append(String.format(" %s; type=%s; length=%d", + status.getPath(), + status.isDirectory()? "dir" : "file", + status.getLen())); + } + LOG.info("{}", sb); } @Test @@ -96,34 +453,35 @@ public void largeFilesToRemote() throws Exception { } @Test - public void deepDirectoryStructureFromRemote() throws Exception { + public void testDeepDirectoryStructureFromRemote() throws Exception { describe("copy a deep directory structure from remote to local"); - deepDirectoryStructure(remoteFS, remoteDir, localFS, localDir); + distCpDeepDirectoryStructure(remoteFS, remoteDir, localFS, localDir); } @Test - public void largeFilesFromRemote() throws Exception { + public void testLargeFilesFromRemote() throws Exception { describe("copy multiple large files from remote to local"); largeFiles(remoteFS, remoteDir, localFS, localDir); } /** - * Executes a test using a file system sub-tree with multiple nesting levels. + * Executes a DistCp using a file system sub-tree with multiple nesting + * levels. + * The filenames are those of the fields initialized in setup. * * @param srcFS source FileSystem * @param srcDir source directory * @param dstFS destination FileSystem * @param dstDir destination directory + * @return the target directory of the copy * @throws Exception if there is a failure */ - private void deepDirectoryStructure(FileSystem srcFS, Path srcDir, - FileSystem dstFS, Path dstDir) throws Exception { - Path inputDir = new Path(srcDir, "inputDir"); - Path inputSubDir1 = new Path(inputDir, "subDir1"); - Path inputSubDir2 = new Path(inputDir, "subDir2/subDir3"); - Path inputFile1 = new Path(inputDir, "file1"); - Path inputFile2 = new Path(inputSubDir1, "file2"); - Path inputFile3 = new Path(inputSubDir2, "file3"); + private Path distCpDeepDirectoryStructure(FileSystem srcFS, + Path srcDir, + FileSystem dstFS, + Path dstDir) throws Exception { + initPathFields(srcDir, dstDir); + mkdirs(srcFS, inputSubDir1); mkdirs(srcFS, inputSubDir2); byte[] data1 = dataset(100, 33, 43); @@ -132,14 +490,18 @@ private void deepDirectoryStructure(FileSystem srcFS, Path srcDir, createFile(srcFS, inputFile2, true, data2); byte[] data3 = dataset(300, 53, 63); createFile(srcFS, inputFile3, true, data3); + createFile(srcFS, inputFile4, true, dataset(400, 53, 63)); + createFile(srcFS, inputFile5, true, dataset(500, 53, 63)); Path target = new Path(dstDir, "outputDir"); runDistCp(inputDir, target); ContractTestUtils.assertIsDirectory(dstFS, target); + lsR("Destination tree after distcp", dstFS, target); verifyFileContents(dstFS, new Path(target, "inputDir/file1"), data1); verifyFileContents(dstFS, new Path(target, "inputDir/subDir1/file2"), data2); verifyFileContents(dstFS, - new Path(target, "inputDir/subDir2/subDir3/file3"), data3); + new Path(target, "inputDir/subDir2/subDir2/file3"), data3); + return target; } /** @@ -153,20 +515,21 @@ private void deepDirectoryStructure(FileSystem srcFS, Path srcDir, */ private void largeFiles(FileSystem srcFS, Path srcDir, FileSystem dstFS, Path dstDir) throws Exception { - Path inputDir = new Path(srcDir, "inputDir"); - Path inputFile1 = new Path(inputDir, "file1"); - Path inputFile2 = new Path(inputDir, "file2"); - Path inputFile3 = new Path(inputDir, "file3"); + initPathFields(srcDir, dstDir); + Path largeFile1 = new Path(inputDir, "file1"); + Path largeFile2 = new Path(inputDir, "file2"); + Path largeFile3 = new Path(inputDir, "file3"); mkdirs(srcFS, inputDir); - int fileSizeKb = conf.getInt("scale.test.distcp.file.size.kb", 10 * 1024); + int fileSizeKb = conf.getInt(SCALE_TEST_DISTCP_FILE_SIZE_KB, + DEFAULT_DISTCP_SIZE_KB); int fileSizeMb = fileSizeKb / 1024; getLog().info("{} with file size {}", testName.getMethodName(), fileSizeMb); - byte[] data1 = dataset((fileSizeMb + 1) * 1024 * 1024, 33, 43); - createFile(srcFS, inputFile1, true, data1); - byte[] data2 = dataset((fileSizeMb + 2) * 1024 * 1024, 43, 53); - createFile(srcFS, inputFile2, true, data2); - byte[] data3 = dataset((fileSizeMb + 3) * 1024 * 1024, 53, 63); - createFile(srcFS, inputFile3, true, data3); + byte[] data1 = dataset((fileSizeMb + 1) * MB, 33, 43); + createFile(srcFS, largeFile1, true, data1); + byte[] data2 = dataset((fileSizeMb + 2) * MB, 43, 53); + createFile(srcFS, largeFile2, true, data2); + byte[] data3 = dataset((fileSizeMb + 3) * MB, 53, 63); + createFile(srcFS, largeFile3, true, data3); Path target = new Path(dstDir, "outputDir"); runDistCp(inputDir, target); ContractTestUtils.assertIsDirectory(dstFS, target); @@ -183,12 +546,34 @@ private void largeFiles(FileSystem srcFS, Path srcDir, FileSystem dstFS, * @throws Exception if there is a failure */ private void runDistCp(Path src, Path dst) throws Exception { - DistCpOptions options = new DistCpOptions.Builder( - Collections.singletonList(src), dst).build(); + runDistCp(buildWithStandardOptions( + new DistCpOptions.Builder(Collections.singletonList(src), dst))); + } + + /** + * Run the distcp job. + * @param optons distcp options + * @return the job. It will have already completed. + * @throws Exception failure + */ + private Job runDistCp(final DistCpOptions options) throws Exception { Job job = new DistCp(conf, options).execute(); assertNotNull("Unexpected null job returned from DistCp execution.", job); assertTrue("DistCp job did not complete.", job.isComplete()); assertTrue("DistCp job did not complete successfully.", job.isSuccessful()); + return job; + } + + /** + * Add any standard options and then build. + * @param builder DistCp option builder + * @return the build options + */ + private DistCpOptions buildWithStandardOptions( + DistCpOptions.Builder builder) { + return builder + .withNumListstatusThreads(8) + .build(); } /** diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/contract/TestLocalContractDistCp.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/contract/TestLocalContractDistCp.java new file mode 100644 index 0000000000..1a054c5716 --- /dev/null +++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/contract/TestLocalContractDistCp.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.tools.contract; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.contract.AbstractFSContract; +import org.apache.hadoop.fs.contract.localfs.LocalFSContract; + +/** + * Verifies that the local FS passes all the tests in + * {@link AbstractContractDistCpTest}. + * As such, it acts as an in-module validation of this contract test itself. + */ +public class TestLocalContractDistCp extends AbstractContractDistCpTest { + + @Override + protected AbstractFSContract createContract(Configuration conf) { + return new LocalFSContract(conf); + } +} diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyCommitter.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyCommitter.java index bf151cd173..e00241bafb 100644 --- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyCommitter.java +++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyCommitter.java @@ -43,6 +43,9 @@ import java.io.IOException; import java.util.*; +import static org.apache.hadoop.fs.contract.ContractTestUtils.*; +import static org.apache.hadoop.tools.util.TestDistCpUtils.*; + public class TestCopyCommitter { private static final Log LOG = LogFactory.getLog(TestCopyCommitter.class); @@ -80,56 +83,42 @@ public static void destroy() { } @Before - public void createMetaFolder() { + public void createMetaFolder() throws IOException { config.set(DistCpConstants.CONF_LABEL_META_FOLDER, "/meta"); // Unset listing file path since the config is shared by // multiple tests, and some test doesn't set it, such as // testNoCommitAction, but the distcp code will check it. config.set(DistCpConstants.CONF_LABEL_LISTING_FILE_PATH, ""); Path meta = new Path("/meta"); - try { - cluster.getFileSystem().mkdirs(meta); - } catch (IOException e) { - LOG.error("Exception encountered while creating meta folder", e); - Assert.fail("Unable to create meta folder"); - } + cluster.getFileSystem().mkdirs(meta); } @After - public void cleanupMetaFolder() { + public void cleanupMetaFolder() throws IOException { Path meta = new Path("/meta"); - try { - if (cluster.getFileSystem().exists(meta)) { - cluster.getFileSystem().delete(meta, true); - Assert.fail("Expected meta folder to be deleted"); - } - } catch (IOException e) { - LOG.error("Exception encountered while cleaning up folder", e); - Assert.fail("Unable to clean up meta folder"); + if (cluster.getFileSystem().exists(meta)) { + cluster.getFileSystem().delete(meta, true); + Assert.fail("Expected meta folder to be deleted"); } } @Test - public void testNoCommitAction() { + public void testNoCommitAction() throws IOException { TaskAttemptContext taskAttemptContext = getTaskAttemptContext(config); - JobContext jobContext = new JobContextImpl(taskAttemptContext.getConfiguration(), + JobContext jobContext = new JobContextImpl( + taskAttemptContext.getConfiguration(), taskAttemptContext.getTaskAttemptID().getJobID()); - try { - OutputCommitter committer = new CopyCommitter(null, taskAttemptContext); - committer.commitJob(jobContext); - Assert.assertEquals(taskAttemptContext.getStatus(), "Commit Successful"); + OutputCommitter committer = new CopyCommitter(null, taskAttemptContext); + committer.commitJob(jobContext); + Assert.assertEquals("Commit Successful", taskAttemptContext.getStatus()); - //Test for idempotent commit - committer.commitJob(jobContext); - Assert.assertEquals(taskAttemptContext.getStatus(), "Commit Successful"); - } catch (IOException e) { - LOG.error("Exception encountered ", e); - Assert.fail("Commit failed"); - } + //Test for idempotent commit + committer.commitJob(jobContext); + Assert.assertEquals("Commit Successful", taskAttemptContext.getStatus()); } @Test - public void testPreserveStatus() { + public void testPreserveStatus() throws IOException { TaskAttemptContext taskAttemptContext = getTaskAttemptContext(config); JobContext jobContext = new JobContextImpl(taskAttemptContext.getConfiguration(), taskAttemptContext.getTaskAttemptID().getJobID()); @@ -161,19 +150,12 @@ public void testPreserveStatus() { conf.set(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH, targetBase); committer.commitJob(jobContext); - if (!checkDirectoryPermissions(fs, targetBase, sourcePerm)) { - Assert.fail("Permission don't match"); - } + checkDirectoryPermissions(fs, targetBase, sourcePerm); //Test for idempotent commit committer.commitJob(jobContext); - if (!checkDirectoryPermissions(fs, targetBase, sourcePerm)) { - Assert.fail("Permission don't match"); - } + checkDirectoryPermissions(fs, targetBase, sourcePerm); - } catch (IOException e) { - LOG.error("Exception encountered while testing for preserve status", e); - Assert.fail("Preserve status failure"); } finally { TestDistCpUtils.delete(fs, "/tmp1"); conf.unset(DistCpConstants.CONF_LABEL_PRESERVE_STATUS); @@ -182,7 +164,7 @@ public void testPreserveStatus() { } @Test - public void testDeleteMissing() { + public void testDeleteMissing() throws IOException { TaskAttemptContext taskAttemptContext = getTaskAttemptContext(config); JobContext jobContext = new JobContextImpl(taskAttemptContext.getConfiguration(), taskAttemptContext.getTaskAttemptID().getJobID()); @@ -213,24 +195,13 @@ public void testDeleteMissing() { conf.set(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH, targetBase); committer.commitJob(jobContext); - if (!TestDistCpUtils.checkIfFoldersAreInSync(fs, targetBase, sourceBase)) { - Assert.fail("Source and target folders are not in sync"); - } - if (!TestDistCpUtils.checkIfFoldersAreInSync(fs, sourceBase, targetBase)) { - Assert.fail("Source and target folders are not in sync"); - } + verifyFoldersAreInSync(fs, targetBase, sourceBase); + verifyFoldersAreInSync(fs, sourceBase, targetBase); //Test for idempotent commit committer.commitJob(jobContext); - if (!TestDistCpUtils.checkIfFoldersAreInSync(fs, targetBase, sourceBase)) { - Assert.fail("Source and target folders are not in sync"); - } - if (!TestDistCpUtils.checkIfFoldersAreInSync(fs, sourceBase, targetBase)) { - Assert.fail("Source and target folders are not in sync"); - } - } catch (Throwable e) { - LOG.error("Exception encountered while testing for delete missing", e); - Assert.fail("Delete missing failure"); + verifyFoldersAreInSync(fs, targetBase, sourceBase); + verifyFoldersAreInSync(fs, sourceBase, targetBase); } finally { TestDistCpUtils.delete(fs, "/tmp1"); conf.set(DistCpConstants.CONF_LABEL_DELETE_MISSING, "false"); @@ -238,7 +209,7 @@ public void testDeleteMissing() { } @Test - public void testDeleteMissingFlatInterleavedFiles() { + public void testDeleteMissingFlatInterleavedFiles() throws IOException { TaskAttemptContext taskAttemptContext = getTaskAttemptContext(config); JobContext jobContext = new JobContextImpl(taskAttemptContext.getConfiguration(), taskAttemptContext.getTaskAttemptID().getJobID()); @@ -253,20 +224,20 @@ public void testDeleteMissingFlatInterleavedFiles() { fs = FileSystem.get(conf); sourceBase = "/tmp1/" + String.valueOf(rand.nextLong()); targetBase = "/tmp1/" + String.valueOf(rand.nextLong()); - TestDistCpUtils.createFile(fs, sourceBase + "/1"); - TestDistCpUtils.createFile(fs, sourceBase + "/3"); - TestDistCpUtils.createFile(fs, sourceBase + "/4"); - TestDistCpUtils.createFile(fs, sourceBase + "/5"); - TestDistCpUtils.createFile(fs, sourceBase + "/7"); - TestDistCpUtils.createFile(fs, sourceBase + "/8"); - TestDistCpUtils.createFile(fs, sourceBase + "/9"); + createFile(fs, sourceBase + "/1"); + createFile(fs, sourceBase + "/3"); + createFile(fs, sourceBase + "/4"); + createFile(fs, sourceBase + "/5"); + createFile(fs, sourceBase + "/7"); + createFile(fs, sourceBase + "/8"); + createFile(fs, sourceBase + "/9"); - TestDistCpUtils.createFile(fs, targetBase + "/2"); - TestDistCpUtils.createFile(fs, targetBase + "/4"); - TestDistCpUtils.createFile(fs, targetBase + "/5"); - TestDistCpUtils.createFile(fs, targetBase + "/7"); - TestDistCpUtils.createFile(fs, targetBase + "/9"); - TestDistCpUtils.createFile(fs, targetBase + "/A"); + createFile(fs, targetBase + "/2"); + createFile(fs, targetBase + "/4"); + createFile(fs, targetBase + "/5"); + createFile(fs, targetBase + "/7"); + createFile(fs, targetBase + "/9"); + createFile(fs, targetBase + "/A"); final DistCpOptions options = new DistCpOptions.Builder( Collections.singletonList(new Path(sourceBase)), new Path("/out")) @@ -282,20 +253,13 @@ public void testDeleteMissingFlatInterleavedFiles() { conf.set(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH, targetBase); committer.commitJob(jobContext); - if (!TestDistCpUtils.checkIfFoldersAreInSync(fs, targetBase, sourceBase)) { - Assert.fail("Source and target folders are not in sync"); - } - Assert.assertEquals(fs.listStatus(new Path(targetBase)).length, 4); + verifyFoldersAreInSync(fs, targetBase, sourceBase); + Assert.assertEquals(4, fs.listStatus(new Path(targetBase)).length); //Test for idempotent commit committer.commitJob(jobContext); - if (!TestDistCpUtils.checkIfFoldersAreInSync(fs, targetBase, sourceBase)) { - Assert.fail("Source and target folders are not in sync"); - } - Assert.assertEquals(fs.listStatus(new Path(targetBase)).length, 4); - } catch (IOException e) { - LOG.error("Exception encountered while testing for delete missing", e); - Assert.fail("Delete missing failure"); + verifyFoldersAreInSync(fs, targetBase, sourceBase); + Assert.assertEquals(4, fs.listStatus(new Path(targetBase)).length); } finally { TestDistCpUtils.delete(fs, "/tmp1"); conf.set(DistCpConstants.CONF_LABEL_DELETE_MISSING, "false"); @@ -304,7 +268,7 @@ public void testDeleteMissingFlatInterleavedFiles() { } @Test - public void testAtomicCommitMissingFinal() { + public void testAtomicCommitMissingFinal() throws IOException { TaskAttemptContext taskAttemptContext = getTaskAttemptContext(config); JobContext jobContext = new JobContextImpl(taskAttemptContext.getConfiguration(), taskAttemptContext.getTaskAttemptID().getJobID()); @@ -322,19 +286,16 @@ public void testAtomicCommitMissingFinal() { conf.set(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH, finalPath); conf.setBoolean(DistCpConstants.CONF_LABEL_ATOMIC_COPY, true); - Assert.assertTrue(fs.exists(new Path(workPath))); - Assert.assertFalse(fs.exists(new Path(finalPath))); + assertPathExists(fs, "Work path", new Path(workPath)); + assertPathDoesNotExist(fs, "Final path", new Path(finalPath)); committer.commitJob(jobContext); - Assert.assertFalse(fs.exists(new Path(workPath))); - Assert.assertTrue(fs.exists(new Path(finalPath))); + assertPathDoesNotExist(fs, "Work path", new Path(workPath)); + assertPathExists(fs, "Final path", new Path(finalPath)); //Test for idempotent commit committer.commitJob(jobContext); - Assert.assertFalse(fs.exists(new Path(workPath))); - Assert.assertTrue(fs.exists(new Path(finalPath))); - } catch (IOException e) { - LOG.error("Exception encountered while testing for preserve status", e); - Assert.fail("Atomic commit failure"); + assertPathDoesNotExist(fs, "Work path", new Path(workPath)); + assertPathExists(fs, "Final path", new Path(finalPath)); } finally { TestDistCpUtils.delete(fs, workPath); TestDistCpUtils.delete(fs, finalPath); @@ -343,7 +304,7 @@ public void testAtomicCommitMissingFinal() { } @Test - public void testAtomicCommitExistingFinal() { + public void testAtomicCommitExistingFinal() throws IOException { TaskAttemptContext taskAttemptContext = getTaskAttemptContext(config); JobContext jobContext = new JobContextImpl(taskAttemptContext.getConfiguration(), taskAttemptContext.getTaskAttemptID().getJobID()); @@ -363,20 +324,17 @@ public void testAtomicCommitExistingFinal() { conf.set(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH, finalPath); conf.setBoolean(DistCpConstants.CONF_LABEL_ATOMIC_COPY, true); - Assert.assertTrue(fs.exists(new Path(workPath))); - Assert.assertTrue(fs.exists(new Path(finalPath))); + assertPathExists(fs, "Work path", new Path(workPath)); + assertPathExists(fs, "Final path", new Path(finalPath)); try { committer.commitJob(jobContext); Assert.fail("Should not be able to atomic-commit to pre-existing path."); } catch(Exception exception) { - Assert.assertTrue(fs.exists(new Path(workPath))); - Assert.assertTrue(fs.exists(new Path(finalPath))); + assertPathExists(fs, "Work path", new Path(workPath)); + assertPathExists(fs, "Final path", new Path(finalPath)); LOG.info("Atomic-commit Test pass."); } - } catch (IOException e) { - LOG.error("Exception encountered while testing for atomic commit.", e); - Assert.fail("Atomic commit failure"); } finally { TestDistCpUtils.delete(fs, workPath); TestDistCpUtils.delete(fs, finalPath); @@ -389,11 +347,11 @@ private TaskAttemptContext getTaskAttemptContext(Configuration conf) { new TaskAttemptID("200707121733", 1, TaskType.MAP, 1, 1)); } - private boolean checkDirectoryPermissions(FileSystem fs, String targetBase, - FsPermission sourcePerm) throws IOException { + private void checkDirectoryPermissions(FileSystem fs, String targetBase, + FsPermission sourcePerm) throws IOException { Path base = new Path(targetBase); - Stack stack = new Stack(); + Stack stack = new Stack<>(); stack.push(base); while (!stack.isEmpty()) { Path file = stack.pop(); @@ -404,11 +362,10 @@ private boolean checkDirectoryPermissions(FileSystem fs, String targetBase, for (FileStatus status : fStatus) { if (status.isDirectory()) { stack.push(status.getPath()); - Assert.assertEquals(status.getPermission(), sourcePerm); + Assert.assertEquals(sourcePerm, status.getPermission()); } } } - return true; } private static class NullInputFormat extends InputFormat { diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestDeletedDirTracker.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestDeletedDirTracker.java new file mode 100644 index 0000000000..77b08ade6d --- /dev/null +++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestDeletedDirTracker.java @@ -0,0 +1,250 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.tools.mapred; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.tools.CopyListingFileStatus; + +/** + * Unit tests of the deleted directory tracker. + */ +@SuppressWarnings("RedundantThrows") +public class TestDeletedDirTracker extends Assert { + + private static final Logger LOG = + LoggerFactory.getLogger(TestDeletedDirTracker.class); + + public static final Path ROOT = new Path("hdfs://namenode/"); + + public static final Path DIR1 = new Path(ROOT, "dir1"); + + public static final Path FILE0 = new Path(ROOT, "file0"); + + public static final Path DIR1_FILE1 = new Path(DIR1, "file1"); + + public static final Path DIR1_FILE2 = new Path(DIR1, "file2"); + + public static final Path DIR1_DIR3 = new Path(DIR1, "dir3"); + + public static final Path DIR1_DIR3_DIR4 = new Path(DIR1_DIR3, "dir4"); + + public static final Path DIR1_DIR3_DIR4_FILE_3 = + new Path(DIR1_DIR3_DIR4, "file1"); + + + private DeletedDirTracker tracker; + + @Before + public void setup() { + tracker = new DeletedDirTracker(1000); + } + + @After + public void teardown() { + LOG.info(tracker.toString()); + } + + @Test(expected = IllegalArgumentException.class) + public void testNoRootDir() throws Throwable { + shouldDelete(ROOT, true); + } + + @Test(expected = IllegalArgumentException.class) + public void testNoRootFile() throws Throwable { + shouldDelete(dirStatus(ROOT)); + } + + @Test + public void testFileInRootDir() throws Throwable { + expectShouldDelete(FILE0, false); + expectShouldDelete(FILE0, false); + } + + @Test + public void testDeleteDir1() throws Throwable { + expectShouldDelete(DIR1, true); + expectShouldNotDelete(DIR1, true); + expectShouldNotDelete(DIR1_FILE1, false); + expectNotCached(DIR1_FILE1); + expectShouldNotDelete(DIR1_DIR3, true); + expectCached(DIR1_DIR3); + expectShouldNotDelete(DIR1_FILE2, false); + expectShouldNotDelete(DIR1_DIR3_DIR4_FILE_3, false); + expectShouldNotDelete(DIR1_DIR3_DIR4, true); + expectShouldNotDelete(DIR1_DIR3_DIR4, true); + } + + @Test + public void testDeleteDirDeep() throws Throwable { + expectShouldDelete(DIR1, true); + expectShouldNotDelete(DIR1_DIR3_DIR4_FILE_3, false); + } + + @Test + public void testDeletePerfectCache() throws Throwable { + // run a larger scale test. Also use the ordering we'd expect for a sorted + // listing, which we implement by sorting the paths + List statusList = buildStatusList(); + // cache is bigger than the status list + tracker = new DeletedDirTracker(statusList.size()); + + AtomicInteger deletedFiles = new AtomicInteger(0); + AtomicInteger deletedDirs = new AtomicInteger(0); + deletePaths(statusList, deletedFiles, deletedDirs); + assertEquals(0, deletedFiles.get()); + } + + @Test + public void testDeleteFullCache() throws Throwable { + // run a larger scale test. Also use the ordering we'd expect for a sorted + // listing, which we implement by sorting the paths + AtomicInteger deletedFiles = new AtomicInteger(0); + AtomicInteger deletedDirs = new AtomicInteger(0); + deletePaths(buildStatusList(), deletedFiles, deletedDirs); + assertEquals(0, deletedFiles.get()); + } + + @Test + public void testDeleteMediumCache() throws Throwable { + tracker = new DeletedDirTracker(100); + AtomicInteger deletedFiles = new AtomicInteger(0); + AtomicInteger deletedDirs = new AtomicInteger(0); + deletePaths(buildStatusList(), deletedFiles, deletedDirs); + assertEquals(0, deletedFiles.get()); + } + + @Test + public void testDeleteFullSmallCache() throws Throwable { + tracker = new DeletedDirTracker(10); + AtomicInteger deletedFiles = new AtomicInteger(0); + AtomicInteger deletedDirs = new AtomicInteger(0); + deletePaths(buildStatusList(), deletedFiles, deletedDirs); + assertEquals(0, deletedFiles.get()); + } + + protected void deletePaths(final List statusList, + final AtomicInteger deletedFiles, final AtomicInteger deletedDirs) { + for (CopyListingFileStatus status : statusList) { + if (shouldDelete(status)) { + AtomicInteger r = status.isDirectory() ? deletedDirs : deletedFiles; + r.incrementAndGet(); + LOG.info("Delete {}", status.getPath()); + } + } + + LOG.info("After proposing to delete {} paths, {} directories and {} files" + + " were explicitly deleted from a cache {}", + statusList.size(), deletedDirs, deletedFiles, tracker); + } + + /** + * Build a large YMD status list; 30 * 12 * 10 directories, + * each with 24 files. + * @return a sorted list. + */ + protected List buildStatusList() { + List statusList = new ArrayList<>(); + // recursive create of many files + for (int y = 0; y <= 20; y++) { + Path yp = new Path(String.format("YEAR=%d", y)); + statusList.add(dirStatus(yp)); + for (int m = 1; m <= 12; m++) { + Path ymp = new Path(yp, String.format("MONTH=%d", m)); + statusList.add(dirStatus(ymp)); + for (int d = 1; d < 30; d++) { + Path dir = new Path(ymp, String.format("DAY=%02d", d)); + statusList.add(dirStatus(dir)); + for (int h = 0; h < 24; h++) { + statusList.add(fileStatus(new Path(dir, + String.format("%02d00.avro", h)))); + } + } + } + // sort on paths. + Collections.sort(statusList, + (l, r) -> l.getPath().compareTo(r.getPath())); + } + return statusList; + } + + + private void expectShouldDelete(final Path path, boolean isDir) { + expectShouldDelete(newStatus(path, isDir)); + } + + private void expectShouldDelete(CopyListingFileStatus status) { + assertTrue("Expected shouldDelete of " + status.getPath(), + shouldDelete(status)); + } + + private boolean shouldDelete(final Path path, final boolean isDir) { + return shouldDelete(newStatus(path, isDir)); + } + + private boolean shouldDelete(final CopyListingFileStatus status) { + return tracker.shouldDelete(status); + } + + private void expectShouldNotDelete(final Path path, boolean isDir) { + expectShouldNotDelete(newStatus(path, isDir)); + } + + private void expectShouldNotDelete(CopyListingFileStatus status) { + assertFalse("Expected !shouldDelete of " + status.getPath() + + " but got true", + shouldDelete(status)); + } + + private CopyListingFileStatus newStatus(final Path path, + final boolean isDir) { + return new CopyListingFileStatus(new FileStatus(0, isDir, 0, 0, 0, path)); + } + + private CopyListingFileStatus dirStatus(final Path path) { + return newStatus(path, true); + } + + private CopyListingFileStatus fileStatus(final Path path) { + return newStatus(path, false); + } + + private void expectCached(final Path path) { + assertTrue("Path " + path + " is not in the cache of " + tracker, + tracker.isContained(path)); + } + + private void expectNotCached(final Path path) { + assertFalse("Path " + path + " is in the cache of " + tracker, + tracker.isContained(path)); + } + +} diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/util/TestDistCpUtils.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/util/TestDistCpUtils.java index 9ccddd122e..311c1b3a72 100644 --- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/util/TestDistCpUtils.java +++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/util/TestDistCpUtils.java @@ -24,6 +24,7 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.contract.ContractTestUtils; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.server.namenode.INodeFile; @@ -1187,26 +1188,33 @@ public static void createDirectory(FileSystem fs, Path dirPath) throws IOExcepti } } - public static boolean checkIfFoldersAreInSync(FileSystem fs, String targetBase, String sourceBase) - throws IOException { + public static void verifyFoldersAreInSync(FileSystem fs, String targetBase, + String sourceBase) throws IOException { Path base = new Path(targetBase); - Stack stack = new Stack(); - stack.push(base); - while (!stack.isEmpty()) { - Path file = stack.pop(); - if (!fs.exists(file)) continue; - FileStatus[] fStatus = fs.listStatus(file); - if (fStatus == null || fStatus.length == 0) continue; + Stack stack = new Stack<>(); + stack.push(base); + while (!stack.isEmpty()) { + Path file = stack.pop(); + if (!fs.exists(file)) { + continue; + } + FileStatus[] fStatus = fs.listStatus(file); + if (fStatus == null || fStatus.length == 0) { + continue; + } - for (FileStatus status : fStatus) { - if (status.isDirectory()) { - stack.push(status.getPath()); - } - Assert.assertTrue(fs.exists(new Path(sourceBase + "/" + - DistCpUtils.getRelativePath(new Path(targetBase), status.getPath())))); - } - } - return true; + for (FileStatus status : fStatus) { + if (status.isDirectory()) { + stack.push(status.getPath()); + } + Path p = new Path(sourceBase + "/" + + DistCpUtils.getRelativePath(new Path(targetBase), + status.getPath())); + ContractTestUtils.assertPathExists(fs, + "path in sync with " + status.getPath(), p); + } + } } + } diff --git a/hadoop-tools/hadoop-distcp/src/test/resources/contract/localfs.xml b/hadoop-tools/hadoop-distcp/src/test/resources/contract/localfs.xml new file mode 100644 index 0000000000..42bc6f427a --- /dev/null +++ b/hadoop-tools/hadoop-distcp/src/test/resources/contract/localfs.xml @@ -0,0 +1,128 @@ + + + + + + + + fs.contract.is-case-sensitive + true + + + + + fs.contract.supports-unix-permissions + true + + + + + + fs.contract.test.root-tests-enabled + false + + + + fs.contract.test.random-seek-count + 1000 + + + + fs.contract.rename-creates-dest-dirs + true + + + + fs.contract.rename-overwrites-dest + true + + + + fs.contract.rename-remove-dest-if-empty-dir + true + + + + + fs.contract.supports-append + false + + + + fs.contract.supports-atomic-directory-delete + true + + + + fs.contract.supports-atomic-rename + true + + + + fs.contract.supports-block-locality + false + + + + fs.contract.supports-concat + false + + + + fs.contract.supports-seek + true + + + + fs.contract.supports-seek-on-closed-file + true + + + + + fs.contract.rejects-seek-past-eof + true + + + + fs.contract.supports-strict-exceptions + false + + + + fs.contract.supports-settimes + true + + + + fs.contract.supports-getfilestatus + true + + + diff --git a/hadoop-tools/hadoop-distcp/src/test/resources/log4j.properties b/hadoop-tools/hadoop-distcp/src/test/resources/log4j.properties index 22990ca89a..46fda96d97 100644 --- a/hadoop-tools/hadoop-distcp/src/test/resources/log4j.properties +++ b/hadoop-tools/hadoop-distcp/src/test/resources/log4j.properties @@ -16,7 +16,18 @@ # # log4j configuration used during build and unit tests -log4j.rootLogger=debug,stdout +log4j.rootLogger=info,stdout log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} (%F:%M(%L)) - %m%n + +log4j.logger.org.apache.hadoop.util.NativeCodeLoader=ERROR +log4j.logger.org.apache.hadoop.conf.Configuration.deprecation=WARN +log4j.logger.org.apache.hadoop.metrics2=ERROR +log4j.logger.org.apache.hadoop.mapreduce.JobResourceUploader=ERROR +log4j.logger.org.apache.hadoop.yarn.util.ProcfsBasedProcessTree=ERROR +log4j.logger.org.apache.commons.beanutils.FluentPropertyBeanIntrospector=ERROR +log4j.logger.org.apache.commons.configuration2.AbstractConfiguration=ERROR + +# Debug level logging of distcp in test runs. +log4j.logger.org.apache.hadoop.tools.mapred=DEBUG