HDFS-15200. Delete Corrupt Replica Immediately Irrespective of Replicas On Stale Storage. Contributed by Ayush Saxena.
This commit is contained in:
parent
bb41ddaf1e
commit
f9bb2a8cc5
@ -304,6 +304,13 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
||||
= "dfs.namenode.blockreport.max.lock.hold.time";
|
||||
public static final long
|
||||
DFS_NAMENODE_BLOCKREPORT_MAX_LOCK_HOLD_TIME_DEFAULT = 4;
|
||||
|
||||
public static final String
|
||||
DFS_NAMENODE_CORRUPT_BLOCK_DELETE_IMMEDIATELY_ENABLED =
|
||||
"dfs.namenode.corrupt.block.delete.immediately.enabled";
|
||||
public static final boolean
|
||||
DFS_NAMENODE_CORRUPT_BLOCK_DELETE_IMMEDIATELY_ENABLED_DEFAULT = true;
|
||||
|
||||
@Deprecated
|
||||
public static final String DFS_WEBHDFS_USER_PATTERN_KEY =
|
||||
HdfsClientConfigKeys.DFS_WEBHDFS_USER_PATTERN_KEY;
|
||||
|
@ -459,6 +459,11 @@ public long getTotalECBlockGroups() {
|
||||
* from ENTERING_MAINTENANCE to IN_MAINTENANCE.
|
||||
*/
|
||||
private final short minReplicationToBeInMaintenance;
|
||||
/**
|
||||
* Whether to delete corrupt replica immediately irrespective of other
|
||||
* replicas available on stale storages.
|
||||
*/
|
||||
private final boolean deleteCorruptReplicaImmediately;
|
||||
|
||||
/** Storages accessible from multiple DNs. */
|
||||
private final ProvidedStorageMap providedStorageMap;
|
||||
@ -615,6 +620,10 @@ public BlockManager(final Namesystem namesystem, boolean haEnabled,
|
||||
DFSConfigKeys.DFS_NAMENODE_BLOCKREPORT_QUEUE_SIZE_DEFAULT);
|
||||
blockReportThread = new BlockReportProcessingThread(queueSize);
|
||||
|
||||
this.deleteCorruptReplicaImmediately =
|
||||
conf.getBoolean(DFS_NAMENODE_CORRUPT_BLOCK_DELETE_IMMEDIATELY_ENABLED,
|
||||
DFS_NAMENODE_CORRUPT_BLOCK_DELETE_IMMEDIATELY_ENABLED_DEFAULT);
|
||||
|
||||
LOG.info("defaultReplication = {}", defaultReplication);
|
||||
LOG.info("maxReplication = {}", maxReplication);
|
||||
LOG.info("minReplication = {}", minReplication);
|
||||
@ -1870,7 +1879,7 @@ private boolean invalidateBlock(BlockToMarkCorrupt b, DatanodeInfo dn,
|
||||
}
|
||||
|
||||
// Check how many copies we have of the block
|
||||
if (nr.replicasOnStaleNodes() > 0) {
|
||||
if (nr.replicasOnStaleNodes() > 0 && !deleteCorruptReplicaImmediately) {
|
||||
blockLog.debug("BLOCK* invalidateBlocks: postponing " +
|
||||
"invalidation of {} on {} because {} replica(s) are located on " +
|
||||
"nodes with potentially out-of-date block reports", b, dn,
|
||||
|
@ -5777,6 +5777,16 @@
|
||||
</description>
|
||||
</property>
|
||||
|
||||
|
||||
<property>
|
||||
<name>dfs.namenode.corrupt.block.delete.immediately.enabled</name>
|
||||
<value>true</value>
|
||||
<description>
|
||||
Whether the corrupt replicas should be deleted immediately, irrespective
|
||||
of other replicas on stale storages..
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>dfs.journalnode.edits.dir.perm</name>
|
||||
<value>700</value>
|
||||
|
@ -21,7 +21,9 @@
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.LinkedListMultimap;
|
||||
import com.google.common.collect.Lists;
|
||||
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
||||
import org.apache.hadoop.hdfs.protocol.SystemErasureCodingPolicies;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
@ -503,6 +505,40 @@ public void testNeededReconstructionWhileAppending() throws IOException {
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
public void testDeleteCorruptReplicaWithStatleStorages() throws Exception {
|
||||
Configuration conf = new HdfsConfiguration();
|
||||
conf.setInt(HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.
|
||||
MIN_REPLICATION, 2);
|
||||
Path file = new Path("/test-file");
|
||||
MiniDFSCluster cluster =
|
||||
new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
|
||||
try {
|
||||
cluster.waitActive();
|
||||
BlockManager blockManager = cluster.getNamesystem().getBlockManager();
|
||||
blockManager.getDatanodeManager().markAllDatanodesStale();
|
||||
FileSystem fs = cluster.getFileSystem();
|
||||
FSDataOutputStream out = fs.create(file);
|
||||
for (int i = 0; i < 1024 * 1024 * 1; i++) {
|
||||
out.write(i);
|
||||
}
|
||||
out.hflush();
|
||||
MiniDFSCluster.DataNodeProperties datanode = cluster.stopDataNode(0);
|
||||
for (int i = 0; i < 1024 * 1024 * 1; i++) {
|
||||
out.write(i);
|
||||
}
|
||||
out.close();
|
||||
cluster.restartDataNode(datanode);
|
||||
cluster.triggerBlockReports();
|
||||
DataNodeTestUtils.triggerBlockReport(datanode.getDatanode());
|
||||
assertEquals(0, blockManager.getCorruptBlocks());
|
||||
} finally {
|
||||
if (cluster != null) {
|
||||
cluster.shutdown();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Tell the block manager that replication is completed for the given
|
||||
* pipeline.
|
||||
|
@ -27,6 +27,8 @@
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.junit.Test;
|
||||
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_CORRUPT_BLOCK_DELETE_IMMEDIATELY_ENABLED;
|
||||
|
||||
/**
|
||||
* Tests corruption of replicas in case of failover.
|
||||
*/
|
||||
@ -35,6 +37,8 @@ public class TestCorruptionWithFailover {
|
||||
@Test
|
||||
public void testCorruptReplicaAfterFailover() throws Exception {
|
||||
Configuration conf = new Configuration();
|
||||
conf.setBoolean(DFS_NAMENODE_CORRUPT_BLOCK_DELETE_IMMEDIATELY_ENABLED,
|
||||
false);
|
||||
// Enable data to be written, to less replicas in case of pipeline failure.
|
||||
conf.setInt(HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.
|
||||
MIN_REPLICATION, 2);
|
||||
|
@ -18,6 +18,7 @@
|
||||
|
||||
package org.apache.hadoop.hdfs.server.namenode;
|
||||
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_CORRUPT_BLOCK_DELETE_IMMEDIATELY_ENABLED;
|
||||
import static org.apache.hadoop.hdfs.MiniDFSCluster.HDFS_MINIDFS_BASEDIR;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
@ -187,6 +188,8 @@ public static String runFsck(Configuration conf, int expectedErrCode,
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
conf = new Configuration();
|
||||
conf.setBoolean(DFS_NAMENODE_CORRUPT_BLOCK_DELETE_IMMEDIATELY_ENABLED,
|
||||
false);
|
||||
}
|
||||
|
||||
@After
|
||||
|
Loading…
Reference in New Issue
Block a user