diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 9f3ddc0ec8..71361695b4 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -142,6 +142,8 @@ Branch-2 ( Unreleased changes ) MAPREDUCE-4157. ResourceManager should not kill apps that are well behaved (Jason Lowe via bobby) + MAPREDUCE-4511. Add IFile readahead (ahmed via tucu) + BUG FIXES MAPREDUCE-4422. YARN_APPLICATION_CLASSPATH needs a documented default value in diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IFile.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IFile.java index 936cfc06c3..a410c97557 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IFile.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IFile.java @@ -340,7 +340,7 @@ public Reader(Configuration conf, FSDataInputStream in, long length, CompressionCodec codec, Counters.Counter readsCounter) throws IOException { readRecordsCounter = readsCounter; - checksumIn = new IFileInputStream(in,length); + checksumIn = new IFileInputStream(in,length, conf); if (codec != null) { decompressor = CodecPool.getDecompressor(codec); if (decompressor != null) { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IFileInputStream.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IFileInputStream.java index 734b33a73f..b171fb0e47 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IFileInputStream.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IFileInputStream.java @@ -19,13 +19,22 @@ package org.apache.hadoop.mapred; import java.io.EOFException; +import java.io.FileDescriptor; +import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; +import org.apache.hadoop.conf.Configuration; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.fs.ChecksumException; +import org.apache.hadoop.fs.HasFileDescriptor; import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.io.ReadaheadPool; +import org.apache.hadoop.io.ReadaheadPool.ReadaheadRequest; +import org.apache.hadoop.mapreduce.MRConfig; import org.apache.hadoop.util.DataChecksum; /** * A checksum input stream, used for IFiles. @@ -35,7 +44,8 @@ @InterfaceStability.Unstable public class IFileInputStream extends InputStream { - private final InputStream in; //The input stream to be verified for checksum. + private final InputStream in; //The input stream to be verified for checksum. + private final FileDescriptor inFd; // the file descriptor, if it is known private final long length; //The total length of the input file private final long dataLength; private DataChecksum sum; @@ -43,7 +53,14 @@ public class IFileInputStream extends InputStream { private final byte b[] = new byte[1]; private byte csum[] = null; private int checksumSize; - + + private ReadaheadRequest curReadahead = null; + private ReadaheadPool raPool = ReadaheadPool.getInstance(); + private boolean readahead; + private int readaheadLength; + + public static final Log LOG = LogFactory.getLog(IFileInputStream.class); + private boolean disableChecksumValidation = false; /** @@ -51,13 +68,36 @@ public class IFileInputStream extends InputStream { * @param in The input stream to be verified for checksum. * @param len The length of the input stream including checksum bytes. */ - public IFileInputStream(InputStream in, long len) { + public IFileInputStream(InputStream in, long len, Configuration conf) { this.in = in; + this.inFd = getFileDescriptorIfAvail(in); sum = DataChecksum.newDataChecksum(DataChecksum.CHECKSUM_CRC32, Integer.MAX_VALUE); checksumSize = sum.getChecksumSize(); length = len; dataLength = length - checksumSize; + + conf = (conf != null) ? conf : new Configuration(); + readahead = conf.getBoolean(MRConfig.MAPRED_IFILE_READAHEAD, + MRConfig.DEFAULT_MAPRED_IFILE_READAHEAD); + readaheadLength = conf.getInt(MRConfig.MAPRED_IFILE_READAHEAD_BYTES, + MRConfig.DEFAULT_MAPRED_IFILE_READAHEAD_BYTES); + + doReadahead(); + } + + private static FileDescriptor getFileDescriptorIfAvail(InputStream in) { + FileDescriptor fd = null; + try { + if (in instanceof HasFileDescriptor) { + fd = ((HasFileDescriptor)in).getFileDescriptor(); + } else if (in instanceof FileInputStream) { + fd = ((FileInputStream)in).getFD(); + } + } catch (IOException e) { + LOG.info("Unable to determine FileDescriptor", e); + } + return fd; } /** @@ -66,6 +106,10 @@ public IFileInputStream(InputStream in, long len) { */ @Override public void close() throws IOException { + + if (curReadahead != null) { + curReadahead.cancel(); + } if (currentOffset < dataLength) { byte[] t = new byte[Math.min((int) (Integer.MAX_VALUE & (dataLength - currentOffset)), 32 * 1024)]; @@ -102,10 +146,21 @@ public int read(byte[] b, int off, int len) throws IOException { if (currentOffset >= dataLength) { return -1; } - + + doReadahead(); + return doRead(b,off,len); } + private void doReadahead() { + if (raPool != null && inFd != null && readahead) { + curReadahead = raPool.readaheadStream( + "ifile", inFd, + currentOffset, readaheadLength, dataLength, + curReadahead); + } + } + /** * Read bytes from the stream. * At EOF, checksum is validated and sent back diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRConfig.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRConfig.java index fb9a1ff6f6..d758e00483 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRConfig.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRConfig.java @@ -84,4 +84,20 @@ public interface MRConfig { "mapreduce.shuffle.ssl.enabled"; public static final boolean SHUFFLE_SSL_ENABLED_DEFAULT = false; -} + + /** + * Configuration key to enable/disable IFile readahead. + */ + public static final String MAPRED_IFILE_READAHEAD = + "mapreduce.ifile.readahead"; + + public static final boolean DEFAULT_MAPRED_IFILE_READAHEAD = true; + + /** + * Configuration key to set the IFile readahead length in bytes. + */ + public static final String MAPRED_IFILE_READAHEAD_BYTES = + "mapreduce.ifile.readahead.bytes"; + + public static final int DEFAULT_MAPRED_IFILE_READAHEAD_BYTES = + 4 * 1024 * 1024;} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java index 6c527ae1ce..27c7a49069 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java @@ -98,6 +98,8 @@ private static enum ShuffleErrors{IO_ERROR, WRONG_LENGTH, BAD_ID, WRONG_MAP, private volatile boolean stopped = false; + private JobConf job; + private static boolean sslShuffle; private static SSLFactory sslFactory; @@ -105,6 +107,7 @@ public Fetcher(JobConf job, TaskAttemptID reduceId, ShuffleScheduler scheduler, MergeManager merger, Reporter reporter, ShuffleClientMetrics metrics, ExceptionReporter exceptionReporter, SecretKey jobTokenSecret) { + this.job = job; this.reporter = reporter; this.scheduler = scheduler; this.merger = merger; @@ -539,7 +542,7 @@ private void shuffleToMemory(MapHost host, MapOutput mapOutput, int decompressedLength, int compressedLength) throws IOException { IFileInputStream checksumIn = - new IFileInputStream(input, compressedLength); + new IFileInputStream(input, compressedLength, job); input = checksumIn; diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml index 5fee954bba..b2b1f061c8 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml @@ -959,6 +959,20 @@ acceptable. + + + mapreduce.ifile.readahead + true + Configuration key to enable/disable IFile readahead. + + + + + mapreduce.ifile.readahead.bytes + 4194304 + Configuration key to set the IFile readahead length in bytes. + + diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestIFileStreams.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestIFileStreams.java index f2a19f6a55..86431e5c13 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestIFileStreams.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestIFileStreams.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.mapred; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.ChecksumException; import org.apache.hadoop.io.DataInputBuffer; import org.apache.hadoop.io.DataOutputBuffer; @@ -35,7 +36,7 @@ public void testIFileStream() throws Exception { ifos.close(); DataInputBuffer dib = new DataInputBuffer(); dib.reset(dob.getData(), DLEN + 4); - IFileInputStream ifis = new IFileInputStream(dib, 104); + IFileInputStream ifis = new IFileInputStream(dib, 104, new Configuration()); for (int i = 0; i < DLEN; ++i) { assertEquals(i, ifis.read()); } @@ -54,7 +55,7 @@ public void testBadIFileStream() throws Exception { final byte[] b = dob.getData(); ++b[17]; dib.reset(b, DLEN + 4); - IFileInputStream ifis = new IFileInputStream(dib, 104); + IFileInputStream ifis = new IFileInputStream(dib, 104, new Configuration()); int i = 0; try { while (i < DLEN) { @@ -83,7 +84,7 @@ public void testBadLength() throws Exception { ifos.close(); DataInputBuffer dib = new DataInputBuffer(); dib.reset(dob.getData(), DLEN + 4); - IFileInputStream ifis = new IFileInputStream(dib, 100); + IFileInputStream ifis = new IFileInputStream(dib, 100, new Configuration()); int i = 0; try { while (i < DLEN - 8) {