diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java index d388d00502..77f5a92651 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java @@ -559,6 +559,8 @@ private synchronized DatanodeInfo blockSeekTo(long target) chosenNode = retval.info; InetSocketAddress targetAddr = retval.addr; StorageType storageType = retval.storageType; + // Latest block if refreshed by chooseDatanode() + targetBlock = retval.block; try { blockReader = getBlockReader(targetBlock, offsetIntoBlock, @@ -915,7 +917,7 @@ protected DNAddrPair getBestNodeDNAddrPair(LocatedBlock block, chosenNode.getXferAddr(dfsClient.getConf().isConnectToDnViaHostname()); DFSClient.LOG.debug("Connecting to datanode {}", dnAddr); InetSocketAddress targetAddr = NetUtils.createSocketAddr(dnAddr); - return new DNAddrPair(chosenNode, targetAddr, storageType); + return new DNAddrPair(chosenNode, targetAddr, storageType, block); } /** @@ -957,12 +959,13 @@ private static String getBestNodeDNAddrPairErrorString( protected void fetchBlockByteRange(LocatedBlock block, long start, long end, ByteBuffer buf, CorruptedBlocks corruptedBlocks) throws IOException { - block = refreshLocatedBlock(block); while (true) { DNAddrPair addressPair = chooseDataNode(block, null); + // Latest block, if refreshed internally + block = addressPair.block; try { - actualGetFromOneDataNode(addressPair, block, start, end, - buf, corruptedBlocks); + actualGetFromOneDataNode(addressPair, start, end, buf, + corruptedBlocks); return; } catch (IOException e) { checkInterrupted(e); // check if the read has been interrupted @@ -983,8 +986,7 @@ private Callable getFromOneDataNode(final DNAddrPair datanode, public ByteBuffer call() throws Exception { try (TraceScope ignored = dfsClient.getTracer(). newScope("hedgedRead" + hedgedReadId, parentSpanId)) { - actualGetFromOneDataNode(datanode, block, start, end, bb, - corruptedBlocks); + actualGetFromOneDataNode(datanode, start, end, bb, corruptedBlocks); return bb; } } @@ -995,27 +997,21 @@ public ByteBuffer call() throws Exception { * Read data from one DataNode. * * @param datanode the datanode from which to read data - * @param block the located block containing the requested data * @param startInBlk the startInBlk offset of the block * @param endInBlk the endInBlk offset of the block * @param buf the given byte buffer into which the data is read * @param corruptedBlocks map recording list of datanodes with corrupted * block replica */ - void actualGetFromOneDataNode(final DNAddrPair datanode, LocatedBlock block, - final long startInBlk, final long endInBlk, ByteBuffer buf, - CorruptedBlocks corruptedBlocks) + void actualGetFromOneDataNode(final DNAddrPair datanode, final long startInBlk, + final long endInBlk, ByteBuffer buf, CorruptedBlocks corruptedBlocks) throws IOException { DFSClientFaultInjector.get().startFetchFromDatanode(); int refetchToken = 1; // only need to get a new access token once int refetchEncryptionKey = 1; // only need to get a new encryption key once final int len = (int) (endInBlk - startInBlk + 1); - + LocatedBlock block = datanode.block; while (true) { - // cached block locations may have been updated by chooseDataNode() - // or fetchBlockAt(). Always get the latest list of locations at the - // start of the loop. - block = refreshLocatedBlock(block); BlockReader reader = null; try { DFSClientFaultInjector.get().fetchFromDatanodeException(); @@ -1078,6 +1074,9 @@ void actualGetFromOneDataNode(final DNAddrPair datanode, LocatedBlock block, addToDeadNodes(datanode.info); throw new IOException(msg); } + // Refresh the block for updated tokens in case of token failures or + // encryption key failures. + block = refreshLocatedBlock(block); } finally { if (reader != null) { reader.close(); @@ -1113,7 +1112,6 @@ private void hedgedFetchBlockByteRange(LocatedBlock block, long start, ByteBuffer bb; int len = (int) (end - start + 1); int hedgedReadId = 0; - block = refreshLocatedBlock(block); while (true) { // see HDFS-6591, this metric is used to verify/catch unnecessary loops hedgedReadOpsLoopNumForTesting++; @@ -1123,6 +1121,8 @@ private void hedgedFetchBlockByteRange(LocatedBlock block, long start, // chooseDataNode is a commitment. If no node, we go to // the NN to reget block locations. Only go here on first read. chosenNode = chooseDataNode(block, ignored); + // Latest block, if refreshed internally + block = chosenNode.block; bb = ByteBuffer.allocate(len); Callable getFromDataNodeCallable = getFromOneDataNode( chosenNode, block, start, end, bb, @@ -1160,6 +1160,8 @@ private void hedgedFetchBlockByteRange(LocatedBlock block, long start, if (chosenNode == null) { chosenNode = chooseDataNode(block, ignored); } + // Latest block, if refreshed internally + block = chosenNode.block; bb = ByteBuffer.allocate(len); Callable getFromDataNodeCallable = getFromOneDataNode( chosenNode, block, start, end, bb, @@ -1530,12 +1532,14 @@ static final class DNAddrPair { final DatanodeInfo info; final InetSocketAddress addr; final StorageType storageType; + final LocatedBlock block; DNAddrPair(DatanodeInfo info, InetSocketAddress addr, - StorageType storageType) { + StorageType storageType, LocatedBlock block) { this.info = info; this.addr = addr; this.storageType = storageType; + this.block = block; } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java index 75ad0225b7..d4d06465c7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java @@ -236,7 +236,7 @@ boolean createBlockReader(LocatedBlock block, long offsetInBlock, BlockReader reader = null; final ReaderRetryPolicy retry = new ReaderRetryPolicy(); DFSInputStream.DNAddrPair dnInfo = - new DFSInputStream.DNAddrPair(null, null, null); + new DFSInputStream.DNAddrPair(null, null, null, null); while (true) { try { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java index 2cfcc2b6ab..038b6ce70a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java @@ -50,6 +50,7 @@ import java.net.HttpURLConnection; import java.net.InetSocketAddress; import java.net.Socket; +import java.net.SocketException; import java.net.URI; import java.net.URL; import java.net.URLConnection; @@ -122,7 +123,9 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; +import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager; import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; @@ -2055,6 +2058,49 @@ public static Block addBlockToFile(boolean isStripedBlock, return lastBlock; } + /* + * Copy a block from sourceProxy to destination. If the block becomes + * over-replicated, preferably remove it from source. + * Return true if a block is successfully copied; otherwise false. + */ + public static boolean replaceBlock(ExtendedBlock block, DatanodeInfo source, + DatanodeInfo sourceProxy, DatanodeInfo destination) throws IOException { + return replaceBlock(block, source, sourceProxy, destination, + StorageType.DEFAULT, Status.SUCCESS); + } + + /* + * Replace block + */ + public static boolean replaceBlock(ExtendedBlock block, DatanodeInfo source, + DatanodeInfo sourceProxy, DatanodeInfo destination, + StorageType targetStorageType, Status opStatus) throws IOException, + SocketException { + Socket sock = new Socket(); + try { + sock.connect(NetUtils.createSocketAddr(destination.getXferAddr()), + HdfsConstants.READ_TIMEOUT); + sock.setKeepAlive(true); + // sendRequest + DataOutputStream out = new DataOutputStream(sock.getOutputStream()); + new Sender(out).replaceBlock(block, targetStorageType, + BlockTokenSecretManager.DUMMY_TOKEN, source.getDatanodeUuid(), + sourceProxy, null); + out.flush(); + // receiveResponse + DataInputStream reply = new DataInputStream(sock.getInputStream()); + + BlockOpResponseProto proto = BlockOpResponseProto.parseDelimitedFrom( + reply); + while (proto.getStatus() == Status.IN_PROGRESS) { + proto = BlockOpResponseProto.parseDelimitedFrom(reply); + } + return proto.getStatus() == opStatus; + } finally { + sock.close(); + } + } + /** * Because currently DFSStripedOutputStream does not support hflush/hsync, * tests can use this method to flush all the buffered data to DataNodes. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPread.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPread.java index 637f2dfe88..85fc97bc15 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPread.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestPread.java @@ -23,6 +23,8 @@ import java.io.EOFException; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; import java.util.Random; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; @@ -30,6 +32,8 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.ChecksumException; @@ -38,6 +42,9 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol; import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset; @@ -51,6 +58,8 @@ import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; +import com.google.common.base.Supplier; + /** * This class tests the DFS positional read functionality in a single node * mini-cluster. @@ -542,6 +551,143 @@ public Void call() throws IOException { } } + /** + * Scenario: 1. Write a file with RF=2, DN1 and DN2
+ * 2. Open the stream, Consider Locations are [DN1, DN2] in LocatedBlock.
+ * 3. Move block from DN2 to DN3.
+ * 4. Let block gets replicated to another DN3
+ * 5. Stop DN1 also.
+ * 6. Current valid Block locations in NameNode [DN1, DN3]
+ * 7. Consider next calls to getBlockLocations() always returns DN3 as last + * location.
+ */ + @Test + public void testPreadFailureWithChangedBlockLocations() throws Exception { + doPreadTestWithChangedLocations(); + } + + /** + * Scenario: 1. Write a file with RF=2, DN1 and DN2
+ * 2. Open the stream, Consider Locations are [DN1, DN2] in LocatedBlock.
+ * 3. Move block from DN2 to DN3.
+ * 4. Let block gets replicated to another DN3
+ * 5. Stop DN1 also.
+ * 6. Current valid Block locations in NameNode [DN1, DN3]
+ * 7. Consider next calls to getBlockLocations() always returns DN3 as last + * location.
+ */ + @Test + public void testPreadHedgedFailureWithChangedBlockLocations() + throws Exception { + isHedgedRead = true; + doPreadTestWithChangedLocations(); + } + + private void doPreadTestWithChangedLocations() + throws IOException, TimeoutException, InterruptedException { + GenericTestUtils.setLogLevel(DFSClient.LOG, Level.DEBUG); + Configuration conf = new HdfsConfiguration(); + conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 2); + conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1); + if (isHedgedRead) { + conf.setInt(HdfsClientConfigKeys.HedgedRead.THREADPOOL_SIZE_KEY, 2); + } + try (MiniDFSCluster cluster = + new MiniDFSCluster.Builder(conf).numDataNodes(3).build()) { + DistributedFileSystem dfs = cluster.getFileSystem(); + Path p = new Path("/test"); + String data = "testingmissingblock"; + DFSTestUtil.writeFile(dfs, p, data); + + FSDataInputStream in = dfs.open(p); + List blocks = DFSTestUtil.getAllBlocks(in); + LocatedBlock lb = blocks.get(0); + DFSTestUtil.waitForReplication(cluster, lb.getBlock(), 1, 2, 0); + blocks = DFSTestUtil.getAllBlocks(in); + DatanodeInfo[] locations = null; + for (LocatedBlock locatedBlock : blocks) { + locations = locatedBlock.getLocations(); + DFSClient.LOG + .info(locatedBlock.getBlock() + " " + Arrays.toString(locations)); + } + final DatanodeInfo validDownLocation = locations[0]; + final DFSClient client = dfs.getClient(); + DFSClient dfsClient = Mockito.spy(client); + // Keep the valid location as last in the locations list for second + // requests + // onwards. + final AtomicInteger count = new AtomicInteger(0); + Mockito.doAnswer(new Answer() { + @Override + public LocatedBlocks answer(InvocationOnMock invocation) + throws Throwable { + if (count.compareAndSet(0, 1)) { + return (LocatedBlocks) invocation.callRealMethod(); + } + Object obj = invocation.callRealMethod(); + LocatedBlocks locatedBlocks = (LocatedBlocks) obj; + LocatedBlock lb = locatedBlocks.get(0); + DatanodeInfo[] locations = lb.getLocations(); + if (!(locations[0].getName().equals(validDownLocation.getName()))) { + // Latest location which is currently down, should be first + DatanodeInfo l = locations[0]; + locations[0] = locations[locations.length - 1]; + locations[locations.length - 1] = l; + } + return locatedBlocks; + } + }).when(dfsClient).getLocatedBlocks(p.toString(), 0); + + // Findout target node to move the block to. + DatanodeInfo[] nodes = + cluster.getNameNodeRpc().getDatanodeReport(DatanodeReportType.LIVE); + DatanodeInfo toMove = null; + List locationsList = Arrays.asList(locations); + for (DatanodeInfo node : nodes) { + if (locationsList.contains(node)) { + continue; + } + toMove = node; + break; + } + // STEP 2: Open stream + DFSInputStream din = dfsClient.open(p.toString()); + // STEP 3: Move replica + final DatanodeInfo source = locations[1]; + final DatanodeInfo destination = toMove; + DFSTestUtil.replaceBlock(lb.getBlock(), source, locations[1], toMove); + // Wait for replica to get deleted + GenericTestUtils.waitFor(new Supplier() { + + @Override + public Boolean get() { + try { + LocatedBlocks lbs = dfsClient.getLocatedBlocks(p.toString(), 0); + LocatedBlock lb = lbs.get(0); + List locations = Arrays.asList(lb.getLocations()); + DFSClient.LOG + .info("Source :" + source + ", destination: " + destination); + DFSClient.LOG.info("Got updated locations :" + locations); + return locations.contains(destination) + && !locations.contains(source); + } catch (IOException e) { + DFSClient.LOG.error("Problem in getting block locations", e); + } + return null; + } + }, 1000, 10000); + DFSTestUtil.waitForReplication(cluster, lb.getBlock(), 1, 2, 0); + // STEP 4: Stop first node in new locations + cluster.stopDataNode(validDownLocation.getName()); + DFSClient.LOG.info("Starting read"); + byte[] buf = new byte[1024]; + int n = din.read(0, buf, 0, data.length()); + assertEquals(data.length(), n); + assertEquals("Data should be read", data, new String(buf, 0, n)); + DFSClient.LOG.info("Read completed"); + } + } + public static void main(String[] args) throws Exception { new TestPread().testPreadDFS(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java index 8992d47d52..97255ae5ea 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java @@ -17,13 +17,13 @@ */ package org.apache.hadoop.hdfs.server.datanode; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; -import java.io.DataInputStream; -import java.io.DataOutputStream; import java.io.IOException; import java.net.InetSocketAddress; -import java.net.Socket; import java.net.SocketException; import java.util.ArrayList; import java.util.Arrays; @@ -48,19 +48,14 @@ import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; -import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; -import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; -import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; -import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; import org.apache.hadoop.hdfs.util.DataTransferThrottler; import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.util.Time; import org.junit.Test; @@ -371,7 +366,7 @@ private void checkBlocks(DatanodeInfo[] includeNodes, String fileName, */ private boolean replaceBlock( ExtendedBlock block, DatanodeInfo source, DatanodeInfo sourceProxy, DatanodeInfo destination) throws IOException { - return replaceBlock(block, source, sourceProxy, destination, + return DFSTestUtil.replaceBlock(block, source, sourceProxy, destination, StorageType.DEFAULT, Status.SUCCESS); } @@ -385,29 +380,8 @@ private boolean replaceBlock( DatanodeInfo destination, StorageType targetStorageType, Status opStatus) throws IOException, SocketException { - Socket sock = new Socket(); - try { - sock.connect(NetUtils.createSocketAddr(destination.getXferAddr()), - HdfsConstants.READ_TIMEOUT); - sock.setKeepAlive(true); - // sendRequest - DataOutputStream out = new DataOutputStream(sock.getOutputStream()); - new Sender(out).replaceBlock(block, targetStorageType, - BlockTokenSecretManager.DUMMY_TOKEN, source.getDatanodeUuid(), - sourceProxy, null); - out.flush(); - // receiveResponse - DataInputStream reply = new DataInputStream(sock.getInputStream()); - - BlockOpResponseProto proto = - BlockOpResponseProto.parseDelimitedFrom(reply); - while (proto.getStatus() == Status.IN_PROGRESS) { - proto = BlockOpResponseProto.parseDelimitedFrom(reply); - } - return proto.getStatus() == opStatus; - } finally { - sock.close(); - } + return DFSTestUtil.replaceBlock(block, source, sourceProxy, destination, + targetStorageType, opStatus); } /**