diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java
index d2cbca0198..54d015a955 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java
@@ -228,9 +228,9 @@ public static byte[] readDataset(FileSystem fs, Path path, int len)
public static void verifyFileContents(FileSystem fs,
Path path,
byte[] original) throws IOException {
+ assertIsFile(fs, path);
FileStatus stat = fs.getFileStatus(path);
String statText = stat.toString();
- assertTrue("not a file " + statText, stat.isFile());
assertEquals("wrong length " + statText, original.length, stat.getLen());
byte[] bytes = readDataset(fs, path, original.length);
compareByteArrays(original, bytes, original.length);
@@ -853,6 +853,36 @@ public static void assertIsFile(Path filename, FileStatus status) {
status.isSymlink());
}
+ /**
+ * Assert that a varargs list of paths exist.
+ * @param fs filesystem
+ * @param message message for exceptions
+ * @param paths paths
+ * @throws IOException IO failure
+ */
+ public static void assertPathsExist(FileSystem fs,
+ String message,
+ Path... paths) throws IOException {
+ for (Path path : paths) {
+ assertPathExists(fs, message, path);
+ }
+ }
+
+ /**
+ * Assert that a varargs list of paths do not exist.
+ * @param fs filesystem
+ * @param message message for exceptions
+ * @param paths paths
+ * @throws IOException IO failure
+ */
+ public static void assertPathsDoNotExist(FileSystem fs,
+ String message,
+ Path... paths) throws IOException {
+ for (Path path : paths) {
+ assertPathDoesNotExist(fs, message, path);
+ }
+ }
+
/**
* Create a dataset for use in the tests; all data is in the range
* base to (base+modulo-1) inclusive.
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractDistCp.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractDistCp.java
index 8da8b6ad5b..b3d511ed6a 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractDistCp.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractDistCp.java
@@ -18,11 +18,15 @@
package org.apache.hadoop.fs.contract.s3a;
+import java.io.IOException;
+
import static org.apache.hadoop.fs.s3a.Constants.*;
import static org.apache.hadoop.fs.s3a.S3ATestConstants.SCALE_TEST_TIMEOUT_MILLIS;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.maybeEnableS3Guard;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.FailureInjectionPolicy;
import org.apache.hadoop.tools.contract.AbstractContractDistCpTest;
/**
@@ -57,4 +61,17 @@ protected Configuration createConfiguration() {
protected S3AContract createContract(Configuration conf) {
return new S3AContract(conf);
}
+
+ /**
+ * Always inject the delay path in, so if the destination is inconsistent,
+ * and uses this key, inconsistency triggered.
+ * @param filepath path string in
+ * @return path on the remote FS for distcp
+ * @throws IOException IO failure
+ */
+ @Override
+ protected Path path(final String filepath) throws IOException {
+ Path path = super.path(filepath);
+ return new Path(path, FailureInjectionPolicy.DEFAULT_DELAY_KEY_SUBSTRING);
+ }
}
diff --git a/hadoop-tools/hadoop-azure-datalake/pom.xml b/hadoop-tools/hadoop-azure-datalake/pom.xml
index 95eaef5f89..80203812de 100644
--- a/hadoop-tools/hadoop-azure-datalake/pom.xml
+++ b/hadoop-tools/hadoop-azure-datalake/pom.xml
@@ -147,5 +147,24 @@
${okhttp.version}
test
+
+
+ org.apache.hadoop
+ hadoop-mapreduce-client-jobclient
+ test
+
+
+
+ org.apache.hadoop
+ hadoop-distcp
+ test
+
+
+
+ org.apache.hadoop
+ hadoop-distcp
+ test
+ test-jar
+
diff --git a/hadoop-tools/hadoop-azure-datalake/src/test/java/org/apache/hadoop/fs/adl/live/TestAdlContractDistCpLive.java b/hadoop-tools/hadoop-azure-datalake/src/test/java/org/apache/hadoop/fs/adl/live/TestAdlContractDistCpLive.java
new file mode 100644
index 0000000000..dae6e8087c
--- /dev/null
+++ b/hadoop-tools/hadoop-azure-datalake/src/test/java/org/apache/hadoop/fs/adl/live/TestAdlContractDistCpLive.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.fs.adl.live;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
+import org.apache.hadoop.tools.contract.AbstractContractDistCpTest;
+
+/**
+ * Test DistCP operations.
+ */
+public class TestAdlContractDistCpLive extends AbstractContractDistCpTest {
+
+ @Override
+ protected AbstractFSContract createContract(Configuration configuration) {
+ return new AdlStorageContract(configuration);
+ }
+
+}
diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/CopyListing.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/CopyListing.java
index 908b558047..9a40a4933a 100644
--- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/CopyListing.java
+++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/CopyListing.java
@@ -143,7 +143,6 @@ private void validateFinalListing(Path pathToListFile, DistCpContext context)
throws DuplicateFileException, IOException {
Configuration config = getConf();
- FileSystem fs = pathToListFile.getFileSystem(config);
final boolean splitLargeFile = context.splitLargeFile();
@@ -153,7 +152,7 @@ private void validateFinalListing(Path pathToListFile, DistCpContext context)
// is continuous.
//
Path checkPath = splitLargeFile?
- pathToListFile : DistCpUtils.sortListing(fs, config, pathToListFile);
+ pathToListFile : DistCpUtils.sortListing(config, pathToListFile);
SequenceFile.Reader reader = new SequenceFile.Reader(
config, SequenceFile.Reader.file(checkPath));
diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/CopyListingFileStatus.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/CopyListingFileStatus.java
index 138b491189..e01a6b15bd 100644
--- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/CopyListingFileStatus.java
+++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/CopyListingFileStatus.java
@@ -27,6 +27,7 @@
import java.util.Map.Entry;
import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.AclEntry;
@@ -46,8 +47,18 @@
/**
* CopyListingFileStatus is a view of {@link FileStatus}, recording additional
* data members useful to distcp.
+ *
+ * This is the datastructure persisted in the sequence files generated
+ * in the CopyCommitter when deleting files.
+ * Any tool working with these generated files needs to be aware of an
+ * important stability guarantee: there is none; expect it to change
+ * across minor Hadoop releases without any support for reading the files of
+ * different versions.
+ * Tools parsing the listings must be built and tested against the point
+ * release of Hadoop which they intend to support.
*/
-@InterfaceAudience.Private
+@InterfaceAudience.LimitedPrivate("Distcp support tools")
+@InterfaceStability.Unstable
public final class CopyListingFileStatus implements Writable {
private static final byte NO_ACL_ENTRIES = -1;
diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java
index 0bae5d5034..212256ccfd 100644
--- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java
+++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java
@@ -18,12 +18,19 @@
package org.apache.hadoop.tools;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.Path;
/**
* Utility class to hold commonly used constants.
*/
-public class DistCpConstants {
+@InterfaceAudience.LimitedPrivate("Distcp support tools")
+@InterfaceStability.Evolving
+public final class DistCpConstants {
+
+ private DistCpConstants() {
+ }
/* Default number of threads to use for building file listing */
public static final int DEFAULT_LISTSTATUS_THREADS = 1;
@@ -52,6 +59,8 @@ public class DistCpConstants {
"distcp.preserve.rawxattrs";
public static final String CONF_LABEL_SYNC_FOLDERS = "distcp.sync.folders";
public static final String CONF_LABEL_DELETE_MISSING = "distcp.delete.missing.source";
+ public static final String CONF_LABEL_TRACK_MISSING =
+ "distcp.track.missing.source";
public static final String CONF_LABEL_LISTSTATUS_THREADS = "distcp.liststatus.threads";
public static final String CONF_LABEL_MAX_MAPS = "distcp.max.maps";
public static final String CONF_LABEL_SOURCE_LISTING = "distcp.source.listing";
@@ -148,4 +157,13 @@ public class DistCpConstants {
static final String HDFS_DISTCP_DIFF_DIRECTORY_NAME = ".distcp.diff.tmp";
public static final int COPY_BUFFER_SIZE_DEFAULT = 8 * 1024;
+
+ /** Filename of sorted files in when tracking saves them. */
+ public static final String SOURCE_SORTED_FILE = "source_sorted.seq";
+
+ /** Filename of unsorted target listing. */
+ public static final String TARGET_LISTING_FILE = "target_listing.seq";
+
+ /** Filename of sorted target listing. */
+ public static final String TARGET_SORTED_FILE = "target_sorted.seq";
}
diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java
index faef7e6eda..3ce12b264d 100644
--- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java
+++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.tools;
import org.apache.commons.cli.Option;
+import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
/**
@@ -63,10 +64,10 @@ public enum DistCpOptionSwitch {
*/
SYNC_FOLDERS(DistCpConstants.CONF_LABEL_SYNC_FOLDERS,
new Option("update", false, "Update target, copying only missing" +
- "files or directories")),
+ " files or directories")),
/**
- * Deletes missing files in target that are missing from source
+ * Deletes missing files in target that are missing from source.
* This allows the target to be in sync with the source contents
* Typically used in conjunction with SYNC_FOLDERS
* Incompatible with ATOMIC_COMMIT
@@ -74,6 +75,21 @@ public enum DistCpOptionSwitch {
DELETE_MISSING(DistCpConstants.CONF_LABEL_DELETE_MISSING,
new Option("delete", false, "Delete from target, " +
"files missing in source. Delete is applicable only with update or overwrite options")),
+
+ /**
+ * Track missing files in target that are missing from source
+ * This allows for other applications to complete the synchronization,
+ * possibly with object-store-specific delete algorithms.
+ * Typically used in conjunction with SYNC_FOLDERS
+ * Incompatible with ATOMIC_COMMIT
+ */
+ @InterfaceStability.Unstable
+ TRACK_MISSING(DistCpConstants.CONF_LABEL_TRACK_MISSING,
+ new Option("xtrack", true,
+ "Save information about missing source files to the"
+ + " specified directory")),
+
+
/**
* Number of threads for building source file listing (before map-reduce
* phase, max one listStatus per thread at a time).
diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java
index f33f7fdcb9..ea99016b2c 100644
--- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java
+++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java
@@ -24,6 +24,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.tools.util.DistCpUtils;
@@ -43,6 +45,8 @@
*
* This class is immutable.
*/
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
public final class DistCpOptions {
private static final Logger LOG = LoggerFactory.getLogger(Builder.class);
public static final int MAX_NUM_LISTSTATUS_THREADS = 40;
@@ -68,6 +72,9 @@ public final class DistCpOptions {
/** Whether source and target folder contents be sync'ed up. */
private final boolean syncFolder;
+ /** Path to save source/dest sequence files to, if non-null. */
+ private final Path trackPath;
+
/** Whether files only present in target should be deleted. */
private boolean deleteMissing;
@@ -208,6 +215,7 @@ private DistCpOptions(Builder builder) {
this.copyBufferSize = builder.copyBufferSize;
this.verboseLog = builder.verboseLog;
+ this.trackPath = builder.trackPath;
}
public Path getSourceFileListing() {
@@ -331,6 +339,10 @@ public boolean shouldVerboseLog() {
return verboseLog;
}
+ public Path getTrackPath() {
+ return trackPath;
+ }
+
/**
* Add options to configuration. These will be used in the Mapper/committer
*
@@ -371,6 +383,11 @@ public void appendToConf(Configuration conf) {
String.valueOf(copyBufferSize));
DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.VERBOSE_LOG,
String.valueOf(verboseLog));
+ if (trackPath != null) {
+ DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.TRACK_MISSING,
+ String.valueOf(trackPath));
+ }
+
}
/**
@@ -441,6 +458,7 @@ public static class Builder {
private String filtersFile;
private Path logPath;
+ private Path trackPath;
private String copyStrategy = DistCpConstants.UNIFORMSIZE;
private int numListstatusThreads = 0; // 0 indicates that flag is not set.
@@ -641,6 +659,11 @@ public Builder withLogPath(Path newLogPath) {
return this;
}
+ public Builder withTrackMissing(Path path) {
+ this.trackPath = path;
+ return this;
+ }
+
public Builder withCopyStrategy(String newCopyStrategy) {
this.copyStrategy = newCopyStrategy;
return this;
diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java
index 606ed3254f..668b594be6 100644
--- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java
+++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java
@@ -145,6 +145,12 @@ public static DistCpOptions parse(String[] args)
builder.withAtomicWorkPath(new Path(workPath));
}
}
+ if (command.hasOption(DistCpOptionSwitch.TRACK_MISSING.getSwitch())) {
+ builder.withTrackMissing(
+ new Path(getVal(
+ command,
+ DistCpOptionSwitch.TRACK_MISSING.getSwitch())));
+ }
if (command.hasOption(DistCpOptionSwitch.BANDWIDTH.getSwitch())) {
try {
diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyCommitter.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyCommitter.java
index 81c2be7e05..07eacb0483 100644
--- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyCommitter.java
+++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyCommitter.java
@@ -18,8 +18,9 @@
package org.apache.hadoop.tools.mapred;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -49,6 +50,8 @@
import java.util.LinkedList;
import java.util.List;
+import static org.apache.hadoop.tools.DistCpConstants.*;
+
/**
* The CopyCommitter class is DistCp's OutputCommitter implementation. It is
* responsible for handling the completion/cleanup of the DistCp run.
@@ -62,7 +65,8 @@
* 5. Cleanup of any partially copied files, from previous, failed attempts.
*/
public class CopyCommitter extends FileOutputCommitter {
- private static final Log LOG = LogFactory.getLog(CopyCommitter.class);
+ private static final Logger LOG =
+ LoggerFactory.getLogger(CopyCommitter.class);
private final TaskAttemptContext taskAttemptContext;
private boolean syncFolder = false;
@@ -111,6 +115,9 @@ public void commitJob(JobContext jobContext) throws IOException {
deleteMissing(conf);
} else if (conf.getBoolean(DistCpConstants.CONF_LABEL_ATOMIC_COPY, false)) {
commitData(conf);
+ } else if (conf.get(CONF_LABEL_TRACK_MISSING) != null) {
+ // save missing information to a directory
+ trackMissing(conf);
}
taskAttemptContext.setStatus("Commit Successful");
}
@@ -334,40 +341,64 @@ private void preserveFileAttributesForDirectories(Configuration conf)
LOG.info("Preserved status on " + preservedEntries + " dir entries on target");
}
- // This method deletes "extra" files from the target, if they're not
- // available at the source.
+ /**
+ * Track all the missing files by saving the listings to the tracking
+ * directory.
+ * This is the same as listing phase of the
+ * {@link #deleteMissing(Configuration)} operation.
+ * @param conf configuration to read options from, and for FS instantiation.
+ * @throws IOException IO failure
+ */
+ private void trackMissing(Configuration conf) throws IOException {
+ // destination directory for all output files
+ Path trackDir = new Path(
+ conf.get(DistCpConstants.CONF_LABEL_TRACK_MISSING));
+
+ // where is the existing source listing?
+ Path sourceListing = new Path(
+ conf.get(DistCpConstants.CONF_LABEL_LISTING_FILE_PATH));
+ LOG.info("Tracking file changes to directory {}", trackDir);
+
+ // the destination path is under the track directory
+ Path sourceSortedListing = new Path(trackDir,
+ DistCpConstants.SOURCE_SORTED_FILE);
+ LOG.info("Source listing {}", sourceSortedListing);
+
+ DistCpUtils.sortListing(conf, sourceListing, sourceSortedListing);
+
+ // Similarly, create the listing of target-files. Sort alphabetically.
+ // target listing will be deleted after the sort
+ Path targetListing = new Path(trackDir, TARGET_LISTING_FILE);
+ Path sortedTargetListing = new Path(trackDir, TARGET_SORTED_FILE);
+ // list the target
+ listTargetFiles(conf, targetListing, sortedTargetListing);
+ LOG.info("Target listing {}", sortedTargetListing);
+
+ targetListing.getFileSystem(conf).delete(targetListing, false);
+ }
+
+ /**
+ * Deletes "extra" files and directories from the target, if they're not
+ * available at the source.
+ * @param conf configuration to read options from, and for FS instantiation.
+ * @throws IOException IO failure
+ */
private void deleteMissing(Configuration conf) throws IOException {
LOG.info("-delete option is enabled. About to remove entries from " +
"target that are missing in source");
+ long listingStart = System.currentTimeMillis();
// Sort the source-file listing alphabetically.
Path sourceListing = new Path(conf.get(DistCpConstants.CONF_LABEL_LISTING_FILE_PATH));
FileSystem clusterFS = sourceListing.getFileSystem(conf);
- Path sortedSourceListing = DistCpUtils.sortListing(clusterFS, conf, sourceListing);
+ Path sortedSourceListing = DistCpUtils.sortListing(conf, sourceListing);
// Similarly, create the listing of target-files. Sort alphabetically.
Path targetListing = new Path(sourceListing.getParent(), "targetListing.seq");
- CopyListing target = new GlobbedCopyListing(new Configuration(conf), null);
+ Path sortedTargetListing = new Path(targetListing.toString() + "_sorted");
- List targets = new ArrayList(1);
- Path targetFinalPath = new Path(conf.get(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH));
- targets.add(targetFinalPath);
- Path resultNonePath = Path.getPathWithoutSchemeAndAuthority(targetFinalPath)
- .toString().startsWith(DistCpConstants.HDFS_RESERVED_RAW_DIRECTORY_NAME)
- ? DistCpConstants.RAW_NONE_PATH : DistCpConstants.NONE_PATH;
- //
- // Set up options to be the same from the CopyListing.buildListing's perspective,
- // so to collect similar listings as when doing the copy
- //
- DistCpOptions options = new DistCpOptions.Builder(targets, resultNonePath)
- .withOverwrite(overwrite)
- .withSyncFolder(syncFolder)
- .build();
- DistCpContext distCpContext = new DistCpContext(options);
- distCpContext.setTargetPathExists(targetPathExists);
-
- target.buildListing(targetListing, distCpContext);
- Path sortedTargetListing = DistCpUtils.sortListing(clusterFS, conf, targetListing);
+ Path targetFinalPath = listTargetFiles(conf,
+ targetListing, sortedTargetListing);
long totalLen = clusterFS.getFileStatus(sortedTargetListing).getLen();
SequenceFile.Reader sourceReader = new SequenceFile.Reader(conf,
@@ -377,41 +408,153 @@ private void deleteMissing(Configuration conf) throws IOException {
// Walk both source and target file listings.
// Delete all from target that doesn't also exist on source.
+ long deletionStart = System.currentTimeMillis();
+ LOG.info("Listing completed in {}",
+ formatDuration(deletionStart - listingStart));
+
long deletedEntries = 0;
+ long filesDeleted = 0;
+ long missingDeletes = 0;
+ long failedDeletes = 0;
+ long skippedDeletes = 0;
+ long deletedDirectories = 0;
+ // this is an arbitrary constant.
+ final DeletedDirTracker tracker = new DeletedDirTracker(1000);
try {
CopyListingFileStatus srcFileStatus = new CopyListingFileStatus();
Text srcRelPath = new Text();
CopyListingFileStatus trgtFileStatus = new CopyListingFileStatus();
Text trgtRelPath = new Text();
- FileSystem targetFS = targetFinalPath.getFileSystem(conf);
+ final FileSystem targetFS = targetFinalPath.getFileSystem(conf);
+ boolean showProgress;
boolean srcAvailable = sourceReader.next(srcRelPath, srcFileStatus);
while (targetReader.next(trgtRelPath, trgtFileStatus)) {
// Skip sources that don't exist on target.
while (srcAvailable && trgtRelPath.compareTo(srcRelPath) > 0) {
srcAvailable = sourceReader.next(srcRelPath, srcFileStatus);
}
+ Path targetEntry = trgtFileStatus.getPath();
+ LOG.debug("Comparing {} and {}",
+ srcFileStatus.getPath(), targetEntry);
if (srcAvailable && trgtRelPath.equals(srcRelPath)) continue;
- // Target doesn't exist at source. Delete.
- boolean result = targetFS.delete(trgtFileStatus.getPath(), true)
- || !targetFS.exists(trgtFileStatus.getPath());
- if (result) {
- LOG.info("Deleted " + trgtFileStatus.getPath() + " - Missing at source");
- deletedEntries++;
+ // Target doesn't exist at source. Try to delete it.
+ if (tracker.shouldDelete(trgtFileStatus)) {
+ showProgress = true;
+ try {
+ if (targetFS.delete(targetEntry, true)) {
+ // the delete worked. Unless the file is actually missing, this is the
+ LOG.info("Deleted " + targetEntry + " - missing at source");
+ deletedEntries++;
+ if (trgtFileStatus.isDirectory()) {
+ deletedDirectories++;
+ } else {
+ filesDeleted++;
+ }
+ } else {
+ // delete returned false.
+ // For all the filestores which implement the FS spec properly,
+ // this means "the file wasn't there".
+ // so track but don't worry about it.
+ LOG.info("delete({}) returned false ({})",
+ targetEntry, trgtFileStatus);
+ missingDeletes++;
+ }
+ } catch (IOException e) {
+ if (!ignoreFailures) {
+ throw e;
+ } else {
+ // failed to delete, but ignoring errors. So continue
+ LOG.info("Failed to delete {}, ignoring exception {}",
+ targetEntry, e.toString());
+ LOG.debug("Failed to delete {}", targetEntry, e);
+ // count and break out the loop
+ failedDeletes++;
+ }
+ }
} else {
- throw new IOException("Unable to delete " + trgtFileStatus.getPath());
+ LOG.debug("Skipping deletion of {}", targetEntry);
+ skippedDeletes++;
+ showProgress = false;
+ }
+ if (showProgress) {
+ // update progress if there's been any FS IO/files deleted.
+ taskAttemptContext.progress();
+ taskAttemptContext.setStatus("Deleting removed files from target. [" +
+ targetReader.getPosition() * 100 / totalLen + "%]");
}
- taskAttemptContext.progress();
- taskAttemptContext.setStatus("Deleting missing files from target. [" +
- targetReader.getPosition() * 100 / totalLen + "%]");
}
+ // if the FS toString() call prints statistics, they get logged here
+ LOG.info("Completed deletion of files from {}", targetFS);
} finally {
IOUtils.closeStream(sourceReader);
IOUtils.closeStream(targetReader);
}
- LOG.info("Deleted " + deletedEntries + " from target: " + targets.get(0));
+ long deletionEnd = System.currentTimeMillis();
+ long deletedFileCount = deletedEntries - deletedDirectories;
+ LOG.info("Deleted from target: files: {} directories: {};"
+ + " skipped deletions {}; deletions already missing {};"
+ + " failed deletes {}",
+ deletedFileCount, deletedDirectories, skippedDeletes,
+ missingDeletes, failedDeletes);
+ LOG.info("Number of tracked deleted directories {}", tracker.size());
+ LOG.info("Duration of deletions: {}",
+ formatDuration(deletionEnd - deletionStart));
+ LOG.info("Total duration of deletion operation: {}",
+ formatDuration(deletionEnd - listingStart));
+ }
+
+ /**
+ * Take a duration and return a human-readable duration of
+ * hours:minutes:seconds.millis.
+ * @param duration to process
+ * @return a string for logging.
+ */
+ private String formatDuration(long duration) {
+
+ long seconds = duration > 0 ? (duration / 1000) : 0;
+ long minutes = (seconds / 60);
+ long hours = (minutes / 60);
+ return String.format("%d:%02d:%02d.%03d",
+ hours, minutes % 60, seconds % 60, duration % 1000);
+ }
+
+ /**
+ * Build a listing of the target files, sorted and unsorted.
+ * @param conf configuration to work with
+ * @param targetListing target listing
+ * @param sortedTargetListing sorted version of the listing
+ * @return the target path of the operation
+ * @throws IOException IO failure.
+ */
+ private Path listTargetFiles(final Configuration conf,
+ final Path targetListing,
+ final Path sortedTargetListing) throws IOException {
+ CopyListing target = new GlobbedCopyListing(new Configuration(conf), null);
+ Path targetFinalPath = new Path(
+ conf.get(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH));
+ List targets = new ArrayList<>(1);
+ targets.add(targetFinalPath);
+ Path resultNonePath = Path.getPathWithoutSchemeAndAuthority(targetFinalPath)
+ .toString().startsWith(DistCpConstants.HDFS_RESERVED_RAW_DIRECTORY_NAME)
+ ? DistCpConstants.RAW_NONE_PATH
+ : DistCpConstants.NONE_PATH;
+ //
+ // Set up options to be the same from the CopyListing.buildListing's
+ // perspective, so to collect similar listings as when doing the copy
+ //
+ DistCpOptions options = new DistCpOptions.Builder(targets, resultNonePath)
+ .withOverwrite(overwrite)
+ .withSyncFolder(syncFolder)
+ .build();
+ DistCpContext distCpContext = new DistCpContext(options);
+ distCpContext.setTargetPathExists(targetPathExists);
+
+ target.buildListing(targetListing, distCpContext);
+ DistCpUtils.sortListing(conf, targetListing, sortedTargetListing);
+ return targetFinalPath;
}
private void commitData(Configuration conf) throws IOException {
diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/DeletedDirTracker.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/DeletedDirTracker.java
new file mode 100644
index 0000000000..64431f7e54
--- /dev/null
+++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/DeletedDirTracker.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.mapred;
+
+import com.google.common.base.Preconditions;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.tools.CopyListingFileStatus;
+
+/**
+ * Track deleted directories and support queries to
+ * check for add them.
+ *
+ * Assumptions.
+ *
+ *
+ * - Deep directory trees are being deleted.
+ * - The total number of directories deleted is very much
+ * less than the number of files.
+ * - Most deleted files are in directories which have
+ * been deleted.
+ * - The cost of issuing a delete() call is less than that that
+ * of creating Path entries for parent directories and looking them
+ * up in a hash table.
+ * - That a modest cache is sufficient to identify whether or not
+ * a parent directory has been deleted./li>
+ *
- And that if a path has been evicted from a path, the cost of
+ * the extra deletions incurred is not significant.
+ *
+ *
+ * The directory structure this algorithm is intended to optimize for is
+ * the deletion of datasets partitioned/bucketed into a directory tree,
+ * and deleted in bulk.
+ *
+ * The ordering of deletions comes from the merge sort of the copy listings;
+ * we rely on this placing a path "/dir1" ahead of "/dir1/file1",
+ * "/dir1/dir2/file2", and other descendants.
+ * We do not rely on parent entries being added immediately before children,
+ * as sorting may place "/dir12" between "/dir1" and its descendants.
+ *
+ * Algorithm
+ *
+ *
+ * -
+ * Before deleting a directory or file, a check is made to see if an
+ * ancestor is in the cache of deleted directories.
+ *
+ * -
+ * If an ancestor is found is: skip the delete.
+ *
+ * -
+ * If an ancestor is not foundI: delete the file/dir.
+ *
+ * -
+ * When the entry probed is a directory, it is always added to the cache of
+ * directories, irrespective of the search for an ancestor.
+ * This is to speed up scans of files directly underneath the path.
+ *
+ *
+ *
+ *
+ */
+final class DeletedDirTracker {
+
+ /**
+ * An LRU cache of directories.
+ */
+ private final Cache directories;
+
+ /**
+ * Maximum size of the cache.
+ */
+ private final int cacheSize;
+
+ /**
+ * Create an instance.
+ * @param cacheSize maximum cache size.
+ */
+ DeletedDirTracker(int cacheSize) {
+ this.cacheSize = cacheSize;
+ directories = CacheBuilder.newBuilder()
+ .maximumSize(this.cacheSize)
+ .build();
+ }
+
+ /**
+ * Recursive scan for a directory being in the cache of deleted paths.
+ * @param dir directory to look for.
+ * @return true iff the path or a parent is in the cache.
+ */
+ boolean isDirectoryOrAncestorDeleted(Path dir) {
+ if (dir == null) {
+ // at root
+ return false;
+ } else if (isContained(dir)) {
+ // cache hit
+ return true;
+ } else {
+ // cache miss, check parent
+ return isDirectoryOrAncestorDeleted(dir.getParent());
+ }
+ }
+
+ /**
+ * Probe for a path being deleted by virtue of the fact that an
+ * ancestor dir has already been deleted.
+ * @param path path to check
+ * @return true if the parent dir is deleted.
+ */
+ private boolean isInDeletedDirectory(Path path) {
+ Preconditions.checkArgument(!path.isRoot(), "Root Dir");
+ return isDirectoryOrAncestorDeleted(path.getParent());
+ }
+
+ /**
+ * Should a file or directory be deleted?
+ * The cache of deleted directories will be updated with the path
+ * of the status if it references a directory.
+ * @param status file/path to check
+ * @return true if the path should be deleted.
+ */
+ boolean shouldDelete(CopyListingFileStatus status) {
+ Path path = status.getPath();
+ Preconditions.checkArgument(!path.isRoot(), "Root Dir");
+ if (status.isDirectory()) {
+ boolean deleted = isDirectoryOrAncestorDeleted(path);
+ // even if an ancestor has been deleted, add this entry as
+ // a deleted directory.
+ directories.put(path, path);
+ return !deleted;
+ } else {
+ return !isInDeletedDirectory(path);
+ }
+ }
+
+ /**
+ * Is a path directly contained in the set of deleted directories.
+ * @param dir directory to probe
+ * @return true if this directory is recorded as being deleted.
+ */
+ boolean isContained(Path dir) {
+ return directories.getIfPresent(dir) != null;
+ }
+
+ @Override
+ public String toString() {
+ final StringBuilder sb = new StringBuilder(
+ "DeletedDirTracker{");
+ sb.append("maximum size=").append(cacheSize);
+ sb.append("; current size=").append(directories.size());
+ sb.append('}');
+ return sb.toString();
+ }
+
+ /**
+ * Return the current size of the tracker, as in #of entries in the cache.
+ * @return tracker size.
+ */
+ long size() {
+ return directories.size();
+ }
+}
diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/DistCpUtils.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/DistCpUtils.java
index eba4beeba4..2a60e80112 100644
--- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/DistCpUtils.java
+++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/DistCpUtils.java
@@ -433,24 +433,45 @@ public static CopyListingFileStatus toCopyListingFileStatusHelper(
}
/**
- * Sort sequence file containing FileStatus and Text as key and value respecitvely
+ * Sort sequence file containing FileStatus and Text as key and value
+ * respectively.
*
- * @param fs - File System
* @param conf - Configuration
* @param sourceListing - Source listing file
* @return Path of the sorted file. Is source file with _sorted appended to the name
* @throws IOException - Any exception during sort.
*/
- public static Path sortListing(FileSystem fs, Configuration conf, Path sourceListing)
+ public static Path sortListing(Configuration conf,
+ Path sourceListing)
throws IOException {
+ Path output = new Path(sourceListing.toString() + "_sorted");
+ sortListing(conf, sourceListing, output);
+ return output;
+ }
+
+ /**
+ * Sort sequence file containing FileStatus and Text as key and value
+ * respectively, saving the result to the {@code output} path, which
+ * will be deleted first.
+ *
+ * @param conf - Configuration
+ * @param sourceListing - Source listing file
+ * @param output output path
+ * @throws IOException - Any exception during sort.
+ */
+
+ public static void sortListing(final Configuration conf,
+ final Path sourceListing,
+ final Path output) throws IOException {
+ FileSystem fs = sourceListing.getFileSystem(conf);
+ // force verify that the destination FS matches the input
+ fs.makeQualified(output);
SequenceFile.Sorter sorter = new SequenceFile.Sorter(fs, Text.class,
CopyListingFileStatus.class, conf);
- Path output = new Path(sourceListing.toString() + "_sorted");
fs.delete(output, false);
sorter.sort(sourceListing, output);
- return output;
}
/**
@@ -547,9 +568,13 @@ public static boolean checksumsAreEqual(FileSystem sourceFS, Path source,
throws IOException {
FileChecksum targetChecksum = null;
try {
- sourceChecksum = sourceChecksum != null ? sourceChecksum : sourceFS
- .getFileChecksum(source);
- targetChecksum = targetFS.getFileChecksum(target);
+ sourceChecksum = sourceChecksum != null
+ ? sourceChecksum
+ : sourceFS.getFileChecksum(source);
+ if (sourceChecksum != null) {
+ // iff there's a source checksum, look for one at the destination.
+ targetChecksum = targetFS.getFileChecksum(target);
+ }
} catch (IOException e) {
LOG.error("Unable to retrieve checksum for " + source + " or " + target, e);
}
diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/contract/AbstractContractDistCpTest.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/contract/AbstractContractDistCpTest.java
index fb1a64db18..a5e0a03357 100644
--- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/contract/AbstractContractDistCpTest.java
+++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/contract/AbstractContractDistCpTest.java
@@ -20,22 +20,35 @@
import static org.apache.hadoop.fs.contract.ContractTestUtils.*;
+import java.io.IOException;
import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.contract.AbstractFSContractTestBase;
import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.tools.CopyListingFileStatus;
import org.apache.hadoop.tools.DistCp;
+import org.apache.hadoop.tools.DistCpConstants;
import org.apache.hadoop.tools.DistCpOptions;
+import org.apache.hadoop.tools.mapred.CopyMapper;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Contract test suite covering a file system's integration with DistCp. The
@@ -48,13 +61,70 @@
public abstract class AbstractContractDistCpTest
extends AbstractFSContractTestBase {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(AbstractContractDistCpTest.class);
+
+ public static final String SCALE_TEST_DISTCP_FILE_SIZE_KB
+ = "scale.test.distcp.file.size.kb";
+
+ public static final int DEFAULT_DISTCP_SIZE_KB = 1024;
+
+ protected static final int MB = 1024 * 1024;
+
@Rule
public TestName testName = new TestName();
+ /**
+ * The timeout value is extended over the default so that large updates
+ * are allowed to take time, especially to remote stores.
+ * @return the current test timeout
+ */
+ protected int getTestTimeoutMillis() {
+ return 15 * 60 * 1000;
+ }
+
private Configuration conf;
private FileSystem localFS, remoteFS;
private Path localDir, remoteDir;
+ private Path inputDir;
+
+ private Path inputSubDir1;
+
+ private Path inputSubDir2;
+
+ private Path inputSubDir4;
+
+ private Path inputFile1;
+
+ private Path inputFile2;
+
+ private Path inputFile3;
+
+ private Path inputFile4;
+
+ private Path inputFile5;
+
+ private Path outputDir;
+
+ private Path outputSubDir1;
+
+ private Path outputSubDir2;
+
+ private Path outputSubDir4;
+
+ private Path outputFile1;
+
+ private Path outputFile2;
+
+ private Path outputFile3;
+
+ private Path outputFile4;
+
+ private Path outputFile5;
+
+ private Path inputDirUnderOutputDir;
+
@Override
protected Configuration createConfiguration() {
Configuration newConf = new Configuration();
@@ -73,20 +143,307 @@ public void setup() throws Exception {
// All paths are fully qualified including scheme (not taking advantage of
// default file system), so if something fails, the messages will make it
// clear which paths are local and which paths are remote.
- Path testSubDir = new Path(getClass().getSimpleName(),
- testName.getMethodName());
- localDir = localFS.makeQualified(new Path(new Path(
- GenericTestUtils.getTestDir().toURI()), testSubDir));
+ String className = getClass().getSimpleName();
+ String testSubDir = className + "/" + testName.getMethodName();
+ localDir =
+ localFS.makeQualified(new Path(new Path(
+ GenericTestUtils.getTestDir().toURI()), testSubDir + "/local"));
mkdirs(localFS, localDir);
- remoteDir = remoteFS.makeQualified(
- new Path(getContract().getTestPath(), testSubDir));
+ remoteDir = path(testSubDir + "/remote");
mkdirs(remoteFS, remoteDir);
+ // test teardown does this, but IDE-based test debugging can skip
+ // that teardown; this guarantees the initial state is clean
+ remoteFS.delete(remoteDir, true);
+ localFS.delete(localDir, true);
+ }
+
+ /**
+ * Set up both input and output fields.
+ * @param src source tree
+ * @param dest dest tree
+ */
+ protected void initPathFields(final Path src, final Path dest) {
+ initInputFields(src);
+ initOutputFields(dest);
+ }
+
+ /**
+ * Output field setup.
+ * @param path path to set up
+ */
+ protected void initOutputFields(final Path path) {
+ outputDir = new Path(path, "outputDir");
+ inputDirUnderOutputDir = new Path(outputDir, "inputDir");
+ outputFile1 = new Path(inputDirUnderOutputDir, "file1");
+ outputSubDir1 = new Path(inputDirUnderOutputDir, "subDir1");
+ outputFile2 = new Path(outputSubDir1, "file2");
+ outputSubDir2 = new Path(inputDirUnderOutputDir, "subDir2/subDir2");
+ outputFile3 = new Path(outputSubDir2, "file3");
+ outputSubDir4 = new Path(inputDirUnderOutputDir, "subDir4/subDir4");
+ outputFile4 = new Path(outputSubDir4, "file4");
+ outputFile5 = new Path(outputSubDir4, "file5");
+ }
+
+ /**
+ * this path setup is used across different methods (copy, update, track)
+ * so they are set up as fields.
+ * @param srcDir source directory for these to go under.
+ */
+ protected void initInputFields(final Path srcDir) {
+ inputDir = new Path(srcDir, "inputDir");
+ inputFile1 = new Path(inputDir, "file1");
+ inputSubDir1 = new Path(inputDir, "subDir1");
+ inputFile2 = new Path(inputSubDir1, "file2");
+ inputSubDir2 = new Path(inputDir, "subDir2/subDir2");
+ inputFile3 = new Path(inputSubDir2, "file3");
+ inputSubDir4 = new Path(inputDir, "subDir4/subDir4");
+ inputFile4 = new Path(inputSubDir4, "file4");
+ inputFile5 = new Path(inputSubDir4, "file5");
+ }
+
+ protected FileSystem getLocalFS() {
+ return localFS;
+ }
+
+ protected FileSystem getRemoteFS() {
+ return remoteFS;
+ }
+
+ protected Path getLocalDir() {
+ return localDir;
+ }
+
+ protected Path getRemoteDir() {
+ return remoteDir;
}
@Test
- public void deepDirectoryStructureToRemote() throws Exception {
+ public void testUpdateDeepDirectoryStructureToRemote() throws Exception {
+ describe("update a deep directory structure from local to remote");
+ distCpDeepDirectoryStructure(localFS, localDir, remoteFS, remoteDir);
+ distCpUpdateDeepDirectoryStructure(inputDirUnderOutputDir);
+ }
+
+ @Test
+ public void testUpdateDeepDirectoryStructureNoChange() throws Exception {
+ describe("update an unchanged directory structure"
+ + " from local to remote; expect no copy");
+ Path target = distCpDeepDirectoryStructure(localFS, localDir, remoteFS,
+ remoteDir);
+ describe("\nExecuting Update\n");
+ Job job = distCpUpdate(localDir, target);
+ assertCounterInRange(job, CopyMapper.Counter.SKIP, 1, -1);
+ assertCounterInRange(job, CopyMapper.Counter.BYTESCOPIED, 0, 0);
+ }
+
+ /**
+ * Assert that a counter is in a range; min and max values are inclusive.
+ * @param job job to query
+ * @param counter counter to examine
+ * @param min min value, if negative "no minimum"
+ * @param max max value, if negative "no maximum"
+ * @throws IOException IO problem
+ */
+ void assertCounterInRange(Job job, Enum> counter, long min, long max)
+ throws IOException {
+ Counter c = job.getCounters().findCounter(counter);
+ long value = c.getValue();
+ String description =
+ String.format("%s value %s", c.getDisplayName(), value);
+
+ if (min >= 0) {
+ assertTrue(description + " too below minimum " + min,
+ value >= min);
+ }
+ if (max >= 0) {
+ assertTrue(description + " above maximum " + max,
+ value <= max);
+ }
+ }
+
+ /**
+ * Do a distcp from the local source to the destination filesystem.
+ * This is executed as part of
+ * {@link #testUpdateDeepDirectoryStructureToRemote()}; it's designed to be
+ * overidden or wrapped by subclasses which wish to add more assertions.
+ *
+ * Life is complicated here by the way that the src/dest paths
+ * on a distcp is different with -update.
+ * @param destDir output directory used by the initial distcp
+ * @return the distcp job
+ */
+ protected Job distCpUpdateDeepDirectoryStructure(final Path destDir)
+ throws Exception {
+ describe("Now do an incremental update with deletion of missing files");
+ Path srcDir = inputDir;
+ LOG.info("Source directory = {}, dest={}", srcDir, destDir);
+
+ ContractTestUtils.assertPathsExist(localFS,
+ "Paths for test are wrong",
+ inputFile1, inputFile2, inputFile3, inputFile4, inputFile5);
+
+ modifySourceDirectories();
+
+ Job job = distCpUpdate(srcDir, destDir);
+
+ Path outputFileNew1 = new Path(outputSubDir2, "newfile1");
+
+ lsR("Updated Remote", remoteFS, destDir);
+
+ ContractTestUtils.assertPathDoesNotExist(remoteFS,
+ " deleted from " + inputFile1, outputFile1);
+ ContractTestUtils.assertIsFile(remoteFS, outputFileNew1);
+ ContractTestUtils.assertPathsDoNotExist(remoteFS,
+ "DistCP should have deleted",
+ outputFile3, outputFile4, outputSubDir4);
+ assertCounterInRange(job, CopyMapper.Counter.COPY, 1, 1);
+ assertCounterInRange(job, CopyMapper.Counter.SKIP, 1, -1);
+ return job;
+ }
+
+ /**
+ * Run distcp -update srcDir destDir.
+ * @param srcDir local source directory
+ * @param destDir remote destination directory.
+ * @return the completed job
+ * @throws Exception any failure.
+ */
+ private Job distCpUpdate(final Path srcDir, final Path destDir)
+ throws Exception {
+ describe("\nDistcp -update from " + srcDir + " to " + destDir);
+ lsR("Local to update", localFS, srcDir);
+ lsR("Remote before update", remoteFS, destDir);
+ return runDistCp(buildWithStandardOptions(
+ new DistCpOptions.Builder(
+ Collections.singletonList(srcDir), destDir)
+ .withDeleteMissing(true)
+ .withSyncFolder(true)
+ .withCRC(true)
+ .withOverwrite(false)));
+ }
+
+ /**
+ * Update the source directories as various tests expect,
+ * including adding a new file.
+ * @return the path to the newly created file
+ * @throws IOException IO failure
+ */
+ private Path modifySourceDirectories() throws IOException {
+ localFS.delete(inputFile1, false);
+ localFS.delete(inputFile3, false);
+ // delete all of subdir4, so input/output file 4 & 5 will go
+ localFS.delete(inputSubDir4, true);
+ // add one new file
+ Path inputFileNew1 = new Path(inputSubDir2, "newfile1");
+ ContractTestUtils.touch(localFS, inputFileNew1);
+ return inputFileNew1;
+ }
+
+
+ @Test
+ public void testTrackDeepDirectoryStructureToRemote() throws Exception {
describe("copy a deep directory structure from local to remote");
- deepDirectoryStructure(localFS, localDir, remoteFS, remoteDir);
+
+ Path destDir = distCpDeepDirectoryStructure(localFS, localDir, remoteFS,
+ remoteDir);
+ ContractTestUtils.assertIsDirectory(remoteFS, destDir);
+
+ describe("Now do an incremental update and save of missing files");
+ Path srcDir = inputDir;
+ // same path setup as in deepDirectoryStructure()
+ Path trackDir = new Path(localDir, "trackDir");
+
+
+ describe("\nDirectories\n");
+ lsR("Local to update", localFS, srcDir);
+ lsR("Remote before update", remoteFS, destDir);
+
+
+ ContractTestUtils.assertPathsExist(localFS,
+ "Paths for test are wrong",
+ inputFile2, inputFile3, inputFile4, inputFile5);
+
+ Path inputFileNew1 = modifySourceDirectories();
+
+ // Distcp set to track but not delete
+ runDistCp(buildWithStandardOptions(
+ new DistCpOptions.Builder(
+ Collections.singletonList(srcDir),
+ inputDirUnderOutputDir)
+ .withTrackMissing(trackDir)
+ .withSyncFolder(true)
+ .withOverwrite(false)));
+
+ lsR("tracked udpate", remoteFS, destDir);
+ // new file went over
+ Path outputFileNew1 = new Path(outputSubDir2, "newfile1");
+ ContractTestUtils.assertIsFile(remoteFS, outputFileNew1);
+
+ ContractTestUtils.assertPathExists(localFS, "tracking directory",
+ trackDir);
+
+ // now read in the listings
+ Path sortedSourceListing = new Path(trackDir,
+ DistCpConstants.SOURCE_SORTED_FILE);
+ ContractTestUtils.assertIsFile(localFS, sortedSourceListing);
+ Path sortedTargetListing = new Path(trackDir,
+ DistCpConstants.TARGET_SORTED_FILE);
+ ContractTestUtils.assertIsFile(localFS, sortedTargetListing);
+ // deletion didn't happen
+ ContractTestUtils.assertPathsExist(remoteFS,
+ "DistCP should have retained",
+ outputFile2, outputFile3, outputFile4, outputSubDir4);
+
+ // now scan the table and see that things are there.
+ Map sourceFiles = new HashMap<>(10);
+ Map targetFiles = new HashMap<>(10);
+
+ try (SequenceFile.Reader sourceReader = new SequenceFile.Reader(conf,
+ SequenceFile.Reader.file(sortedSourceListing));
+ SequenceFile.Reader targetReader = new SequenceFile.Reader(conf,
+ SequenceFile.Reader.file(sortedTargetListing))) {
+ CopyListingFileStatus copyStatus = new CopyListingFileStatus();
+ Text name = new Text();
+ while(sourceReader.next(name, copyStatus)) {
+ String key = name.toString();
+ Path path = copyStatus.getPath();
+ LOG.info("{}: {}", key, path);
+ sourceFiles.put(key, path);
+ }
+ while(targetReader.next(name, copyStatus)) {
+ String key = name.toString();
+ Path path = copyStatus.getPath();
+ LOG.info("{}: {}", key, path);
+ targetFiles.put(name.toString(), copyStatus.getPath());
+ }
+ }
+
+ // look for the new file in both lists
+ assertTrue("No " + outputFileNew1 + " in source listing",
+ sourceFiles.containsValue(inputFileNew1));
+ assertTrue("No " + outputFileNew1 + " in target listing",
+ targetFiles.containsValue(outputFileNew1));
+ assertTrue("No " + outputSubDir4 + " in target listing",
+ targetFiles.containsValue(outputSubDir4));
+ assertFalse("Found " + inputSubDir4 + " in source listing",
+ sourceFiles.containsValue(inputSubDir4));
+
+ }
+
+ public void lsR(final String description,
+ final FileSystem fs,
+ final Path dir) throws IOException {
+ RemoteIterator files = fs.listFiles(dir, true);
+ LOG.info("{}: {}:", description, dir);
+ StringBuilder sb = new StringBuilder();
+ while(files.hasNext()) {
+ LocatedFileStatus status = files.next();
+ sb.append(String.format(" %s; type=%s; length=%d",
+ status.getPath(),
+ status.isDirectory()? "dir" : "file",
+ status.getLen()));
+ }
+ LOG.info("{}", sb);
}
@Test
@@ -96,34 +453,35 @@ public void largeFilesToRemote() throws Exception {
}
@Test
- public void deepDirectoryStructureFromRemote() throws Exception {
+ public void testDeepDirectoryStructureFromRemote() throws Exception {
describe("copy a deep directory structure from remote to local");
- deepDirectoryStructure(remoteFS, remoteDir, localFS, localDir);
+ distCpDeepDirectoryStructure(remoteFS, remoteDir, localFS, localDir);
}
@Test
- public void largeFilesFromRemote() throws Exception {
+ public void testLargeFilesFromRemote() throws Exception {
describe("copy multiple large files from remote to local");
largeFiles(remoteFS, remoteDir, localFS, localDir);
}
/**
- * Executes a test using a file system sub-tree with multiple nesting levels.
+ * Executes a DistCp using a file system sub-tree with multiple nesting
+ * levels.
+ * The filenames are those of the fields initialized in setup.
*
* @param srcFS source FileSystem
* @param srcDir source directory
* @param dstFS destination FileSystem
* @param dstDir destination directory
+ * @return the target directory of the copy
* @throws Exception if there is a failure
*/
- private void deepDirectoryStructure(FileSystem srcFS, Path srcDir,
- FileSystem dstFS, Path dstDir) throws Exception {
- Path inputDir = new Path(srcDir, "inputDir");
- Path inputSubDir1 = new Path(inputDir, "subDir1");
- Path inputSubDir2 = new Path(inputDir, "subDir2/subDir3");
- Path inputFile1 = new Path(inputDir, "file1");
- Path inputFile2 = new Path(inputSubDir1, "file2");
- Path inputFile3 = new Path(inputSubDir2, "file3");
+ private Path distCpDeepDirectoryStructure(FileSystem srcFS,
+ Path srcDir,
+ FileSystem dstFS,
+ Path dstDir) throws Exception {
+ initPathFields(srcDir, dstDir);
+
mkdirs(srcFS, inputSubDir1);
mkdirs(srcFS, inputSubDir2);
byte[] data1 = dataset(100, 33, 43);
@@ -132,14 +490,18 @@ private void deepDirectoryStructure(FileSystem srcFS, Path srcDir,
createFile(srcFS, inputFile2, true, data2);
byte[] data3 = dataset(300, 53, 63);
createFile(srcFS, inputFile3, true, data3);
+ createFile(srcFS, inputFile4, true, dataset(400, 53, 63));
+ createFile(srcFS, inputFile5, true, dataset(500, 53, 63));
Path target = new Path(dstDir, "outputDir");
runDistCp(inputDir, target);
ContractTestUtils.assertIsDirectory(dstFS, target);
+ lsR("Destination tree after distcp", dstFS, target);
verifyFileContents(dstFS, new Path(target, "inputDir/file1"), data1);
verifyFileContents(dstFS,
new Path(target, "inputDir/subDir1/file2"), data2);
verifyFileContents(dstFS,
- new Path(target, "inputDir/subDir2/subDir3/file3"), data3);
+ new Path(target, "inputDir/subDir2/subDir2/file3"), data3);
+ return target;
}
/**
@@ -153,20 +515,21 @@ private void deepDirectoryStructure(FileSystem srcFS, Path srcDir,
*/
private void largeFiles(FileSystem srcFS, Path srcDir, FileSystem dstFS,
Path dstDir) throws Exception {
- Path inputDir = new Path(srcDir, "inputDir");
- Path inputFile1 = new Path(inputDir, "file1");
- Path inputFile2 = new Path(inputDir, "file2");
- Path inputFile3 = new Path(inputDir, "file3");
+ initPathFields(srcDir, dstDir);
+ Path largeFile1 = new Path(inputDir, "file1");
+ Path largeFile2 = new Path(inputDir, "file2");
+ Path largeFile3 = new Path(inputDir, "file3");
mkdirs(srcFS, inputDir);
- int fileSizeKb = conf.getInt("scale.test.distcp.file.size.kb", 10 * 1024);
+ int fileSizeKb = conf.getInt(SCALE_TEST_DISTCP_FILE_SIZE_KB,
+ DEFAULT_DISTCP_SIZE_KB);
int fileSizeMb = fileSizeKb / 1024;
getLog().info("{} with file size {}", testName.getMethodName(), fileSizeMb);
- byte[] data1 = dataset((fileSizeMb + 1) * 1024 * 1024, 33, 43);
- createFile(srcFS, inputFile1, true, data1);
- byte[] data2 = dataset((fileSizeMb + 2) * 1024 * 1024, 43, 53);
- createFile(srcFS, inputFile2, true, data2);
- byte[] data3 = dataset((fileSizeMb + 3) * 1024 * 1024, 53, 63);
- createFile(srcFS, inputFile3, true, data3);
+ byte[] data1 = dataset((fileSizeMb + 1) * MB, 33, 43);
+ createFile(srcFS, largeFile1, true, data1);
+ byte[] data2 = dataset((fileSizeMb + 2) * MB, 43, 53);
+ createFile(srcFS, largeFile2, true, data2);
+ byte[] data3 = dataset((fileSizeMb + 3) * MB, 53, 63);
+ createFile(srcFS, largeFile3, true, data3);
Path target = new Path(dstDir, "outputDir");
runDistCp(inputDir, target);
ContractTestUtils.assertIsDirectory(dstFS, target);
@@ -183,12 +546,34 @@ private void largeFiles(FileSystem srcFS, Path srcDir, FileSystem dstFS,
* @throws Exception if there is a failure
*/
private void runDistCp(Path src, Path dst) throws Exception {
- DistCpOptions options = new DistCpOptions.Builder(
- Collections.singletonList(src), dst).build();
+ runDistCp(buildWithStandardOptions(
+ new DistCpOptions.Builder(Collections.singletonList(src), dst)));
+ }
+
+ /**
+ * Run the distcp job.
+ * @param optons distcp options
+ * @return the job. It will have already completed.
+ * @throws Exception failure
+ */
+ private Job runDistCp(final DistCpOptions options) throws Exception {
Job job = new DistCp(conf, options).execute();
assertNotNull("Unexpected null job returned from DistCp execution.", job);
assertTrue("DistCp job did not complete.", job.isComplete());
assertTrue("DistCp job did not complete successfully.", job.isSuccessful());
+ return job;
+ }
+
+ /**
+ * Add any standard options and then build.
+ * @param builder DistCp option builder
+ * @return the build options
+ */
+ private DistCpOptions buildWithStandardOptions(
+ DistCpOptions.Builder builder) {
+ return builder
+ .withNumListstatusThreads(8)
+ .build();
}
/**
diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/contract/TestLocalContractDistCp.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/contract/TestLocalContractDistCp.java
new file mode 100644
index 0000000000..1a054c5716
--- /dev/null
+++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/contract/TestLocalContractDistCp.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.contract;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
+import org.apache.hadoop.fs.contract.localfs.LocalFSContract;
+
+/**
+ * Verifies that the local FS passes all the tests in
+ * {@link AbstractContractDistCpTest}.
+ * As such, it acts as an in-module validation of this contract test itself.
+ */
+public class TestLocalContractDistCp extends AbstractContractDistCpTest {
+
+ @Override
+ protected AbstractFSContract createContract(Configuration conf) {
+ return new LocalFSContract(conf);
+ }
+}
diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyCommitter.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyCommitter.java
index bf151cd173..e00241bafb 100644
--- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyCommitter.java
+++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyCommitter.java
@@ -43,6 +43,9 @@
import java.io.IOException;
import java.util.*;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.*;
+import static org.apache.hadoop.tools.util.TestDistCpUtils.*;
+
public class TestCopyCommitter {
private static final Log LOG = LogFactory.getLog(TestCopyCommitter.class);
@@ -80,56 +83,42 @@ public static void destroy() {
}
@Before
- public void createMetaFolder() {
+ public void createMetaFolder() throws IOException {
config.set(DistCpConstants.CONF_LABEL_META_FOLDER, "/meta");
// Unset listing file path since the config is shared by
// multiple tests, and some test doesn't set it, such as
// testNoCommitAction, but the distcp code will check it.
config.set(DistCpConstants.CONF_LABEL_LISTING_FILE_PATH, "");
Path meta = new Path("/meta");
- try {
- cluster.getFileSystem().mkdirs(meta);
- } catch (IOException e) {
- LOG.error("Exception encountered while creating meta folder", e);
- Assert.fail("Unable to create meta folder");
- }
+ cluster.getFileSystem().mkdirs(meta);
}
@After
- public void cleanupMetaFolder() {
+ public void cleanupMetaFolder() throws IOException {
Path meta = new Path("/meta");
- try {
- if (cluster.getFileSystem().exists(meta)) {
- cluster.getFileSystem().delete(meta, true);
- Assert.fail("Expected meta folder to be deleted");
- }
- } catch (IOException e) {
- LOG.error("Exception encountered while cleaning up folder", e);
- Assert.fail("Unable to clean up meta folder");
+ if (cluster.getFileSystem().exists(meta)) {
+ cluster.getFileSystem().delete(meta, true);
+ Assert.fail("Expected meta folder to be deleted");
}
}
@Test
- public void testNoCommitAction() {
+ public void testNoCommitAction() throws IOException {
TaskAttemptContext taskAttemptContext = getTaskAttemptContext(config);
- JobContext jobContext = new JobContextImpl(taskAttemptContext.getConfiguration(),
+ JobContext jobContext = new JobContextImpl(
+ taskAttemptContext.getConfiguration(),
taskAttemptContext.getTaskAttemptID().getJobID());
- try {
- OutputCommitter committer = new CopyCommitter(null, taskAttemptContext);
- committer.commitJob(jobContext);
- Assert.assertEquals(taskAttemptContext.getStatus(), "Commit Successful");
+ OutputCommitter committer = new CopyCommitter(null, taskAttemptContext);
+ committer.commitJob(jobContext);
+ Assert.assertEquals("Commit Successful", taskAttemptContext.getStatus());
- //Test for idempotent commit
- committer.commitJob(jobContext);
- Assert.assertEquals(taskAttemptContext.getStatus(), "Commit Successful");
- } catch (IOException e) {
- LOG.error("Exception encountered ", e);
- Assert.fail("Commit failed");
- }
+ //Test for idempotent commit
+ committer.commitJob(jobContext);
+ Assert.assertEquals("Commit Successful", taskAttemptContext.getStatus());
}
@Test
- public void testPreserveStatus() {
+ public void testPreserveStatus() throws IOException {
TaskAttemptContext taskAttemptContext = getTaskAttemptContext(config);
JobContext jobContext = new JobContextImpl(taskAttemptContext.getConfiguration(),
taskAttemptContext.getTaskAttemptID().getJobID());
@@ -161,19 +150,12 @@ public void testPreserveStatus() {
conf.set(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH, targetBase);
committer.commitJob(jobContext);
- if (!checkDirectoryPermissions(fs, targetBase, sourcePerm)) {
- Assert.fail("Permission don't match");
- }
+ checkDirectoryPermissions(fs, targetBase, sourcePerm);
//Test for idempotent commit
committer.commitJob(jobContext);
- if (!checkDirectoryPermissions(fs, targetBase, sourcePerm)) {
- Assert.fail("Permission don't match");
- }
+ checkDirectoryPermissions(fs, targetBase, sourcePerm);
- } catch (IOException e) {
- LOG.error("Exception encountered while testing for preserve status", e);
- Assert.fail("Preserve status failure");
} finally {
TestDistCpUtils.delete(fs, "/tmp1");
conf.unset(DistCpConstants.CONF_LABEL_PRESERVE_STATUS);
@@ -182,7 +164,7 @@ public void testPreserveStatus() {
}
@Test
- public void testDeleteMissing() {
+ public void testDeleteMissing() throws IOException {
TaskAttemptContext taskAttemptContext = getTaskAttemptContext(config);
JobContext jobContext = new JobContextImpl(taskAttemptContext.getConfiguration(),
taskAttemptContext.getTaskAttemptID().getJobID());
@@ -213,24 +195,13 @@ public void testDeleteMissing() {
conf.set(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH, targetBase);
committer.commitJob(jobContext);
- if (!TestDistCpUtils.checkIfFoldersAreInSync(fs, targetBase, sourceBase)) {
- Assert.fail("Source and target folders are not in sync");
- }
- if (!TestDistCpUtils.checkIfFoldersAreInSync(fs, sourceBase, targetBase)) {
- Assert.fail("Source and target folders are not in sync");
- }
+ verifyFoldersAreInSync(fs, targetBase, sourceBase);
+ verifyFoldersAreInSync(fs, sourceBase, targetBase);
//Test for idempotent commit
committer.commitJob(jobContext);
- if (!TestDistCpUtils.checkIfFoldersAreInSync(fs, targetBase, sourceBase)) {
- Assert.fail("Source and target folders are not in sync");
- }
- if (!TestDistCpUtils.checkIfFoldersAreInSync(fs, sourceBase, targetBase)) {
- Assert.fail("Source and target folders are not in sync");
- }
- } catch (Throwable e) {
- LOG.error("Exception encountered while testing for delete missing", e);
- Assert.fail("Delete missing failure");
+ verifyFoldersAreInSync(fs, targetBase, sourceBase);
+ verifyFoldersAreInSync(fs, sourceBase, targetBase);
} finally {
TestDistCpUtils.delete(fs, "/tmp1");
conf.set(DistCpConstants.CONF_LABEL_DELETE_MISSING, "false");
@@ -238,7 +209,7 @@ public void testDeleteMissing() {
}
@Test
- public void testDeleteMissingFlatInterleavedFiles() {
+ public void testDeleteMissingFlatInterleavedFiles() throws IOException {
TaskAttemptContext taskAttemptContext = getTaskAttemptContext(config);
JobContext jobContext = new JobContextImpl(taskAttemptContext.getConfiguration(),
taskAttemptContext.getTaskAttemptID().getJobID());
@@ -253,20 +224,20 @@ public void testDeleteMissingFlatInterleavedFiles() {
fs = FileSystem.get(conf);
sourceBase = "/tmp1/" + String.valueOf(rand.nextLong());
targetBase = "/tmp1/" + String.valueOf(rand.nextLong());
- TestDistCpUtils.createFile(fs, sourceBase + "/1");
- TestDistCpUtils.createFile(fs, sourceBase + "/3");
- TestDistCpUtils.createFile(fs, sourceBase + "/4");
- TestDistCpUtils.createFile(fs, sourceBase + "/5");
- TestDistCpUtils.createFile(fs, sourceBase + "/7");
- TestDistCpUtils.createFile(fs, sourceBase + "/8");
- TestDistCpUtils.createFile(fs, sourceBase + "/9");
+ createFile(fs, sourceBase + "/1");
+ createFile(fs, sourceBase + "/3");
+ createFile(fs, sourceBase + "/4");
+ createFile(fs, sourceBase + "/5");
+ createFile(fs, sourceBase + "/7");
+ createFile(fs, sourceBase + "/8");
+ createFile(fs, sourceBase + "/9");
- TestDistCpUtils.createFile(fs, targetBase + "/2");
- TestDistCpUtils.createFile(fs, targetBase + "/4");
- TestDistCpUtils.createFile(fs, targetBase + "/5");
- TestDistCpUtils.createFile(fs, targetBase + "/7");
- TestDistCpUtils.createFile(fs, targetBase + "/9");
- TestDistCpUtils.createFile(fs, targetBase + "/A");
+ createFile(fs, targetBase + "/2");
+ createFile(fs, targetBase + "/4");
+ createFile(fs, targetBase + "/5");
+ createFile(fs, targetBase + "/7");
+ createFile(fs, targetBase + "/9");
+ createFile(fs, targetBase + "/A");
final DistCpOptions options = new DistCpOptions.Builder(
Collections.singletonList(new Path(sourceBase)), new Path("/out"))
@@ -282,20 +253,13 @@ public void testDeleteMissingFlatInterleavedFiles() {
conf.set(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH, targetBase);
committer.commitJob(jobContext);
- if (!TestDistCpUtils.checkIfFoldersAreInSync(fs, targetBase, sourceBase)) {
- Assert.fail("Source and target folders are not in sync");
- }
- Assert.assertEquals(fs.listStatus(new Path(targetBase)).length, 4);
+ verifyFoldersAreInSync(fs, targetBase, sourceBase);
+ Assert.assertEquals(4, fs.listStatus(new Path(targetBase)).length);
//Test for idempotent commit
committer.commitJob(jobContext);
- if (!TestDistCpUtils.checkIfFoldersAreInSync(fs, targetBase, sourceBase)) {
- Assert.fail("Source and target folders are not in sync");
- }
- Assert.assertEquals(fs.listStatus(new Path(targetBase)).length, 4);
- } catch (IOException e) {
- LOG.error("Exception encountered while testing for delete missing", e);
- Assert.fail("Delete missing failure");
+ verifyFoldersAreInSync(fs, targetBase, sourceBase);
+ Assert.assertEquals(4, fs.listStatus(new Path(targetBase)).length);
} finally {
TestDistCpUtils.delete(fs, "/tmp1");
conf.set(DistCpConstants.CONF_LABEL_DELETE_MISSING, "false");
@@ -304,7 +268,7 @@ public void testDeleteMissingFlatInterleavedFiles() {
}
@Test
- public void testAtomicCommitMissingFinal() {
+ public void testAtomicCommitMissingFinal() throws IOException {
TaskAttemptContext taskAttemptContext = getTaskAttemptContext(config);
JobContext jobContext = new JobContextImpl(taskAttemptContext.getConfiguration(),
taskAttemptContext.getTaskAttemptID().getJobID());
@@ -322,19 +286,16 @@ public void testAtomicCommitMissingFinal() {
conf.set(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH, finalPath);
conf.setBoolean(DistCpConstants.CONF_LABEL_ATOMIC_COPY, true);
- Assert.assertTrue(fs.exists(new Path(workPath)));
- Assert.assertFalse(fs.exists(new Path(finalPath)));
+ assertPathExists(fs, "Work path", new Path(workPath));
+ assertPathDoesNotExist(fs, "Final path", new Path(finalPath));
committer.commitJob(jobContext);
- Assert.assertFalse(fs.exists(new Path(workPath)));
- Assert.assertTrue(fs.exists(new Path(finalPath)));
+ assertPathDoesNotExist(fs, "Work path", new Path(workPath));
+ assertPathExists(fs, "Final path", new Path(finalPath));
//Test for idempotent commit
committer.commitJob(jobContext);
- Assert.assertFalse(fs.exists(new Path(workPath)));
- Assert.assertTrue(fs.exists(new Path(finalPath)));
- } catch (IOException e) {
- LOG.error("Exception encountered while testing for preserve status", e);
- Assert.fail("Atomic commit failure");
+ assertPathDoesNotExist(fs, "Work path", new Path(workPath));
+ assertPathExists(fs, "Final path", new Path(finalPath));
} finally {
TestDistCpUtils.delete(fs, workPath);
TestDistCpUtils.delete(fs, finalPath);
@@ -343,7 +304,7 @@ public void testAtomicCommitMissingFinal() {
}
@Test
- public void testAtomicCommitExistingFinal() {
+ public void testAtomicCommitExistingFinal() throws IOException {
TaskAttemptContext taskAttemptContext = getTaskAttemptContext(config);
JobContext jobContext = new JobContextImpl(taskAttemptContext.getConfiguration(),
taskAttemptContext.getTaskAttemptID().getJobID());
@@ -363,20 +324,17 @@ public void testAtomicCommitExistingFinal() {
conf.set(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH, finalPath);
conf.setBoolean(DistCpConstants.CONF_LABEL_ATOMIC_COPY, true);
- Assert.assertTrue(fs.exists(new Path(workPath)));
- Assert.assertTrue(fs.exists(new Path(finalPath)));
+ assertPathExists(fs, "Work path", new Path(workPath));
+ assertPathExists(fs, "Final path", new Path(finalPath));
try {
committer.commitJob(jobContext);
Assert.fail("Should not be able to atomic-commit to pre-existing path.");
} catch(Exception exception) {
- Assert.assertTrue(fs.exists(new Path(workPath)));
- Assert.assertTrue(fs.exists(new Path(finalPath)));
+ assertPathExists(fs, "Work path", new Path(workPath));
+ assertPathExists(fs, "Final path", new Path(finalPath));
LOG.info("Atomic-commit Test pass.");
}
- } catch (IOException e) {
- LOG.error("Exception encountered while testing for atomic commit.", e);
- Assert.fail("Atomic commit failure");
} finally {
TestDistCpUtils.delete(fs, workPath);
TestDistCpUtils.delete(fs, finalPath);
@@ -389,11 +347,11 @@ private TaskAttemptContext getTaskAttemptContext(Configuration conf) {
new TaskAttemptID("200707121733", 1, TaskType.MAP, 1, 1));
}
- private boolean checkDirectoryPermissions(FileSystem fs, String targetBase,
- FsPermission sourcePerm) throws IOException {
+ private void checkDirectoryPermissions(FileSystem fs, String targetBase,
+ FsPermission sourcePerm) throws IOException {
Path base = new Path(targetBase);
- Stack stack = new Stack();
+ Stack stack = new Stack<>();
stack.push(base);
while (!stack.isEmpty()) {
Path file = stack.pop();
@@ -404,11 +362,10 @@ private boolean checkDirectoryPermissions(FileSystem fs, String targetBase,
for (FileStatus status : fStatus) {
if (status.isDirectory()) {
stack.push(status.getPath());
- Assert.assertEquals(status.getPermission(), sourcePerm);
+ Assert.assertEquals(sourcePerm, status.getPermission());
}
}
}
- return true;
}
private static class NullInputFormat extends InputFormat {
diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestDeletedDirTracker.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestDeletedDirTracker.java
new file mode 100644
index 0000000000..77b08ade6d
--- /dev/null
+++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestDeletedDirTracker.java
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.tools.mapred;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.tools.CopyListingFileStatus;
+
+/**
+ * Unit tests of the deleted directory tracker.
+ */
+@SuppressWarnings("RedundantThrows")
+public class TestDeletedDirTracker extends Assert {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(TestDeletedDirTracker.class);
+
+ public static final Path ROOT = new Path("hdfs://namenode/");
+
+ public static final Path DIR1 = new Path(ROOT, "dir1");
+
+ public static final Path FILE0 = new Path(ROOT, "file0");
+
+ public static final Path DIR1_FILE1 = new Path(DIR1, "file1");
+
+ public static final Path DIR1_FILE2 = new Path(DIR1, "file2");
+
+ public static final Path DIR1_DIR3 = new Path(DIR1, "dir3");
+
+ public static final Path DIR1_DIR3_DIR4 = new Path(DIR1_DIR3, "dir4");
+
+ public static final Path DIR1_DIR3_DIR4_FILE_3 =
+ new Path(DIR1_DIR3_DIR4, "file1");
+
+
+ private DeletedDirTracker tracker;
+
+ @Before
+ public void setup() {
+ tracker = new DeletedDirTracker(1000);
+ }
+
+ @After
+ public void teardown() {
+ LOG.info(tracker.toString());
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testNoRootDir() throws Throwable {
+ shouldDelete(ROOT, true);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testNoRootFile() throws Throwable {
+ shouldDelete(dirStatus(ROOT));
+ }
+
+ @Test
+ public void testFileInRootDir() throws Throwable {
+ expectShouldDelete(FILE0, false);
+ expectShouldDelete(FILE0, false);
+ }
+
+ @Test
+ public void testDeleteDir1() throws Throwable {
+ expectShouldDelete(DIR1, true);
+ expectShouldNotDelete(DIR1, true);
+ expectShouldNotDelete(DIR1_FILE1, false);
+ expectNotCached(DIR1_FILE1);
+ expectShouldNotDelete(DIR1_DIR3, true);
+ expectCached(DIR1_DIR3);
+ expectShouldNotDelete(DIR1_FILE2, false);
+ expectShouldNotDelete(DIR1_DIR3_DIR4_FILE_3, false);
+ expectShouldNotDelete(DIR1_DIR3_DIR4, true);
+ expectShouldNotDelete(DIR1_DIR3_DIR4, true);
+ }
+
+ @Test
+ public void testDeleteDirDeep() throws Throwable {
+ expectShouldDelete(DIR1, true);
+ expectShouldNotDelete(DIR1_DIR3_DIR4_FILE_3, false);
+ }
+
+ @Test
+ public void testDeletePerfectCache() throws Throwable {
+ // run a larger scale test. Also use the ordering we'd expect for a sorted
+ // listing, which we implement by sorting the paths
+ List statusList = buildStatusList();
+ // cache is bigger than the status list
+ tracker = new DeletedDirTracker(statusList.size());
+
+ AtomicInteger deletedFiles = new AtomicInteger(0);
+ AtomicInteger deletedDirs = new AtomicInteger(0);
+ deletePaths(statusList, deletedFiles, deletedDirs);
+ assertEquals(0, deletedFiles.get());
+ }
+
+ @Test
+ public void testDeleteFullCache() throws Throwable {
+ // run a larger scale test. Also use the ordering we'd expect for a sorted
+ // listing, which we implement by sorting the paths
+ AtomicInteger deletedFiles = new AtomicInteger(0);
+ AtomicInteger deletedDirs = new AtomicInteger(0);
+ deletePaths(buildStatusList(), deletedFiles, deletedDirs);
+ assertEquals(0, deletedFiles.get());
+ }
+
+ @Test
+ public void testDeleteMediumCache() throws Throwable {
+ tracker = new DeletedDirTracker(100);
+ AtomicInteger deletedFiles = new AtomicInteger(0);
+ AtomicInteger deletedDirs = new AtomicInteger(0);
+ deletePaths(buildStatusList(), deletedFiles, deletedDirs);
+ assertEquals(0, deletedFiles.get());
+ }
+
+ @Test
+ public void testDeleteFullSmallCache() throws Throwable {
+ tracker = new DeletedDirTracker(10);
+ AtomicInteger deletedFiles = new AtomicInteger(0);
+ AtomicInteger deletedDirs = new AtomicInteger(0);
+ deletePaths(buildStatusList(), deletedFiles, deletedDirs);
+ assertEquals(0, deletedFiles.get());
+ }
+
+ protected void deletePaths(final List statusList,
+ final AtomicInteger deletedFiles, final AtomicInteger deletedDirs) {
+ for (CopyListingFileStatus status : statusList) {
+ if (shouldDelete(status)) {
+ AtomicInteger r = status.isDirectory() ? deletedDirs : deletedFiles;
+ r.incrementAndGet();
+ LOG.info("Delete {}", status.getPath());
+ }
+ }
+
+ LOG.info("After proposing to delete {} paths, {} directories and {} files"
+ + " were explicitly deleted from a cache {}",
+ statusList.size(), deletedDirs, deletedFiles, tracker);
+ }
+
+ /**
+ * Build a large YMD status list; 30 * 12 * 10 directories,
+ * each with 24 files.
+ * @return a sorted list.
+ */
+ protected List buildStatusList() {
+ List statusList = new ArrayList<>();
+ // recursive create of many files
+ for (int y = 0; y <= 20; y++) {
+ Path yp = new Path(String.format("YEAR=%d", y));
+ statusList.add(dirStatus(yp));
+ for (int m = 1; m <= 12; m++) {
+ Path ymp = new Path(yp, String.format("MONTH=%d", m));
+ statusList.add(dirStatus(ymp));
+ for (int d = 1; d < 30; d++) {
+ Path dir = new Path(ymp, String.format("DAY=%02d", d));
+ statusList.add(dirStatus(dir));
+ for (int h = 0; h < 24; h++) {
+ statusList.add(fileStatus(new Path(dir,
+ String.format("%02d00.avro", h))));
+ }
+ }
+ }
+ // sort on paths.
+ Collections.sort(statusList,
+ (l, r) -> l.getPath().compareTo(r.getPath()));
+ }
+ return statusList;
+ }
+
+
+ private void expectShouldDelete(final Path path, boolean isDir) {
+ expectShouldDelete(newStatus(path, isDir));
+ }
+
+ private void expectShouldDelete(CopyListingFileStatus status) {
+ assertTrue("Expected shouldDelete of " + status.getPath(),
+ shouldDelete(status));
+ }
+
+ private boolean shouldDelete(final Path path, final boolean isDir) {
+ return shouldDelete(newStatus(path, isDir));
+ }
+
+ private boolean shouldDelete(final CopyListingFileStatus status) {
+ return tracker.shouldDelete(status);
+ }
+
+ private void expectShouldNotDelete(final Path path, boolean isDir) {
+ expectShouldNotDelete(newStatus(path, isDir));
+ }
+
+ private void expectShouldNotDelete(CopyListingFileStatus status) {
+ assertFalse("Expected !shouldDelete of " + status.getPath()
+ + " but got true",
+ shouldDelete(status));
+ }
+
+ private CopyListingFileStatus newStatus(final Path path,
+ final boolean isDir) {
+ return new CopyListingFileStatus(new FileStatus(0, isDir, 0, 0, 0, path));
+ }
+
+ private CopyListingFileStatus dirStatus(final Path path) {
+ return newStatus(path, true);
+ }
+
+ private CopyListingFileStatus fileStatus(final Path path) {
+ return newStatus(path, false);
+ }
+
+ private void expectCached(final Path path) {
+ assertTrue("Path " + path + " is not in the cache of " + tracker,
+ tracker.isContained(path));
+ }
+
+ private void expectNotCached(final Path path) {
+ assertFalse("Path " + path + " is in the cache of " + tracker,
+ tracker.isContained(path));
+ }
+
+}
diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/util/TestDistCpUtils.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/util/TestDistCpUtils.java
index 9ccddd122e..311c1b3a72 100644
--- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/util/TestDistCpUtils.java
+++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/util/TestDistCpUtils.java
@@ -24,6 +24,7 @@
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.server.namenode.INodeFile;
@@ -1187,26 +1188,33 @@ public static void createDirectory(FileSystem fs, Path dirPath) throws IOExcepti
}
}
- public static boolean checkIfFoldersAreInSync(FileSystem fs, String targetBase, String sourceBase)
- throws IOException {
+ public static void verifyFoldersAreInSync(FileSystem fs, String targetBase,
+ String sourceBase) throws IOException {
Path base = new Path(targetBase);
- Stack stack = new Stack();
- stack.push(base);
- while (!stack.isEmpty()) {
- Path file = stack.pop();
- if (!fs.exists(file)) continue;
- FileStatus[] fStatus = fs.listStatus(file);
- if (fStatus == null || fStatus.length == 0) continue;
+ Stack stack = new Stack<>();
+ stack.push(base);
+ while (!stack.isEmpty()) {
+ Path file = stack.pop();
+ if (!fs.exists(file)) {
+ continue;
+ }
+ FileStatus[] fStatus = fs.listStatus(file);
+ if (fStatus == null || fStatus.length == 0) {
+ continue;
+ }
- for (FileStatus status : fStatus) {
- if (status.isDirectory()) {
- stack.push(status.getPath());
- }
- Assert.assertTrue(fs.exists(new Path(sourceBase + "/" +
- DistCpUtils.getRelativePath(new Path(targetBase), status.getPath()))));
- }
- }
- return true;
+ for (FileStatus status : fStatus) {
+ if (status.isDirectory()) {
+ stack.push(status.getPath());
+ }
+ Path p = new Path(sourceBase + "/" +
+ DistCpUtils.getRelativePath(new Path(targetBase),
+ status.getPath()));
+ ContractTestUtils.assertPathExists(fs,
+ "path in sync with " + status.getPath(), p);
+ }
+ }
}
+
}
diff --git a/hadoop-tools/hadoop-distcp/src/test/resources/contract/localfs.xml b/hadoop-tools/hadoop-distcp/src/test/resources/contract/localfs.xml
new file mode 100644
index 0000000000..42bc6f427a
--- /dev/null
+++ b/hadoop-tools/hadoop-distcp/src/test/resources/contract/localfs.xml
@@ -0,0 +1,128 @@
+
+
+
+
+
+
+
+ fs.contract.is-case-sensitive
+ true
+
+
+
+
+ fs.contract.supports-unix-permissions
+ true
+
+
+
+
+
+ fs.contract.test.root-tests-enabled
+ false
+
+
+
+ fs.contract.test.random-seek-count
+ 1000
+
+
+
+ fs.contract.rename-creates-dest-dirs
+ true
+
+
+
+ fs.contract.rename-overwrites-dest
+ true
+
+
+
+ fs.contract.rename-remove-dest-if-empty-dir
+ true
+
+
+
+
+ fs.contract.supports-append
+ false
+
+
+
+ fs.contract.supports-atomic-directory-delete
+ true
+
+
+
+ fs.contract.supports-atomic-rename
+ true
+
+
+
+ fs.contract.supports-block-locality
+ false
+
+
+
+ fs.contract.supports-concat
+ false
+
+
+
+ fs.contract.supports-seek
+ true
+
+
+
+ fs.contract.supports-seek-on-closed-file
+ true
+
+
+
+
+ fs.contract.rejects-seek-past-eof
+ true
+
+
+
+ fs.contract.supports-strict-exceptions
+ false
+
+
+
+ fs.contract.supports-settimes
+ true
+
+
+
+ fs.contract.supports-getfilestatus
+ true
+
+
+
diff --git a/hadoop-tools/hadoop-distcp/src/test/resources/log4j.properties b/hadoop-tools/hadoop-distcp/src/test/resources/log4j.properties
index 22990ca89a..46fda96d97 100644
--- a/hadoop-tools/hadoop-distcp/src/test/resources/log4j.properties
+++ b/hadoop-tools/hadoop-distcp/src/test/resources/log4j.properties
@@ -16,7 +16,18 @@
#
# log4j configuration used during build and unit tests
-log4j.rootLogger=debug,stdout
+log4j.rootLogger=info,stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} (%F:%M(%L)) - %m%n
+
+log4j.logger.org.apache.hadoop.util.NativeCodeLoader=ERROR
+log4j.logger.org.apache.hadoop.conf.Configuration.deprecation=WARN
+log4j.logger.org.apache.hadoop.metrics2=ERROR
+log4j.logger.org.apache.hadoop.mapreduce.JobResourceUploader=ERROR
+log4j.logger.org.apache.hadoop.yarn.util.ProcfsBasedProcessTree=ERROR
+log4j.logger.org.apache.commons.beanutils.FluentPropertyBeanIntrospector=ERROR
+log4j.logger.org.apache.commons.configuration2.AbstractConfiguration=ERROR
+
+# Debug level logging of distcp in test runs.
+log4j.logger.org.apache.hadoop.tools.mapred=DEBUG