MAPREDUCE-3993. Graceful handling of codec errors during decompression (kkambatl via tucu)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1359345 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
b6a9d38c09
commit
820be7cbef
@ -153,6 +153,28 @@ public static void copyBytes(InputStream in, OutputStream out, long count,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Utility wrapper for reading from {@link InputStream}. It catches any errors
|
||||||
|
* thrown by the underlying stream (either IO or decompression-related), and
|
||||||
|
* re-throws as an IOException.
|
||||||
|
*
|
||||||
|
* @param is - InputStream to be read from
|
||||||
|
* @param buf - buffer the data is read into
|
||||||
|
* @param off - offset within buf
|
||||||
|
* @param len - amount of data to be read
|
||||||
|
* @return number of bytes read
|
||||||
|
*/
|
||||||
|
public static int wrappedReadForCompressedData(InputStream is, byte[] buf,
|
||||||
|
int off, int len) throws IOException {
|
||||||
|
try {
|
||||||
|
return is.read(buf, off, len);
|
||||||
|
} catch (IOException ie) {
|
||||||
|
throw ie;
|
||||||
|
} catch (Throwable t) {
|
||||||
|
throw new IOException("Error while reading compressed data", t);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Reads len bytes in a loop.
|
* Reads len bytes in a loop.
|
||||||
*
|
*
|
||||||
|
@ -29,6 +29,7 @@
|
|||||||
import java.nio.ByteBuffer;
|
import java.nio.ByteBuffer;
|
||||||
import java.nio.channels.FileChannel;
|
import java.nio.channels.FileChannel;
|
||||||
|
|
||||||
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.mockito.Mockito;
|
import org.mockito.Mockito;
|
||||||
|
|
||||||
@ -152,4 +153,26 @@ public void testWriteFully() throws IOException {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testWrappedReadForCompressedData() throws IOException {
|
||||||
|
byte[] buf = new byte[2];
|
||||||
|
InputStream mockStream = Mockito.mock(InputStream.class);
|
||||||
|
Mockito.when(mockStream.read(buf, 0, 1)).thenReturn(1);
|
||||||
|
Mockito.when(mockStream.read(buf, 0, 2)).thenThrow(
|
||||||
|
new java.lang.InternalError());
|
||||||
|
|
||||||
|
try {
|
||||||
|
assertEquals("Check expected value", 1,
|
||||||
|
IOUtils.wrappedReadForCompressedData(mockStream, buf, 0, 1));
|
||||||
|
} catch (IOException ioe) {
|
||||||
|
fail("Unexpected error while reading");
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
IOUtils.wrappedReadForCompressedData(mockStream, buf, 0, 2);
|
||||||
|
} catch (IOException ioe) {
|
||||||
|
GenericTestUtils.assertExceptionContains(
|
||||||
|
"Error while reading compressed data", ioe);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -218,6 +218,9 @@ Branch-2 ( Unreleased changes )
|
|||||||
|
|
||||||
MAPREDUCE-2739. Update installation docs (remove YarnClientFactory) (bowang via tucu)
|
MAPREDUCE-2739. Update installation docs (remove YarnClientFactory) (bowang via tucu)
|
||||||
|
|
||||||
|
MAPREDUCE-3993. Graceful handling of codec errors during decompression
|
||||||
|
(kkambatl via tucu)
|
||||||
|
|
||||||
Release 2.0.0-alpha - 05-23-2012
|
Release 2.0.0-alpha - 05-23-2012
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
@ -33,6 +33,7 @@
|
|||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.io.DataInputBuffer;
|
import org.apache.hadoop.io.DataInputBuffer;
|
||||||
import org.apache.hadoop.io.DataOutputBuffer;
|
import org.apache.hadoop.io.DataOutputBuffer;
|
||||||
|
import org.apache.hadoop.io.IOUtils;
|
||||||
import org.apache.hadoop.io.WritableUtils;
|
import org.apache.hadoop.io.WritableUtils;
|
||||||
import org.apache.hadoop.io.compress.CodecPool;
|
import org.apache.hadoop.io.compress.CodecPool;
|
||||||
import org.apache.hadoop.io.compress.CompressionCodec;
|
import org.apache.hadoop.io.compress.CompressionCodec;
|
||||||
@ -379,7 +380,8 @@ public long getPosition() throws IOException {
|
|||||||
private int readData(byte[] buf, int off, int len) throws IOException {
|
private int readData(byte[] buf, int off, int len) throws IOException {
|
||||||
int bytesRead = 0;
|
int bytesRead = 0;
|
||||||
while (bytesRead < len) {
|
while (bytesRead < len) {
|
||||||
int n = in.read(buf, off+bytesRead, len-bytesRead);
|
int n = IOUtils.wrappedReadForCompressedData(in, buf, off + bytesRead,
|
||||||
|
len - bytesRead);
|
||||||
if (n < 0) {
|
if (n < 0) {
|
||||||
return bytesRead;
|
return bytesRead;
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user