diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 04ca6f774a..418384e7ad 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -432,6 +432,9 @@ Release 2.5.0 - UNRELEASED HDFS-6395. Skip checking xattr limits for non-user-visible namespaces. (Yi Liu via wang). + HDFS-3493. Invalidate corrupted blocks as long as minimum replication is + satisfied. (Juan Yu and Vinayakumar B via wang) + OPTIMIZATIONS HDFS-6214. Webhdfs has poor throughput for files >2GB (daryn) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index e122fc26bb..1d83c662ce 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -1096,8 +1096,9 @@ public void findAndMarkBlockAsCorrupt(final ExtendedBlock blk, + blk + " not found"); return; } - markBlockAsCorrupt(new BlockToMarkCorrupt(storedBlock, reason, - Reason.CORRUPTION_REPORTED), dn, storageID); + markBlockAsCorrupt(new BlockToMarkCorrupt(storedBlock, + blk.getGenerationStamp(), reason, Reason.CORRUPTION_REPORTED), + dn, storageID); } private void markBlockAsCorrupt(BlockToMarkCorrupt b, @@ -1123,7 +1124,25 @@ private void markBlockAsCorrupt(BlockToMarkCorrupt b, // Add this replica to corruptReplicas Map corruptReplicas.addToCorruptReplicasMap(b.corrupted, node, b.reason, b.reasonCode); - if (countNodes(b.stored).liveReplicas() >= bc.getBlockReplication()) { + + NumberReplicas numberOfReplicas = countNodes(b.stored); + boolean hasEnoughLiveReplicas = numberOfReplicas.liveReplicas() >= bc + .getBlockReplication(); + boolean minReplicationSatisfied = + numberOfReplicas.liveReplicas() >= minReplication; + boolean hasMoreCorruptReplicas = minReplicationSatisfied && + (numberOfReplicas.liveReplicas() + numberOfReplicas.corruptReplicas()) > + bc.getBlockReplication(); + boolean corruptedDuringWrite = minReplicationSatisfied && + (b.stored.getGenerationStamp() > b.corrupted.getGenerationStamp()); + // case 1: have enough number of live replicas + // case 2: corrupted replicas + live replicas > Replication factor + // case 3: Block is marked corrupt due to failure while writing. In this + // case genstamp will be different than that of valid block. + // In all these cases we can delete the replica. + // In case of 3, rbw block will be deleted and valid block can be replicated + if (hasEnoughLiveReplicas || hasMoreCorruptReplicas + || corruptedDuringWrite) { // the block is over-replicated so invalidate the replicas immediately invalidateBlock(b, node); } else if (namesystem.isPopulatingReplQueues()) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReplication.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReplication.java index 1ec395d350..8734710fd5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReplication.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReplication.java @@ -25,13 +25,16 @@ import java.io.OutputStream; import java.io.RandomAccessFile; import java.net.InetSocketAddress; +import java.util.ArrayList; import java.util.Iterator; +import java.util.List; import java.util.concurrent.TimeoutException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.BlockLocation; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -453,4 +456,66 @@ private void changeBlockLen(MiniDFSCluster cluster, int lenDelta) } fs.delete(fileName, true); } + + /** + * Test that blocks should get replicated if we have corrupted blocks and + * having good replicas at least equal or greater to minreplication + * + * Simulate rbw blocks by creating dummy copies, then a DN restart to detect + * those corrupted blocks asap. + */ + @Test(timeout=30000) + public void testReplicationWhenBlockCorruption() throws Exception { + MiniDFSCluster cluster = null; + try { + Configuration conf = new HdfsConfiguration(); + conf.setLong( + DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY, 1); + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build(); + FileSystem fs = cluster.getFileSystem(); + FSDataOutputStream create = fs.create(new Path("/test")); + fs.setReplication(new Path("/test"), (short) 1); + create.write(new byte[1024]); + create.close(); + + List nonParticipatedNodeDirs = new ArrayList(); + File participatedNodeDirs = null; + for (int i = 0; i < cluster.getDataNodes().size(); i++) { + File storageDir = cluster.getInstanceStorageDir(i, 0); + String bpid = cluster.getNamesystem().getBlockPoolId(); + File data_dir = MiniDFSCluster.getFinalizedDir(storageDir, bpid); + if (data_dir.listFiles().length == 0) { + nonParticipatedNodeDirs.add(data_dir); + } else { + participatedNodeDirs = data_dir; + } + } + + String blockFile = null; + File[] listFiles = participatedNodeDirs.listFiles(); + for (File file : listFiles) { + if (file.getName().startsWith("blk_") + && !file.getName().endsWith("meta")) { + blockFile = file.getName(); + for (File file1 : nonParticipatedNodeDirs) { + file1.mkdirs(); + new File(file1, blockFile).createNewFile(); + new File(file1, blockFile + "_1000.meta").createNewFile(); + } + break; + } + } + + fs.setReplication(new Path("/test"), (short) 3); + cluster.restartDataNodes(); // Lets detect all DNs about dummy copied + // blocks + cluster.waitActive(); + cluster.triggerBlockReports(); + DFSTestUtil.waitReplication(fs, new Path("/test"), (short) 3); + } finally { + if (cluster != null) { + cluster.shutdown(); + } + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java index 473e9c0f19..d2f58a877e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java @@ -410,6 +410,7 @@ public void blockReport_06() throws Exception { * The second datanode is started in the cluster. * As soon as the replication process is completed test finds a block from * the second DN and sets its GS to be < of original one. + * this is the markBlockAsCorrupt case 3 so we expect one pending deletion * Block report is forced and the check for # of currupted blocks is performed. * Another block is chosen and its length is set to a lesser than original. * A check for another corrupted block is performed after yet another @@ -436,20 +437,20 @@ public void blockReport_07() throws Exception { printStats(); assertThat("Wrong number of corrupt blocks", - cluster.getNamesystem().getCorruptReplicaBlocks(), is(1L)); + cluster.getNamesystem().getCorruptReplicaBlocks(), is(0L)); assertThat("Wrong number of PendingDeletion blocks", - cluster.getNamesystem().getPendingDeletionBlocks(), is(0L)); + cluster.getNamesystem().getPendingDeletionBlocks(), is(1L)); assertThat("Wrong number of PendingReplication blocks", cluster.getNamesystem().getPendingReplicationBlocks(), is(0L)); - reports = getBlockReports(dn, poolId, true, true); + reports = getBlockReports(dn, poolId, false, true); sendBlockReports(dnR, poolId, reports); printStats(); assertThat("Wrong number of corrupt blocks", - cluster.getNamesystem().getCorruptReplicaBlocks(), is(2L)); + cluster.getNamesystem().getCorruptReplicaBlocks(), is(1L)); assertThat("Wrong number of PendingDeletion blocks", - cluster.getNamesystem().getPendingDeletionBlocks(), is(0L)); + cluster.getNamesystem().getPendingDeletionBlocks(), is(1L)); assertThat("Wrong number of PendingReplication blocks", cluster.getNamesystem().getPendingReplicationBlocks(), is(0L));