From 960940e0e08f7839775f2d8a352b444d104d36b4 Mon Sep 17 00:00:00 2001 From: Xiao Chen Date: Mon, 14 May 2018 09:28:09 -0700 Subject: [PATCH] HDFS-13539. DFSStripedInputStream NPE when reportCheckSumFailure. --- .../apache/hadoop/hdfs/DFSInputStream.java | 13 +++++++++- .../hadoop/hdfs/DFSStripedInputStream.java | 8 +++--- .../hdfs/TestDFSStripedInputStream.java | 25 +++++++++++++++++++ 3 files changed, 42 insertions(+), 4 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java index d3d6669fca..b38e629903 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java @@ -790,13 +790,24 @@ protected synchronized int readWithStrategy(ReaderStrategy strategy) // Check if need to report block replicas corruption either read // was successful or ChecksumException occurred. reportCheckSumFailure(corruptedBlocks, - currentLocatedBlock.getLocations().length, false); + getCurrentBlockLocationsLength(), false); } } } return -1; } + protected int getCurrentBlockLocationsLength() { + int len = 0; + if (currentLocatedBlock == null) { + DFSClient.LOG.info("Found null currentLocatedBlock. pos={}, " + + "blockEnd={}, fileLength={}", pos, blockEnd, getFileLength()); + } else { + len = currentLocatedBlock.getLocations().length; + } + return len; + } + /** * Read the entire buffer. */ diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java index 339a02c84b..f3b16e0981 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hdfs; +import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.fs.ReadOption; import org.apache.hadoop.hdfs.protocol.BlockType; @@ -160,7 +161,8 @@ protected ThreadPoolExecutor getStripedReadsThreadPool(){ * When seeking into a new block group, create blockReader for each internal * block in the group. */ - private synchronized void blockSeekTo(long target) throws IOException { + @VisibleForTesting + synchronized void blockSeekTo(long target) throws IOException { if (target >= getFileLength()) { throw new IOException("Attempted to read past end of file"); } @@ -400,8 +402,8 @@ protected synchronized int readWithStrategy(ReaderStrategy strategy) } finally { // Check if need to report block replicas corruption either read // was successful or ChecksumException occurred. - reportCheckSumFailure(corruptedBlocks, - currentLocatedBlock.getLocations().length, true); + reportCheckSumFailure(corruptedBlocks, getCurrentBlockLocationsLength(), + true); } } return -1; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java index de276a9e69..cdebee0dc8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java @@ -35,6 +35,7 @@ import org.apache.hadoop.io.erasurecode.ErasureCoderOptions; import org.apache.hadoop.io.erasurecode.rawcoder.NativeRSRawErasureCoderFactory; import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder; +import org.apache.hadoop.test.GenericTestUtils; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -51,7 +52,12 @@ import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.spy; public class TestDFSStripedInputStream { @@ -504,4 +510,23 @@ public void testIdempotentClose() throws Exception { in.close(); } } + + @Test + public void testReadFailToGetCurrentBlock() throws Exception { + DFSTestUtil.writeFile(cluster.getFileSystem(), filePath, "test"); + try (DFSStripedInputStream in = (DFSStripedInputStream) fs.getClient() + .open(filePath.toString())) { + final DFSStripedInputStream spy = spy(in); + final String msg = "Injected exception for testReadNPE"; + doThrow(new IOException(msg)).when(spy).blockSeekTo(anyLong()); + assertNull(in.getCurrentBlock()); + try { + spy.read(); + fail("read should have failed"); + } catch (IOException expected) { + LOG.info("Exception caught", expected); + GenericTestUtils.assertExceptionContains(msg, expected); + } + } + } }