diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/IOUtils.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/IOUtils.java index 6969d19043..4f30483cd9 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/IOUtils.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/IOUtils.java @@ -153,6 +153,28 @@ public static void copyBytes(InputStream in, OutputStream out, long count, } } + /** + * Utility wrapper for reading from {@link InputStream}. It catches any errors + * thrown by the underlying stream (either IO or decompression-related), and + * re-throws as an IOException. + * + * @param is - InputStream to be read from + * @param buf - buffer the data is read into + * @param off - offset within buf + * @param len - amount of data to be read + * @return number of bytes read + */ + public static int wrappedReadForCompressedData(InputStream is, byte[] buf, + int off, int len) throws IOException { + try { + return is.read(buf, off, len); + } catch (IOException ie) { + throw ie; + } catch (Throwable t) { + throw new IOException("Error while reading compressed data", t); + } + } + /** * Reads len bytes in a loop. * diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestIOUtils.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestIOUtils.java index 60c0703abc..6b7ffdfcf2 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestIOUtils.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestIOUtils.java @@ -29,6 +29,7 @@ import java.nio.ByteBuffer; import java.nio.channels.FileChannel; +import org.apache.hadoop.test.GenericTestUtils; import org.junit.Test; import org.mockito.Mockito; @@ -152,4 +153,26 @@ public void testWriteFully() throws IOException { } } } + + @Test + public void testWrappedReadForCompressedData() throws IOException { + byte[] buf = new byte[2]; + InputStream mockStream = Mockito.mock(InputStream.class); + Mockito.when(mockStream.read(buf, 0, 1)).thenReturn(1); + Mockito.when(mockStream.read(buf, 0, 2)).thenThrow( + new java.lang.InternalError()); + + try { + assertEquals("Check expected value", 1, + IOUtils.wrappedReadForCompressedData(mockStream, buf, 0, 1)); + } catch (IOException ioe) { + fail("Unexpected error while reading"); + } + try { + IOUtils.wrappedReadForCompressedData(mockStream, buf, 0, 2); + } catch (IOException ioe) { + GenericTestUtils.assertExceptionContains( + "Error while reading compressed data", ioe); + } + } } diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 04073d3bb2..4bdb1345c6 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -218,6 +218,9 @@ Branch-2 ( Unreleased changes ) MAPREDUCE-2739. Update installation docs (remove YarnClientFactory) (bowang via tucu) + MAPREDUCE-3993. Graceful handling of codec errors during decompression + (kkambatl via tucu) + Release 2.0.0-alpha - 05-23-2012 INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IFile.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IFile.java index 57e74df819..936cfc06c3 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IFile.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/IFile.java @@ -33,6 +33,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DataInputBuffer; import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.WritableUtils; import org.apache.hadoop.io.compress.CodecPool; import org.apache.hadoop.io.compress.CompressionCodec; @@ -379,7 +380,8 @@ public long getPosition() throws IOException { private int readData(byte[] buf, int off, int len) throws IOException { int bytesRead = 0; while (bytesRead < len) { - int n = in.read(buf, off+bytesRead, len-bytesRead); + int n = IOUtils.wrappedReadForCompressedData(in, buf, off + bytesRead, + len - bytesRead); if (n < 0) { return bytesRead; }