HDFS-14053. Provide ability for NN to re-replicate based on topology changes. Contributed by Hrishikesh Gadre.

This commit is contained in:
Xiao Chen 2018-11-05 21:36:43 -08:00
parent c7fcca0d7e
commit ffc9c50e07
6 changed files with 173 additions and 4 deletions

View File

@ -3534,6 +3534,44 @@ public boolean hasNonEcBlockUsingStripedID(){
return hasNonEcBlockUsingStripedID;
}
/**
* Schedule replication work for a specified list of mis-replicated
* blocks and return total number of blocks scheduled for replication.
*
* @param blocks A list of blocks for which replication work needs to
* be scheduled.
* @return Total number of blocks for which replication work is scheduled.
**/
public int processMisReplicatedBlocks(List<BlockInfo> blocks) {
int processed = 0;
Iterator<BlockInfo> iter = blocks.iterator();
try {
while (isPopulatingReplQueues() && namesystem.isRunning()
&& !Thread.currentThread().isInterrupted()
&& iter.hasNext()) {
int limit = processed + numBlocksPerIteration;
namesystem.writeLockInterruptibly();
try {
while (iter.hasNext() && processed < limit) {
BlockInfo blk = iter.next();
MisReplicationResult r = processMisReplicatedBlock(blk);
LOG.debug("BLOCK* processMisReplicatedBlocks: " +
"Re-scanned block {}, result is {}", blk, r);
}
} finally {
namesystem.writeUnlock();
}
}
} catch (InterruptedException ex) {
LOG.info("Caught InterruptedException while scheduling replication work" +
" for mis-replicated blocks");
Thread.currentThread().interrupt();
}
return processed;
}
/**
* Process a single possibly misreplicated block. This adds it to the
* appropriate queues if necessary, and returns a result code indicating

View File

@ -25,6 +25,7 @@
import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Date;
@ -173,6 +174,14 @@ public class NamenodeFsck implements DataEncryptionKeyFactory {
*/
private boolean doDelete = false;
/**
* True if the user specified the -replicate option.
*
* When this option is in effect, we will initiate replication work to make
* mis-replicated blocks confirm the block placement policy.
*/
private boolean doReplicate = false;
String path = "/";
private String blockIds = null;
@ -249,6 +258,8 @@ else if (key.equals("replicadetails")) {
this.snapshottableDirs = new ArrayList<String>();
} else if (key.equals("blockId")) {
this.blockIds = pmap.get("blockId")[0];
} else if (key.equals("replicate")) {
this.doReplicate = true;
}
}
}
@ -683,6 +694,7 @@ private void collectBlocksSummary(String parent, HdfsFileStatus file,
StringBuilder report = new StringBuilder();
int blockNumber = 0;
final LocatedBlock lastBlock = blocks.getLastLocatedBlock();
List<BlockInfo> misReplicatedBlocks = new LinkedList<>();
for (LocatedBlock lBlk : blocks.getLocatedBlocks()) {
ExtendedBlock block = lBlk.getBlock();
if (!blocks.isLastBlockComplete() && lastBlock != null &&
@ -791,6 +803,9 @@ private void collectBlocksSummary(String parent, HdfsFileStatus file,
}
out.println(" Replica placement policy is violated for " +
block + ". " + blockPlacementStatus.getErrorDescription());
if (doReplicate) {
misReplicatedBlocks.add(storedBlock);
}
}
// count storage summary
@ -888,6 +903,19 @@ private void collectBlocksSummary(String parent, HdfsFileStatus file,
out.print(report + "\n");
}
}
if (doReplicate && !misReplicatedBlocks.isEmpty()) {
int processedBlocks = this.blockManager.processMisReplicatedBlocks(
misReplicatedBlocks);
if (processedBlocks < misReplicatedBlocks.size()) {
LOG.warn("Fsck: Block manager is able to process only " +
processedBlocks +
" mis-replicated blocks (Total count : " +
misReplicatedBlocks.size() +
" ) for path " + path);
}
res.numBlocksQueuedForReplication += processedBlocks;
}
}
private void countStorageTypeSummary(HdfsFileStatus file, LocatedBlock lBlk) {
@ -1167,6 +1195,7 @@ static class Result {
long totalSize = 0L;
long totalOpenFilesSize = 0L;
long totalReplicas = 0L;
long numBlocksQueuedForReplication = 0L;
/**
* DFS is considered healthy if there are no missing blocks.
@ -1310,6 +1339,8 @@ public String toString() {
res.append("\n InMaintenanceReplicas:\t").append(
inMaintenanceReplicas);
}
res.append("\n Blocks queued for replication:\t").append(
numBlocksQueuedForReplication);
return res.toString();
}
}
@ -1420,6 +1451,8 @@ public String toString() {
res.append("\n InMaintenanceReplicas:\t").append(
inMaintenanceReplicas);
}
res.append("\n Blocks queued for replication:\t").append(
numBlocksQueuedForReplication);
return res.toString();
}
}

View File

@ -84,7 +84,7 @@ public class DFSck extends Configured implements Tool {
"-upgradedomains]]]] "
+ "[-includeSnapshots] [-showprogress] "
+ "[-storagepolicies] [-maintenance] "
+ "[-blockId <blk_Id>]\n"
+ "[-blockId <blk_Id>] [-replicate]\n"
+ "\t<path>\tstart checking from this path\n"
+ "\t-move\tmove corrupted files to /lost+found\n"
+ "\t-delete\tdelete corrupted files\n"
@ -107,8 +107,10 @@ public class DFSck extends Configured implements Tool {
+ "\t-showprogress\tshow progress in output. Default is OFF (no progress)\n"
+ "\t-blockId\tprint out which file this blockId belongs to, locations"
+ " (nodes, racks) of this block, and other diagnostics info"
+ " (under replicated, corrupted or not, etc)\n\n"
+ "Please Note:\n"
+ " (under replicated, corrupted or not, etc)\n"
+ "\t-replicate initiate replication work to make mis-replicated\n"
+ " blocks satisfy block placement policy\n\n"
+ "Please Note:\n\n"
+ "\t1. By default fsck ignores files opened for write, "
+ "use -openforwrite to report such files. They are usually "
+ " tagged CORRUPT or HEALTHY depending on their block "
@ -308,6 +310,8 @@ else if (args[idx].equals("-replicaDetails")) {
idx++;
}
url.append("&blockId=").append(URLEncoder.encode(sb.toString(), "UTF-8"));
} else if (args[idx].equals("-replicate")) {
url.append("&replicate=1");
} else if (!args[idx].startsWith("-")) {
if (null == dir) {
dir = args[idx];

View File

@ -86,7 +86,7 @@ Usage:
[-files [-blocks [-locations | -racks | -replicaDetails | -upgradedomains]]]
[-includeSnapshots] [-showprogress]
[-storagepolicies] [-maintenance]
[-blockId <blk_Id>]
[-blockId <blk_Id>] [-replicate]
| COMMAND\_OPTION | Description |
|:---- |:---- |
@ -106,6 +106,7 @@ Usage:
| `-storagepolicies` | Print out storage policy summary for the blocks. |
| `-maintenance` | Print out maintenance state node details. |
| `-blockId` | Print out information about the block. |
| `-replicate` | Initiate replication work to make mis-replicated blocks satisfy block placement policy. |
Runs the HDFS filesystem checking utility. See [fsck](./HdfsUserGuide.html#fsck) for more info.

View File

@ -82,6 +82,7 @@
import com.google.common.collect.Maps;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.hdfs.tools.DFSck;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
@ -2531,4 +2532,24 @@ public static NameNodeConnector getNameNodeConnector(Configuration conf,
}
}
/**
* Run the fsck command using the specified params.
*
* @param conf HDFS configuration to use
* @param expectedErrCode The error code expected to be returned by
* the fsck command
* @param checkErrorCode Should the error code be checked
* @param path actual arguments to the fsck command
**/
public static String runFsck(Configuration conf, int expectedErrCode,
boolean checkErrorCode, String... path)
throws Exception {
ByteArrayOutputStream bStream = new ByteArrayOutputStream();
PrintStream out = new PrintStream(bStream, true);
int errCode = ToolRunner.run(new DFSck(conf, out), path);
if (checkErrorCode) {
assertEquals(expectedErrCode, errCode);
}
return bStream.toString();
}
}

View File

@ -20,9 +20,14 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.fail;
import static org.junit.Assert.assertTrue;
import java.util.ArrayList;
import java.util.concurrent.TimeoutException;
import org.apache.hadoop.hdfs.server.datanode.InternalDataNodeTestUtils;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
@ -143,6 +148,73 @@ public void testSufficientlySingleReplBlockUsesNewRack() throws Exception {
}
}
/*
* Initialize a cluster with datanodes on two different racks and shutdown
* all datanodes on one rack. Now create a file with a single block. Even
* though the block is sufficiently replicated, it violates the replica
* placement policy. Now restart the datanodes stopped earlier. Run the fsck
* command with -replicate option to schedule the replication of these
* mis-replicated blocks and verify if it indeed works as expected.
*/
@Test
public void testMisReplicatedBlockUsesNewRack() throws Exception {
Configuration conf = getConf();
conf.setInt("dfs.namenode.heartbeat.recheck-interval", 500);
final short replicationFactor = 3;
final Path filePath = new Path("/testFile");
// All datanodes are on two different racks
String[] racks = new String[]{"/rack1", "/rack1", "/rack1", "/rack2"};
try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(racks.length).racks(racks).build()) {
cluster.waitActive();
String poolId = cluster.getNamesystem().getBlockPoolId();
DatanodeRegistration reg = InternalDataNodeTestUtils.
getDNRegistrationForBP(cluster.getDataNodes().get(3), poolId);
// Shutdown datanode on rack2 and wait for it to be marked dead
cluster.stopDataNode(3);
DFSTestUtil.waitForDatanodeState(cluster, reg.getDatanodeUuid(),
false, 20000);
// Create a file with one block with a replication factor of 3
final FileSystem fs = cluster.getFileSystem();
DFSTestUtil.createFile(fs, filePath, 1L, replicationFactor, 1L);
ExtendedBlock b = DFSTestUtil.getFirstBlock(fs, filePath);
DFSTestUtil.waitReplication(cluster.getFileSystem(), filePath,
replicationFactor);
// Add datanode on rack2 and wait for it be recognized as alive by NN
cluster.startDataNodes(conf, 1, true,
null, new String[]{"/rack2"});
cluster.waitActive();
try {
DFSTestUtil.waitForReplication(cluster, b, 2, replicationFactor, 0);
fail("NameNode should not have fixed the mis-replicated blocks" +
" automatically.");
} catch (TimeoutException e) {
//Expected.
}
String fsckOp = DFSTestUtil.runFsck(conf, 0, true, filePath.toString(),
"-replicate");
LOG.info("fsck response {}", fsckOp);
assertTrue(fsckOp.contains(
"/testFile: Replica placement policy is violated"));
assertTrue(fsckOp.contains(" Block should be additionally replicated" +
" on 1 more rack(s). Total number of racks in the cluster: 2"));
try {
DFSTestUtil.waitForReplication(cluster, b, 2, replicationFactor, 0);
} catch (TimeoutException e) {
fail("NameNode should have fixed the mis-replicated blocks as a" +
" result of fsck command.");
}
}
}
/*
* Creates a block with all datanodes on the same rack. Add additional
* datanodes on a different rack and increase the replication factor,