HADOOP-1540. Support file exclusion list in distcp. Contributed by Rich Haase.
This commit is contained in:
parent
76afd28862
commit
0790275f05
@ -583,6 +583,8 @@ Release 2.8.0 - UNRELEASED
|
||||
HADOOP-11944. add option to test-patch to avoid relocating patch process
|
||||
directory (Sean Busbey via aw)
|
||||
|
||||
HADOOP-1540. Support file exclusion list in distcp. (Rich Haase via jing9)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
HADOOP-11785. Reduce the number of listStatus operation in distcp
|
||||
|
@ -0,0 +1,60 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.tools;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
|
||||
/**
|
||||
* Interface for excluding files from DistCp.
|
||||
*
|
||||
*/
|
||||
public abstract class CopyFilter {
|
||||
|
||||
/**
|
||||
* Default initialize method does nothing.
|
||||
*/
|
||||
public void initialize() {}
|
||||
|
||||
/**
|
||||
* Predicate to determine if a file can be excluded from copy.
|
||||
*
|
||||
* @param path a Path to be considered for copying
|
||||
* @return boolean, true to copy, false to exclude
|
||||
*/
|
||||
public abstract boolean shouldCopy(Path path);
|
||||
|
||||
/**
|
||||
* Public factory method which returns the appropriate implementation of
|
||||
* CopyFilter.
|
||||
*
|
||||
* @param conf DistCp configuratoin
|
||||
* @return An instance of the appropriate CopyFilter
|
||||
*/
|
||||
public static CopyFilter getCopyFilter(Configuration conf) {
|
||||
String filtersFilename = conf.get(DistCpConstants.CONF_LABEL_FILTERS_FILE);
|
||||
|
||||
if (filtersFilename == null) {
|
||||
return new TrueCopyFilter();
|
||||
} else {
|
||||
String filterFilename = conf.get(
|
||||
DistCpConstants.CONF_LABEL_FILTERS_FILE);
|
||||
return new RegexCopyFilter(filterFilename);
|
||||
}
|
||||
}
|
||||
}
|
@ -59,7 +59,8 @@ public class DistCpConstants {
|
||||
public static final String CONF_LABEL_APPEND = "distcp.copy.append";
|
||||
public static final String CONF_LABEL_DIFF = "distcp.copy.diff";
|
||||
public static final String CONF_LABEL_BANDWIDTH_MB = "distcp.map.bandwidth.mb";
|
||||
|
||||
public static final String CONF_LABEL_FILTERS_FILE =
|
||||
"distcp.filters.file";
|
||||
public static final String CONF_LABEL_MAX_CHUNKS_TOLERABLE =
|
||||
"distcp.dynamic.max.chunks.tolerable";
|
||||
public static final String CONF_LABEL_MAX_CHUNKS_IDEAL =
|
||||
|
@ -177,7 +177,16 @@ public enum DistCpOptionSwitch {
|
||||
* Specify bandwidth per map in MB
|
||||
*/
|
||||
BANDWIDTH(DistCpConstants.CONF_LABEL_BANDWIDTH_MB,
|
||||
new Option("bandwidth", true, "Specify bandwidth per map in MB"));
|
||||
new Option("bandwidth", true, "Specify bandwidth per map in MB")),
|
||||
|
||||
/**
|
||||
* Path containing a list of strings, which when found in the path of
|
||||
* a file to be copied excludes that file from the copy job.
|
||||
*/
|
||||
FILTERS(DistCpConstants.CONF_LABEL_FILTERS_FILE,
|
||||
new Option("filters", true, "The path to a file containing a list of"
|
||||
+ " strings for paths to be excluded from the copy."));
|
||||
|
||||
|
||||
public static final String PRESERVE_STATUS_DEFAULT = "-prbugpct";
|
||||
private final String confLabel;
|
||||
|
@ -69,7 +69,12 @@ public class DistCpOptions {
|
||||
|
||||
private Path targetPath;
|
||||
|
||||
// targetPathExist is a derived field, it's initialized in the
|
||||
/**
|
||||
* The path to a file containing a list of paths to filter out of the copy.
|
||||
*/
|
||||
private String filtersFile;
|
||||
|
||||
// targetPathExist is a derived field, it's initialized in the
|
||||
// beginning of distcp.
|
||||
private boolean targetPathExists = true;
|
||||
|
||||
@ -139,6 +144,7 @@ public DistCpOptions(DistCpOptions that) {
|
||||
this.sourcePaths = that.getSourcePaths();
|
||||
this.targetPath = that.getTargetPath();
|
||||
this.targetPathExists = that.getTargetPathExists();
|
||||
this.filtersFile = that.getFiltersFile();
|
||||
}
|
||||
}
|
||||
|
||||
@ -549,6 +555,23 @@ public boolean setTargetPathExists(boolean targetPathExists) {
|
||||
return this.targetPathExists = targetPathExists;
|
||||
}
|
||||
|
||||
/**
|
||||
* File path that contains the list of patterns
|
||||
* for paths to be filtered from the file copy.
|
||||
* @return - Filter file path.
|
||||
*/
|
||||
public final String getFiltersFile() {
|
||||
return filtersFile;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set filtersFile.
|
||||
* @param filtersFilename The path to a list of patterns to exclude from copy.
|
||||
*/
|
||||
public final void setFiltersFile(String filtersFilename) {
|
||||
this.filtersFile = filtersFilename;
|
||||
}
|
||||
|
||||
public void validate(DistCpOptionSwitch option, boolean value) {
|
||||
|
||||
boolean syncFolder = (option == DistCpOptionSwitch.SYNC_FOLDERS ?
|
||||
@ -623,6 +646,10 @@ public void appendToConf(Configuration conf) {
|
||||
String.valueOf(mapBandwidth));
|
||||
DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.PRESERVE_STATUS,
|
||||
DistCpUtils.packAttributes(preserveStatus));
|
||||
if (filtersFile != null) {
|
||||
DistCpOptionSwitch.addToConf(conf, DistCpOptionSwitch.FILTERS,
|
||||
filtersFile);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@ -645,6 +672,7 @@ public String toString() {
|
||||
", targetPath=" + targetPath +
|
||||
", targetPathExists=" + targetPathExists +
|
||||
", preserveRawXattrs=" + preserveRawXattrs +
|
||||
", filtersFile='" + filtersFile + '\'' +
|
||||
'}';
|
||||
}
|
||||
|
||||
|
@ -86,37 +86,7 @@ public static DistCpOptions parse(String args[]) throws IllegalArgumentException
|
||||
Arrays.toString(args), e);
|
||||
}
|
||||
|
||||
DistCpOptions option;
|
||||
Path targetPath;
|
||||
List<Path> sourcePaths = new ArrayList<Path>();
|
||||
|
||||
String leftOverArgs[] = command.getArgs();
|
||||
if (leftOverArgs == null || leftOverArgs.length < 1) {
|
||||
throw new IllegalArgumentException("Target path not specified");
|
||||
}
|
||||
|
||||
//Last Argument is the target path
|
||||
targetPath = new Path(leftOverArgs[leftOverArgs.length -1].trim());
|
||||
|
||||
//Copy any source paths in the arguments to the list
|
||||
for (int index = 0; index < leftOverArgs.length - 1; index++) {
|
||||
sourcePaths.add(new Path(leftOverArgs[index].trim()));
|
||||
}
|
||||
|
||||
/* If command has source file listing, use it else, fall back on source paths in args
|
||||
If both are present, throw exception and bail */
|
||||
if (command.hasOption(DistCpOptionSwitch.SOURCE_FILE_LISTING.getSwitch())) {
|
||||
if (!sourcePaths.isEmpty()) {
|
||||
throw new IllegalArgumentException("Both source file listing and source paths present");
|
||||
}
|
||||
option = new DistCpOptions(new Path(getVal(command, DistCpOptionSwitch.
|
||||
SOURCE_FILE_LISTING.getSwitch())), targetPath);
|
||||
} else {
|
||||
if (sourcePaths.isEmpty()) {
|
||||
throw new IllegalArgumentException("Neither source file listing nor source paths present");
|
||||
}
|
||||
option = new DistCpOptions(sourcePaths, targetPath);
|
||||
}
|
||||
DistCpOptions option = parseSourceAndTargetPaths(command);
|
||||
|
||||
//Process all the other option switches and set options appropriately
|
||||
if (command.hasOption(DistCpOptionSwitch.IGNORE_FAILURES.getSwitch())) {
|
||||
@ -165,54 +135,95 @@ public static DistCpOptions parse(String args[]) throws IllegalArgumentException
|
||||
option.setBlocking(false);
|
||||
}
|
||||
|
||||
if (command.hasOption(DistCpOptionSwitch.BANDWIDTH.getSwitch())) {
|
||||
try {
|
||||
Integer mapBandwidth = Integer.parseInt(
|
||||
getVal(command, DistCpOptionSwitch.BANDWIDTH.getSwitch()).trim());
|
||||
if (mapBandwidth.intValue() <= 0) {
|
||||
throw new IllegalArgumentException("Bandwidth specified is not positive: " +
|
||||
mapBandwidth);
|
||||
}
|
||||
option.setMapBandwidth(mapBandwidth);
|
||||
} catch (NumberFormatException e) {
|
||||
throw new IllegalArgumentException("Bandwidth specified is invalid: " +
|
||||
getVal(command, DistCpOptionSwitch.BANDWIDTH.getSwitch()), e);
|
||||
}
|
||||
}
|
||||
parseBandwidth(command, option);
|
||||
|
||||
if (command.hasOption(DistCpOptionSwitch.SSL_CONF.getSwitch())) {
|
||||
option.setSslConfigurationFile(command.
|
||||
getOptionValue(DistCpOptionSwitch.SSL_CONF.getSwitch()));
|
||||
}
|
||||
|
||||
if (command.hasOption(DistCpOptionSwitch.NUM_LISTSTATUS_THREADS.getSwitch())) {
|
||||
try {
|
||||
Integer numThreads = Integer.parseInt(getVal(command,
|
||||
DistCpOptionSwitch.NUM_LISTSTATUS_THREADS.getSwitch()).trim());
|
||||
option.setNumListstatusThreads(numThreads);
|
||||
} catch (NumberFormatException e) {
|
||||
throw new IllegalArgumentException(
|
||||
"Number of liststatus threads is invalid: " + getVal(command,
|
||||
DistCpOptionSwitch.NUM_LISTSTATUS_THREADS.getSwitch()), e);
|
||||
}
|
||||
}
|
||||
parseNumListStatusThreads(command, option);
|
||||
|
||||
if (command.hasOption(DistCpOptionSwitch.MAX_MAPS.getSwitch())) {
|
||||
try {
|
||||
Integer maps = Integer.parseInt(
|
||||
getVal(command, DistCpOptionSwitch.MAX_MAPS.getSwitch()).trim());
|
||||
option.setMaxMaps(maps);
|
||||
} catch (NumberFormatException e) {
|
||||
throw new IllegalArgumentException("Number of maps is invalid: " +
|
||||
getVal(command, DistCpOptionSwitch.MAX_MAPS.getSwitch()), e);
|
||||
}
|
||||
}
|
||||
parseMaxMaps(command, option);
|
||||
|
||||
if (command.hasOption(DistCpOptionSwitch.COPY_STRATEGY.getSwitch())) {
|
||||
option.setCopyStrategy(
|
||||
getVal(command, DistCpOptionSwitch.COPY_STRATEGY.getSwitch()));
|
||||
}
|
||||
|
||||
parsePreserveStatus(command, option);
|
||||
|
||||
if (command.hasOption(DistCpOptionSwitch.DIFF.getSwitch())) {
|
||||
String[] snapshots = getVals(command, DistCpOptionSwitch.DIFF.getSwitch());
|
||||
Preconditions.checkArgument(snapshots != null && snapshots.length == 2,
|
||||
"Must provide both the starting and ending snapshot names");
|
||||
option.setUseDiff(true, snapshots[0], snapshots[1]);
|
||||
}
|
||||
|
||||
parseFileLimit(command);
|
||||
|
||||
parseSizeLimit(command);
|
||||
|
||||
if (command.hasOption(DistCpOptionSwitch.FILTERS.getSwitch())) {
|
||||
option.setFiltersFile(getVal(command,
|
||||
DistCpOptionSwitch.FILTERS.getSwitch()));
|
||||
}
|
||||
|
||||
return option;
|
||||
}
|
||||
|
||||
/**
|
||||
* parseSizeLimit is a helper method for parsing the deprecated
|
||||
* argument SIZE_LIMIT.
|
||||
*
|
||||
* @param command command line arguments
|
||||
*/
|
||||
private static void parseSizeLimit(CommandLine command) {
|
||||
if (command.hasOption(DistCpOptionSwitch.SIZE_LIMIT.getSwitch())) {
|
||||
String sizeLimitString = getVal(command,
|
||||
DistCpOptionSwitch.SIZE_LIMIT.getSwitch().trim());
|
||||
try {
|
||||
Long.parseLong(sizeLimitString);
|
||||
}
|
||||
catch (NumberFormatException e) {
|
||||
throw new IllegalArgumentException("Size-limit is invalid: "
|
||||
+ sizeLimitString, e);
|
||||
}
|
||||
LOG.warn(DistCpOptionSwitch.SIZE_LIMIT.getSwitch() + " is a deprecated" +
|
||||
" option. Ignoring.");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* parseFileLimit is a helper method for parsing the deprecated
|
||||
* argument FILE_LIMIT.
|
||||
*
|
||||
* @param command command line arguments
|
||||
*/
|
||||
private static void parseFileLimit(CommandLine command) {
|
||||
if (command.hasOption(DistCpOptionSwitch.FILE_LIMIT.getSwitch())) {
|
||||
String fileLimitString = getVal(command,
|
||||
DistCpOptionSwitch.FILE_LIMIT.getSwitch().trim());
|
||||
try {
|
||||
Integer.parseInt(fileLimitString);
|
||||
}
|
||||
catch (NumberFormatException e) {
|
||||
throw new IllegalArgumentException("File-limit is invalid: "
|
||||
+ fileLimitString, e);
|
||||
}
|
||||
LOG.warn(DistCpOptionSwitch.FILE_LIMIT.getSwitch() + " is a deprecated" +
|
||||
" option. Ignoring.");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* parsePreserveStatus is a helper method for parsing PRESERVE_STATUS.
|
||||
*
|
||||
* @param command command line arguments
|
||||
* @param option parsed distcp options
|
||||
*/
|
||||
private static void parsePreserveStatus(CommandLine command,
|
||||
DistCpOptions option) {
|
||||
if (command.hasOption(DistCpOptionSwitch.PRESERVE_STATUS.getSwitch())) {
|
||||
String attributes =
|
||||
getVal(command, DistCpOptionSwitch.PRESERVE_STATUS.getSwitch());
|
||||
@ -227,42 +238,118 @@ public static DistCpOptions parse(String args[]) throws IllegalArgumentException
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (command.hasOption(DistCpOptionSwitch.DIFF.getSwitch())) {
|
||||
String[] snapshots = getVals(command, DistCpOptionSwitch.DIFF.getSwitch());
|
||||
Preconditions.checkArgument(snapshots != null && snapshots.length == 2,
|
||||
"Must provide both the starting and ending snapshot names");
|
||||
option.setUseDiff(true, snapshots[0], snapshots[1]);
|
||||
}
|
||||
|
||||
if (command.hasOption(DistCpOptionSwitch.FILE_LIMIT.getSwitch())) {
|
||||
String fileLimitString = getVal(command,
|
||||
DistCpOptionSwitch.FILE_LIMIT.getSwitch().trim());
|
||||
/**
|
||||
* parseMaxMaps is a helper method for parsing MAX_MAPS.
|
||||
*
|
||||
* @param command command line arguments
|
||||
* @param option parsed distcp options
|
||||
*/
|
||||
private static void parseMaxMaps(CommandLine command,
|
||||
DistCpOptions option) {
|
||||
if (command.hasOption(DistCpOptionSwitch.MAX_MAPS.getSwitch())) {
|
||||
try {
|
||||
Integer.parseInt(fileLimitString);
|
||||
Integer maps = Integer.parseInt(
|
||||
getVal(command, DistCpOptionSwitch.MAX_MAPS.getSwitch()).trim());
|
||||
option.setMaxMaps(maps);
|
||||
} catch (NumberFormatException e) {
|
||||
throw new IllegalArgumentException("Number of maps is invalid: " +
|
||||
getVal(command, DistCpOptionSwitch.MAX_MAPS.getSwitch()), e);
|
||||
}
|
||||
catch (NumberFormatException e) {
|
||||
throw new IllegalArgumentException("File-limit is invalid: "
|
||||
+ fileLimitString, e);
|
||||
}
|
||||
LOG.warn(DistCpOptionSwitch.FILE_LIMIT.getSwitch() + " is a deprecated" +
|
||||
" option. Ignoring.");
|
||||
}
|
||||
}
|
||||
|
||||
if (command.hasOption(DistCpOptionSwitch.SIZE_LIMIT.getSwitch())) {
|
||||
String sizeLimitString = getVal(command,
|
||||
DistCpOptionSwitch.SIZE_LIMIT.getSwitch().trim());
|
||||
/**
|
||||
* parseNumListStatusThreads is a helper method for parsing
|
||||
* NUM_LISTSTATUS_THREADS.
|
||||
*
|
||||
* @param command command line arguments
|
||||
* @param option parsed distcp options
|
||||
*/
|
||||
private static void parseNumListStatusThreads(CommandLine command,
|
||||
DistCpOptions option) {
|
||||
if (command.hasOption(
|
||||
DistCpOptionSwitch.NUM_LISTSTATUS_THREADS.getSwitch())) {
|
||||
try {
|
||||
Long.parseLong(sizeLimitString);
|
||||
Integer numThreads = Integer.parseInt(getVal(command,
|
||||
DistCpOptionSwitch.NUM_LISTSTATUS_THREADS.getSwitch()).trim());
|
||||
option.setNumListstatusThreads(numThreads);
|
||||
} catch (NumberFormatException e) {
|
||||
throw new IllegalArgumentException(
|
||||
"Number of liststatus threads is invalid: " + getVal(command,
|
||||
DistCpOptionSwitch.NUM_LISTSTATUS_THREADS.getSwitch()), e);
|
||||
}
|
||||
catch (NumberFormatException e) {
|
||||
throw new IllegalArgumentException("Size-limit is invalid: "
|
||||
+ sizeLimitString, e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* parseBandwidth is a helper method for parsing BANDWIDTH.
|
||||
*
|
||||
* @param command command line arguments
|
||||
* @param option parsed distcp options
|
||||
*/
|
||||
private static void parseBandwidth(CommandLine command,
|
||||
DistCpOptions option) {
|
||||
if (command.hasOption(DistCpOptionSwitch.BANDWIDTH.getSwitch())) {
|
||||
try {
|
||||
Integer mapBandwidth = Integer.parseInt(
|
||||
getVal(command, DistCpOptionSwitch.BANDWIDTH.getSwitch()).trim());
|
||||
if (mapBandwidth <= 0) {
|
||||
throw new IllegalArgumentException("Bandwidth specified is not " +
|
||||
"positive: " + mapBandwidth);
|
||||
}
|
||||
option.setMapBandwidth(mapBandwidth);
|
||||
} catch (NumberFormatException e) {
|
||||
throw new IllegalArgumentException("Bandwidth specified is invalid: " +
|
||||
getVal(command, DistCpOptionSwitch.BANDWIDTH.getSwitch()), e);
|
||||
}
|
||||
LOG.warn(DistCpOptionSwitch.SIZE_LIMIT.getSwitch() + " is a deprecated" +
|
||||
" option. Ignoring.");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* parseSourceAndTargetPaths is a helper method for parsing the source
|
||||
* and target paths.
|
||||
*
|
||||
* @param command command line arguments
|
||||
* @return DistCpOptions
|
||||
*/
|
||||
private static DistCpOptions parseSourceAndTargetPaths(
|
||||
CommandLine command) {
|
||||
DistCpOptions option;
|
||||
Path targetPath;
|
||||
List<Path> sourcePaths = new ArrayList<Path>();
|
||||
|
||||
String[] leftOverArgs = command.getArgs();
|
||||
if (leftOverArgs == null || leftOverArgs.length < 1) {
|
||||
throw new IllegalArgumentException("Target path not specified");
|
||||
}
|
||||
|
||||
//Last Argument is the target path
|
||||
targetPath = new Path(leftOverArgs[leftOverArgs.length - 1].trim());
|
||||
|
||||
//Copy any source paths in the arguments to the list
|
||||
for (int index = 0; index < leftOverArgs.length - 1; index++) {
|
||||
sourcePaths.add(new Path(leftOverArgs[index].trim()));
|
||||
}
|
||||
|
||||
/* If command has source file listing, use it else, fall back on source
|
||||
paths in args. If both are present, throw exception and bail */
|
||||
if (command.hasOption(
|
||||
DistCpOptionSwitch.SOURCE_FILE_LISTING.getSwitch())) {
|
||||
if (!sourcePaths.isEmpty()) {
|
||||
throw new IllegalArgumentException("Both source file listing and " +
|
||||
"source paths present");
|
||||
}
|
||||
option = new DistCpOptions(new Path(getVal(command, DistCpOptionSwitch.
|
||||
SOURCE_FILE_LISTING.getSwitch())), targetPath);
|
||||
} else {
|
||||
if (sourcePaths.isEmpty()) {
|
||||
throw new IllegalArgumentException("Neither source file listing nor " +
|
||||
"source paths present");
|
||||
}
|
||||
option = new DistCpOptions(sourcePaths, targetPath);
|
||||
}
|
||||
return option;
|
||||
}
|
||||
|
||||
|
@ -0,0 +1,98 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.tools;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.fs.*;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
|
||||
import java.io.*;
|
||||
import java.nio.charset.Charset;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
/**
|
||||
* A CopyFilter which compares Java Regex Patterns to each Path to determine
|
||||
* whether a file should be copied.
|
||||
*/
|
||||
public class RegexCopyFilter extends CopyFilter {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(RegexCopyFilter.class);
|
||||
private File filtersFile;
|
||||
private List<Pattern> filters;
|
||||
|
||||
/**
|
||||
* Constructor, sets up a File object to read filter patterns from and
|
||||
* the List to store the patterns.
|
||||
*/
|
||||
protected RegexCopyFilter(String filtersFilename) {
|
||||
filtersFile = new File(filtersFilename);
|
||||
filters = new ArrayList<>();
|
||||
}
|
||||
|
||||
/**
|
||||
* Loads a list of filter patterns for use in shouldCopy.
|
||||
*/
|
||||
@Override
|
||||
public void initialize() {
|
||||
BufferedReader reader = null;
|
||||
try {
|
||||
InputStream is = new FileInputStream(filtersFile);
|
||||
reader = new BufferedReader(new InputStreamReader(is,
|
||||
Charset.forName("UTF-8")));
|
||||
String line;
|
||||
while ((line = reader.readLine()) != null) {
|
||||
Pattern pattern = Pattern.compile(line);
|
||||
filters.add(pattern);
|
||||
}
|
||||
} catch (FileNotFoundException notFound) {
|
||||
LOG.error("Can't find filters file " + filtersFile);
|
||||
} catch (IOException cantRead) {
|
||||
LOG.error("An error occurred while attempting to read from " +
|
||||
filtersFile);
|
||||
} finally {
|
||||
IOUtils.cleanup(LOG, reader);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the list of filters to exclude files from copy.
|
||||
* Simplifies testing of the filters feature.
|
||||
*
|
||||
* @param filtersList a list of Patterns to be excluded
|
||||
*/
|
||||
@VisibleForTesting
|
||||
protected final void setFilters(List<Pattern> filtersList) {
|
||||
this.filters = filtersList;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean shouldCopy(Path path) {
|
||||
for (Pattern filter : filters) {
|
||||
if (filter.matcher(path.toString()).matches()) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
}
|
@ -58,6 +58,7 @@ public class SimpleCopyListing extends CopyListing {
|
||||
private long totalBytesToCopy = 0;
|
||||
private int numListstatusThreads = 1;
|
||||
private final int maxRetries = 3;
|
||||
private CopyFilter copyFilter;
|
||||
|
||||
/**
|
||||
* Protected constructor, to initialize configuration.
|
||||
@ -71,6 +72,8 @@ protected SimpleCopyListing(Configuration configuration, Credentials credentials
|
||||
numListstatusThreads = getConf().getInt(
|
||||
DistCpConstants.CONF_LABEL_LISTSTATUS_THREADS,
|
||||
DistCpConstants.DEFAULT_LISTSTATUS_THREADS);
|
||||
copyFilter = CopyFilter.getCopyFilter(getConf());
|
||||
copyFilter.initialize();
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
@ -213,7 +216,7 @@ public void doBuildListing(SequenceFile.Writer fileListWriter,
|
||||
preserveXAttrs && sourceStatus.isDirectory(),
|
||||
preserveRawXAttrs && sourceStatus.isDirectory());
|
||||
writeToFileListing(fileListWriter, sourceCopyListingStatus,
|
||||
sourcePathRoot, options);
|
||||
sourcePathRoot);
|
||||
|
||||
if (sourceStatus.isDirectory()) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
@ -264,11 +267,10 @@ private Path computeSourceRootPath(FileStatus sourceStatus,
|
||||
* Provide an option to skip copy of a path, Allows for exclusion
|
||||
* of files such as {@link org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter#SUCCEEDED_FILE_NAME}
|
||||
* @param path - Path being considered for copy while building the file listing
|
||||
* @param options - Input options passed during DistCp invocation
|
||||
* @return - True if the path should be considered for copy, false otherwise
|
||||
*/
|
||||
protected boolean shouldCopy(Path path, DistCpOptions options) {
|
||||
return true;
|
||||
protected boolean shouldCopy(Path path) {
|
||||
return copyFilter.shouldCopy(path);
|
||||
}
|
||||
|
||||
/** {@inheritDoc} */
|
||||
@ -409,7 +411,7 @@ private void traverseDirectory(SequenceFile.Writer fileListWriter,
|
||||
preserveXAttrs && child.isDirectory(),
|
||||
preserveRawXattrs && child.isDirectory());
|
||||
writeToFileListing(fileListWriter, childCopyListingStatus,
|
||||
sourcePathRoot, options);
|
||||
sourcePathRoot);
|
||||
}
|
||||
if (retry < maxRetries) {
|
||||
if (child.isDirectory()) {
|
||||
@ -443,26 +445,23 @@ private void writeToFileListingRoot(SequenceFile.Writer fileListWriter,
|
||||
}
|
||||
return;
|
||||
}
|
||||
writeToFileListing(fileListWriter, fileStatus, sourcePathRoot, options);
|
||||
writeToFileListing(fileListWriter, fileStatus, sourcePathRoot);
|
||||
}
|
||||
|
||||
private void writeToFileListing(SequenceFile.Writer fileListWriter,
|
||||
CopyListingFileStatus fileStatus,
|
||||
Path sourcePathRoot,
|
||||
DistCpOptions options) throws IOException {
|
||||
Path sourcePathRoot) throws IOException {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("REL PATH: " + DistCpUtils.getRelativePath(sourcePathRoot,
|
||||
fileStatus.getPath()) + ", FULL PATH: " + fileStatus.getPath());
|
||||
}
|
||||
|
||||
FileStatus status = fileStatus;
|
||||
|
||||
if (!shouldCopy(fileStatus.getPath(), options)) {
|
||||
if (!shouldCopy(fileStatus.getPath())) {
|
||||
return;
|
||||
}
|
||||
|
||||
fileListWriter.append(new Text(DistCpUtils.getRelativePath(sourcePathRoot,
|
||||
fileStatus.getPath())), status);
|
||||
fileStatus.getPath())), fileStatus);
|
||||
fileListWriter.sync();
|
||||
|
||||
if (!fileStatus.isDirectory()) {
|
||||
|
@ -0,0 +1,33 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.tools;
|
||||
|
||||
import org.apache.hadoop.fs.Path;
|
||||
|
||||
/**
|
||||
* A CopyFilter which always returns true.
|
||||
*
|
||||
*/
|
||||
public class TrueCopyFilter extends CopyFilter {
|
||||
|
||||
@Override
|
||||
public boolean shouldCopy(Path path) {
|
||||
return true;
|
||||
}
|
||||
}
|
@ -0,0 +1,26 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
|
||||
/**
|
||||
* DistCp is a tool for replicating data using MapReduce jobs for concurrent
|
||||
* copy operations.
|
||||
*
|
||||
* @version 2
|
||||
*/
|
||||
package org.apache.hadoop.tools;
|
@ -94,40 +94,6 @@ protected long getNumberOfPaths() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Test(timeout=10000)
|
||||
public void testSkipCopy() throws Exception {
|
||||
SimpleCopyListing listing = new SimpleCopyListing(getConf(), CREDENTIALS) {
|
||||
@Override
|
||||
protected boolean shouldCopy(Path path, DistCpOptions options) {
|
||||
return !path.getName().equals(FileOutputCommitter.SUCCEEDED_FILE_NAME);
|
||||
}
|
||||
};
|
||||
FileSystem fs = FileSystem.get(getConf());
|
||||
List<Path> srcPaths = new ArrayList<Path>();
|
||||
srcPaths.add(new Path("/tmp/in4/1"));
|
||||
srcPaths.add(new Path("/tmp/in4/2"));
|
||||
Path target = new Path("/tmp/out4/1");
|
||||
TestDistCpUtils.createFile(fs, "/tmp/in4/1/_SUCCESS");
|
||||
TestDistCpUtils.createFile(fs, "/tmp/in4/1/file");
|
||||
TestDistCpUtils.createFile(fs, "/tmp/in4/2");
|
||||
fs.mkdirs(target);
|
||||
DistCpOptions options = new DistCpOptions(srcPaths, target);
|
||||
Path listingFile = new Path("/tmp/list4");
|
||||
listing.buildListing(listingFile, options);
|
||||
Assert.assertEquals(listing.getNumberOfPaths(), 3);
|
||||
SequenceFile.Reader reader = new SequenceFile.Reader(getConf(),
|
||||
SequenceFile.Reader.file(listingFile));
|
||||
CopyListingFileStatus fileStatus = new CopyListingFileStatus();
|
||||
Text relativePath = new Text();
|
||||
Assert.assertTrue(reader.next(relativePath, fileStatus));
|
||||
Assert.assertEquals(relativePath.toString(), "/1");
|
||||
Assert.assertTrue(reader.next(relativePath, fileStatus));
|
||||
Assert.assertEquals(relativePath.toString(), "/1/file");
|
||||
Assert.assertTrue(reader.next(relativePath, fileStatus));
|
||||
Assert.assertEquals(relativePath.toString(), "/2");
|
||||
Assert.assertFalse(reader.next(relativePath, fileStatus));
|
||||
}
|
||||
|
||||
@Test(timeout=10000)
|
||||
public void testMultipleSrcToFile() {
|
||||
FileSystem fs = null;
|
||||
|
@ -241,55 +241,6 @@ private void caseMultiFileTargetPresent(boolean sync) {
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout=100000)
|
||||
public void testCustomCopyListing() {
|
||||
|
||||
try {
|
||||
addEntries(listFile, "multifile1/file3", "multifile1/file4", "multifile1/file5");
|
||||
createFiles("multifile1/file3", "multifile1/file4", "multifile1/file5");
|
||||
mkdirs(target.toString());
|
||||
|
||||
Configuration conf = getConf();
|
||||
try {
|
||||
conf.setClass(DistCpConstants.CONF_LABEL_COPY_LISTING_CLASS,
|
||||
CustomCopyListing.class, CopyListing.class);
|
||||
DistCpOptions options = new DistCpOptions(Arrays.
|
||||
asList(new Path(root + "/" + "multifile1")), target);
|
||||
options.setSyncFolder(true);
|
||||
options.setDeleteMissing(false);
|
||||
options.setOverwrite(false);
|
||||
try {
|
||||
new DistCp(conf, options).execute();
|
||||
} catch (Exception e) {
|
||||
LOG.error("Exception encountered ", e);
|
||||
throw new IOException(e);
|
||||
}
|
||||
} finally {
|
||||
conf.unset(DistCpConstants.CONF_LABEL_COPY_LISTING_CLASS);
|
||||
}
|
||||
|
||||
checkResult(target, 2, "file4", "file5");
|
||||
} catch (IOException e) {
|
||||
LOG.error("Exception encountered while testing distcp", e);
|
||||
Assert.fail("distcp failure");
|
||||
} finally {
|
||||
TestDistCpUtils.delete(fs, root);
|
||||
}
|
||||
}
|
||||
|
||||
private static class CustomCopyListing extends SimpleCopyListing {
|
||||
|
||||
public CustomCopyListing(Configuration configuration,
|
||||
Credentials credentials) {
|
||||
super(configuration, credentials);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean shouldCopy(Path path, DistCpOptions options) {
|
||||
return !path.getName().equals("file3");
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout=100000)
|
||||
public void testMultiFileTargetMissing() {
|
||||
caseMultiFileTargetMissing(false);
|
||||
|
@ -400,7 +400,7 @@ public void testToString() {
|
||||
String val = "DistCpOptions{atomicCommit=false, syncFolder=false, deleteMissing=false, " +
|
||||
"ignoreFailures=false, maxMaps=20, sslConfigurationFile='null', copyStrategy='uniformsize', " +
|
||||
"sourceFileListing=abc, sourcePaths=null, targetPath=xyz, targetPathExists=true, " +
|
||||
"preserveRawXattrs=false}";
|
||||
"preserveRawXattrs=false, filtersFile='null'}";
|
||||
Assert.assertEquals(val, option.toString());
|
||||
Assert.assertNotSame(DistCpOptionSwitch.ATOMIC_COMMIT.toString(),
|
||||
DistCpOptionSwitch.ATOMIC_COMMIT.name());
|
||||
@ -718,4 +718,19 @@ public void testDiffOption() {
|
||||
"Diff is valid only with update and delete options", e);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testExclusionsOption() {
|
||||
DistCpOptions options = OptionsParser.parse(new String[] {
|
||||
"hdfs://localhost:8020/source/first",
|
||||
"hdfs://localhost:8020/target/"});
|
||||
Assert.assertNull(options.getFiltersFile());
|
||||
|
||||
options = OptionsParser.parse(new String[] {
|
||||
"-filters",
|
||||
"/tmp/filters.txt",
|
||||
"hdfs://localhost:8020/source/first",
|
||||
"hdfs://localhost:8020/target/"});
|
||||
Assert.assertEquals(options.getFiltersFile(), "/tmp/filters.txt");
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,113 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.tools;
|
||||
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.File;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
public class TestRegexCopyFilter {
|
||||
|
||||
@Test
|
||||
public void testShouldCopyTrue() {
|
||||
List<Pattern> filters = new ArrayList<>();
|
||||
filters.add(Pattern.compile("user"));
|
||||
|
||||
RegexCopyFilter regexCopyFilter = new RegexCopyFilter("fakeFile");
|
||||
regexCopyFilter.setFilters(filters);
|
||||
|
||||
Path shouldCopyPath = new Path("/user/bar");
|
||||
Assert.assertTrue(regexCopyFilter.shouldCopy(shouldCopyPath));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testShouldCopyFalse() {
|
||||
List<Pattern> filters = new ArrayList<>();
|
||||
filters.add(Pattern.compile(".*test.*"));
|
||||
|
||||
RegexCopyFilter regexCopyFilter = new RegexCopyFilter("fakeFile");
|
||||
regexCopyFilter.setFilters(filters);
|
||||
|
||||
Path shouldNotCopyPath = new Path("/user/testing");
|
||||
Assert.assertFalse(regexCopyFilter.shouldCopy(shouldNotCopyPath));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testShouldCopyWithMultipleFilters() {
|
||||
List<Pattern> filters = new ArrayList<>();
|
||||
filters.add(Pattern.compile(".*test.*"));
|
||||
filters.add(Pattern.compile("/user/b.*"));
|
||||
filters.add(Pattern.compile(".*_SUCCESS"));
|
||||
|
||||
List<Path> toCopy = getTestPaths();
|
||||
|
||||
int shouldCopyCount = 0;
|
||||
|
||||
RegexCopyFilter regexCopyFilter = new RegexCopyFilter("fakeFile");
|
||||
regexCopyFilter.setFilters(filters);
|
||||
|
||||
for (Path path: toCopy) {
|
||||
if (regexCopyFilter.shouldCopy(path)) {
|
||||
shouldCopyCount++;
|
||||
}
|
||||
}
|
||||
|
||||
Assert.assertEquals(2, shouldCopyCount);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testShouldExcludeAll() {
|
||||
List<Pattern> filters = new ArrayList<>();
|
||||
filters.add(Pattern.compile(".*test.*"));
|
||||
filters.add(Pattern.compile("/user/b.*"));
|
||||
filters.add(Pattern.compile(".*")); // exclude everything
|
||||
|
||||
List<Path> toCopy = getTestPaths();
|
||||
|
||||
int shouldCopyCount = 0;
|
||||
|
||||
RegexCopyFilter regexCopyFilter = new RegexCopyFilter("fakeFile");
|
||||
regexCopyFilter.setFilters(filters);
|
||||
|
||||
for (Path path: toCopy) {
|
||||
if (regexCopyFilter.shouldCopy(path)) {
|
||||
shouldCopyCount++;
|
||||
}
|
||||
}
|
||||
|
||||
Assert.assertEquals(0, shouldCopyCount);
|
||||
}
|
||||
|
||||
private List<Path> getTestPaths() {
|
||||
List<Path> toCopy = new ArrayList<>();
|
||||
toCopy.add(new Path("/user/bar"));
|
||||
toCopy.add(new Path("/user/foo/_SUCCESS"));
|
||||
toCopy.add(new Path("/hive/test_data"));
|
||||
toCopy.add(new Path("test"));
|
||||
toCopy.add(new Path("/user/foo/bar"));
|
||||
toCopy.add(new Path("/mapred/.staging_job"));
|
||||
return toCopy;
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,36 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.tools;
|
||||
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
public class TestTrueCopyFilter {
|
||||
|
||||
@Test
|
||||
public void testShouldCopy() {
|
||||
Assert.assertTrue(new TrueCopyFilter().shouldCopy(new Path("fake")));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testShouldCopyWithNull() {
|
||||
Assert.assertTrue(new TrueCopyFilter().shouldCopy(new Path("fake")));
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user