diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java index caf8aad32e..8150b6fb0e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java @@ -23,6 +23,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException; /** * Used for injecting faults in DFSClient and DFSOutputStream tests. @@ -69,4 +70,5 @@ public void delayWhenRenewLeaseTimeout() {} public void onCreateBlockReader(LocatedBlock block, int chunkIndex, long offset, long length) {} + public void failCreateBlockReader() throws InvalidBlockTokenException {} } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java index 1ec55e4f82..b54694fab1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java @@ -519,6 +519,14 @@ private LocatedBlock fetchBlockAt(long offset, long length, boolean useCache) // Update the LastLocatedBlock, if offset is for last block. if (offset >= locatedBlocks.getFileLength()) { setLocatedBlocksFields(newBlocks, getLastBlockLength(newBlocks)); + // After updating the locatedBlock, the block to which the offset belongs + // should be researched like {@link DFSInputStream#getBlockAt(long)}. + if (offset >= locatedBlocks.getFileLength()) { + return locatedBlocks.getLastLocatedBlock(); + } else { + targetBlockIdx = locatedBlocks.findBlock(offset); + assert targetBlockIdx >= 0 && targetBlockIdx < locatedBlocks.locatedBlockCount(); + } } else { locatedBlocks.insertRange(targetBlockIdx, newBlocks.getLocatedBlocks()); @@ -641,6 +649,7 @@ private synchronized DatanodeInfo blockSeekTo(long target) targetBlock = retval.block; try { + DFSClientFaultInjector.get().failCreateBlockReader(); blockReader = getBlockReader(targetBlock, offsetIntoBlock, targetBlock.getBlockSize() - offsetIntoBlock, targetAddr, storageType, chosenNode); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInputStream.java index 2f9e0d319c..b2b18237a7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInputStream.java @@ -31,6 +31,8 @@ import java.util.ArrayList; import java.util.List; import java.util.Random; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; @@ -41,14 +43,21 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfoWithStorage; import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; +import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.net.unix.DomainSocket; import org.apache.hadoop.net.unix.TemporarySocketDirectory; import org.apache.hadoop.hdfs.client.impl.DfsClientConf; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.Retry; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.log4j.Level; import org.junit.Assume; import org.junit.Test; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; public class TestDFSInputStream { private void testSkipInner(MiniDFSCluster cluster) throws IOException { @@ -287,4 +296,67 @@ public void testReadWithoutPreferredCachingReplica() throws IOException { cluster.shutdown(); } } + + @Test + public void testCreateBlockReaderWhenInvalidBlockTokenException() throws + IOException, InterruptedException, TimeoutException { + GenericTestUtils.setLogLevel(DFSClient.LOG, Level.DEBUG); + Configuration conf = new HdfsConfiguration(); + conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 64 * 1024); + conf.setInt(HdfsClientConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY, 516); + DFSClientFaultInjector oldFaultInjector = DFSClientFaultInjector.get(); + FSDataOutputStream out = null; + try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build()) { + cluster.waitActive(); + DistributedFileSystem fs = cluster.getFileSystem(); + + // Create file which only contains one UC block. + String file = "/testfile"; + Path path = new Path(file); + out = fs.create(path, (short) 3); + int bufferLen = 5120; + byte[] toWrite = new byte[bufferLen]; + Random rb = new Random(0); + rb.nextBytes(toWrite); + out.write(toWrite, 0, bufferLen); + + // Wait for the block length of the file to be 1. + GenericTestUtils.waitFor(() -> { + try { + return fs.getFileBlockLocations(path, 0, bufferLen).length == 1; + } catch (IOException e) { + return false; + } + }, 100, 10000); + + // Set up the InjectionHandler. + DFSClientFaultInjector.set(Mockito.mock(DFSClientFaultInjector.class)); + DFSClientFaultInjector injector = DFSClientFaultInjector.get(); + final AtomicInteger count = new AtomicInteger(0); + Mockito.doAnswer(new Answer() { + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + // Mock access token was invalid when connecting to first datanode + // throw InvalidBlockTokenException. + if (count.getAndIncrement() == 0) { + throw new InvalidBlockTokenException("Mock InvalidBlockTokenException"); + } + return null; + } + }).when(injector).failCreateBlockReader(); + + try (DFSInputStream in = new DFSInputStream(fs.getClient(), file, + false, null)) { + int bufLen = 1024; + byte[] buf = new byte[bufLen]; + // Seek the offset to 1024 and which should be in the range (0, fileSize). + in.seek(1024); + int read = in.read(buf, 0, bufLen); + assertEquals(1024, read); + } + } finally { + DFSClientFaultInjector.set(oldFaultInjector); + IOUtils.closeStream(out); + } + } }