HDFS-3493. Invalidate corrupted blocks as long as minimum replication is satisfied. Contributed by Juan Yu and Vinayakumar B.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1602291 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
befa4bb1ed
commit
be01103af7
@ -432,6 +432,9 @@ Release 2.5.0 - UNRELEASED
|
|||||||
HDFS-6395. Skip checking xattr limits for non-user-visible namespaces.
|
HDFS-6395. Skip checking xattr limits for non-user-visible namespaces.
|
||||||
(Yi Liu via wang).
|
(Yi Liu via wang).
|
||||||
|
|
||||||
|
HDFS-3493. Invalidate corrupted blocks as long as minimum replication is
|
||||||
|
satisfied. (Juan Yu and Vinayakumar B via wang)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
HDFS-6214. Webhdfs has poor throughput for files >2GB (daryn)
|
HDFS-6214. Webhdfs has poor throughput for files >2GB (daryn)
|
||||||
|
@ -1096,8 +1096,9 @@ public class BlockManager {
|
|||||||
+ blk + " not found");
|
+ blk + " not found");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
markBlockAsCorrupt(new BlockToMarkCorrupt(storedBlock, reason,
|
markBlockAsCorrupt(new BlockToMarkCorrupt(storedBlock,
|
||||||
Reason.CORRUPTION_REPORTED), dn, storageID);
|
blk.getGenerationStamp(), reason, Reason.CORRUPTION_REPORTED),
|
||||||
|
dn, storageID);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void markBlockAsCorrupt(BlockToMarkCorrupt b,
|
private void markBlockAsCorrupt(BlockToMarkCorrupt b,
|
||||||
@ -1123,7 +1124,25 @@ public class BlockManager {
|
|||||||
// Add this replica to corruptReplicas Map
|
// Add this replica to corruptReplicas Map
|
||||||
corruptReplicas.addToCorruptReplicasMap(b.corrupted, node, b.reason,
|
corruptReplicas.addToCorruptReplicasMap(b.corrupted, node, b.reason,
|
||||||
b.reasonCode);
|
b.reasonCode);
|
||||||
if (countNodes(b.stored).liveReplicas() >= bc.getBlockReplication()) {
|
|
||||||
|
NumberReplicas numberOfReplicas = countNodes(b.stored);
|
||||||
|
boolean hasEnoughLiveReplicas = numberOfReplicas.liveReplicas() >= bc
|
||||||
|
.getBlockReplication();
|
||||||
|
boolean minReplicationSatisfied =
|
||||||
|
numberOfReplicas.liveReplicas() >= minReplication;
|
||||||
|
boolean hasMoreCorruptReplicas = minReplicationSatisfied &&
|
||||||
|
(numberOfReplicas.liveReplicas() + numberOfReplicas.corruptReplicas()) >
|
||||||
|
bc.getBlockReplication();
|
||||||
|
boolean corruptedDuringWrite = minReplicationSatisfied &&
|
||||||
|
(b.stored.getGenerationStamp() > b.corrupted.getGenerationStamp());
|
||||||
|
// case 1: have enough number of live replicas
|
||||||
|
// case 2: corrupted replicas + live replicas > Replication factor
|
||||||
|
// case 3: Block is marked corrupt due to failure while writing. In this
|
||||||
|
// case genstamp will be different than that of valid block.
|
||||||
|
// In all these cases we can delete the replica.
|
||||||
|
// In case of 3, rbw block will be deleted and valid block can be replicated
|
||||||
|
if (hasEnoughLiveReplicas || hasMoreCorruptReplicas
|
||||||
|
|| corruptedDuringWrite) {
|
||||||
// the block is over-replicated so invalidate the replicas immediately
|
// the block is over-replicated so invalidate the replicas immediately
|
||||||
invalidateBlock(b, node);
|
invalidateBlock(b, node);
|
||||||
} else if (namesystem.isPopulatingReplQueues()) {
|
} else if (namesystem.isPopulatingReplQueues()) {
|
||||||
|
@ -25,13 +25,16 @@ import java.io.IOException;
|
|||||||
import java.io.OutputStream;
|
import java.io.OutputStream;
|
||||||
import java.io.RandomAccessFile;
|
import java.io.RandomAccessFile;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
|
import java.util.ArrayList;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
|
import java.util.List;
|
||||||
import java.util.concurrent.TimeoutException;
|
import java.util.concurrent.TimeoutException;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.BlockLocation;
|
import org.apache.hadoop.fs.BlockLocation;
|
||||||
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
import org.apache.hadoop.fs.FileStatus;
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
@ -453,4 +456,66 @@ public class TestReplication {
|
|||||||
}
|
}
|
||||||
fs.delete(fileName, true);
|
fs.delete(fileName, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test that blocks should get replicated if we have corrupted blocks and
|
||||||
|
* having good replicas at least equal or greater to minreplication
|
||||||
|
*
|
||||||
|
* Simulate rbw blocks by creating dummy copies, then a DN restart to detect
|
||||||
|
* those corrupted blocks asap.
|
||||||
|
*/
|
||||||
|
@Test(timeout=30000)
|
||||||
|
public void testReplicationWhenBlockCorruption() throws Exception {
|
||||||
|
MiniDFSCluster cluster = null;
|
||||||
|
try {
|
||||||
|
Configuration conf = new HdfsConfiguration();
|
||||||
|
conf.setLong(
|
||||||
|
DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY, 1);
|
||||||
|
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
|
||||||
|
FileSystem fs = cluster.getFileSystem();
|
||||||
|
FSDataOutputStream create = fs.create(new Path("/test"));
|
||||||
|
fs.setReplication(new Path("/test"), (short) 1);
|
||||||
|
create.write(new byte[1024]);
|
||||||
|
create.close();
|
||||||
|
|
||||||
|
List<File> nonParticipatedNodeDirs = new ArrayList<File>();
|
||||||
|
File participatedNodeDirs = null;
|
||||||
|
for (int i = 0; i < cluster.getDataNodes().size(); i++) {
|
||||||
|
File storageDir = cluster.getInstanceStorageDir(i, 0);
|
||||||
|
String bpid = cluster.getNamesystem().getBlockPoolId();
|
||||||
|
File data_dir = MiniDFSCluster.getFinalizedDir(storageDir, bpid);
|
||||||
|
if (data_dir.listFiles().length == 0) {
|
||||||
|
nonParticipatedNodeDirs.add(data_dir);
|
||||||
|
} else {
|
||||||
|
participatedNodeDirs = data_dir;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
String blockFile = null;
|
||||||
|
File[] listFiles = participatedNodeDirs.listFiles();
|
||||||
|
for (File file : listFiles) {
|
||||||
|
if (file.getName().startsWith("blk_")
|
||||||
|
&& !file.getName().endsWith("meta")) {
|
||||||
|
blockFile = file.getName();
|
||||||
|
for (File file1 : nonParticipatedNodeDirs) {
|
||||||
|
file1.mkdirs();
|
||||||
|
new File(file1, blockFile).createNewFile();
|
||||||
|
new File(file1, blockFile + "_1000.meta").createNewFile();
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fs.setReplication(new Path("/test"), (short) 3);
|
||||||
|
cluster.restartDataNodes(); // Lets detect all DNs about dummy copied
|
||||||
|
// blocks
|
||||||
|
cluster.waitActive();
|
||||||
|
cluster.triggerBlockReports();
|
||||||
|
DFSTestUtil.waitReplication(fs, new Path("/test"), (short) 3);
|
||||||
|
} finally {
|
||||||
|
if (cluster != null) {
|
||||||
|
cluster.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -410,6 +410,7 @@ public abstract class BlockReportTestBase {
|
|||||||
* The second datanode is started in the cluster.
|
* The second datanode is started in the cluster.
|
||||||
* As soon as the replication process is completed test finds a block from
|
* As soon as the replication process is completed test finds a block from
|
||||||
* the second DN and sets its GS to be < of original one.
|
* the second DN and sets its GS to be < of original one.
|
||||||
|
* this is the markBlockAsCorrupt case 3 so we expect one pending deletion
|
||||||
* Block report is forced and the check for # of currupted blocks is performed.
|
* Block report is forced and the check for # of currupted blocks is performed.
|
||||||
* Another block is chosen and its length is set to a lesser than original.
|
* Another block is chosen and its length is set to a lesser than original.
|
||||||
* A check for another corrupted block is performed after yet another
|
* A check for another corrupted block is performed after yet another
|
||||||
@ -436,20 +437,20 @@ public abstract class BlockReportTestBase {
|
|||||||
printStats();
|
printStats();
|
||||||
|
|
||||||
assertThat("Wrong number of corrupt blocks",
|
assertThat("Wrong number of corrupt blocks",
|
||||||
cluster.getNamesystem().getCorruptReplicaBlocks(), is(1L));
|
cluster.getNamesystem().getCorruptReplicaBlocks(), is(0L));
|
||||||
assertThat("Wrong number of PendingDeletion blocks",
|
assertThat("Wrong number of PendingDeletion blocks",
|
||||||
cluster.getNamesystem().getPendingDeletionBlocks(), is(0L));
|
cluster.getNamesystem().getPendingDeletionBlocks(), is(1L));
|
||||||
assertThat("Wrong number of PendingReplication blocks",
|
assertThat("Wrong number of PendingReplication blocks",
|
||||||
cluster.getNamesystem().getPendingReplicationBlocks(), is(0L));
|
cluster.getNamesystem().getPendingReplicationBlocks(), is(0L));
|
||||||
|
|
||||||
reports = getBlockReports(dn, poolId, true, true);
|
reports = getBlockReports(dn, poolId, false, true);
|
||||||
sendBlockReports(dnR, poolId, reports);
|
sendBlockReports(dnR, poolId, reports);
|
||||||
printStats();
|
printStats();
|
||||||
|
|
||||||
assertThat("Wrong number of corrupt blocks",
|
assertThat("Wrong number of corrupt blocks",
|
||||||
cluster.getNamesystem().getCorruptReplicaBlocks(), is(2L));
|
cluster.getNamesystem().getCorruptReplicaBlocks(), is(1L));
|
||||||
assertThat("Wrong number of PendingDeletion blocks",
|
assertThat("Wrong number of PendingDeletion blocks",
|
||||||
cluster.getNamesystem().getPendingDeletionBlocks(), is(0L));
|
cluster.getNamesystem().getPendingDeletionBlocks(), is(1L));
|
||||||
assertThat("Wrong number of PendingReplication blocks",
|
assertThat("Wrong number of PendingReplication blocks",
|
||||||
cluster.getNamesystem().getPendingReplicationBlocks(), is(0L));
|
cluster.getNamesystem().getPendingReplicationBlocks(), is(0L));
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user