HDFS-13539. DFSStripedInputStream NPE when reportCheckSumFailure.
This commit is contained in:
parent
fc5d49c202
commit
960940e0e0
@ -790,13 +790,24 @@ protected synchronized int readWithStrategy(ReaderStrategy strategy)
|
|||||||
// Check if need to report block replicas corruption either read
|
// Check if need to report block replicas corruption either read
|
||||||
// was successful or ChecksumException occurred.
|
// was successful or ChecksumException occurred.
|
||||||
reportCheckSumFailure(corruptedBlocks,
|
reportCheckSumFailure(corruptedBlocks,
|
||||||
currentLocatedBlock.getLocations().length, false);
|
getCurrentBlockLocationsLength(), false);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected int getCurrentBlockLocationsLength() {
|
||||||
|
int len = 0;
|
||||||
|
if (currentLocatedBlock == null) {
|
||||||
|
DFSClient.LOG.info("Found null currentLocatedBlock. pos={}, "
|
||||||
|
+ "blockEnd={}, fileLength={}", pos, blockEnd, getFileLength());
|
||||||
|
} else {
|
||||||
|
len = currentLocatedBlock.getLocations().length;
|
||||||
|
}
|
||||||
|
return len;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Read the entire buffer.
|
* Read the entire buffer.
|
||||||
*/
|
*/
|
||||||
|
@ -17,6 +17,7 @@
|
|||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hdfs;
|
package org.apache.hadoop.hdfs;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.fs.ReadOption;
|
import org.apache.hadoop.fs.ReadOption;
|
||||||
import org.apache.hadoop.hdfs.protocol.BlockType;
|
import org.apache.hadoop.hdfs.protocol.BlockType;
|
||||||
@ -160,7 +161,8 @@ protected ThreadPoolExecutor getStripedReadsThreadPool(){
|
|||||||
* When seeking into a new block group, create blockReader for each internal
|
* When seeking into a new block group, create blockReader for each internal
|
||||||
* block in the group.
|
* block in the group.
|
||||||
*/
|
*/
|
||||||
private synchronized void blockSeekTo(long target) throws IOException {
|
@VisibleForTesting
|
||||||
|
synchronized void blockSeekTo(long target) throws IOException {
|
||||||
if (target >= getFileLength()) {
|
if (target >= getFileLength()) {
|
||||||
throw new IOException("Attempted to read past end of file");
|
throw new IOException("Attempted to read past end of file");
|
||||||
}
|
}
|
||||||
@ -400,8 +402,8 @@ protected synchronized int readWithStrategy(ReaderStrategy strategy)
|
|||||||
} finally {
|
} finally {
|
||||||
// Check if need to report block replicas corruption either read
|
// Check if need to report block replicas corruption either read
|
||||||
// was successful or ChecksumException occurred.
|
// was successful or ChecksumException occurred.
|
||||||
reportCheckSumFailure(corruptedBlocks,
|
reportCheckSumFailure(corruptedBlocks, getCurrentBlockLocationsLength(),
|
||||||
currentLocatedBlock.getLocations().length, true);
|
true);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -35,6 +35,7 @@
|
|||||||
import org.apache.hadoop.io.erasurecode.ErasureCoderOptions;
|
import org.apache.hadoop.io.erasurecode.ErasureCoderOptions;
|
||||||
import org.apache.hadoop.io.erasurecode.rawcoder.NativeRSRawErasureCoderFactory;
|
import org.apache.hadoop.io.erasurecode.rawcoder.NativeRSRawErasureCoderFactory;
|
||||||
import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder;
|
import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder;
|
||||||
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
@ -51,7 +52,12 @@
|
|||||||
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY;
|
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY;
|
||||||
import static org.junit.Assert.assertArrayEquals;
|
import static org.junit.Assert.assertArrayEquals;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertNull;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
import static org.junit.Assert.fail;
|
||||||
|
import static org.mockito.Matchers.anyLong;
|
||||||
|
import static org.mockito.Mockito.doThrow;
|
||||||
|
import static org.mockito.Mockito.spy;
|
||||||
|
|
||||||
public class TestDFSStripedInputStream {
|
public class TestDFSStripedInputStream {
|
||||||
|
|
||||||
@ -504,4 +510,23 @@ public void testIdempotentClose() throws Exception {
|
|||||||
in.close();
|
in.close();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testReadFailToGetCurrentBlock() throws Exception {
|
||||||
|
DFSTestUtil.writeFile(cluster.getFileSystem(), filePath, "test");
|
||||||
|
try (DFSStripedInputStream in = (DFSStripedInputStream) fs.getClient()
|
||||||
|
.open(filePath.toString())) {
|
||||||
|
final DFSStripedInputStream spy = spy(in);
|
||||||
|
final String msg = "Injected exception for testReadNPE";
|
||||||
|
doThrow(new IOException(msg)).when(spy).blockSeekTo(anyLong());
|
||||||
|
assertNull(in.getCurrentBlock());
|
||||||
|
try {
|
||||||
|
spy.read();
|
||||||
|
fail("read should have failed");
|
||||||
|
} catch (IOException expected) {
|
||||||
|
LOG.info("Exception caught", expected);
|
||||||
|
GenericTestUtils.assertExceptionContains(msg, expected);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user