From 0790275f058b0cf41780ad337c9150a1e8ebebc6 Mon Sep 17 00:00:00 2001 From: Jing Zhao Date: Mon, 18 May 2015 13:24:35 -0700 Subject: [PATCH] HADOOP-1540. Support file exclusion list in distcp. Contributed by Rich Haase. --- .../hadoop-common/CHANGES.txt | 2 + .../org/apache/hadoop/tools/CopyFilter.java | 60 ++++ .../apache/hadoop/tools/DistCpConstants.java | 3 +- .../hadoop/tools/DistCpOptionSwitch.java | 11 +- .../apache/hadoop/tools/DistCpOptions.java | 30 +- .../apache/hadoop/tools/OptionsParser.java | 271 ++++++++++++------ .../apache/hadoop/tools/RegexCopyFilter.java | 98 +++++++ .../hadoop/tools/SimpleCopyListing.java | 23 +- .../apache/hadoop/tools/TrueCopyFilter.java | 33 +++ .../org/apache/hadoop/tools/package-info.java | 26 ++ .../apache/hadoop/tools/TestCopyListing.java | 34 --- .../apache/hadoop/tools/TestIntegration.java | 49 ---- .../hadoop/tools/TestOptionsParser.java | 17 +- .../hadoop/tools/TestRegexCopyFilter.java | 113 ++++++++ .../hadoop/tools/TestTrueCopyFilter.java | 36 +++ 15 files changed, 615 insertions(+), 191 deletions(-) create mode 100644 hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/CopyFilter.java create mode 100644 hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/RegexCopyFilter.java create mode 100644 hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/TrueCopyFilter.java create mode 100644 hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/package-info.java create mode 100644 hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestRegexCopyFilter.java create mode 100644 hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestTrueCopyFilter.java diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index 324434b110..cf09c5fab0 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -583,6 +583,8 @@ Release 2.8.0 - UNRELEASED HADOOP-11944. add option to test-patch to avoid relocating patch process directory (Sean Busbey via aw) + HADOOP-1540. Support file exclusion list in distcp. (Rich Haase via jing9) + OPTIMIZATIONS HADOOP-11785. Reduce the number of listStatus operation in distcp diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/CopyFilter.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/CopyFilter.java new file mode 100644 index 0000000000..3da364c600 --- /dev/null +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/CopyFilter.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.tools; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; + +/** + * Interface for excluding files from DistCp. + * + */ +public abstract class CopyFilter { + + /** + * Default initialize method does nothing. + */ + public void initialize() {} + + /** + * Predicate to determine if a file can be excluded from copy. + * + * @param path a Path to be considered for copying + * @return boolean, true to copy, false to exclude + */ + public abstract boolean shouldCopy(Path path); + + /** + * Public factory method which returns the appropriate implementation of + * CopyFilter. + * + * @param conf DistCp configuratoin + * @return An instance of the appropriate CopyFilter + */ + public static CopyFilter getCopyFilter(Configuration conf) { + String filtersFilename = conf.get(DistCpConstants.CONF_LABEL_FILTERS_FILE); + + if (filtersFilename == null) { + return new TrueCopyFilter(); + } else { + String filterFilename = conf.get( + DistCpConstants.CONF_LABEL_FILTERS_FILE); + return new RegexCopyFilter(filterFilename); + } + } +} diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java index 7ecb6ce415..21dca628f7 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpConstants.java @@ -59,7 +59,8 @@ public class DistCpConstants { public static final String CONF_LABEL_APPEND = "distcp.copy.append"; public static final String CONF_LABEL_DIFF = "distcp.copy.diff"; public static final String CONF_LABEL_BANDWIDTH_MB = "distcp.map.bandwidth.mb"; - + public static final String CONF_LABEL_FILTERS_FILE = + "distcp.filters.file"; public static final String CONF_LABEL_MAX_CHUNKS_TOLERABLE = "distcp.dynamic.max.chunks.tolerable"; public static final String CONF_LABEL_MAX_CHUNKS_IDEAL = diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java index f90319dab8..ed4a0b2e88 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptionSwitch.java @@ -177,7 +177,16 @@ public enum DistCpOptionSwitch { * Specify bandwidth per map in MB */ BANDWIDTH(DistCpConstants.CONF_LABEL_BANDWIDTH_MB, - new Option("bandwidth", true, "Specify bandwidth per map in MB")); + new Option("bandwidth", true, "Specify bandwidth per map in MB")), + + /** + * Path containing a list of strings, which when found in the path of + * a file to be copied excludes that file from the copy job. + */ + FILTERS(DistCpConstants.CONF_LABEL_FILTERS_FILE, + new Option("filters", true, "The path to a file containing a list of" + + " strings for paths to be excluded from the copy.")); + public static final String PRESERVE_STATUS_DEFAULT = "-prbugpct"; private final String confLabel; diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java index d8f3ff7162..302b626b8a 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/DistCpOptions.java @@ -69,7 +69,12 @@ public class DistCpOptions { private Path targetPath; - // targetPathExist is a derived field, it's initialized in the + /** + * The path to a file containing a list of paths to filter out of the copy. + */ + private String filtersFile; + + // targetPathExist is a derived field, it's initialized in the // beginning of distcp. private boolean targetPathExists = true; @@ -139,6 +144,7 @@ public DistCpOptions(DistCpOptions that) { this.sourcePaths = that.getSourcePaths(); this.targetPath = that.getTargetPath(); this.targetPathExists = that.getTargetPathExists(); + this.filtersFile = that.getFiltersFile(); } } @@ -549,6 +555,23 @@ public boolean setTargetPathExists(boolean targetPathExists) { return this.targetPathExists = targetPathExists; } + /** + * File path that contains the list of patterns + * for paths to be filtered from the file copy. + * @return - Filter file path. + */ + public final String getFiltersFile() { + return filtersFile; + } + + /** + * Set filtersFile. + * @param filtersFilename The path to a list of patterns to exclude from copy. + */ + public final void setFiltersFile(String filtersFilename) { + this.filtersFile = filtersFilename; + } + public void validate(DistCpOptionSwitch option, boolean value) { boolean syncFolder = (option == DistCpOptionSwitch.SYNC_FOLDERS ? @@ -623,6 +646,10 @@ public void appendToConf(Configuration conf) { String.valueOf(mapBandwidth)); DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.PRESERVE_STATUS, DistCpUtils.packAttributes(preserveStatus)); + if (filtersFile != null) { + DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.FILTERS, + filtersFile); + } } /** @@ -645,6 +672,7 @@ public String toString() { ", targetPath=" + targetPath + ", targetPathExists=" + targetPathExists + ", preserveRawXattrs=" + preserveRawXattrs + + ", filtersFile='" + filtersFile + '\'' + '}'; } diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java index 1729479ef8..37add1edac 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/OptionsParser.java @@ -86,37 +86,7 @@ public static DistCpOptions parse(String args[]) throws IllegalArgumentException Arrays.toString(args), e); } - DistCpOptions option; - Path targetPath; - List sourcePaths = new ArrayList(); - - String leftOverArgs[] = command.getArgs(); - if (leftOverArgs == null || leftOverArgs.length < 1) { - throw new IllegalArgumentException("Target path not specified"); - } - - //Last Argument is the target path - targetPath = new Path(leftOverArgs[leftOverArgs.length -1].trim()); - - //Copy any source paths in the arguments to the list - for (int index = 0; index < leftOverArgs.length - 1; index++) { - sourcePaths.add(new Path(leftOverArgs[index].trim())); - } - - /* If command has source file listing, use it else, fall back on source paths in args - If both are present, throw exception and bail */ - if (command.hasOption(DistCpOptionSwitch.SOURCE_FILE_LISTING.getSwitch())) { - if (!sourcePaths.isEmpty()) { - throw new IllegalArgumentException("Both source file listing and source paths present"); - } - option = new DistCpOptions(new Path(getVal(command, DistCpOptionSwitch. - SOURCE_FILE_LISTING.getSwitch())), targetPath); - } else { - if (sourcePaths.isEmpty()) { - throw new IllegalArgumentException("Neither source file listing nor source paths present"); - } - option = new DistCpOptions(sourcePaths, targetPath); - } + DistCpOptions option = parseSourceAndTargetPaths(command); //Process all the other option switches and set options appropriately if (command.hasOption(DistCpOptionSwitch.IGNORE_FAILURES.getSwitch())) { @@ -165,54 +135,95 @@ public static DistCpOptions parse(String args[]) throws IllegalArgumentException option.setBlocking(false); } - if (command.hasOption(DistCpOptionSwitch.BANDWIDTH.getSwitch())) { - try { - Integer mapBandwidth = Integer.parseInt( - getVal(command, DistCpOptionSwitch.BANDWIDTH.getSwitch()).trim()); - if (mapBandwidth.intValue() <= 0) { - throw new IllegalArgumentException("Bandwidth specified is not positive: " + - mapBandwidth); - } - option.setMapBandwidth(mapBandwidth); - } catch (NumberFormatException e) { - throw new IllegalArgumentException("Bandwidth specified is invalid: " + - getVal(command, DistCpOptionSwitch.BANDWIDTH.getSwitch()), e); - } - } + parseBandwidth(command, option); if (command.hasOption(DistCpOptionSwitch.SSL_CONF.getSwitch())) { option.setSslConfigurationFile(command. getOptionValue(DistCpOptionSwitch.SSL_CONF.getSwitch())); } - if (command.hasOption(DistCpOptionSwitch.NUM_LISTSTATUS_THREADS.getSwitch())) { - try { - Integer numThreads = Integer.parseInt(getVal(command, - DistCpOptionSwitch.NUM_LISTSTATUS_THREADS.getSwitch()).trim()); - option.setNumListstatusThreads(numThreads); - } catch (NumberFormatException e) { - throw new IllegalArgumentException( - "Number of liststatus threads is invalid: " + getVal(command, - DistCpOptionSwitch.NUM_LISTSTATUS_THREADS.getSwitch()), e); - } - } + parseNumListStatusThreads(command, option); - if (command.hasOption(DistCpOptionSwitch.MAX_MAPS.getSwitch())) { - try { - Integer maps = Integer.parseInt( - getVal(command, DistCpOptionSwitch.MAX_MAPS.getSwitch()).trim()); - option.setMaxMaps(maps); - } catch (NumberFormatException e) { - throw new IllegalArgumentException("Number of maps is invalid: " + - getVal(command, DistCpOptionSwitch.MAX_MAPS.getSwitch()), e); - } - } + parseMaxMaps(command, option); if (command.hasOption(DistCpOptionSwitch.COPY_STRATEGY.getSwitch())) { option.setCopyStrategy( getVal(command, DistCpOptionSwitch.COPY_STRATEGY.getSwitch())); } + parsePreserveStatus(command, option); + + if (command.hasOption(DistCpOptionSwitch.DIFF.getSwitch())) { + String[] snapshots = getVals(command, DistCpOptionSwitch.DIFF.getSwitch()); + Preconditions.checkArgument(snapshots != null && snapshots.length == 2, + "Must provide both the starting and ending snapshot names"); + option.setUseDiff(true, snapshots[0], snapshots[1]); + } + + parseFileLimit(command); + + parseSizeLimit(command); + + if (command.hasOption(DistCpOptionSwitch.FILTERS.getSwitch())) { + option.setFiltersFile(getVal(command, + DistCpOptionSwitch.FILTERS.getSwitch())); + } + + return option; + } + + /** + * parseSizeLimit is a helper method for parsing the deprecated + * argument SIZE_LIMIT. + * + * @param command command line arguments + */ + private static void parseSizeLimit(CommandLine command) { + if (command.hasOption(DistCpOptionSwitch.SIZE_LIMIT.getSwitch())) { + String sizeLimitString = getVal(command, + DistCpOptionSwitch.SIZE_LIMIT.getSwitch().trim()); + try { + Long.parseLong(sizeLimitString); + } + catch (NumberFormatException e) { + throw new IllegalArgumentException("Size-limit is invalid: " + + sizeLimitString, e); + } + LOG.warn(DistCpOptionSwitch.SIZE_LIMIT.getSwitch() + " is a deprecated" + + " option. Ignoring."); + } + } + + /** + * parseFileLimit is a helper method for parsing the deprecated + * argument FILE_LIMIT. + * + * @param command command line arguments + */ + private static void parseFileLimit(CommandLine command) { + if (command.hasOption(DistCpOptionSwitch.FILE_LIMIT.getSwitch())) { + String fileLimitString = getVal(command, + DistCpOptionSwitch.FILE_LIMIT.getSwitch().trim()); + try { + Integer.parseInt(fileLimitString); + } + catch (NumberFormatException e) { + throw new IllegalArgumentException("File-limit is invalid: " + + fileLimitString, e); + } + LOG.warn(DistCpOptionSwitch.FILE_LIMIT.getSwitch() + " is a deprecated" + + " option. Ignoring."); + } + } + + /** + * parsePreserveStatus is a helper method for parsing PRESERVE_STATUS. + * + * @param command command line arguments + * @param option parsed distcp options + */ + private static void parsePreserveStatus(CommandLine command, + DistCpOptions option) { if (command.hasOption(DistCpOptionSwitch.PRESERVE_STATUS.getSwitch())) { String attributes = getVal(command, DistCpOptionSwitch.PRESERVE_STATUS.getSwitch()); @@ -227,42 +238,118 @@ public static DistCpOptions parse(String args[]) throws IllegalArgumentException } } } + } - if (command.hasOption(DistCpOptionSwitch.DIFF.getSwitch())) { - String[] snapshots = getVals(command, DistCpOptionSwitch.DIFF.getSwitch()); - Preconditions.checkArgument(snapshots != null && snapshots.length == 2, - "Must provide both the starting and ending snapshot names"); - option.setUseDiff(true, snapshots[0], snapshots[1]); - } - - if (command.hasOption(DistCpOptionSwitch.FILE_LIMIT.getSwitch())) { - String fileLimitString = getVal(command, - DistCpOptionSwitch.FILE_LIMIT.getSwitch().trim()); + /** + * parseMaxMaps is a helper method for parsing MAX_MAPS. + * + * @param command command line arguments + * @param option parsed distcp options + */ + private static void parseMaxMaps(CommandLine command, + DistCpOptions option) { + if (command.hasOption(DistCpOptionSwitch.MAX_MAPS.getSwitch())) { try { - Integer.parseInt(fileLimitString); + Integer maps = Integer.parseInt( + getVal(command, DistCpOptionSwitch.MAX_MAPS.getSwitch()).trim()); + option.setMaxMaps(maps); + } catch (NumberFormatException e) { + throw new IllegalArgumentException("Number of maps is invalid: " + + getVal(command, DistCpOptionSwitch.MAX_MAPS.getSwitch()), e); } - catch (NumberFormatException e) { - throw new IllegalArgumentException("File-limit is invalid: " - + fileLimitString, e); - } - LOG.warn(DistCpOptionSwitch.FILE_LIMIT.getSwitch() + " is a deprecated" + - " option. Ignoring."); } + } - if (command.hasOption(DistCpOptionSwitch.SIZE_LIMIT.getSwitch())) { - String sizeLimitString = getVal(command, - DistCpOptionSwitch.SIZE_LIMIT.getSwitch().trim()); + /** + * parseNumListStatusThreads is a helper method for parsing + * NUM_LISTSTATUS_THREADS. + * + * @param command command line arguments + * @param option parsed distcp options + */ + private static void parseNumListStatusThreads(CommandLine command, + DistCpOptions option) { + if (command.hasOption( + DistCpOptionSwitch.NUM_LISTSTATUS_THREADS.getSwitch())) { try { - Long.parseLong(sizeLimitString); + Integer numThreads = Integer.parseInt(getVal(command, + DistCpOptionSwitch.NUM_LISTSTATUS_THREADS.getSwitch()).trim()); + option.setNumListstatusThreads(numThreads); + } catch (NumberFormatException e) { + throw new IllegalArgumentException( + "Number of liststatus threads is invalid: " + getVal(command, + DistCpOptionSwitch.NUM_LISTSTATUS_THREADS.getSwitch()), e); } - catch (NumberFormatException e) { - throw new IllegalArgumentException("Size-limit is invalid: " - + sizeLimitString, e); + } + } + + /** + * parseBandwidth is a helper method for parsing BANDWIDTH. + * + * @param command command line arguments + * @param option parsed distcp options + */ + private static void parseBandwidth(CommandLine command, + DistCpOptions option) { + if (command.hasOption(DistCpOptionSwitch.BANDWIDTH.getSwitch())) { + try { + Integer mapBandwidth = Integer.parseInt( + getVal(command, DistCpOptionSwitch.BANDWIDTH.getSwitch()).trim()); + if (mapBandwidth <= 0) { + throw new IllegalArgumentException("Bandwidth specified is not " + + "positive: " + mapBandwidth); + } + option.setMapBandwidth(mapBandwidth); + } catch (NumberFormatException e) { + throw new IllegalArgumentException("Bandwidth specified is invalid: " + + getVal(command, DistCpOptionSwitch.BANDWIDTH.getSwitch()), e); } - LOG.warn(DistCpOptionSwitch.SIZE_LIMIT.getSwitch() + " is a deprecated" + - " option. Ignoring."); + } + } + + /** + * parseSourceAndTargetPaths is a helper method for parsing the source + * and target paths. + * + * @param command command line arguments + * @return DistCpOptions + */ + private static DistCpOptions parseSourceAndTargetPaths( + CommandLine command) { + DistCpOptions option; + Path targetPath; + List sourcePaths = new ArrayList(); + + String[] leftOverArgs = command.getArgs(); + if (leftOverArgs == null || leftOverArgs.length < 1) { + throw new IllegalArgumentException("Target path not specified"); } + //Last Argument is the target path + targetPath = new Path(leftOverArgs[leftOverArgs.length - 1].trim()); + + //Copy any source paths in the arguments to the list + for (int index = 0; index < leftOverArgs.length - 1; index++) { + sourcePaths.add(new Path(leftOverArgs[index].trim())); + } + + /* If command has source file listing, use it else, fall back on source + paths in args. If both are present, throw exception and bail */ + if (command.hasOption( + DistCpOptionSwitch.SOURCE_FILE_LISTING.getSwitch())) { + if (!sourcePaths.isEmpty()) { + throw new IllegalArgumentException("Both source file listing and " + + "source paths present"); + } + option = new DistCpOptions(new Path(getVal(command, DistCpOptionSwitch. + SOURCE_FILE_LISTING.getSwitch())), targetPath); + } else { + if (sourcePaths.isEmpty()) { + throw new IllegalArgumentException("Neither source file listing nor " + + "source paths present"); + } + option = new DistCpOptions(sourcePaths, targetPath); + } return option; } diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/RegexCopyFilter.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/RegexCopyFilter.java new file mode 100644 index 0000000000..1c2b324dd3 --- /dev/null +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/RegexCopyFilter.java @@ -0,0 +1,98 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.tools; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.*; +import org.apache.hadoop.io.IOUtils; + +import java.io.*; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.List; +import java.util.regex.Pattern; + +import com.google.common.annotations.VisibleForTesting; + +/** + * A CopyFilter which compares Java Regex Patterns to each Path to determine + * whether a file should be copied. + */ +public class RegexCopyFilter extends CopyFilter { + + private static final Log LOG = LogFactory.getLog(RegexCopyFilter.class); + private File filtersFile; + private List filters; + + /** + * Constructor, sets up a File object to read filter patterns from and + * the List to store the patterns. + */ + protected RegexCopyFilter(String filtersFilename) { + filtersFile = new File(filtersFilename); + filters = new ArrayList<>(); + } + + /** + * Loads a list of filter patterns for use in shouldCopy. + */ + @Override + public void initialize() { + BufferedReader reader = null; + try { + InputStream is = new FileInputStream(filtersFile); + reader = new BufferedReader(new InputStreamReader(is, + Charset.forName("UTF-8"))); + String line; + while ((line = reader.readLine()) != null) { + Pattern pattern = Pattern.compile(line); + filters.add(pattern); + } + } catch (FileNotFoundException notFound) { + LOG.error("Can't find filters file " + filtersFile); + } catch (IOException cantRead) { + LOG.error("An error occurred while attempting to read from " + + filtersFile); + } finally { + IOUtils.cleanup(LOG, reader); + } + } + + /** + * Sets the list of filters to exclude files from copy. + * Simplifies testing of the filters feature. + * + * @param filtersList a list of Patterns to be excluded + */ + @VisibleForTesting + protected final void setFilters(List filtersList) { + this.filters = filtersList; + } + + @Override + public boolean shouldCopy(Path path) { + for (Pattern filter : filters) { + if (filter.matcher(path.toString()).matches()) { + return false; + } + } + return true; + } +} diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/SimpleCopyListing.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/SimpleCopyListing.java index 4ea1dc9abc..8f50913349 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/SimpleCopyListing.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/SimpleCopyListing.java @@ -58,6 +58,7 @@ public class SimpleCopyListing extends CopyListing { private long totalBytesToCopy = 0; private int numListstatusThreads = 1; private final int maxRetries = 3; + private CopyFilter copyFilter; /** * Protected constructor, to initialize configuration. @@ -71,6 +72,8 @@ protected SimpleCopyListing(Configuration configuration, Credentials credentials numListstatusThreads = getConf().getInt( DistCpConstants.CONF_LABEL_LISTSTATUS_THREADS, DistCpConstants.DEFAULT_LISTSTATUS_THREADS); + copyFilter = CopyFilter.getCopyFilter(getConf()); + copyFilter.initialize(); } @VisibleForTesting @@ -213,7 +216,7 @@ public void doBuildListing(SequenceFile.Writer fileListWriter, preserveXAttrs && sourceStatus.isDirectory(), preserveRawXAttrs && sourceStatus.isDirectory()); writeToFileListing(fileListWriter, sourceCopyListingStatus, - sourcePathRoot, options); + sourcePathRoot); if (sourceStatus.isDirectory()) { if (LOG.isDebugEnabled()) { @@ -264,11 +267,10 @@ private Path computeSourceRootPath(FileStatus sourceStatus, * Provide an option to skip copy of a path, Allows for exclusion * of files such as {@link org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter#SUCCEEDED_FILE_NAME} * @param path - Path being considered for copy while building the file listing - * @param options - Input options passed during DistCp invocation * @return - True if the path should be considered for copy, false otherwise */ - protected boolean shouldCopy(Path path, DistCpOptions options) { - return true; + protected boolean shouldCopy(Path path) { + return copyFilter.shouldCopy(path); } /** {@inheritDoc} */ @@ -409,7 +411,7 @@ private void traverseDirectory(SequenceFile.Writer fileListWriter, preserveXAttrs && child.isDirectory(), preserveRawXattrs && child.isDirectory()); writeToFileListing(fileListWriter, childCopyListingStatus, - sourcePathRoot, options); + sourcePathRoot); } if (retry < maxRetries) { if (child.isDirectory()) { @@ -443,26 +445,23 @@ private void writeToFileListingRoot(SequenceFile.Writer fileListWriter, } return; } - writeToFileListing(fileListWriter, fileStatus, sourcePathRoot, options); + writeToFileListing(fileListWriter, fileStatus, sourcePathRoot); } private void writeToFileListing(SequenceFile.Writer fileListWriter, CopyListingFileStatus fileStatus, - Path sourcePathRoot, - DistCpOptions options) throws IOException { + Path sourcePathRoot) throws IOException { if (LOG.isDebugEnabled()) { LOG.debug("REL PATH: " + DistCpUtils.getRelativePath(sourcePathRoot, fileStatus.getPath()) + ", FULL PATH: " + fileStatus.getPath()); } - FileStatus status = fileStatus; - - if (!shouldCopy(fileStatus.getPath(), options)) { + if (!shouldCopy(fileStatus.getPath())) { return; } fileListWriter.append(new Text(DistCpUtils.getRelativePath(sourcePathRoot, - fileStatus.getPath())), status); + fileStatus.getPath())), fileStatus); fileListWriter.sync(); if (!fileStatus.isDirectory()) { diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/TrueCopyFilter.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/TrueCopyFilter.java new file mode 100644 index 0000000000..b58dd9c20b --- /dev/null +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/TrueCopyFilter.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.tools; + +import org.apache.hadoop.fs.Path; + +/** + * A CopyFilter which always returns true. + * + */ +public class TrueCopyFilter extends CopyFilter { + + @Override + public boolean shouldCopy(Path path) { + return true; + } +} diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/package-info.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/package-info.java new file mode 100644 index 0000000000..92278ed2e1 --- /dev/null +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/package-info.java @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +/** + * DistCp is a tool for replicating data using MapReduce jobs for concurrent + * copy operations. + * + * @version 2 + */ +package org.apache.hadoop.tools; \ No newline at end of file diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestCopyListing.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestCopyListing.java index 8381c1b703..896763d26e 100644 --- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestCopyListing.java +++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestCopyListing.java @@ -94,40 +94,6 @@ protected long getNumberOfPaths() { return 0; } - @Test(timeout=10000) - public void testSkipCopy() throws Exception { - SimpleCopyListing listing = new SimpleCopyListing(getConf(), CREDENTIALS) { - @Override - protected boolean shouldCopy(Path path, DistCpOptions options) { - return !path.getName().equals(FileOutputCommitter.SUCCEEDED_FILE_NAME); - } - }; - FileSystem fs = FileSystem.get(getConf()); - List srcPaths = new ArrayList(); - srcPaths.add(new Path("/tmp/in4/1")); - srcPaths.add(new Path("/tmp/in4/2")); - Path target = new Path("/tmp/out4/1"); - TestDistCpUtils.createFile(fs, "/tmp/in4/1/_SUCCESS"); - TestDistCpUtils.createFile(fs, "/tmp/in4/1/file"); - TestDistCpUtils.createFile(fs, "/tmp/in4/2"); - fs.mkdirs(target); - DistCpOptions options = new DistCpOptions(srcPaths, target); - Path listingFile = new Path("/tmp/list4"); - listing.buildListing(listingFile, options); - Assert.assertEquals(listing.getNumberOfPaths(), 3); - SequenceFile.Reader reader = new SequenceFile.Reader(getConf(), - SequenceFile.Reader.file(listingFile)); - CopyListingFileStatus fileStatus = new CopyListingFileStatus(); - Text relativePath = new Text(); - Assert.assertTrue(reader.next(relativePath, fileStatus)); - Assert.assertEquals(relativePath.toString(), "/1"); - Assert.assertTrue(reader.next(relativePath, fileStatus)); - Assert.assertEquals(relativePath.toString(), "/1/file"); - Assert.assertTrue(reader.next(relativePath, fileStatus)); - Assert.assertEquals(relativePath.toString(), "/2"); - Assert.assertFalse(reader.next(relativePath, fileStatus)); - } - @Test(timeout=10000) public void testMultipleSrcToFile() { FileSystem fs = null; diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestIntegration.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestIntegration.java index 572634272f..ee8e7cc4f1 100644 --- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestIntegration.java +++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestIntegration.java @@ -241,55 +241,6 @@ private void caseMultiFileTargetPresent(boolean sync) { } } - @Test(timeout=100000) - public void testCustomCopyListing() { - - try { - addEntries(listFile, "multifile1/file3", "multifile1/file4", "multifile1/file5"); - createFiles("multifile1/file3", "multifile1/file4", "multifile1/file5"); - mkdirs(target.toString()); - - Configuration conf = getConf(); - try { - conf.setClass(DistCpConstants.CONF_LABEL_COPY_LISTING_CLASS, - CustomCopyListing.class, CopyListing.class); - DistCpOptions options = new DistCpOptions(Arrays. - asList(new Path(root + "/" + "multifile1")), target); - options.setSyncFolder(true); - options.setDeleteMissing(false); - options.setOverwrite(false); - try { - new DistCp(conf, options).execute(); - } catch (Exception e) { - LOG.error("Exception encountered ", e); - throw new IOException(e); - } - } finally { - conf.unset(DistCpConstants.CONF_LABEL_COPY_LISTING_CLASS); - } - - checkResult(target, 2, "file4", "file5"); - } catch (IOException e) { - LOG.error("Exception encountered while testing distcp", e); - Assert.fail("distcp failure"); - } finally { - TestDistCpUtils.delete(fs, root); - } - } - - private static class CustomCopyListing extends SimpleCopyListing { - - public CustomCopyListing(Configuration configuration, - Credentials credentials) { - super(configuration, credentials); - } - - @Override - protected boolean shouldCopy(Path path, DistCpOptions options) { - return !path.getName().equals("file3"); - } - } - @Test(timeout=100000) public void testMultiFileTargetMissing() { caseMultiFileTargetMissing(false); diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java index 6eddfb222c..b9d9ada066 100644 --- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java +++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestOptionsParser.java @@ -400,7 +400,7 @@ public void testToString() { String val = "DistCpOptions{atomicCommit=false, syncFolder=false, deleteMissing=false, " + "ignoreFailures=false, maxMaps=20, sslConfigurationFile='null', copyStrategy='uniformsize', " + "sourceFileListing=abc, sourcePaths=null, targetPath=xyz, targetPathExists=true, " + - "preserveRawXattrs=false}"; + "preserveRawXattrs=false, filtersFile='null'}"; Assert.assertEquals(val, option.toString()); Assert.assertNotSame(DistCpOptionSwitch.ATOMIC_COMMIT.toString(), DistCpOptionSwitch.ATOMIC_COMMIT.name()); @@ -718,4 +718,19 @@ public void testDiffOption() { "Diff is valid only with update and delete options", e); } } + + @Test + public void testExclusionsOption() { + DistCpOptions options = OptionsParser.parse(new String[] { + "hdfs://localhost:8020/source/first", + "hdfs://localhost:8020/target/"}); + Assert.assertNull(options.getFiltersFile()); + + options = OptionsParser.parse(new String[] { + "-filters", + "/tmp/filters.txt", + "hdfs://localhost:8020/source/first", + "hdfs://localhost:8020/target/"}); + Assert.assertEquals(options.getFiltersFile(), "/tmp/filters.txt"); + } } diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestRegexCopyFilter.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestRegexCopyFilter.java new file mode 100644 index 0000000000..5618a0b524 --- /dev/null +++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestRegexCopyFilter.java @@ -0,0 +1,113 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.tools; + +import org.apache.hadoop.fs.Path; +import org.junit.Assert; +import org.junit.Test; + +import java.io.File; +import java.util.ArrayList; +import java.util.List; +import java.util.regex.Pattern; + +public class TestRegexCopyFilter { + + @Test + public void testShouldCopyTrue() { + List filters = new ArrayList<>(); + filters.add(Pattern.compile("user")); + + RegexCopyFilter regexCopyFilter = new RegexCopyFilter("fakeFile"); + regexCopyFilter.setFilters(filters); + + Path shouldCopyPath = new Path("/user/bar"); + Assert.assertTrue(regexCopyFilter.shouldCopy(shouldCopyPath)); + } + + @Test + public void testShouldCopyFalse() { + List filters = new ArrayList<>(); + filters.add(Pattern.compile(".*test.*")); + + RegexCopyFilter regexCopyFilter = new RegexCopyFilter("fakeFile"); + regexCopyFilter.setFilters(filters); + + Path shouldNotCopyPath = new Path("/user/testing"); + Assert.assertFalse(regexCopyFilter.shouldCopy(shouldNotCopyPath)); + } + + @Test + public void testShouldCopyWithMultipleFilters() { + List filters = new ArrayList<>(); + filters.add(Pattern.compile(".*test.*")); + filters.add(Pattern.compile("/user/b.*")); + filters.add(Pattern.compile(".*_SUCCESS")); + + List toCopy = getTestPaths(); + + int shouldCopyCount = 0; + + RegexCopyFilter regexCopyFilter = new RegexCopyFilter("fakeFile"); + regexCopyFilter.setFilters(filters); + + for (Path path: toCopy) { + if (regexCopyFilter.shouldCopy(path)) { + shouldCopyCount++; + } + } + + Assert.assertEquals(2, shouldCopyCount); + } + + @Test + public void testShouldExcludeAll() { + List filters = new ArrayList<>(); + filters.add(Pattern.compile(".*test.*")); + filters.add(Pattern.compile("/user/b.*")); + filters.add(Pattern.compile(".*")); // exclude everything + + List toCopy = getTestPaths(); + + int shouldCopyCount = 0; + + RegexCopyFilter regexCopyFilter = new RegexCopyFilter("fakeFile"); + regexCopyFilter.setFilters(filters); + + for (Path path: toCopy) { + if (regexCopyFilter.shouldCopy(path)) { + shouldCopyCount++; + } + } + + Assert.assertEquals(0, shouldCopyCount); + } + + private List getTestPaths() { + List toCopy = new ArrayList<>(); + toCopy.add(new Path("/user/bar")); + toCopy.add(new Path("/user/foo/_SUCCESS")); + toCopy.add(new Path("/hive/test_data")); + toCopy.add(new Path("test")); + toCopy.add(new Path("/user/foo/bar")); + toCopy.add(new Path("/mapred/.staging_job")); + return toCopy; + } + +} diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestTrueCopyFilter.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestTrueCopyFilter.java new file mode 100644 index 0000000000..2ea60a98fa --- /dev/null +++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/TestTrueCopyFilter.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.tools; + +import org.apache.hadoop.fs.Path; +import org.junit.Assert; +import org.junit.Test; + +public class TestTrueCopyFilter { + + @Test + public void testShouldCopy() { + Assert.assertTrue(new TrueCopyFilter().shouldCopy(new Path("fake"))); + } + + @Test + public void testShouldCopyWithNull() { + Assert.assertTrue(new TrueCopyFilter().shouldCopy(new Path("fake"))); + } +}