From ffc9c50e074aeca804674c6e1e6b0f1eb629e230 Mon Sep 17 00:00:00 2001 From: Xiao Chen Date: Mon, 5 Nov 2018 21:36:43 -0800 Subject: [PATCH] HDFS-14053. Provide ability for NN to re-replicate based on topology changes. Contributed by Hrishikesh Gadre. --- .../server/blockmanagement/BlockManager.java | 38 ++++++++++ .../hdfs/server/namenode/NamenodeFsck.java | 33 +++++++++ .../org/apache/hadoop/hdfs/tools/DFSck.java | 10 ++- .../src/site/markdown/HDFSCommands.md | 3 +- .../org/apache/hadoop/hdfs/DFSTestUtil.java | 21 ++++++ .../TestBlocksWithNotEnoughRacks.java | 72 +++++++++++++++++++ 6 files changed, 173 insertions(+), 4 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index a5fb0b1730..36bbeb149a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -3534,6 +3534,44 @@ public boolean hasNonEcBlockUsingStripedID(){ return hasNonEcBlockUsingStripedID; } + /** + * Schedule replication work for a specified list of mis-replicated + * blocks and return total number of blocks scheduled for replication. + * + * @param blocks A list of blocks for which replication work needs to + * be scheduled. + * @return Total number of blocks for which replication work is scheduled. + **/ + public int processMisReplicatedBlocks(List blocks) { + int processed = 0; + Iterator iter = blocks.iterator(); + + try { + while (isPopulatingReplQueues() && namesystem.isRunning() + && !Thread.currentThread().isInterrupted() + && iter.hasNext()) { + int limit = processed + numBlocksPerIteration; + namesystem.writeLockInterruptibly(); + try { + while (iter.hasNext() && processed < limit) { + BlockInfo blk = iter.next(); + MisReplicationResult r = processMisReplicatedBlock(blk); + LOG.debug("BLOCK* processMisReplicatedBlocks: " + + "Re-scanned block {}, result is {}", blk, r); + } + } finally { + namesystem.writeUnlock(); + } + } + } catch (InterruptedException ex) { + LOG.info("Caught InterruptedException while scheduling replication work" + + " for mis-replicated blocks"); + Thread.currentThread().interrupt(); + } + + return processed; + } + /** * Process a single possibly misreplicated block. This adds it to the * appropriate queues if necessary, and returns a result code indicating diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java index 56607f08a5..f54b407dd0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java @@ -25,6 +25,7 @@ import java.net.InetSocketAddress; import java.net.Socket; import java.util.ArrayList; +import java.util.LinkedList; import java.util.Arrays; import java.util.Collection; import java.util.Date; @@ -173,6 +174,14 @@ public class NamenodeFsck implements DataEncryptionKeyFactory { */ private boolean doDelete = false; + /** + * True if the user specified the -replicate option. + * + * When this option is in effect, we will initiate replication work to make + * mis-replicated blocks confirm the block placement policy. + */ + private boolean doReplicate = false; + String path = "/"; private String blockIds = null; @@ -249,6 +258,8 @@ else if (key.equals("replicadetails")) { this.snapshottableDirs = new ArrayList(); } else if (key.equals("blockId")) { this.blockIds = pmap.get("blockId")[0]; + } else if (key.equals("replicate")) { + this.doReplicate = true; } } } @@ -683,6 +694,7 @@ private void collectBlocksSummary(String parent, HdfsFileStatus file, StringBuilder report = new StringBuilder(); int blockNumber = 0; final LocatedBlock lastBlock = blocks.getLastLocatedBlock(); + List misReplicatedBlocks = new LinkedList<>(); for (LocatedBlock lBlk : blocks.getLocatedBlocks()) { ExtendedBlock block = lBlk.getBlock(); if (!blocks.isLastBlockComplete() && lastBlock != null && @@ -791,6 +803,9 @@ private void collectBlocksSummary(String parent, HdfsFileStatus file, } out.println(" Replica placement policy is violated for " + block + ". " + blockPlacementStatus.getErrorDescription()); + if (doReplicate) { + misReplicatedBlocks.add(storedBlock); + } } // count storage summary @@ -888,6 +903,19 @@ private void collectBlocksSummary(String parent, HdfsFileStatus file, out.print(report + "\n"); } } + + if (doReplicate && !misReplicatedBlocks.isEmpty()) { + int processedBlocks = this.blockManager.processMisReplicatedBlocks( + misReplicatedBlocks); + if (processedBlocks < misReplicatedBlocks.size()) { + LOG.warn("Fsck: Block manager is able to process only " + + processedBlocks + + " mis-replicated blocks (Total count : " + + misReplicatedBlocks.size() + + " ) for path " + path); + } + res.numBlocksQueuedForReplication += processedBlocks; + } } private void countStorageTypeSummary(HdfsFileStatus file, LocatedBlock lBlk) { @@ -1167,6 +1195,7 @@ static class Result { long totalSize = 0L; long totalOpenFilesSize = 0L; long totalReplicas = 0L; + long numBlocksQueuedForReplication = 0L; /** * DFS is considered healthy if there are no missing blocks. @@ -1310,6 +1339,8 @@ public String toString() { res.append("\n InMaintenanceReplicas:\t").append( inMaintenanceReplicas); } + res.append("\n Blocks queued for replication:\t").append( + numBlocksQueuedForReplication); return res.toString(); } } @@ -1420,6 +1451,8 @@ public String toString() { res.append("\n InMaintenanceReplicas:\t").append( inMaintenanceReplicas); } + res.append("\n Blocks queued for replication:\t").append( + numBlocksQueuedForReplication); return res.toString(); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSck.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSck.java index 2afc97cb7a..403c0bbcb4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSck.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSck.java @@ -84,7 +84,7 @@ public class DFSck extends Configured implements Tool { "-upgradedomains]]]] " + "[-includeSnapshots] [-showprogress] " + "[-storagepolicies] [-maintenance] " - + "[-blockId ]\n" + + "[-blockId ] [-replicate]\n" + "\t\tstart checking from this path\n" + "\t-move\tmove corrupted files to /lost+found\n" + "\t-delete\tdelete corrupted files\n" @@ -107,8 +107,10 @@ public class DFSck extends Configured implements Tool { + "\t-showprogress\tshow progress in output. Default is OFF (no progress)\n" + "\t-blockId\tprint out which file this blockId belongs to, locations" + " (nodes, racks) of this block, and other diagnostics info" - + " (under replicated, corrupted or not, etc)\n\n" - + "Please Note:\n" + + " (under replicated, corrupted or not, etc)\n" + + "\t-replicate initiate replication work to make mis-replicated\n" + + " blocks satisfy block placement policy\n\n" + + "Please Note:\n\n" + "\t1. By default fsck ignores files opened for write, " + "use -openforwrite to report such files. They are usually " + " tagged CORRUPT or HEALTHY depending on their block " @@ -308,6 +310,8 @@ else if (args[idx].equals("-replicaDetails")) { idx++; } url.append("&blockId=").append(URLEncoder.encode(sb.toString(), "UTF-8")); + } else if (args[idx].equals("-replicate")) { + url.append("&replicate=1"); } else if (!args[idx].startsWith("-")) { if (null == dir) { dir = args[idx]; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md index c2a6ae1d60..012d4ae19e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md +++ b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md @@ -86,7 +86,7 @@ Usage: [-files [-blocks [-locations | -racks | -replicaDetails | -upgradedomains]]] [-includeSnapshots] [-showprogress] [-storagepolicies] [-maintenance] - [-blockId ] + [-blockId ] [-replicate] | COMMAND\_OPTION | Description | |:---- |:---- | @@ -106,6 +106,7 @@ Usage: | `-storagepolicies` | Print out storage policy summary for the blocks. | | `-maintenance` | Print out maintenance state node details. | | `-blockId` | Print out information about the block. | +| `-replicate` | Initiate replication work to make mis-replicated blocks satisfy block placement policy. | Runs the HDFS filesystem checking utility. See [fsck](./HdfsUserGuide.html#fsck) for more info. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java index 3e22b565dc..9465b31e7d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java @@ -82,6 +82,7 @@ import com.google.common.collect.Maps; import org.apache.commons.io.FileUtils; +import org.apache.hadoop.hdfs.tools.DFSck; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -2531,4 +2532,24 @@ public static NameNodeConnector getNameNodeConnector(Configuration conf, } } + /** + * Run the fsck command using the specified params. + * + * @param conf HDFS configuration to use + * @param expectedErrCode The error code expected to be returned by + * the fsck command + * @param checkErrorCode Should the error code be checked + * @param path actual arguments to the fsck command + **/ + public static String runFsck(Configuration conf, int expectedErrCode, + boolean checkErrorCode, String... path) + throws Exception { + ByteArrayOutputStream bStream = new ByteArrayOutputStream(); + PrintStream out = new PrintStream(bStream, true); + int errCode = ToolRunner.run(new DFSck(conf, out), path); + if (checkErrorCode) { + assertEquals(expectedErrCode, errCode); + } + return bStream.toString(); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlocksWithNotEnoughRacks.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlocksWithNotEnoughRacks.java index 5e59443f1e..85f8e54654 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlocksWithNotEnoughRacks.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlocksWithNotEnoughRacks.java @@ -20,9 +20,14 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.fail; +import static org.junit.Assert.assertTrue; import java.util.ArrayList; +import java.util.concurrent.TimeoutException; +import org.apache.hadoop.hdfs.server.datanode.InternalDataNodeTestUtils; +import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -143,6 +148,73 @@ public void testSufficientlySingleReplBlockUsesNewRack() throws Exception { } } + /* + * Initialize a cluster with datanodes on two different racks and shutdown + * all datanodes on one rack. Now create a file with a single block. Even + * though the block is sufficiently replicated, it violates the replica + * placement policy. Now restart the datanodes stopped earlier. Run the fsck + * command with -replicate option to schedule the replication of these + * mis-replicated blocks and verify if it indeed works as expected. + */ + @Test + public void testMisReplicatedBlockUsesNewRack() throws Exception { + Configuration conf = getConf(); + conf.setInt("dfs.namenode.heartbeat.recheck-interval", 500); + + final short replicationFactor = 3; + final Path filePath = new Path("/testFile"); + // All datanodes are on two different racks + String[] racks = new String[]{"/rack1", "/rack1", "/rack1", "/rack2"}; + + try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) + .numDataNodes(racks.length).racks(racks).build()) { + cluster.waitActive(); + + String poolId = cluster.getNamesystem().getBlockPoolId(); + DatanodeRegistration reg = InternalDataNodeTestUtils. + getDNRegistrationForBP(cluster.getDataNodes().get(3), poolId); + // Shutdown datanode on rack2 and wait for it to be marked dead + cluster.stopDataNode(3); + DFSTestUtil.waitForDatanodeState(cluster, reg.getDatanodeUuid(), + false, 20000); + + // Create a file with one block with a replication factor of 3 + final FileSystem fs = cluster.getFileSystem(); + DFSTestUtil.createFile(fs, filePath, 1L, replicationFactor, 1L); + ExtendedBlock b = DFSTestUtil.getFirstBlock(fs, filePath); + DFSTestUtil.waitReplication(cluster.getFileSystem(), filePath, + replicationFactor); + + // Add datanode on rack2 and wait for it be recognized as alive by NN + cluster.startDataNodes(conf, 1, true, + null, new String[]{"/rack2"}); + cluster.waitActive(); + + try { + DFSTestUtil.waitForReplication(cluster, b, 2, replicationFactor, 0); + fail("NameNode should not have fixed the mis-replicated blocks" + + " automatically."); + } catch (TimeoutException e) { + //Expected. + } + + String fsckOp = DFSTestUtil.runFsck(conf, 0, true, filePath.toString(), + "-replicate"); + LOG.info("fsck response {}", fsckOp); + assertTrue(fsckOp.contains( + "/testFile: Replica placement policy is violated")); + assertTrue(fsckOp.contains(" Block should be additionally replicated" + + " on 1 more rack(s). Total number of racks in the cluster: 2")); + + try { + DFSTestUtil.waitForReplication(cluster, b, 2, replicationFactor, 0); + } catch (TimeoutException e) { + fail("NameNode should have fixed the mis-replicated blocks as a" + + " result of fsck command."); + } + } + } + /* * Creates a block with all datanodes on the same rack. Add additional * datanodes on a different rack and increase the replication factor,