HDFS-1172. Blocks in newly completed files are considered under-replicated too quickly. Contributed by Masatake Iwasaki.

This commit is contained in:
Jing Zhao 2015-10-13 23:00:18 -07:00
parent 40cac59248
commit 2a98724342
3 changed files with 224 additions and 5 deletions

View File

@ -1521,6 +1521,9 @@ Release 2.8.0 - UNRELEASED
HDFS-9006. Provide BlockPlacementPolicy that supports upgrade domain. HDFS-9006. Provide BlockPlacementPolicy that supports upgrade domain.
(Ming Ma via lei) (Ming Ma via lei)
HDFS-1172. Blocks in newly completed files are considered under-replicated
too quickly. (Masatake Iwasaki via jing9)
OPTIMIZATIONS OPTIMIZATIONS
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than

View File

@ -677,11 +677,36 @@ public boolean commitOrCompleteLastBlock(BlockCollection bc,
final boolean b = commitBlock(lastBlock, commitBlock); final boolean b = commitBlock(lastBlock, commitBlock);
if (hasMinStorage(lastBlock)) { if (hasMinStorage(lastBlock)) {
if (b && !bc.isStriped()) {
addExpectedReplicasToPending(lastBlock);
}
completeBlock(lastBlock, false); completeBlock(lastBlock, false);
} }
return b; return b;
} }
/**
* If IBR is not sent from expected locations yet, add the datanodes to
* pendingReplications in order to keep ReplicationMonitor from scheduling
* the block.
*/
private void addExpectedReplicasToPending(BlockInfo lastBlock) {
DatanodeStorageInfo[] expectedStorages =
lastBlock.getUnderConstructionFeature().getExpectedStorageLocations();
if (expectedStorages.length - lastBlock.numNodes() > 0) {
ArrayList<DatanodeDescriptor> pendingNodes =
new ArrayList<DatanodeDescriptor>();
for (DatanodeStorageInfo storage : expectedStorages) {
DatanodeDescriptor dnd = storage.getDatanodeDescriptor();
if (lastBlock.findStorageInfo(dnd) == null) {
pendingNodes.add(dnd);
}
}
pendingReplications.increment(lastBlock,
pendingNodes.toArray(new DatanodeDescriptor[pendingNodes.size()]));
}
}
/** /**
* Convert a specified block of the file to a complete block. * Convert a specified block of the file to a complete block.
* @throws IOException if the block does not have at least a minimal number * @throws IOException if the block does not have at least a minimal number
@ -3764,8 +3789,9 @@ public void checkReplication(BlockCollection bc) {
for (BlockInfo block : bc.getBlocks()) { for (BlockInfo block : bc.getBlocks()) {
short expected = getExpectedReplicaNum(block); short expected = getExpectedReplicaNum(block);
final NumberReplicas n = countNodes(block); final NumberReplicas n = countNodes(block);
if (isNeededReplication(block, n.liveReplicas())) { final int pending = pendingReplications.getNumReplicas(block);
neededReplications.add(block, n.liveReplicas(), if (!hasEnoughEffectiveReplicas(block, n, pending, expected)) {
neededReplications.add(block, n.liveReplicas() + pending,
n.decommissionedAndDecommissioning(), expected); n.decommissionedAndDecommissioning(), expected);
} else if (n.liveReplicas() > expected) { } else if (n.liveReplicas() > expected) {
processOverReplicatedBlock(block, expected, null, null); processOverReplicatedBlock(block, expected, null, null);

View File

@ -17,10 +17,15 @@
*/ */
package org.apache.hadoop.hdfs; package org.apache.hadoop.hdfs;
import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull; import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import com.google.common.base.Supplier;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
@ -40,20 +45,34 @@
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset; import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.GenericTestUtils.DelayAnswer;
import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Time;
import org.junit.Test; import org.junit.Test;
import org.mockito.Mockito;
/** /**
* This class tests the replication of a DFS file. * This class tests the replication of a DFS file.
@ -278,6 +297,14 @@ private void waitForBlockReplication(String filename,
ClientProtocol namenode, ClientProtocol namenode,
int expected, long maxWaitSec) int expected, long maxWaitSec)
throws IOException { throws IOException {
waitForBlockReplication(filename, namenode, expected, maxWaitSec, false, false);
}
private void waitForBlockReplication(String filename,
ClientProtocol namenode,
int expected, long maxWaitSec,
boolean isUnderConstruction, boolean noOverReplication)
throws IOException {
long start = Time.monotonicNow(); long start = Time.monotonicNow();
//wait for all the blocks to be replicated; //wait for all the blocks to be replicated;
@ -290,7 +317,13 @@ private void waitForBlockReplication(String filename,
for (Iterator<LocatedBlock> iter = blocks.getLocatedBlocks().iterator(); for (Iterator<LocatedBlock> iter = blocks.getLocatedBlocks().iterator();
iter.hasNext();) { iter.hasNext();) {
LocatedBlock block = iter.next(); LocatedBlock block = iter.next();
if (isUnderConstruction && !iter.hasNext()) {
break; // do not check the last block
}
int actual = block.getLocations().length; int actual = block.getLocations().length;
if (noOverReplication) {
assertTrue(actual <= expected);
}
if ( actual < expected ) { if ( actual < expected ) {
LOG.info("Not enough replicas for " + block.getBlock() LOG.info("Not enough replicas for " + block.getBlock()
+ " yet. Expecting " + expected + ", got " + actual + "."); + " yet. Expecting " + expected + ", got " + actual + ".");
@ -560,4 +593,161 @@ public FileVisitResult visitFile(
} }
} }
} }
/**
* This test makes sure that, when a file is closed before all
* of the datanodes in the pipeline have reported their replicas,
* the NameNode doesn't consider the block under-replicated too
* aggressively. It is a regression test for HDFS-1172.
*/
@Test(timeout=60000)
public void testNoExtraReplicationWhenBlockReceivedIsLate()
throws Exception {
LOG.info("Test block replication when blockReceived is late" );
final short numDataNodes = 3;
final short replication = 3;
final Configuration conf = new Configuration();
conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 1024);
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(numDataNodes).build();
final String testFile = "/replication-test-file";
final Path testPath = new Path(testFile);
final BlockManager bm =
cluster.getNameNode().getNamesystem().getBlockManager();
try {
cluster.waitActive();
// Artificially delay IBR from 1 DataNode.
// this ensures that the client's completeFile() RPC will get to the
// NN before some of the replicas are reported.
NameNode nn = cluster.getNameNode();
DataNode dn = cluster.getDataNodes().get(0);
DatanodeProtocolClientSideTranslatorPB spy =
DataNodeTestUtils.spyOnBposToNN(dn, nn);
DelayAnswer delayer = new GenericTestUtils.DelayAnswer(LOG);
Mockito.doAnswer(delayer).when(spy).blockReceivedAndDeleted(
Mockito.<DatanodeRegistration>anyObject(),
Mockito.anyString(),
Mockito.<StorageReceivedDeletedBlocks[]>anyObject());
FileSystem fs = cluster.getFileSystem();
// Create and close a small file with two blocks
DFSTestUtil.createFile(fs, testPath, 1500, replication, 0);
// schedule replication via BlockManager#computeReplicationWork
BlockManagerTestUtil.computeAllPendingWork(bm);
// Initially, should have some pending replication since the close()
// is earlier than at lease one of the reportReceivedDeletedBlocks calls
assertTrue(pendingReplicationCount(bm) > 0);
// release pending IBR.
delayer.waitForCall();
delayer.proceed();
delayer.waitForResult();
// make sure DataNodes do replication work if exists
for (DataNode d : cluster.getDataNodes()) {
DataNodeTestUtils.triggerHeartbeat(d);
}
// Wait until there is nothing pending
try {
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
return pendingReplicationCount(bm) == 0;
}
}, 100, 3000);
} catch (TimeoutException e) {
fail("timed out while waiting for no pending replication.");
}
// Check that none of the datanodes have serviced a replication request.
// i.e. that the NameNode didn't schedule any spurious replication.
assertNoReplicationWasPerformed(cluster);
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}
/**
* This test makes sure that, if a file is under construction, blocks
* in the middle of that file are properly re-replicated if they
* become corrupt.
*/
@Test(timeout=60000)
public void testReplicationWhileUnderConstruction()
throws Exception {
LOG.info("Test block replication in under construction" );
MiniDFSCluster cluster = null;
final short numDataNodes = 6;
final short replication = 3;
String testFile = "/replication-test-file";
Path testPath = new Path(testFile);
FSDataOutputStream stm = null;
try {
Configuration conf = new Configuration();
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDataNodes).build();
cluster.waitActive();
FileSystem fs = cluster.getFileSystem();
stm = AppendTestUtil.createFile(fs, testPath, replication);
// Write a full block
byte[] buffer = AppendTestUtil.initBuffer(AppendTestUtil.BLOCK_SIZE);
stm.write(buffer); // block 1
stm.write(buffer); // block 2
stm.write(buffer, 0, 1); // start block 3
stm.hflush(); // make sure blocks are persisted, etc
// Everything should be fully replicated
waitForBlockReplication(testFile, cluster.getNameNodeRpc(), replication, 30000, true, true);
// Check that none of the datanodes have serviced a replication request.
// i.e. that the NameNode didn't schedule any spurious replication.
assertNoReplicationWasPerformed(cluster);
// Mark one the blocks corrupt
List<LocatedBlock> blocks;
FSDataInputStream in = fs.open(testPath);
try {
blocks = DFSTestUtil.getAllBlocks(in);
} finally {
in.close();
}
LocatedBlock lb = blocks.get(0);
LocatedBlock lbOneReplica = new LocatedBlock(lb.getBlock(),
new DatanodeInfo[] { lb.getLocations()[0] });
cluster.getNameNodeRpc().reportBadBlocks(
new LocatedBlock[] { lbOneReplica });
// Everything should be fully replicated
waitForBlockReplication(testFile, cluster.getNameNodeRpc(), replication, 30000, true, true);
} finally {
if (stm != null) {
IOUtils.closeStream(stm);
}
if (cluster != null) {
cluster.shutdown();
}
}
}
private long pendingReplicationCount(BlockManager bm) {
BlockManagerTestUtil.updateState(bm);
return bm.getPendingReplicationBlocksCount();
}
private void assertNoReplicationWasPerformed(MiniDFSCluster cluster) {
for (DataNode dn : cluster.getDataNodes()) {
MetricsRecordBuilder rb = getMetrics(dn.getMetrics().name());
assertCounter("BlocksReplicated", 0L, rb);
}
}
} }