HDFS-9640. Remove hsftp from DistCp in trunk. Contributed by Wei-Chiu Chuang.
This commit is contained in:
parent
55ae143923
commit
18c7e58283
@ -19,7 +19,6 @@
|
||||
package org.apache.hadoop.tools;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URL;
|
||||
import java.util.Random;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
@ -254,88 +253,10 @@ private Job createJob() throws IOException {
|
||||
job.getConfiguration().set(JobContext.NUM_MAPS,
|
||||
String.valueOf(inputOptions.getMaxMaps()));
|
||||
|
||||
if (inputOptions.getSslConfigurationFile() != null) {
|
||||
setupSSLConfig(job);
|
||||
}
|
||||
|
||||
inputOptions.appendToConf(job.getConfiguration());
|
||||
return job;
|
||||
}
|
||||
|
||||
/**
|
||||
* Setup ssl configuration on the job configuration to enable hsftp access
|
||||
* from map job. Also copy the ssl configuration file to Distributed cache
|
||||
*
|
||||
* @param job - Reference to job's handle
|
||||
* @throws java.io.IOException - Exception if unable to locate ssl config file
|
||||
*/
|
||||
private void setupSSLConfig(Job job) throws IOException {
|
||||
Configuration configuration = job.getConfiguration();
|
||||
URL sslFileUrl = configuration.getResource(inputOptions
|
||||
.getSslConfigurationFile());
|
||||
if (sslFileUrl == null) {
|
||||
throw new IOException(
|
||||
"Given ssl configuration file doesn't exist in class path : "
|
||||
+ inputOptions.getSslConfigurationFile());
|
||||
}
|
||||
Path sslConfigPath = new Path(sslFileUrl.toString());
|
||||
|
||||
addSSLFilesToDistCache(job, sslConfigPath);
|
||||
configuration.set(DistCpConstants.CONF_LABEL_SSL_CONF, sslConfigPath.getName());
|
||||
configuration.set(DistCpConstants.CONF_LABEL_SSL_KEYSTORE, sslConfigPath.getName());
|
||||
}
|
||||
|
||||
/**
|
||||
* Add SSL files to distributed cache. Trust store, key store and ssl config xml
|
||||
*
|
||||
* @param job - Job handle
|
||||
* @param sslConfigPath - ssl Configuration file specified through options
|
||||
* @throws IOException - If any
|
||||
*/
|
||||
private void addSSLFilesToDistCache(Job job,
|
||||
Path sslConfigPath) throws IOException {
|
||||
Configuration configuration = job.getConfiguration();
|
||||
FileSystem localFS = FileSystem.getLocal(configuration);
|
||||
|
||||
Configuration sslConf = new Configuration(false);
|
||||
sslConf.addResource(sslConfigPath);
|
||||
|
||||
Path localStorePath = getLocalStorePath(sslConf,
|
||||
DistCpConstants.CONF_LABEL_SSL_TRUST_STORE_LOCATION);
|
||||
job.addCacheFile(localStorePath.makeQualified(localFS.getUri(),
|
||||
localFS.getWorkingDirectory()).toUri());
|
||||
configuration.set(DistCpConstants.CONF_LABEL_SSL_TRUST_STORE_LOCATION,
|
||||
localStorePath.getName());
|
||||
|
||||
localStorePath = getLocalStorePath(sslConf,
|
||||
DistCpConstants.CONF_LABEL_SSL_KEY_STORE_LOCATION);
|
||||
job.addCacheFile(localStorePath.makeQualified(localFS.getUri(),
|
||||
localFS.getWorkingDirectory()).toUri());
|
||||
configuration.set(DistCpConstants.CONF_LABEL_SSL_KEY_STORE_LOCATION,
|
||||
localStorePath.getName());
|
||||
|
||||
job.addCacheFile(sslConfigPath.makeQualified(localFS.getUri(),
|
||||
localFS.getWorkingDirectory()).toUri());
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Get Local Trust store/key store path
|
||||
*
|
||||
* @param sslConf - Config from SSL Client xml
|
||||
* @param storeKey - Key for either trust store or key store
|
||||
* @return - Path where the store is present
|
||||
* @throws IOException -If any
|
||||
*/
|
||||
private Path getLocalStorePath(Configuration sslConf, String storeKey) throws IOException {
|
||||
if (sslConf.get(storeKey) != null) {
|
||||
return new Path(sslConf.get(storeKey));
|
||||
} else {
|
||||
throw new IOException("Store for " + storeKey + " is not set in " +
|
||||
inputOptions.getSslConfigurationFile());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Setup output format appropriately
|
||||
*
|
||||
|
@ -49,7 +49,6 @@ public class DistCpConstants {
|
||||
"distcp.preserve.rawxattrs";
|
||||
public static final String CONF_LABEL_SYNC_FOLDERS = "distcp.sync.folders";
|
||||
public static final String CONF_LABEL_DELETE_MISSING = "distcp.delete.missing.source";
|
||||
public static final String CONF_LABEL_SSL_CONF = "distcp.keystore.resource";
|
||||
public static final String CONF_LABEL_LISTSTATUS_THREADS = "distcp.liststatus.threads";
|
||||
public static final String CONF_LABEL_MAX_MAPS = "distcp.max.maps";
|
||||
public static final String CONF_LABEL_SOURCE_LISTING = "distcp.source.listing";
|
||||
@ -76,9 +75,6 @@ public class DistCpConstants {
|
||||
/* Total number of paths to copy, includes directories. Unfiltered count */
|
||||
public static final String CONF_LABEL_TOTAL_NUMBER_OF_RECORDS = "mapred.number.of.records";
|
||||
|
||||
/* SSL keystore resource */
|
||||
public static final String CONF_LABEL_SSL_KEYSTORE = "dfs.https.client.keystore.resource";
|
||||
|
||||
/* If input is based -f <<source listing>>, file containing the src paths */
|
||||
public static final String CONF_LABEL_LISTING_FILE_PATH = "distcp.listing.file.path";
|
||||
|
||||
@ -106,18 +102,6 @@ public class DistCpConstants {
|
||||
/* DistCp CopyListing class override param */
|
||||
public static final String CONF_LABEL_COPY_LISTING_CLASS = "distcp.copy.listing.class";
|
||||
|
||||
/**
|
||||
* Conf label for SSL Trust-store location.
|
||||
*/
|
||||
public static final String CONF_LABEL_SSL_TRUST_STORE_LOCATION
|
||||
= "ssl.client.truststore.location";
|
||||
|
||||
/**
|
||||
* Conf label for SSL Key-store location.
|
||||
*/
|
||||
public static final String CONF_LABEL_SSL_KEY_STORE_LOCATION
|
||||
= "ssl.client.keystore.location";
|
||||
|
||||
/**
|
||||
* Constants for DistCp return code to shell / consumer of ToolRunner's run
|
||||
*/
|
||||
|
@ -74,15 +74,6 @@ public enum DistCpOptionSwitch {
|
||||
DELETE_MISSING(DistCpConstants.CONF_LABEL_DELETE_MISSING,
|
||||
new Option("delete", false, "Delete from target, " +
|
||||
"files missing in source")),
|
||||
|
||||
/**
|
||||
* Configuration file to use with hftps:// for securely copying
|
||||
* files across clusters. Typically the configuration file contains
|
||||
* truststore/keystore information such as location, password and type
|
||||
*/
|
||||
SSL_CONF(DistCpConstants.CONF_LABEL_SSL_CONF,
|
||||
new Option("mapredSslConf", true, "Configuration for ssl config file" +
|
||||
", to use with hftps://. Must be in the classpath.")),
|
||||
/**
|
||||
* Number of threads for building source file listing (before map-reduce
|
||||
* phase, max one listStatus per thread at a time).
|
||||
|
@ -49,8 +49,6 @@ public class DistCpOptions {
|
||||
private int maxMaps = DistCpConstants.DEFAULT_MAPS;
|
||||
private float mapBandwidth = DistCpConstants.DEFAULT_BANDWIDTH_MB;
|
||||
|
||||
private String sslConfigurationFile;
|
||||
|
||||
private String copyStrategy = DistCpConstants.UNIFORMSIZE;
|
||||
|
||||
private EnumSet<FileAttribute> preserveStatus = EnumSet.noneOf(FileAttribute.class);
|
||||
@ -134,7 +132,6 @@ public DistCpOptions(DistCpOptions that) {
|
||||
this.numListstatusThreads = that.numListstatusThreads;
|
||||
this.maxMaps = that.maxMaps;
|
||||
this.mapBandwidth = that.mapBandwidth;
|
||||
this.sslConfigurationFile = that.getSslConfigurationFile();
|
||||
this.copyStrategy = that.copyStrategy;
|
||||
this.preserveStatus = that.preserveStatus;
|
||||
this.preserveRawXattrs = that.preserveRawXattrs;
|
||||
@ -380,24 +377,6 @@ public void setMapBandwidth(float mapBandwidth) {
|
||||
this.mapBandwidth = mapBandwidth;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get path where the ssl configuration file is present to use for hftps://
|
||||
*
|
||||
* @return Path on local file system
|
||||
*/
|
||||
public String getSslConfigurationFile() {
|
||||
return sslConfigurationFile;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the SSL configuration file path to use with hftps:// (local path)
|
||||
*
|
||||
* @param sslConfigurationFile - Local ssl config file path
|
||||
*/
|
||||
public void setSslConfigurationFile(String sslConfigurationFile) {
|
||||
this.sslConfigurationFile = sslConfigurationFile;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns an iterator with the list of file attributes to preserve
|
||||
*
|
||||
@ -670,7 +649,6 @@ public String toString() {
|
||||
", numListstatusThreads=" + numListstatusThreads +
|
||||
", maxMaps=" + maxMaps +
|
||||
", mapBandwidth=" + mapBandwidth +
|
||||
", sslConfigurationFile='" + sslConfigurationFile + '\'' +
|
||||
", copyStrategy='" + copyStrategy + '\'' +
|
||||
", preserveStatus=" + preserveStatus +
|
||||
", preserveRawXattrs=" + preserveRawXattrs +
|
||||
|
@ -137,11 +137,6 @@ public static DistCpOptions parse(String args[]) throws IllegalArgumentException
|
||||
|
||||
parseBandwidth(command, option);
|
||||
|
||||
if (command.hasOption(DistCpOptionSwitch.SSL_CONF.getSwitch())) {
|
||||
option.setSslConfigurationFile(command.
|
||||
getOptionValue(DistCpOptionSwitch.SSL_CONF.getSwitch()));
|
||||
}
|
||||
|
||||
parseNumListStatusThreads(command, option);
|
||||
|
||||
parseMaxMaps(command, option);
|
||||
|
@ -19,10 +19,7 @@
|
||||
package org.apache.hadoop.tools.mapred;
|
||||
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.FileOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.util.Arrays;
|
||||
import java.util.EnumSet;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
@ -33,7 +30,6 @@
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.mapreduce.JobContext;
|
||||
import org.apache.hadoop.mapreduce.Mapper;
|
||||
import org.apache.hadoop.tools.CopyListingFileStatus;
|
||||
import org.apache.hadoop.tools.DistCpConstants;
|
||||
@ -119,73 +115,9 @@ public void setup(Context context) throws IOException, InterruptedException {
|
||||
overWrite = true; // When target is an existing file, overwrite it.
|
||||
}
|
||||
|
||||
if (conf.get(DistCpConstants.CONF_LABEL_SSL_CONF) != null) {
|
||||
initializeSSLConf(context);
|
||||
}
|
||||
startEpoch = System.currentTimeMillis();
|
||||
}
|
||||
|
||||
/**
|
||||
* Initialize SSL Config if same is set in conf
|
||||
*
|
||||
* @throws IOException - If any
|
||||
*/
|
||||
private void initializeSSLConf(Context context) throws IOException {
|
||||
LOG.info("Initializing SSL configuration");
|
||||
|
||||
String workDir = conf.get(JobContext.JOB_LOCAL_DIR) + "/work";
|
||||
Path[] cacheFiles = context.getLocalCacheFiles();
|
||||
|
||||
Configuration sslConfig = new Configuration(false);
|
||||
String sslConfFileName = conf.get(DistCpConstants.CONF_LABEL_SSL_CONF);
|
||||
Path sslClient = findCacheFile(cacheFiles, sslConfFileName);
|
||||
if (sslClient == null) {
|
||||
LOG.warn("SSL Client config file not found. Was looking for " + sslConfFileName +
|
||||
" in " + Arrays.toString(cacheFiles));
|
||||
return;
|
||||
}
|
||||
sslConfig.addResource(sslClient);
|
||||
|
||||
String trustStoreFile = conf.get("ssl.client.truststore.location");
|
||||
Path trustStorePath = findCacheFile(cacheFiles, trustStoreFile);
|
||||
sslConfig.set("ssl.client.truststore.location", trustStorePath.toString());
|
||||
|
||||
String keyStoreFile = conf.get("ssl.client.keystore.location");
|
||||
Path keyStorePath = findCacheFile(cacheFiles, keyStoreFile);
|
||||
sslConfig.set("ssl.client.keystore.location", keyStorePath.toString());
|
||||
|
||||
try {
|
||||
OutputStream out = new FileOutputStream(workDir + "/" + sslConfFileName);
|
||||
try {
|
||||
sslConfig.writeXml(out);
|
||||
} finally {
|
||||
out.close();
|
||||
}
|
||||
conf.set(DistCpConstants.CONF_LABEL_SSL_KEYSTORE, sslConfFileName);
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Unable to write out the ssl configuration. " +
|
||||
"Will fall back to default ssl-client.xml in class path, if there is one", e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Find entry from distributed cache
|
||||
*
|
||||
* @param cacheFiles - All localized cache files
|
||||
* @param fileName - fileName to search
|
||||
* @return Path of the filename if found, else null
|
||||
*/
|
||||
private Path findCacheFile(Path[] cacheFiles, String fileName) {
|
||||
if (cacheFiles != null && cacheFiles.length > 0) {
|
||||
for (Path file : cacheFiles) {
|
||||
if (file.getName().equals(fileName)) {
|
||||
return file;
|
||||
}
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Implementation of the Mapper::map(). Does the copy.
|
||||
* @param relPath The target path.
|
||||
|
@ -32,7 +32,6 @@ DistCp Guide
|
||||
- [Map sizing](#Map_sizing)
|
||||
- [Copying Between Versions of HDFS](#Copying_Between_Versions_of_HDFS)
|
||||
- [MapReduce and other side-effects](#MapReduce_and_other_side-effects)
|
||||
- [SSL Configurations for HSFTP sources](#SSL_Configurations_for_HSFTP_sources)
|
||||
- [Frequently Asked Questions](#Frequently_Asked_Questions)
|
||||
|
||||
---
|
||||
@ -232,7 +231,6 @@ Flag | Description | Notes
|
||||
`-strategy {dynamic|uniformsize}` | Choose the copy-strategy to be used in DistCp. | By default, uniformsize is used. (i.e. Maps are balanced on the total size of files copied by each map. Similar to legacy.) If "dynamic" is specified, `DynamicInputFormat` is used instead. (This is described in the Architecture section, under InputFormats.)
|
||||
`-bandwidth` | Specify bandwidth per map, in MB/second. | Each map will be restricted to consume only the specified bandwidth. This is not always exact. The map throttles back its bandwidth consumption during a copy, such that the **net** bandwidth used tends towards the specified value.
|
||||
`-atomic {-tmp <tmp_dir>}` | Specify atomic commit, with optional tmp directory. | `-atomic` instructs DistCp to copy the source data to a temporary target location, and then move the temporary target to the final-location atomically. Data will either be available at final target in a complete and consistent form, or not at all. Optionally, `-tmp` may be used to specify the location of the tmp-target. If not specified, a default is chosen. **Note:** tmp_dir must be on the final target cluster.
|
||||
`-mapredSslConf <ssl_conf_file>` | Specify SSL Config file, to be used with HSFTP source | When using the hsftp protocol with a source, the security- related properties may be specified in a config-file and passed to DistCp. \<ssl_conf_file\> needs to be in the classpath.
|
||||
`-async` | Run DistCp asynchronously. Quits as soon as the Hadoop Job is launched. | The Hadoop Job-id is logged, for tracking.
|
||||
`-diff` | Use snapshot diff report to identify the difference between source and target. |
|
||||
`-numListstatusThreads` | Number of threads to use for building file listing | At most 40 threads.
|
||||
@ -432,43 +430,6 @@ $H3 MapReduce and other side-effects
|
||||
* If `mapreduce.map.speculative` is set set final and true, the result of the
|
||||
copy is undefined.
|
||||
|
||||
$H3 SSL Configurations for HSFTP sources
|
||||
|
||||
To use an HSFTP source (i.e. using the hsftp protocol), a SSL configuration
|
||||
file needs to be specified (via the `-mapredSslConf` option). This must
|
||||
specify 3 parameters:
|
||||
|
||||
* `ssl.client.truststore.location`: The local-filesystem location of the
|
||||
trust-store file, containing the certificate for the NameNode.
|
||||
* `ssl.client.truststore.type`: (Optional) The format of the trust-store
|
||||
file.
|
||||
* `ssl.client.truststore.password`: (Optional) Password for the trust-store
|
||||
file.
|
||||
|
||||
The following is an example SSL configuration file:
|
||||
|
||||
<configuration>
|
||||
<property>
|
||||
<name>ssl.client.truststore.location</name>
|
||||
<value>/work/keystore.jks</value>
|
||||
<description>Truststore to be used by clients like distcp. Must be specified.</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>ssl.client.truststore.password</name>
|
||||
<value>changeme</value>
|
||||
<description>Optional. Default value is "".</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>ssl.client.truststore.type</name>
|
||||
<value>jks</value>
|
||||
<description>Optional. Default value is "jks".</description>
|
||||
</property>
|
||||
</configuration>
|
||||
|
||||
The SSL configuration file must be in the class-path of the DistCp program.
|
||||
|
||||
Frequently Asked Questions
|
||||
--------------------------
|
||||
|
||||
|
@ -251,21 +251,6 @@ public void testParseDeleteMissing() {
|
||||
} catch (IllegalArgumentException ignore) { }
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testParseSSLConf() {
|
||||
DistCpOptions options = OptionsParser.parse(new String[] {
|
||||
"hdfs://localhost:8020/source/first",
|
||||
"hdfs://localhost:8020/target/"});
|
||||
Assert.assertNull(options.getSslConfigurationFile());
|
||||
|
||||
options = OptionsParser.parse(new String[] {
|
||||
"-mapredSslConf",
|
||||
"/tmp/ssl-client.xml",
|
||||
"hdfs://localhost:8020/source/first",
|
||||
"hdfs://localhost:8020/target/"});
|
||||
Assert.assertEquals(options.getSslConfigurationFile(), "/tmp/ssl-client.xml");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testParseMaps() {
|
||||
DistCpOptions options = OptionsParser.parse(new String[] {
|
||||
@ -402,7 +387,7 @@ public void testToString() {
|
||||
String val = "DistCpOptions{atomicCommit=false, syncFolder=false, "
|
||||
+ "deleteMissing=false, ignoreFailures=false, overwrite=false, "
|
||||
+ "skipCRC=false, blocking=true, numListstatusThreads=0, maxMaps=20, "
|
||||
+ "mapBandwidth=100.0, sslConfigurationFile='null', "
|
||||
+ "mapBandwidth=100.0, "
|
||||
+ "copyStrategy='uniformsize', preserveStatus=[], "
|
||||
+ "preserveRawXattrs=false, atomicWorkPath=null, logPath=null, "
|
||||
+ "sourceFileListing=abc, sourcePaths=null, targetPath=xyz, "
|
||||
|
Loading…
Reference in New Issue
Block a user