HDFS-8563. Erasure Coding: fsck handles file smaller than a full stripe. Contributed by Walter Su.
This commit is contained in:
parent
48f3830f21
commit
2470a7bf88
@ -335,3 +335,6 @@
|
||||
|
||||
HDFS-8719. Erasure Coding: client generates too many small packets when
|
||||
writing parity data. (Li Bo via waltersu4549)
|
||||
|
||||
HDFS-8563. Erasure Coding: fsck handles file smaller than a full stripe.
|
||||
(Walter Su via jing9)
|
||||
|
@ -65,6 +65,7 @@
|
||||
import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockCollection;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicies;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementStatus;
|
||||
@ -74,7 +75,6 @@
|
||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
|
||||
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
|
||||
import org.apache.hadoop.hdfs.util.LightWeightLinkedSet;
|
||||
import org.apache.hadoop.io.erasurecode.ECSchema;
|
||||
import org.apache.hadoop.net.NetUtils;
|
||||
import org.apache.hadoop.net.NetworkTopology;
|
||||
import org.apache.hadoop.net.NodeBase;
|
||||
@ -247,7 +247,7 @@ public void blockIdCK(String blockId) {
|
||||
//get blockInfo
|
||||
Block block = new Block(Block.getBlockId(blockId));
|
||||
//find which file this block belongs to
|
||||
BlockInfo blockInfo = namenode.getNamesystem().getStoredBlock(block);
|
||||
BlockInfo blockInfo = bm.getStoredBlock(block);
|
||||
if(blockInfo == null) {
|
||||
out.println("Block "+ blockId +" " + NONEXISTENT_STATUS);
|
||||
LOG.warn("Block "+ blockId + " " + NONEXISTENT_STATUS);
|
||||
@ -556,6 +556,7 @@ private void collectBlocksSummary(String parent, HdfsFileStatus file, Result res
|
||||
|
||||
final BlockInfo storedBlock = bm.getStoredBlock(
|
||||
block.getLocalBlock());
|
||||
final int minReplication = bm.getMinStorageNum(storedBlock);
|
||||
// count decommissionedReplicas / decommissioningReplicas
|
||||
NumberReplicas numberReplicas = bm.countNodes(storedBlock);
|
||||
int decommissionedReplicas = numberReplicas.decommissioned();
|
||||
@ -571,26 +572,17 @@ private void collectBlocksSummary(String parent, HdfsFileStatus file, Result res
|
||||
|
||||
// count expected replicas
|
||||
short targetFileReplication;
|
||||
if(file.getReplication() == 0) {
|
||||
final FSNamesystem fsn = namenode.getNamesystem();
|
||||
final ECSchema ecSchema;
|
||||
fsn.readLock();
|
||||
try {
|
||||
INode inode = namenode.getNamesystem().getFSDirectory()
|
||||
.getINode(path);
|
||||
INodesInPath iip = INodesInPath.fromINode(inode);
|
||||
ecSchema = FSDirErasureCodingOp.getErasureCodingSchema(fsn, iip);
|
||||
} finally {
|
||||
fsn.readUnlock();
|
||||
}
|
||||
targetFileReplication = (short) (ecSchema.getNumDataUnits() + ecSchema.getNumParityUnits());
|
||||
if (file.getECSchema() != null) {
|
||||
assert storedBlock instanceof BlockInfoStriped;
|
||||
targetFileReplication = ((BlockInfoStriped) storedBlock)
|
||||
.getRealTotalBlockNum();
|
||||
} else {
|
||||
targetFileReplication = file.getReplication();
|
||||
}
|
||||
res.numExpectedReplicas += targetFileReplication;
|
||||
|
||||
// count under min repl'd blocks
|
||||
if(totalReplicasPerBlock < res.minReplication){
|
||||
if(totalReplicasPerBlock < minReplication){
|
||||
res.numUnderMinReplicatedBlocks++;
|
||||
}
|
||||
|
||||
@ -611,7 +603,7 @@ private void collectBlocksSummary(String parent, HdfsFileStatus file, Result res
|
||||
}
|
||||
|
||||
// count minimally replicated blocks
|
||||
if (totalReplicasPerBlock >= res.minReplication)
|
||||
if (totalReplicasPerBlock >= minReplication)
|
||||
res.numMinReplicatedBlocks++;
|
||||
|
||||
// count missing replicas / under replicated blocks
|
||||
@ -1026,12 +1018,6 @@ static class Result {
|
||||
long totalOpenFilesSize = 0L;
|
||||
long totalReplicas = 0L;
|
||||
|
||||
final int minReplication;
|
||||
|
||||
Result(int minReplication) {
|
||||
this.minReplication = minReplication;
|
||||
}
|
||||
|
||||
/**
|
||||
* DFS is considered healthy if there are no missing blocks.
|
||||
*/
|
||||
@ -1062,12 +1048,13 @@ float getReplicationFactor() {
|
||||
@VisibleForTesting
|
||||
static class ReplicationResult extends Result {
|
||||
final short replication;
|
||||
final short minReplication;
|
||||
|
||||
ReplicationResult(Configuration conf) {
|
||||
super(conf.getInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_KEY,
|
||||
DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_DEFAULT));
|
||||
this.replication = (short)conf.getInt(DFSConfigKeys.DFS_REPLICATION_KEY,
|
||||
DFSConfigKeys.DFS_REPLICATION_DEFAULT);
|
||||
this.minReplication = (short)conf.getInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_KEY,
|
||||
DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_DEFAULT);
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -1171,15 +1158,11 @@ public String toString() {
|
||||
|
||||
@VisibleForTesting
|
||||
static class ErasureCodingResult extends Result {
|
||||
final String ecSchema;
|
||||
final String defaultSchema;
|
||||
|
||||
ErasureCodingResult(Configuration conf) {
|
||||
this(ErasureCodingSchemaManager.getSystemDefaultSchema());
|
||||
}
|
||||
|
||||
ErasureCodingResult(ECSchema ecSchema) {
|
||||
super(ecSchema.getNumDataUnits());
|
||||
this.ecSchema = ecSchema.getSchemaName();
|
||||
defaultSchema = ErasureCodingSchemaManager.getSystemDefaultSchema()
|
||||
.getSchemaName();
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -1213,8 +1196,6 @@ public String toString() {
|
||||
((float) (numUnderMinReplicatedBlocks * 100) / (float) totalBlocks))
|
||||
.append(" %)");
|
||||
}
|
||||
res.append("\n ").append("MIN REQUIRED EC BLOCK:\t")
|
||||
.append(minReplication);
|
||||
}
|
||||
if(corruptFiles>0) {
|
||||
res.append(
|
||||
@ -1251,18 +1232,18 @@ public String toString() {
|
||||
((float) (numUnderReplicatedBlocks * 100) / (float) totalBlocks))
|
||||
.append(" %)");
|
||||
}
|
||||
res.append("\n Unsatisfactory placement block groups:\t\t")
|
||||
res.append("\n Unsatisfactory placement block groups:\t")
|
||||
.append(numMisReplicatedBlocks);
|
||||
if (totalBlocks > 0) {
|
||||
res.append(" (").append(
|
||||
((float) (numMisReplicatedBlocks * 100) / (float) totalBlocks))
|
||||
.append(" %)");
|
||||
}
|
||||
res.append("\n Default schema:\t").append(ecSchema)
|
||||
res.append("\n Default schema:\t\t").append(defaultSchema)
|
||||
.append("\n Average block group size:\t").append(
|
||||
getReplicationFactor()).append("\n Missing block groups:\t\t").append(
|
||||
missingIds.size()).append("\n Corrupt block groups:\t\t").append(
|
||||
corruptBlocks).append("\n Missing ec-blocks:\t\t").append(
|
||||
corruptBlocks).append("\n Missing internal blocks:\t").append(
|
||||
missingReplicas);
|
||||
if (totalReplicas > 0) {
|
||||
res.append(" (").append(
|
||||
@ -1270,11 +1251,11 @@ public String toString() {
|
||||
" %)");
|
||||
}
|
||||
if (decommissionedReplicas > 0) {
|
||||
res.append("\n Decommissioned ec-blocks:\t").append(
|
||||
res.append("\n Decommissioned internal blocks:\t").append(
|
||||
decommissionedReplicas);
|
||||
}
|
||||
if (decommissioningReplicas > 0) {
|
||||
res.append("\n Decommissioning ec-blocks:\t").append(
|
||||
res.append("\n Decommissioning internal blocks:\t").append(
|
||||
decommissioningReplicas);
|
||||
}
|
||||
return res.toString();
|
||||
|
@ -1648,23 +1648,31 @@ public void testECFsck() throws Exception {
|
||||
+ ErasureCodingSchemaManager.getSystemDefaultSchema().getNumParityUnits();
|
||||
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(totalSize).build();
|
||||
fs = cluster.getFileSystem();
|
||||
|
||||
// create a contiguous file
|
||||
Path replDirPath = new Path("/replicated");
|
||||
Path replFilePath = new Path(replDirPath, "replfile");
|
||||
final short factor = 3;
|
||||
DFSTestUtil.createFile(fs, replFilePath, 1024, factor, 0);
|
||||
DFSTestUtil.waitReplication(fs, replFilePath, factor);
|
||||
|
||||
// create a large striped file
|
||||
Path ecDirPath = new Path("/striped");
|
||||
Path ecFilePath = new Path(ecDirPath, "ecfile");
|
||||
final int numBlocks = 4;
|
||||
DFSTestUtil.createStripedFile(cluster, ecFilePath, ecDirPath, numBlocks, 2, true);
|
||||
Path largeFilePath = new Path(ecDirPath, "largeFile");
|
||||
DFSTestUtil.createStripedFile(cluster, largeFilePath, ecDirPath, 1, 2, true);
|
||||
|
||||
// create a small striped file
|
||||
Path smallFilePath = new Path(ecDirPath, "smallFile");
|
||||
DFSTestUtil.writeFile(fs, smallFilePath, "hello world!");
|
||||
|
||||
long replTime = fs.getFileStatus(replFilePath).getAccessTime();
|
||||
long ecTime = fs.getFileStatus(ecFilePath).getAccessTime();
|
||||
long ecTime = fs.getFileStatus(largeFilePath).getAccessTime();
|
||||
Thread.sleep(precision);
|
||||
setupAuditLogs();
|
||||
String outStr = runFsck(conf, 0, true, "/");
|
||||
verifyAuditLogs();
|
||||
assertEquals(replTime, fs.getFileStatus(replFilePath).getAccessTime());
|
||||
assertEquals(ecTime, fs.getFileStatus(ecFilePath).getAccessTime());
|
||||
assertEquals(ecTime, fs.getFileStatus(largeFilePath).getAccessTime());
|
||||
System.out.println(outStr);
|
||||
assertTrue(outStr.contains(NamenodeFsck.HEALTHY_STATUS));
|
||||
if (fs != null) {try{fs.close();} catch(Exception e){}}
|
||||
|
Loading…
Reference in New Issue
Block a user