From 2a987243423eb5c7e191de2ba969b7591a441c70 Mon Sep 17 00:00:00 2001 From: Jing Zhao Date: Tue, 13 Oct 2015 23:00:18 -0700 Subject: [PATCH] HDFS-1172. Blocks in newly completed files are considered under-replicated too quickly. Contributed by Masatake Iwasaki. --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 + .../server/blockmanagement/BlockManager.java | 32 ++- .../apache/hadoop/hdfs/TestReplication.java | 194 +++++++++++++++++- 3 files changed, 224 insertions(+), 5 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index de7f3497f2..e6cd98f6fd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -1521,6 +1521,9 @@ Release 2.8.0 - UNRELEASED HDFS-9006. Provide BlockPlacementPolicy that supports upgrade domain. (Ming Ma via lei) + HDFS-1172. Blocks in newly completed files are considered under-replicated + too quickly. (Masatake Iwasaki via jing9) + OPTIMIZATIONS HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 8a64b7445e..cdf43fb98b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -676,12 +676,37 @@ public boolean commitOrCompleteLastBlock(BlockCollection bc, return false; // already completed (e.g. by syncBlock) final boolean b = commitBlock(lastBlock, commitBlock); - if (hasMinStorage(lastBlock)) { + if (hasMinStorage(lastBlock)) { + if (b && !bc.isStriped()) { + addExpectedReplicasToPending(lastBlock); + } completeBlock(lastBlock, false); } return b; } + /** + * If IBR is not sent from expected locations yet, add the datanodes to + * pendingReplications in order to keep ReplicationMonitor from scheduling + * the block. + */ + private void addExpectedReplicasToPending(BlockInfo lastBlock) { + DatanodeStorageInfo[] expectedStorages = + lastBlock.getUnderConstructionFeature().getExpectedStorageLocations(); + if (expectedStorages.length - lastBlock.numNodes() > 0) { + ArrayList pendingNodes = + new ArrayList(); + for (DatanodeStorageInfo storage : expectedStorages) { + DatanodeDescriptor dnd = storage.getDatanodeDescriptor(); + if (lastBlock.findStorageInfo(dnd) == null) { + pendingNodes.add(dnd); + } + } + pendingReplications.increment(lastBlock, + pendingNodes.toArray(new DatanodeDescriptor[pendingNodes.size()])); + } + } + /** * Convert a specified block of the file to a complete block. * @throws IOException if the block does not have at least a minimal number @@ -3764,8 +3789,9 @@ public void checkReplication(BlockCollection bc) { for (BlockInfo block : bc.getBlocks()) { short expected = getExpectedReplicaNum(block); final NumberReplicas n = countNodes(block); - if (isNeededReplication(block, n.liveReplicas())) { - neededReplications.add(block, n.liveReplicas(), + final int pending = pendingReplications.getNumReplicas(block); + if (!hasEnoughEffectiveReplicas(block, n, pending, expected)) { + neededReplications.add(block, n.liveReplicas() + pending, n.decommissionedAndDecommissioning(), expected); } else if (n.liveReplicas() > expected) { processOverReplicatedBlock(block, expected, null, null); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReplication.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReplication.java index 2139df94b9..6424bc3bf3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReplication.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReplication.java @@ -17,10 +17,15 @@ */ package org.apache.hadoop.hdfs; +import static org.apache.hadoop.test.MetricsAsserts.assertCounter; +import static org.apache.hadoop.test.MetricsAsserts.getMetrics; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import com.google.common.base.Supplier; import java.io.File; import java.io.IOException; @@ -40,20 +45,34 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.BlockLocation; +import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; -import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; -import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; +import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil; +import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset; +import org.apache.hadoop.hdfs.server.namenode.NameNode; +import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; +import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.metrics2.MetricsRecordBuilder; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.test.GenericTestUtils.DelayAnswer; import org.apache.hadoop.util.Time; import org.junit.Test; +import org.mockito.Mockito; /** * This class tests the replication of a DFS file. @@ -278,6 +297,14 @@ private void waitForBlockReplication(String filename, ClientProtocol namenode, int expected, long maxWaitSec) throws IOException { + waitForBlockReplication(filename, namenode, expected, maxWaitSec, false, false); + } + + private void waitForBlockReplication(String filename, + ClientProtocol namenode, + int expected, long maxWaitSec, + boolean isUnderConstruction, boolean noOverReplication) + throws IOException { long start = Time.monotonicNow(); //wait for all the blocks to be replicated; @@ -290,7 +317,13 @@ private void waitForBlockReplication(String filename, for (Iterator iter = blocks.getLocatedBlocks().iterator(); iter.hasNext();) { LocatedBlock block = iter.next(); + if (isUnderConstruction && !iter.hasNext()) { + break; // do not check the last block + } int actual = block.getLocations().length; + if (noOverReplication) { + assertTrue(actual <= expected); + } if ( actual < expected ) { LOG.info("Not enough replicas for " + block.getBlock() + " yet. Expecting " + expected + ", got " + actual + "."); @@ -560,4 +593,161 @@ public FileVisitResult visitFile( } } } + + + /** + * This test makes sure that, when a file is closed before all + * of the datanodes in the pipeline have reported their replicas, + * the NameNode doesn't consider the block under-replicated too + * aggressively. It is a regression test for HDFS-1172. + */ + @Test(timeout=60000) + public void testNoExtraReplicationWhenBlockReceivedIsLate() + throws Exception { + LOG.info("Test block replication when blockReceived is late" ); + final short numDataNodes = 3; + final short replication = 3; + final Configuration conf = new Configuration(); + conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 1024); + final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) + .numDataNodes(numDataNodes).build(); + final String testFile = "/replication-test-file"; + final Path testPath = new Path(testFile); + final BlockManager bm = + cluster.getNameNode().getNamesystem().getBlockManager(); + + try { + cluster.waitActive(); + + // Artificially delay IBR from 1 DataNode. + // this ensures that the client's completeFile() RPC will get to the + // NN before some of the replicas are reported. + NameNode nn = cluster.getNameNode(); + DataNode dn = cluster.getDataNodes().get(0); + DatanodeProtocolClientSideTranslatorPB spy = + DataNodeTestUtils.spyOnBposToNN(dn, nn); + DelayAnswer delayer = new GenericTestUtils.DelayAnswer(LOG); + Mockito.doAnswer(delayer).when(spy).blockReceivedAndDeleted( + Mockito.anyObject(), + Mockito.anyString(), + Mockito.anyObject()); + + FileSystem fs = cluster.getFileSystem(); + // Create and close a small file with two blocks + DFSTestUtil.createFile(fs, testPath, 1500, replication, 0); + + // schedule replication via BlockManager#computeReplicationWork + BlockManagerTestUtil.computeAllPendingWork(bm); + + // Initially, should have some pending replication since the close() + // is earlier than at lease one of the reportReceivedDeletedBlocks calls + assertTrue(pendingReplicationCount(bm) > 0); + + // release pending IBR. + delayer.waitForCall(); + delayer.proceed(); + delayer.waitForResult(); + + // make sure DataNodes do replication work if exists + for (DataNode d : cluster.getDataNodes()) { + DataNodeTestUtils.triggerHeartbeat(d); + } + + // Wait until there is nothing pending + try { + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + return pendingReplicationCount(bm) == 0; + } + }, 100, 3000); + } catch (TimeoutException e) { + fail("timed out while waiting for no pending replication."); + } + + // Check that none of the datanodes have serviced a replication request. + // i.e. that the NameNode didn't schedule any spurious replication. + assertNoReplicationWasPerformed(cluster); + } finally { + if (cluster != null) { + cluster.shutdown(); + } + } + } + + /** + * This test makes sure that, if a file is under construction, blocks + * in the middle of that file are properly re-replicated if they + * become corrupt. + */ + @Test(timeout=60000) + public void testReplicationWhileUnderConstruction() + throws Exception { + LOG.info("Test block replication in under construction" ); + MiniDFSCluster cluster = null; + final short numDataNodes = 6; + final short replication = 3; + String testFile = "/replication-test-file"; + Path testPath = new Path(testFile); + FSDataOutputStream stm = null; + try { + Configuration conf = new Configuration(); + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDataNodes).build(); + cluster.waitActive(); + + FileSystem fs = cluster.getFileSystem(); + + stm = AppendTestUtil.createFile(fs, testPath, replication); + + // Write a full block + byte[] buffer = AppendTestUtil.initBuffer(AppendTestUtil.BLOCK_SIZE); + stm.write(buffer); // block 1 + stm.write(buffer); // block 2 + stm.write(buffer, 0, 1); // start block 3 + stm.hflush(); // make sure blocks are persisted, etc + + // Everything should be fully replicated + waitForBlockReplication(testFile, cluster.getNameNodeRpc(), replication, 30000, true, true); + + // Check that none of the datanodes have serviced a replication request. + // i.e. that the NameNode didn't schedule any spurious replication. + assertNoReplicationWasPerformed(cluster); + + // Mark one the blocks corrupt + List blocks; + FSDataInputStream in = fs.open(testPath); + try { + blocks = DFSTestUtil.getAllBlocks(in); + } finally { + in.close(); + } + LocatedBlock lb = blocks.get(0); + LocatedBlock lbOneReplica = new LocatedBlock(lb.getBlock(), + new DatanodeInfo[] { lb.getLocations()[0] }); + cluster.getNameNodeRpc().reportBadBlocks( + new LocatedBlock[] { lbOneReplica }); + + // Everything should be fully replicated + waitForBlockReplication(testFile, cluster.getNameNodeRpc(), replication, 30000, true, true); + } finally { + if (stm != null) { + IOUtils.closeStream(stm); + } + if (cluster != null) { + cluster.shutdown(); + } + } + } + + private long pendingReplicationCount(BlockManager bm) { + BlockManagerTestUtil.updateState(bm); + return bm.getPendingReplicationBlocksCount(); + } + + private void assertNoReplicationWasPerformed(MiniDFSCluster cluster) { + for (DataNode dn : cluster.getDataNodes()) { + MetricsRecordBuilder rb = getMetrics(dn.getMetrics().name()); + assertCounter("BlocksReplicated", 0L, rb); + } + } }