diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index f75b1aad4a..dac3b5b9ac 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -334,6 +334,9 @@ Release 2.8.0 - UNRELEASED MAPREDUCE-5248. Let NNBenchWithoutMR specify the replication factor for its test (Erik Paulson via jlowe) + MAPREDUCE-6174. Combine common stream code into parent class for + InMemoryMapOutput and OnDiskMapOutput. (Eric Payne via gera) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/IFileWrappedMapOutput.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/IFileWrappedMapOutput.java new file mode 100644 index 0000000000..119db15587 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/IFileWrappedMapOutput.java @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapreduce.task.reduce; + +import java.io.IOException; +import java.io.InputStream; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapred.IFileInputStream; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapreduce.TaskAttemptID; + +/** + * Common code for allowing MapOutput classes to handle streams. + * + * @param key type for map output + * @param value type for map output + */ +public abstract class IFileWrappedMapOutput extends MapOutput { + private final Configuration conf; + private final MergeManagerImpl merger; + + public IFileWrappedMapOutput( + Configuration c, MergeManagerImpl m, TaskAttemptID mapId, + long size, boolean primaryMapOutput) { + super(mapId, size, primaryMapOutput); + conf = c; + merger = m; + } + + /** + * @return the merger + */ + protected MergeManagerImpl getMerger() { + return merger; + } + + protected abstract void doShuffle( + MapHost host, IFileInputStream iFileInputStream, + long compressedLength, long decompressedLength, + ShuffleClientMetrics metrics, Reporter reporter) throws IOException; + + @Override + public void shuffle(MapHost host, InputStream input, + long compressedLength, long decompressedLength, + ShuffleClientMetrics metrics, + Reporter reporter) throws IOException { + IFileInputStream iFin = + new IFileInputStream(input, compressedLength, conf); + try { + this.doShuffle(host, iFin, compressedLength, + decompressedLength, metrics, reporter); + } finally { + iFin.close(); + } + } +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/InMemoryMapOutput.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/InMemoryMapOutput.java index 24fb3bbaca..9b61ad5e3b 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/InMemoryMapOutput.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/InMemoryMapOutput.java @@ -42,10 +42,8 @@ @InterfaceAudience.Private @InterfaceStability.Unstable -class InMemoryMapOutput extends MapOutput { +class InMemoryMapOutput extends IFileWrappedMapOutput { private static final Log LOG = LogFactory.getLog(InMemoryMapOutput.class); - private Configuration conf; - private final MergeManagerImpl merger; private final byte[] memory; private BoundedByteArrayOutputStream byteStream; // Decompression of map-outputs @@ -56,9 +54,7 @@ public InMemoryMapOutput(Configuration conf, TaskAttemptID mapId, MergeManagerImpl merger, int size, CompressionCodec codec, boolean primaryMapOutput) { - super(mapId, (long)size, primaryMapOutput); - this.conf = conf; - this.merger = merger; + super(conf, merger, mapId, (long)size, primaryMapOutput); this.codec = codec; byteStream = new BoundedByteArrayOutputStream(size); memory = byteStream.getBuffer(); @@ -78,15 +74,12 @@ public BoundedByteArrayOutputStream getArrayStream() { } @Override - public void shuffle(MapHost host, InputStream input, + protected void doShuffle(MapHost host, IFileInputStream iFin, long compressedLength, long decompressedLength, ShuffleClientMetrics metrics, Reporter reporter) throws IOException { - IFileInputStream checksumIn = - new IFileInputStream(input, compressedLength, conf); + InputStream input = iFin; - input = checksumIn; - // Are map-outputs compressed? if (codec != null) { decompressor.reset(); @@ -111,13 +104,6 @@ public void shuffle(MapHost host, InputStream input, throw new IOException("Unexpected extra bytes from input stream for " + getMapId()); } - - } catch (IOException ioe) { - // Close the streams - IOUtils.cleanup(LOG, input); - - // Re-throw - throw ioe; } finally { CodecPool.returnDecompressor(decompressor); } @@ -125,12 +111,12 @@ public void shuffle(MapHost host, InputStream input, @Override public void commit() throws IOException { - merger.closeInMemoryFile(this); + getMerger().closeInMemoryFile(this); } @Override public void abort() { - merger.unreserve(memory.length); + getMerger().unreserve(memory.length); } @Override diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java index f7887070b9..c99a330478 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java @@ -263,8 +263,9 @@ public synchronized MapOutput reserve(TaskAttemptID mapId, LOG.info(mapId + ": Shuffling to disk since " + requestedSize + " is greater than maxSingleShuffleLimit (" + maxSingleShuffleLimit + ")"); - return new OnDiskMapOutput(mapId, reduceId, this, requestedSize, - jobConf, mapOutputFile, fetcher, true); + return new OnDiskMapOutput(mapId, this, requestedSize, jobConf, + fetcher, true, FileSystem.getLocal(jobConf).getRaw(), + mapOutputFile.getInputFileForWrite(mapId.getTaskID(), requestedSize)); } // Stall shuffle if we are above the memory limit diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/OnDiskMapOutput.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/OnDiskMapOutput.java index 8275fd0eba..f22169d282 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/OnDiskMapOutput.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/OnDiskMapOutput.java @@ -18,13 +18,11 @@ package org.apache.hadoop.mapreduce.task.reduce; import java.io.IOException; -import java.io.InputStream; import java.io.OutputStream; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.conf.Configuration; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -46,41 +44,46 @@ @InterfaceAudience.Private @InterfaceStability.Unstable -class OnDiskMapOutput extends MapOutput { +class OnDiskMapOutput extends IFileWrappedMapOutput { private static final Log LOG = LogFactory.getLog(OnDiskMapOutput.class); private final FileSystem fs; private final Path tmpOutputPath; private final Path outputPath; - private final MergeManagerImpl merger; private final OutputStream disk; private long compressedSize; - private final Configuration conf; + @Deprecated public OnDiskMapOutput(TaskAttemptID mapId, TaskAttemptID reduceId, MergeManagerImpl merger, long size, JobConf conf, MapOutputFile mapOutputFile, int fetcher, boolean primaryMapOutput) throws IOException { - this(mapId, reduceId, merger, size, conf, mapOutputFile, fetcher, + this(mapId, merger, size, conf, fetcher, primaryMapOutput, FileSystem.getLocal(conf).getRaw(), mapOutputFile.getInputFileForWrite(mapId.getTaskID(), size)); } - @VisibleForTesting + @Deprecated OnDiskMapOutput(TaskAttemptID mapId, TaskAttemptID reduceId, MergeManagerImpl merger, long size, JobConf conf, MapOutputFile mapOutputFile, int fetcher, boolean primaryMapOutput, FileSystem fs, Path outputPath) throws IOException { - super(mapId, size, primaryMapOutput); + this(mapId, merger, size, conf, fetcher, primaryMapOutput, fs, outputPath); + } + + OnDiskMapOutput(TaskAttemptID mapId, + MergeManagerImpl merger, long size, + JobConf conf, + int fetcher, boolean primaryMapOutput, + FileSystem fs, Path outputPath) throws IOException { + super(conf, merger, mapId, size, primaryMapOutput); this.fs = fs; - this.merger = merger; this.outputPath = outputPath; tmpOutputPath = getTempPath(outputPath, fetcher); disk = CryptoUtils.wrapIfNecessary(conf, fs.create(tmpOutputPath)); - this.conf = conf; } @VisibleForTesting @@ -89,18 +92,18 @@ static Path getTempPath(Path outPath, int fetcher) { } @Override - public void shuffle(MapHost host, InputStream input, + protected void doShuffle(MapHost host, IFileInputStream input, long compressedLength, long decompressedLength, ShuffleClientMetrics metrics, Reporter reporter) throws IOException { - input = new IFileInputStream(input, compressedLength, conf); // Copy data to local-disk long bytesLeft = compressedLength; try { final int BYTES_TO_READ = 64 * 1024; byte[] buf = new byte[BYTES_TO_READ]; while (bytesLeft > 0) { - int n = ((IFileInputStream)input).readWithChecksum(buf, 0, (int) Math.min(bytesLeft, BYTES_TO_READ)); + int n = input.readWithChecksum(buf, 0, + (int) Math.min(bytesLeft, BYTES_TO_READ)); if (n < 0) { throw new IOException("read past end of stream reading " + getMapId()); @@ -117,7 +120,7 @@ public void shuffle(MapHost host, InputStream input, disk.close(); } catch (IOException ioe) { // Close the streams - IOUtils.cleanup(LOG, input, disk); + IOUtils.cleanup(LOG, disk); // Re-throw throw ioe; @@ -139,7 +142,7 @@ public void commit() throws IOException { fs.rename(tmpOutputPath, outputPath); CompressAwarePath compressAwarePath = new CompressAwarePath(outputPath, getSize(), this.compressedSize); - merger.closeOnDiskFile(compressAwarePath); + getMerger().closeOnDiskFile(compressAwarePath); } @Override diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java index a9cd33ea95..78880073d4 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java @@ -19,9 +19,7 @@ package org.apache.hadoop.mapreduce.task.reduce; import java.io.FilterInputStream; - import java.lang.Void; - import java.net.HttpURLConnection; import org.apache.hadoop.fs.ChecksumException; @@ -30,13 +28,12 @@ import org.apache.hadoop.mapred.MapOutputFile; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.TaskID; - import org.junit.After; import org.junit.Before; import org.junit.Rule; import org.junit.rules.TestName; -import static org.junit.Assert.*; +import static org.junit.Assert.*; import static org.mockito.Matchers.*; import static org.mockito.Mockito.*; @@ -65,10 +62,11 @@ import org.apache.hadoop.util.DiskChecker.DiskErrorException; import org.apache.hadoop.util.Time; import org.junit.Test; - import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; +import com.nimbusds.jose.util.StringUtils; + /** * Test that the Fetcher does what we expect it to. */ @@ -453,9 +451,9 @@ public void testCopyFromHostExtraBytes() throws Exception { ByteArrayInputStream in = new ByteArrayInputStream(bout.toByteArray()); when(connection.getInputStream()).thenReturn(in); // 8 < 10 therefore there appear to be extra bytes in the IFileInputStream - InMemoryMapOutput mapOut = new InMemoryMapOutput( + IFileWrappedMapOutput mapOut = new InMemoryMapOutput( job, map1ID, mm, 8, null, true ); - InMemoryMapOutput mapOut2 = new InMemoryMapOutput( + IFileWrappedMapOutput mapOut2 = new InMemoryMapOutput( job, map2ID, mm, 10, null, true ); when(mm.reserve(eq(map1ID), anyLong(), anyInt())).thenReturn(mapOut); @@ -478,9 +476,9 @@ public void testCorruptedIFile() throws Exception { Path shuffledToDisk = OnDiskMapOutput.getTempPath(onDiskMapOutputPath, fetcher); fs = FileSystem.getLocal(job).getRaw(); - MapOutputFile mof = mock(MapOutputFile.class); - OnDiskMapOutput odmo = new OnDiskMapOutput(map1ID, - id, mm, 100L, job, mof, fetcher, true, fs, onDiskMapOutputPath); + IFileWrappedMapOutput odmo = + new OnDiskMapOutput(map1ID, mm, 100L, job, fetcher, true, + fs, onDiskMapOutputPath); String mapData = "MAPDATA12345678901234567890"; @@ -538,7 +536,7 @@ public void testCorruptedIFile() throws Exception { @Test(timeout=10000) public void testInterruptInMemory() throws Exception { final int FETCHER = 2; - InMemoryMapOutput immo = spy(new InMemoryMapOutput( + IFileWrappedMapOutput immo = spy(new InMemoryMapOutput( job, id, mm, 100, null, true)); when(mm.reserve(any(TaskAttemptID.class), anyLong(), anyInt())) .thenReturn(immo); @@ -584,10 +582,9 @@ public void testInterruptOnDisk() throws Exception { Path p = new Path("file:///tmp/foo"); Path pTmp = OnDiskMapOutput.getTempPath(p, FETCHER); FileSystem mFs = mock(FileSystem.class, RETURNS_DEEP_STUBS); - MapOutputFile mof = mock(MapOutputFile.class); - when(mof.getInputFileForWrite(any(TaskID.class), anyLong())).thenReturn(p); - OnDiskMapOutput odmo = spy(new OnDiskMapOutput(map1ID, - id, mm, 100L, job, mof, FETCHER, true, mFs, p)); + IFileWrappedMapOutput odmo = + spy(new OnDiskMapOutput(map1ID, mm, 100L, job, + FETCHER, true, mFs, p)); when(mm.reserve(any(TaskAttemptID.class), anyLong(), anyInt())) .thenReturn(odmo); doNothing().when(mm).waitForResource();