From 8078a5efd0fe26b82c3768e06ccd2faddc619a7f Mon Sep 17 00:00:00 2001 From: Mingliang Liu Date: Mon, 3 Oct 2016 21:50:16 -0400 Subject: [PATCH] HDFS-10810. setreplication removing block from underconstrcution temporarily. Contributed by Brahma Reddy Battula --- .../server/blockmanagement/BlockManager.java | 8 ++- .../hadoop/hdfs/TestFileCorruption.java | 61 +++++++++++++++++++ 2 files changed, 66 insertions(+), 3 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 9b426bbb46..fa051b629a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -4013,13 +4013,15 @@ private void updateNeededReconstructions(final BlockInfo block, return; } NumberReplicas repl = countNodes(block); + int pendingNum = pendingReconstruction.getNumReplicas(block); int curExpectedReplicas = getRedundancy(block); - if (isNeededReconstruction(block, repl.liveReplicas())) { - neededReconstruction.update(block, repl.liveReplicas(), + if (!hasEnoughEffectiveReplicas(block, repl, pendingNum, + curExpectedReplicas)) { + neededReconstruction.update(block, repl.liveReplicas() + pendingNum, repl.readOnlyReplicas(), repl.decommissionedAndDecommissioning(), curExpectedReplicas, curReplicasDelta, expectedReplicasDelta); } else { - int oldReplicas = repl.liveReplicas()-curReplicasDelta; + int oldReplicas = repl.liveReplicas() + pendingNum - curReplicasDelta; int oldExpectedReplicas = curExpectedReplicas-expectedReplicasDelta; neededReconstruction.remove(block, oldReplicas, repl.readOnlyReplicas(), repl.decommissionedAndDecommissioning(), oldExpectedReplicas); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCorruption.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCorruption.java index 2437e38acc..5477700fa0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCorruption.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCorruption.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hdfs; +import com.google.common.base.Supplier; import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo; import static org.junit.Assert.assertEquals; @@ -38,6 +39,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.ChecksumException; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportReplica; @@ -230,6 +232,65 @@ public void testCorruptionWithDiskFailure() throws Exception { } + @Test + public void testSetReplicationWhenBatchIBR() throws Exception { + Configuration conf = new HdfsConfiguration(); + conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 100); + conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INCREMENTAL_INTERVAL_MSEC_KEY, + 30000); + conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 1024); + conf.setInt(DFSConfigKeys.DFS_NAMENODE_FILE_CLOSE_NUM_COMMITTED_ALLOWED_KEY, + 1); + DistributedFileSystem dfs; + try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) + .numDataNodes(3).build()) { + final int bufferSize = 1024; // 1024 Bytes each time + byte[] outBuffer = new byte[bufferSize]; + dfs = cluster.getFileSystem(); + String fileName = "/testSetRep1"; + Path filePath = new Path(fileName); + FSDataOutputStream out = dfs.create(filePath); + out.write(outBuffer, 0, bufferSize); + out.close(); + //sending the FBR to Delay next IBR + cluster.triggerBlockReports(); + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + try { + cluster.triggerBlockReports(); + if (cluster.getNamesystem().getBlocksTotal() == 1) { + return true; + } + } catch (Exception e) { + // Ignore the exception + } + return false; + } + }, 10, 3000); + fileName = "/testSetRep2"; + filePath = new Path(fileName); + out = dfs.create(filePath); + out.write(outBuffer, 0, bufferSize); + out.close(); + dfs.setReplication(filePath, (short) 10); + // underreplicated Blocks should be one after setrep + GenericTestUtils.waitFor(new Supplier() { + @Override public Boolean get() { + try { + return cluster.getNamesystem().getBlockManager() + .getUnderReplicatedBlocksCount() == 1; + } catch (Exception e) { + e.printStackTrace(); + return false; + } + } + }, 10, 3000); + assertEquals(0, + cluster.getNamesystem().getBlockManager().getMissingBlocksCount()); + } + } + private void markAllBlocksAsCorrupt(BlockManager bm, ExtendedBlock blk) throws IOException { for (DatanodeStorageInfo info : bm.getStorages(blk.getLocalBlock())) {