From b8e8b8da75baf62ac7465e64acf17f280475bb20 Mon Sep 17 00:00:00 2001 From: Arun Murthy Date: Wed, 21 Sep 2011 01:10:49 +0000 Subject: [PATCH] MAPREDUCE-3018. Fixed -file option for streaming. Contributed by Mahadev Konar. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1173451 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 2 ++ .../apache/hadoop/streaming/StreamJob.java | 25 +++++++++++++------ 2 files changed, 19 insertions(+), 8 deletions(-) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 1d637419eb..0fc0b4eb1d 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -1370,6 +1370,8 @@ Release 0.23.0 - Unreleased YarnClientProtocolProvider and ensured MiniMRYarnCluster sets JobHistory configuration for tests. (acmurthy) + MAPREDUCE-3018. Fixed -file option for streaming. (mahadev via acmurthy) + Release 0.22.0 - Unreleased INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java b/hadoop-mapreduce-project/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java index 3212a1fcdf..27629476d9 100644 --- a/hadoop-mapreduce-project/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java +++ b/hadoop-mapreduce-project/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java @@ -22,8 +22,10 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.net.URI; +import java.net.URISyntaxException; import java.net.URLEncoder; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.regex.Pattern; @@ -43,6 +45,7 @@ import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.filecache.DistributedCache; import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.FileAlreadyExistsException; import org.apache.hadoop.mapred.FileInputFormat; @@ -277,19 +280,25 @@ void parseArgv() { if (values != null && values.length > 0) { LOG.warn("-file option is deprecated, please use generic option" + " -files instead."); - StringBuilder unpackRegex = new StringBuilder( - config_.getPattern(MRJobConfig.JAR_UNPACK_PATTERN, - JobConf.UNPACK_JAR_PATTERN_DEFAULT).pattern()); + + String fileList = null; for (String file : values) { packageFiles_.add(file); - String fname = new File(file).getName(); - unpackRegex.append("|(?:").append(Pattern.quote(fname)).append(")"); + try { + URI pathURI = new URI(file); + Path path = new Path(pathURI); + FileSystem localFs = FileSystem.getLocal(config_); + String finalPath = path.makeQualified(localFs).toString(); + fileList = fileList == null ? finalPath : fileList + "," + finalPath; + } catch (Exception e) { + throw new IllegalArgumentException(e); + } } - config_.setPattern(MRJobConfig.JAR_UNPACK_PATTERN, - Pattern.compile(unpackRegex.toString())); + config_.set("tmpfiles", config_.get("tmpfiles", "") + + (fileList == null ? "" : fileList)); validate(packageFiles_); } - + String fsName = cmdLine.getOptionValue("dfs"); if (null != fsName){ LOG.warn("-dfs option is deprecated, please use -fs instead.");