HDFS-9958. BlockManager#createLocatedBlocks can throw NPE for corruptBlocks on failed storages. Contributed by Kuhu Shukla
This commit is contained in:
parent
cf2ee45f71
commit
6243eabb48
@ -1038,9 +1038,9 @@ private LocatedBlock createLocatedBlock(final BlockInfo blk, final long pos)
|
|||||||
}
|
}
|
||||||
|
|
||||||
final int numNodes = blocksMap.numNodes(blk);
|
final int numNodes = blocksMap.numNodes(blk);
|
||||||
final boolean isCorrupt = numCorruptNodes != 0 &&
|
final boolean isCorrupt = numCorruptReplicas != 0 &&
|
||||||
numCorruptNodes == numNodes;
|
numCorruptReplicas == numNodes;
|
||||||
final int numMachines = isCorrupt ? numNodes: numNodes - numCorruptNodes;
|
final int numMachines = isCorrupt ? numNodes: numNodes - numCorruptReplicas;
|
||||||
final DatanodeStorageInfo[] machines = new DatanodeStorageInfo[numMachines];
|
final DatanodeStorageInfo[] machines = new DatanodeStorageInfo[numMachines];
|
||||||
final byte[] blockIndices = blk.isStriped() ? new byte[numMachines] : null;
|
final byte[] blockIndices = blk.isStriped() ? new byte[numMachines] : null;
|
||||||
int j = 0, i = 0;
|
int j = 0, i = 0;
|
||||||
@ -1366,11 +1366,22 @@ public void findAndMarkBlockAsCorrupt(final ExtendedBlock blk,
|
|||||||
+ " as corrupt because datanode " + dn + " (" + dn.getDatanodeUuid()
|
+ " as corrupt because datanode " + dn + " (" + dn.getDatanodeUuid()
|
||||||
+ ") does not exist");
|
+ ") does not exist");
|
||||||
}
|
}
|
||||||
|
DatanodeStorageInfo storage = null;
|
||||||
|
if (storageID != null) {
|
||||||
|
storage = node.getStorageInfo(storageID);
|
||||||
|
}
|
||||||
|
if (storage == null) {
|
||||||
|
storage = storedBlock.findStorageInfo(node);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (storage == null) {
|
||||||
|
blockLog.debug("BLOCK* findAndMarkBlockAsCorrupt: {} not found on {}",
|
||||||
|
blk, dn);
|
||||||
|
return;
|
||||||
|
}
|
||||||
markBlockAsCorrupt(new BlockToMarkCorrupt(reportedBlock, storedBlock,
|
markBlockAsCorrupt(new BlockToMarkCorrupt(reportedBlock, storedBlock,
|
||||||
blk.getGenerationStamp(), reason, Reason.CORRUPTION_REPORTED),
|
blk.getGenerationStamp(), reason, Reason.CORRUPTION_REPORTED),
|
||||||
storageID == null ? null : node.getStorageInfo(storageID),
|
storage, node);
|
||||||
node);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -18,15 +18,22 @@
|
|||||||
|
|
||||||
package org.apache.hadoop.hdfs;
|
package org.apache.hadoop.hdfs;
|
||||||
|
|
||||||
|
import org.apache.hadoop.fs.StorageType;
|
||||||
|
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertFalse;
|
import static org.junit.Assert.assertFalse;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
import java.io.DataInputStream;
|
import java.io.DataInputStream;
|
||||||
import java.io.DataOutputStream;
|
import java.io.DataOutputStream;
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.IOException;
|
||||||
import java.io.FileOutputStream;
|
import java.io.FileOutputStream;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.HashSet;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.Random;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.ChecksumException;
|
import org.apache.hadoop.fs.ChecksumException;
|
||||||
@ -36,6 +43,8 @@
|
|||||||
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportReplica;
|
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportReplica;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||||
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
|
||||||
|
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
|
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
||||||
@ -168,6 +177,82 @@ public void testArrayOutOfBoundsException() throws Exception {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCorruptionWithDiskFailure() throws Exception {
|
||||||
|
MiniDFSCluster cluster = null;
|
||||||
|
try {
|
||||||
|
Configuration conf = new HdfsConfiguration();
|
||||||
|
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
|
||||||
|
cluster.waitActive();
|
||||||
|
BlockManager bm = cluster.getNamesystem().getBlockManager();
|
||||||
|
FileSystem fs = cluster.getFileSystem();
|
||||||
|
final Path FILE_PATH = new Path("/tmp.txt");
|
||||||
|
final long FILE_LEN = 1L;
|
||||||
|
DFSTestUtil.createFile(fs, FILE_PATH, FILE_LEN, (short) 3, 1L);
|
||||||
|
|
||||||
|
// get the block
|
||||||
|
final String bpid = cluster.getNamesystem().getBlockPoolId();
|
||||||
|
File storageDir = cluster.getInstanceStorageDir(0, 0);
|
||||||
|
File dataDir = MiniDFSCluster.getFinalizedDir(storageDir, bpid);
|
||||||
|
assertTrue("Data directory does not exist", dataDir.exists());
|
||||||
|
ExtendedBlock blk = getFirstBlock(cluster.getDataNodes().get(0), bpid);
|
||||||
|
if (blk == null) {
|
||||||
|
blk = getFirstBlock(cluster.getDataNodes().get(0), bpid);
|
||||||
|
}
|
||||||
|
assertFalse("Data directory does not contain any blocks or there was an" +
|
||||||
|
" " +
|
||||||
|
"IO error", blk == null);
|
||||||
|
ArrayList<DataNode> datanodes = cluster.getDataNodes();
|
||||||
|
assertEquals(datanodes.size(), 3);
|
||||||
|
FSNamesystem ns = cluster.getNamesystem();
|
||||||
|
//fail the storage on that node which has the block
|
||||||
|
try {
|
||||||
|
ns.writeLock();
|
||||||
|
updateAllStorages(bm);
|
||||||
|
} finally {
|
||||||
|
ns.writeUnlock();
|
||||||
|
}
|
||||||
|
ns.writeLock();
|
||||||
|
try {
|
||||||
|
markAllBlocksAsCorrupt(bm, blk);
|
||||||
|
} finally {
|
||||||
|
ns.writeUnlock();
|
||||||
|
}
|
||||||
|
|
||||||
|
// open the file
|
||||||
|
fs.open(FILE_PATH);
|
||||||
|
|
||||||
|
//clean up
|
||||||
|
fs.delete(FILE_PATH, false);
|
||||||
|
} finally {
|
||||||
|
if (cluster != null) { cluster.shutdown(); }
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
private void markAllBlocksAsCorrupt(BlockManager bm,
|
||||||
|
ExtendedBlock blk) throws IOException {
|
||||||
|
for (DatanodeStorageInfo info : bm.getStorages(blk.getLocalBlock())) {
|
||||||
|
bm.findAndMarkBlockAsCorrupt(
|
||||||
|
blk, info.getDatanodeDescriptor(), info.getStorageID(), "STORAGE_ID");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void updateAllStorages(BlockManager bm) {
|
||||||
|
for (DatanodeDescriptor dd : bm.getDatanodeManager().getDatanodes()) {
|
||||||
|
Set<DatanodeStorageInfo> setInfos = new HashSet<DatanodeStorageInfo>();
|
||||||
|
DatanodeStorageInfo[] infos = dd.getStorageInfos();
|
||||||
|
Random random = new Random();
|
||||||
|
for (int i = 0; i < infos.length; i++) {
|
||||||
|
int blkId = random.nextInt(101);
|
||||||
|
DatanodeStorage storage = new DatanodeStorage(Integer.toString(blkId),
|
||||||
|
DatanodeStorage.State.FAILED, StorageType.DISK);
|
||||||
|
infos[i].updateFromStorage(storage);
|
||||||
|
setInfos.add(infos[i]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private static ExtendedBlock getFirstBlock(DataNode dn, String bpid) {
|
private static ExtendedBlock getFirstBlock(DataNode dn, String bpid) {
|
||||||
Map<DatanodeStorage, BlockListAsLongs> blockReports =
|
Map<DatanodeStorage, BlockListAsLongs> blockReports =
|
||||||
dn.getFSDataset().getBlockReports(bpid);
|
dn.getFSDataset().getBlockReports(bpid);
|
||||||
|
Loading…
Reference in New Issue
Block a user