HDFS-7993. Provide each Replica details in fsck (Contributed by J.Andreina)
This commit is contained in:
parent
d52de61544
commit
8ddbb8dd43
@ -526,6 +526,8 @@ Release 2.8.0 - UNRELEASED
|
||||
HDFS-8173. NPE thrown at DataNode shutdown when HTTP server was not able to
|
||||
create (surendra singh lilhore via vinayakumarb)
|
||||
|
||||
HDFS-7993. Provide each Replica details in fsck (J.Andreina via vinayakumarb)
|
||||
|
||||
Release 2.7.1 - UNRELEASED
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
@ -155,7 +155,7 @@ void setBlockReportCount(int blockReportCount) {
|
||||
this.blockReportCount = blockReportCount;
|
||||
}
|
||||
|
||||
boolean areBlockContentsStale() {
|
||||
public boolean areBlockContentsStale() {
|
||||
return blockContentsStale;
|
||||
}
|
||||
|
||||
@ -205,11 +205,11 @@ boolean areBlocksOnFailedStorage() {
|
||||
return getState() == State.FAILED && numBlocks != 0;
|
||||
}
|
||||
|
||||
String getStorageID() {
|
||||
public String getStorageID() {
|
||||
return storageID;
|
||||
}
|
||||
|
||||
StorageType getStorageType() {
|
||||
public StorageType getStorageType() {
|
||||
return storageType;
|
||||
}
|
||||
|
||||
|
@ -52,6 +52,7 @@
|
||||
import org.apache.hadoop.hdfs.protocol.Block;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfoWithStorage;
|
||||
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
|
||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||
@ -68,9 +69,11 @@
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementStatus;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.NumberReplicas;
|
||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
|
||||
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
|
||||
import org.apache.hadoop.hdfs.util.LightWeightLinkedSet;
|
||||
import org.apache.hadoop.net.NetUtils;
|
||||
import org.apache.hadoop.net.NetworkTopology;
|
||||
import org.apache.hadoop.net.NodeBase;
|
||||
@ -133,6 +136,8 @@ public class NamenodeFsck implements DataEncryptionKeyFactory {
|
||||
private boolean showprogress = false;
|
||||
private boolean showCorruptFileBlocks = false;
|
||||
|
||||
private boolean showReplicaDetails = false;
|
||||
private long staleInterval;
|
||||
/**
|
||||
* True if we encountered an internal error during FSCK, such as not being
|
||||
* able to delete a corrupt file.
|
||||
@ -194,6 +199,9 @@ public class NamenodeFsck implements DataEncryptionKeyFactory {
|
||||
networktopology,
|
||||
namenode.getNamesystem().getBlockManager().getDatanodeManager()
|
||||
.getHost2DatanodeMap());
|
||||
this.staleInterval =
|
||||
conf.getLong(DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY,
|
||||
DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_DEFAULT);
|
||||
|
||||
for (Iterator<String> it = pmap.keySet().iterator(); it.hasNext();) {
|
||||
String key = it.next();
|
||||
@ -204,6 +212,9 @@ public class NamenodeFsck implements DataEncryptionKeyFactory {
|
||||
else if (key.equals("blocks")) { this.showBlocks = true; }
|
||||
else if (key.equals("locations")) { this.showLocations = true; }
|
||||
else if (key.equals("racks")) { this.showRacks = true; }
|
||||
else if (key.equals("replicadetails")) {
|
||||
this.showReplicaDetails = true;
|
||||
}
|
||||
else if (key.equals("storagepolicies")) { this.showStoragePolcies = true; }
|
||||
else if (key.equals("showprogress")) { this.showprogress = true; }
|
||||
else if (key.equals("openforwrite")) {this.showOpenFiles = true; }
|
||||
@ -507,9 +518,8 @@ void check(String parent, HdfsFileStatus file, Result res) throws IOException {
|
||||
ExtendedBlock block = lBlk.getBlock();
|
||||
boolean isCorrupt = lBlk.isCorrupt();
|
||||
String blkName = block.toString();
|
||||
DatanodeInfo[] locs = lBlk.getLocations();
|
||||
NumberReplicas numberReplicas =
|
||||
namenode.getNamesystem().getBlockManager().countNodes(block.getLocalBlock());
|
||||
BlockManager bm = namenode.getNamesystem().getBlockManager();
|
||||
NumberReplicas numberReplicas = bm.countNodes(block.getLocalBlock());
|
||||
int liveReplicas = numberReplicas.liveReplicas();
|
||||
int decommissionedReplicas = numberReplicas.decommissioned();;
|
||||
int decommissioningReplicas = numberReplicas.decommissioning();
|
||||
@ -518,6 +528,10 @@ void check(String parent, HdfsFileStatus file, Result res) throws IOException {
|
||||
int totalReplicas = liveReplicas + decommissionedReplicas +
|
||||
decommissioningReplicas;
|
||||
res.totalReplicas += totalReplicas;
|
||||
Collection<DatanodeDescriptor> corruptReplicas = null;
|
||||
if (showReplicaDetails) {
|
||||
corruptReplicas = bm.getCorruptReplicas(block.getLocalBlock());
|
||||
}
|
||||
short targetFileReplication = file.getReplication();
|
||||
res.numExpectedReplicas += targetFileReplication;
|
||||
if(totalReplicas < minReplication){
|
||||
@ -578,14 +592,41 @@ void check(String parent, HdfsFileStatus file, Result res) throws IOException {
|
||||
missize += block.getNumBytes();
|
||||
} else {
|
||||
report.append(" repl=" + liveReplicas);
|
||||
if (showLocations || showRacks) {
|
||||
if (showLocations || showRacks || showReplicaDetails) {
|
||||
StringBuilder sb = new StringBuilder("[");
|
||||
for (int j = 0; j < locs.length; j++) {
|
||||
if (j > 0) { sb.append(", "); }
|
||||
if (showRacks)
|
||||
sb.append(NodeBase.getPath(locs[j]));
|
||||
else
|
||||
sb.append(locs[j]);
|
||||
Iterable<DatanodeStorageInfo> storages = bm.getStorages(block.getLocalBlock());
|
||||
for (Iterator<DatanodeStorageInfo> iterator = storages.iterator(); iterator.hasNext();) {
|
||||
DatanodeStorageInfo storage = iterator.next();
|
||||
DatanodeDescriptor dnDesc = storage.getDatanodeDescriptor();
|
||||
if (showRacks) {
|
||||
sb.append(NodeBase.getPath(dnDesc));
|
||||
} else {
|
||||
sb.append(new DatanodeInfoWithStorage(dnDesc, storage.getStorageID(), storage
|
||||
.getStorageType()));
|
||||
}
|
||||
if (showReplicaDetails) {
|
||||
LightWeightLinkedSet<Block> blocksExcess =
|
||||
bm.excessReplicateMap.get(dnDesc.getDatanodeUuid());
|
||||
sb.append("(");
|
||||
if (dnDesc.isDecommissioned()) {
|
||||
sb.append("DECOMMISSIONED)");
|
||||
} else if (dnDesc.isDecommissionInProgress()) {
|
||||
sb.append("DECOMMISSIONING)");
|
||||
} else if (corruptReplicas != null && corruptReplicas.contains(dnDesc)) {
|
||||
sb.append("CORRUPT)");
|
||||
} else if (blocksExcess != null && blocksExcess.contains(block.getLocalBlock())) {
|
||||
sb.append("EXCESS)");
|
||||
} else if (dnDesc.isStale(this.staleInterval)) {
|
||||
sb.append("STALE_NODE)");
|
||||
} else if (storage.areBlockContentsStale()) {
|
||||
sb.append("STALE_BLOCK_CONTENT)");
|
||||
} else {
|
||||
sb.append("LIVE)");
|
||||
}
|
||||
}
|
||||
if (iterator.hasNext()) {
|
||||
sb.append(", ");
|
||||
}
|
||||
}
|
||||
sb.append(']');
|
||||
report.append(" " + sb.toString());
|
||||
|
@ -97,7 +97,8 @@ public class DFSck extends Configured implements Tool {
|
||||
+ "\t-showprogress\tshow progress in output. Default is OFF (no progress)\n"
|
||||
+ "\t-blockId\tprint out which file this blockId belongs to, locations"
|
||||
+ " (nodes, racks) of this block, and other diagnostics info"
|
||||
+ " (under replicated, corrupted or not, etc)\n\n"
|
||||
+ " (under replicated, corrupted or not, etc)\n"
|
||||
+ "\t-replicaDetails\tprint out each replica details \n\n"
|
||||
+ "Please Note:\n"
|
||||
+ "\t1. By default fsck ignores files opened for write, "
|
||||
+ "use -openforwrite to report such files. They are usually "
|
||||
@ -268,6 +269,9 @@ private int doWork(final String[] args) throws IOException {
|
||||
else if (args[idx].equals("-blocks")) { url.append("&blocks=1"); }
|
||||
else if (args[idx].equals("-locations")) { url.append("&locations=1"); }
|
||||
else if (args[idx].equals("-racks")) { url.append("&racks=1"); }
|
||||
else if (args[idx].equals("-replicaDetails")) {
|
||||
url.append("&replicadetails=1");
|
||||
}
|
||||
else if (args[idx].equals("-storagepolicies")) { url.append("&storagepolicies=1"); }
|
||||
else if (args[idx].equals("-showprogress")) { url.append("&showprogress=1"); }
|
||||
else if (args[idx].equals("-list-corruptfileblocks")) {
|
||||
|
@ -779,6 +779,77 @@ public void testUnderMinReplicatedBlock() throws Exception {
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
public void testFsckReplicaDetails() throws Exception {
|
||||
|
||||
final short REPL_FACTOR = 1;
|
||||
short NUM_DN = 1;
|
||||
final long blockSize = 512;
|
||||
final long fileSize = 1024;
|
||||
boolean checkDecommissionInProgress = false;
|
||||
String[] racks = { "/rack1" };
|
||||
String[] hosts = { "host1" };
|
||||
|
||||
Configuration conf = new Configuration();
|
||||
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
|
||||
conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1);
|
||||
|
||||
MiniDFSCluster cluster;
|
||||
DistributedFileSystem dfs;
|
||||
cluster =
|
||||
new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DN).hosts(hosts).racks(racks).build();
|
||||
cluster.waitClusterUp();
|
||||
dfs = cluster.getFileSystem();
|
||||
|
||||
// create files
|
||||
final String testFile = new String("/testfile");
|
||||
final Path path = new Path(testFile);
|
||||
DFSTestUtil.createFile(dfs, path, fileSize, REPL_FACTOR, 1000L);
|
||||
DFSTestUtil.waitReplication(dfs, path, REPL_FACTOR);
|
||||
try {
|
||||
// make sure datanode that has replica is fine before decommission
|
||||
String fsckOut = runFsck(conf, 0, true, testFile, "-files", "-blocks", "-replicaDetails");
|
||||
assertTrue(fsckOut.contains(NamenodeFsck.HEALTHY_STATUS));
|
||||
assertTrue(fsckOut.contains("(LIVE)"));
|
||||
|
||||
// decommission datanode
|
||||
ExtendedBlock eb = DFSTestUtil.getFirstBlock(dfs, path);
|
||||
DatanodeDescriptor dn =
|
||||
cluster.getNameNode().getNamesystem().getBlockManager()
|
||||
.getBlockCollection(eb.getLocalBlock()).getBlocks()[0].getDatanode(0);
|
||||
cluster.getNameNode().getNamesystem().getBlockManager().getDatanodeManager()
|
||||
.getDecomManager().startDecommission(dn);
|
||||
String dnName = dn.getXferAddr();
|
||||
|
||||
// check the replica status while decommissioning
|
||||
fsckOut = runFsck(conf, 0, true, testFile, "-files", "-blocks", "-replicaDetails");
|
||||
assertTrue(fsckOut.contains("(DECOMMISSIONING)"));
|
||||
|
||||
// Start 2nd Datanode and wait for decommission to start
|
||||
cluster.startDataNodes(conf, 1, true, null, null, null);
|
||||
DatanodeInfo datanodeInfo = null;
|
||||
do {
|
||||
Thread.sleep(2000);
|
||||
for (DatanodeInfo info : dfs.getDataNodeStats()) {
|
||||
if (dnName.equals(info.getXferAddr())) {
|
||||
datanodeInfo = info;
|
||||
}
|
||||
}
|
||||
if (!checkDecommissionInProgress && datanodeInfo != null
|
||||
&& datanodeInfo.isDecommissionInProgress()) {
|
||||
checkDecommissionInProgress = true;
|
||||
}
|
||||
} while (datanodeInfo != null && !datanodeInfo.isDecommissioned());
|
||||
|
||||
// check the replica status after decommission is done
|
||||
fsckOut = runFsck(conf, 0, true, testFile, "-files", "-blocks", "-replicaDetails");
|
||||
assertTrue(fsckOut.contains("(DECOMMISSIONED)"));
|
||||
} finally {
|
||||
if (cluster != null) {
|
||||
cluster.shutdown();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/** Test if fsck can return -1 in case of failure
|
||||
*
|
||||
|
Loading…
Reference in New Issue
Block a user