diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 1344da9d1d..b389f48266 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -126,6 +126,9 @@ Release 2.0.1-alpha - UNRELEASED IMPROVEMENTS + MAPREDUCE-4146. Support limits on task status string length and number of + block locations in branch-2. (Ahmed Radwan via tomwhite) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java index 06a3d48892..f7a7dd4474 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java @@ -53,7 +53,6 @@ import org.apache.hadoop.io.serializer.Deserializer; import org.apache.hadoop.io.serializer.SerializationFactory; import org.apache.hadoop.mapred.IFile.Writer; -import org.apache.hadoop.mapreduce.Counter; import org.apache.hadoop.mapreduce.FileSystemCounter; import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.TaskCounter; @@ -569,7 +568,21 @@ public void initialize(JobConf job, JobID id, resourceCalculator.getProcResourceValues().getCumulativeCpuTime(); } } - + + public static String normalizeStatus(String status, Configuration conf) { + // Check to see if the status string is too long + // and truncate it if needed. + int progressStatusLength = conf.getInt( + MRConfig.PROGRESS_STATUS_LEN_LIMIT_KEY, + MRConfig.PROGRESS_STATUS_LEN_LIMIT_DEFAULT); + if (status.length() > progressStatusLength) { + LOG.warn("Task status: \"" + status + "\" truncated to max limit (" + + progressStatusLength + " characters)"); + status = status.substring(0, progressStatusLength); + } + return status; + } + @InterfaceAudience.Private @InterfaceStability.Unstable protected class TaskReporter @@ -603,7 +616,7 @@ boolean resetProgressFlag() { return progressFlag.getAndSet(false); } public void setStatus(String status) { - taskProgress.setStatus(status); + taskProgress.setStatus(normalizeStatus(status, conf)); // indicate that progress update needs to be sent setProgressFlag(); } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRConfig.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRConfig.java index 4516cb9eda..82ee5f00a7 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRConfig.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRConfig.java @@ -71,4 +71,12 @@ public interface MRConfig { public static final String TASK_LOCAL_OUTPUT_CLASS = "mapreduce.task.local.output.class"; + + public static final String PROGRESS_STATUS_LEN_LIMIT_KEY = + "mapreduce.task.max.status.length"; + public static final int PROGRESS_STATUS_LEN_LIMIT_DEFAULT = 512; + + public static final int MAX_BLOCK_LOCATIONS_DEFAULT = 10; + public static final String MAX_BLOCK_LOCATIONS_KEY = + "mapreduce.job.max.split.locations"; } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/split/JobSplitWriter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/split/JobSplitWriter.java index b6e44d71c4..e6ecac5b01 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/split/JobSplitWriter.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/split/JobSplitWriter.java @@ -34,6 +34,7 @@ import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.JobSubmissionFiles; +import org.apache.hadoop.mapreduce.MRConfig; import org.apache.hadoop.mapreduce.split.JobSplit.SplitMetaInfo; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; @@ -48,6 +49,7 @@ public class JobSplitWriter { private static final int splitVersion = JobSplit.META_SPLIT_VERSION; private static final byte[] SPLIT_FILE_HEADER; + static { try { SPLIT_FILE_HEADER = "SPL".getBytes("UTF-8"); @@ -82,7 +84,7 @@ public static void createSplitFiles(Path jobSubmitDir, throws IOException { FSDataOutputStream out = createFile(fs, JobSubmissionFiles.getJobSplitFile(jobSubmitDir), conf); - SplitMetaInfo[] info = writeOldSplits(splits, out); + SplitMetaInfo[] info = writeOldSplits(splits, out, conf); out.close(); writeJobSplitMetaInfo(fs,JobSubmissionFiles.getJobSplitMetaFile(jobSubmitDir), new FsPermission(JobSubmissionFiles.JOB_FILE_PERMISSION), splitVersion, @@ -114,6 +116,8 @@ SplitMetaInfo[] writeNewSplits(Configuration conf, if (array.length != 0) { SerializationFactory factory = new SerializationFactory(conf); int i = 0; + int maxBlockLocations = conf.getInt(MRConfig.MAX_BLOCK_LOCATIONS_KEY, + MRConfig.MAX_BLOCK_LOCATIONS_DEFAULT); long offset = out.getPos(); for(T split: array) { long prevCount = out.getPos(); @@ -123,9 +127,15 @@ SplitMetaInfo[] writeNewSplits(Configuration conf, serializer.open(out); serializer.serialize(split); long currCount = out.getPos(); + String[] locations = split.getLocations(); + if (locations.length > maxBlockLocations) { + throw new IOException("Max block location exceeded for split: " + + split + " splitsize: " + locations.length + + " maxsize: " + maxBlockLocations); + } info[i++] = new JobSplit.SplitMetaInfo( - split.getLocations(), offset, + locations, offset, split.getLength()); offset += currCount - prevCount; } @@ -135,18 +145,26 @@ SplitMetaInfo[] writeNewSplits(Configuration conf, private static SplitMetaInfo[] writeOldSplits( org.apache.hadoop.mapred.InputSplit[] splits, - FSDataOutputStream out) throws IOException { + FSDataOutputStream out, Configuration conf) throws IOException { SplitMetaInfo[] info = new SplitMetaInfo[splits.length]; if (splits.length != 0) { int i = 0; long offset = out.getPos(); + int maxBlockLocations = conf.getInt(MRConfig.MAX_BLOCK_LOCATIONS_KEY, + MRConfig.MAX_BLOCK_LOCATIONS_DEFAULT); for(org.apache.hadoop.mapred.InputSplit split: splits) { long prevLen = out.getPos(); Text.writeString(out, split.getClass().getName()); split.write(out); long currLen = out.getPos(); + String[] locations = split.getLocations(); + if (locations.length > maxBlockLocations) { + throw new IOException("Max block location exceeded for split: " + + split + " splitsize: " + locations.length + + " maxsize: " + maxBlockLocations); + } info[i++] = new JobSplit.SplitMetaInfo( - split.getLocations(), offset, + locations, offset, split.getLength()); offset += currLen - prevLen; } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/TaskAttemptContextImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/TaskAttemptContextImpl.java index 9b039b0d4d..333f57b426 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/TaskAttemptContextImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/TaskAttemptContextImpl.java @@ -21,6 +21,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapred.Task; import org.apache.hadoop.mapreduce.Counter; import org.apache.hadoop.mapreduce.Counters; import org.apache.hadoop.mapreduce.StatusReporter; @@ -92,8 +93,9 @@ protected void setStatusString(String status) { */ @Override public void setStatus(String status) { - setStatusString(status); - reporter.setStatus(status); + String normalizedStatus = Task.normalizeStatus(status, conf); + setStatusString(normalizedStatus); + reporter.setStatus(normalizedStatus); } public static class DummyReporter extends StatusReporter { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestBlockLimits.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestBlockLimits.java new file mode 100644 index 0000000000..d8b250ad45 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestBlockLimits.java @@ -0,0 +1,176 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.File; +import java.io.IOException; +import java.util.Iterator; + +import junit.framework.TestCase; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.io.WritableUtils; +import org.apache.hadoop.util.Progressable; +import org.apache.hadoop.util.StringUtils; + +/** + * A JUnit test to test limits on block locations + */ +public class TestBlockLimits extends TestCase { + private static String TEST_ROOT_DIR = new File(System.getProperty( + "test.build.data", "/tmp")).toURI().toString().replace(' ', '+'); + + public void testWithLimits() throws IOException, InterruptedException, + ClassNotFoundException { + MiniMRClientCluster mr = null; + try { + mr = MiniMRClientClusterFactory.create(this.getClass(), 2, + new Configuration()); + runCustomFormat(mr); + } finally { + if (mr != null) { + mr.stop(); + } + } + } + + private void runCustomFormat(MiniMRClientCluster mr) throws IOException { + JobConf job = new JobConf(mr.getConfig()); + FileSystem fileSys = FileSystem.get(job); + Path testDir = new Path(TEST_ROOT_DIR + "/test_mini_mr_local"); + Path outDir = new Path(testDir, "out"); + System.out.println("testDir= " + testDir); + fileSys.delete(testDir, true); + job.setInputFormat(MyInputFormat.class); + job.setOutputFormat(MyOutputFormat.class); + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(IntWritable.class); + + job.setMapperClass(MyMapper.class); + job.setReducerClass(MyReducer.class); + job.setNumMapTasks(100); + job.setNumReduceTasks(1); + job.set("non.std.out", outDir.toString()); + try { + JobClient.runJob(job); + assertTrue(false); + } catch (IOException ie) { + System.out.println("Failed job " + StringUtils.stringifyException(ie)); + } finally { + fileSys.delete(testDir, true); + } + + } + + static class MyMapper extends MapReduceBase implements + Mapper { + + public void map(WritableComparable key, Writable value, + OutputCollector out, Reporter reporter) + throws IOException { + } + } + + static class MyReducer extends MapReduceBase implements + Reducer { + public void reduce(WritableComparable key, Iterator values, + OutputCollector output, Reporter reporter) + throws IOException { + } + } + + private static class MyInputFormat implements InputFormat { + + private static class MySplit implements InputSplit { + int first; + int length; + + public MySplit() { + } + + public MySplit(int first, int length) { + this.first = first; + this.length = length; + } + + public String[] getLocations() { + return new String[200]; + } + + public long getLength() { + return length; + } + + public void write(DataOutput out) throws IOException { + WritableUtils.writeVInt(out, first); + WritableUtils.writeVInt(out, length); + } + + public void readFields(DataInput in) throws IOException { + first = WritableUtils.readVInt(in); + length = WritableUtils.readVInt(in); + } + } + + public InputSplit[] getSplits(JobConf job, int numSplits) + throws IOException { + return new MySplit[] { new MySplit(0, 1), new MySplit(1, 3), + new MySplit(4, 2) }; + } + + public RecordReader getRecordReader(InputSplit split, + JobConf job, Reporter reporter) throws IOException { + return null; + } + + } + + static class MyOutputFormat implements OutputFormat { + static class MyRecordWriter implements RecordWriter { + + public MyRecordWriter(Path outputFile, JobConf job) throws IOException { + } + + public void write(Object key, Object value) throws IOException { + return; + } + + public void close(Reporter reporter) throws IOException { + } + } + + public RecordWriter getRecordWriter(FileSystem ignored, JobConf job, + String name, Progressable progress) throws IOException { + return new MyRecordWriter(new Path(job.get("non.std.out")), job); + } + + public void checkOutputSpecs(FileSystem ignored, JobConf job) + throws IOException { + } + } + +} \ No newline at end of file diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestReporter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestReporter.java index 43b1a1d4da..48df092e37 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestReporter.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestReporter.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.mapred; +import java.io.DataOutputStream; import java.io.IOException; import java.util.Iterator; @@ -25,10 +26,15 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.MRConfig; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; + import static org.junit.Assert.*; /** @@ -98,7 +104,28 @@ public void close() throws IOException { progressRange, reporter.getProgress(), 0f); } } - + + static class StatusLimitMapper extends + org.apache.hadoop.mapreduce.Mapper { + + @Override + public void map(LongWritable key, Text value, Context context) + throws IOException { + StringBuilder sb = new StringBuilder(512); + for (int i = 0; i < 1000; i++) { + sb.append("a"); + } + context.setStatus(sb.toString()); + int progressStatusLength = context.getConfiguration().getInt( + MRConfig.PROGRESS_STATUS_LEN_LIMIT_KEY, + MRConfig.PROGRESS_STATUS_LEN_LIMIT_DEFAULT); + + if (context.getStatus().length() > progressStatusLength) { + throw new IOException("Status is not truncated"); + } + } + } + /** * Test {@link Reporter}'s progress for a map-only job. * This will make sure that only the map phase decides the attempt's progress. @@ -166,7 +193,6 @@ public void close() throws IOException { /** * Test {@link Reporter}'s progress for map-reduce job. */ - @SuppressWarnings("deprecation") @Test public void testReporterProgressForMRJob() throws IOException { Path test = new Path(testRootTempDir, "testReporterProgressForMRJob"); @@ -186,4 +212,39 @@ public void testReporterProgressForMRJob() throws IOException { assertTrue("Job failed", job.isSuccessful()); } + + @Test + public void testStatusLimit() throws IOException, InterruptedException, + ClassNotFoundException { + Path test = new Path(testRootTempDir, "testStatusLimit"); + + Configuration conf = new Configuration(); + Path inDir = new Path(test, "in"); + Path outDir = new Path(test, "out"); + FileSystem fs = FileSystem.get(conf); + if (fs.exists(inDir)) { + fs.delete(inDir, true); + } + fs.mkdirs(inDir); + DataOutputStream file = fs.create(new Path(inDir, "part-" + 0)); + file.writeBytes("testStatusLimit"); + file.close(); + + if (fs.exists(outDir)) { + fs.delete(outDir, true); + } + + Job job = Job.getInstance(conf, "testStatusLimit"); + + job.setMapperClass(StatusLimitMapper.class); + job.setNumReduceTasks(0); + + FileInputFormat.addInputPath(job, inDir); + FileOutputFormat.setOutputPath(job, outDir); + + job.waitForCompletion(true); + + assertTrue("Job failed", job.isSuccessful()); + } + } \ No newline at end of file