HDFS-5214. Fix NPEs in BlockManager and DirectoryScanner.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-2832@1536179 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
9043a92219
commit
b9d561c548
@ -49,3 +49,5 @@ IMPROVEMENTS:
|
||||
HDFS-5401. Fix NPE in Directory Scanner. (Arpit Agarwal)
|
||||
|
||||
HDFS-5417. Fix storage IDs in PBHelper and UpgradeUtilities. (szetszwo)
|
||||
|
||||
HDFS-5214. Fix NPEs in BlockManager and DirectoryScanner. (Arpit Agarwal)
|
||||
|
@ -1833,7 +1833,10 @@ private void reportDiff(DatanodeDescriptor dn, DatanodeStorage storage,
|
||||
ReplicaState iState = itBR.getCurrentReplicaState();
|
||||
BlockInfo storedBlock = processReportedBlock(dn, storage.getStorageID(),
|
||||
iblk, iState, toAdd, toInvalidate, toCorrupt, toUC);
|
||||
toRemove.remove(storedBlock);
|
||||
|
||||
if (storedBlock != null) {
|
||||
toRemove.remove(storedBlock);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -187,7 +187,7 @@ public LinkedElement getNext() {
|
||||
+ hours + " hours for block pool " + bpid);
|
||||
|
||||
// get the list of blocks and arrange them in random order
|
||||
List<Block> arr = dataset.getFinalizedBlocks(blockPoolId);
|
||||
List<FinalizedReplica> arr = dataset.getFinalizedBlocks(blockPoolId);
|
||||
Collections.shuffle(arr);
|
||||
|
||||
long scanTime = -1;
|
||||
|
@ -230,10 +230,6 @@ private static String getSuffix(File f, String prefix) {
|
||||
throw new RuntimeException(prefix + " is not a prefix of " + fullPath);
|
||||
}
|
||||
|
||||
ScanInfo(long blockId) {
|
||||
this(blockId, null, null, null);
|
||||
}
|
||||
|
||||
ScanInfo(long blockId, File blockFile, File metaFile, FsVolumeSpi vol) {
|
||||
this.blockId = blockId;
|
||||
String condensedVolPath = vol == null ? null :
|
||||
@ -439,8 +435,8 @@ void scan() {
|
||||
diffs.put(bpid, diffRecord);
|
||||
|
||||
statsRecord.totalBlocks = blockpoolReport.length;
|
||||
List<Block> bl = dataset.getFinalizedBlocks(bpid);
|
||||
Block[] memReport = bl.toArray(new Block[bl.size()]);
|
||||
List<FinalizedReplica> bl = dataset.getFinalizedBlocks(bpid);
|
||||
FinalizedReplica[] memReport = bl.toArray(new FinalizedReplica[bl.size()]);
|
||||
Arrays.sort(memReport); // Sort based on blockId
|
||||
|
||||
int d = 0; // index for blockpoolReport
|
||||
@ -458,7 +454,8 @@ void scan() {
|
||||
}
|
||||
if (info.getBlockId() > memBlock.getBlockId()) {
|
||||
// Block is missing on the disk
|
||||
addDifference(diffRecord, statsRecord, memBlock.getBlockId());
|
||||
addDifference(diffRecord, statsRecord,
|
||||
memBlock.getBlockId(), info.getVolume());
|
||||
m++;
|
||||
continue;
|
||||
}
|
||||
@ -478,7 +475,9 @@ void scan() {
|
||||
m++;
|
||||
}
|
||||
while (m < memReport.length) {
|
||||
addDifference(diffRecord, statsRecord, memReport[m++].getBlockId());
|
||||
FinalizedReplica current = memReport[m++];
|
||||
addDifference(diffRecord, statsRecord,
|
||||
current.getBlockId(), current.getVolume());
|
||||
}
|
||||
while (d < blockpoolReport.length) {
|
||||
statsRecord.missingMemoryBlocks++;
|
||||
@ -502,10 +501,11 @@ private void addDifference(LinkedList<ScanInfo> diffRecord,
|
||||
|
||||
/** Block is not found on the disk */
|
||||
private void addDifference(LinkedList<ScanInfo> diffRecord,
|
||||
Stats statsRecord, long blockId) {
|
||||
Stats statsRecord, long blockId,
|
||||
FsVolumeSpi vol) {
|
||||
statsRecord.missingBlockFile++;
|
||||
statsRecord.missingMetaFile++;
|
||||
diffRecord.add(new ScanInfo(blockId));
|
||||
diffRecord.add(new ScanInfo(blockId, null, null, vol));
|
||||
}
|
||||
|
||||
/** Is the given volume still valid in the dataset? */
|
||||
|
@ -61,6 +61,10 @@ public FinalizedReplica(FinalizedReplica from) {
|
||||
this.unlinked = from.isUnlinked();
|
||||
}
|
||||
|
||||
public FinalizedReplica(ReplicaInfo replicaInfo) {
|
||||
super(replicaInfo);
|
||||
}
|
||||
|
||||
@Override // ReplicaInfo
|
||||
public ReplicaState getState() {
|
||||
return ReplicaState.FINALIZED;
|
||||
|
@ -34,6 +34,7 @@
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataStorage;
|
||||
import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica;
|
||||
import org.apache.hadoop.hdfs.server.datanode.Replica;
|
||||
import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipelineInterface;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetFactory;
|
||||
@ -98,7 +99,7 @@ public StorageReport[] getStorageReports(String bpid)
|
||||
public Map<String, Object> getVolumeInfoMap();
|
||||
|
||||
/** @return a list of finalized blocks for the given block pool. */
|
||||
public List<Block> getFinalizedBlocks(String bpid);
|
||||
public List<FinalizedReplica> getFinalizedBlocks(String bpid);
|
||||
|
||||
/**
|
||||
* Check whether the in-memory block record matches the block on the disk,
|
||||
|
@ -1079,11 +1079,12 @@ public Map<String, BlockListAsLongs> getBlockReports(String bpid) {
|
||||
* Get the list of finalized blocks from in-memory blockmap for a block pool.
|
||||
*/
|
||||
@Override
|
||||
public synchronized List<Block> getFinalizedBlocks(String bpid) {
|
||||
ArrayList<Block> finalized = new ArrayList<Block>(volumeMap.size(bpid));
|
||||
public synchronized List<FinalizedReplica> getFinalizedBlocks(String bpid) {
|
||||
ArrayList<FinalizedReplica> finalized =
|
||||
new ArrayList<FinalizedReplica>(volumeMap.size(bpid));
|
||||
for (ReplicaInfo b : volumeMap.replicas(bpid)) {
|
||||
if(b.getState() == ReplicaState.FINALIZED) {
|
||||
finalized.add(new Block(b));
|
||||
finalized.add(new FinalizedReplica(b));
|
||||
}
|
||||
}
|
||||
return finalized;
|
||||
|
@ -1006,7 +1006,7 @@ public StorageReport[] getStorageReports(String bpid) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Block> getFinalizedBlocks(String bpid) {
|
||||
public List<FinalizedReplica> getFinalizedBlocks(String bpid) {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
|
@ -17,7 +17,9 @@
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.datanode;
|
||||
|
||||
import static org.hamcrest.core.Is.is;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertThat;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.File;
|
||||
@ -25,6 +27,7 @@
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Random;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
@ -89,7 +92,7 @@ public class TestBlockReport {
|
||||
private MiniDFSCluster cluster;
|
||||
private DistributedFileSystem fs;
|
||||
|
||||
Random rand = new Random(RAND_LIMIT);
|
||||
private static Random rand = new Random(RAND_LIMIT);
|
||||
|
||||
private static Configuration conf;
|
||||
|
||||
@ -113,6 +116,57 @@ public void shutDownCluster() throws IOException {
|
||||
cluster.shutdown();
|
||||
}
|
||||
|
||||
private static StorageBlockReport[] getBlockReports(DataNode dn, String bpid) {
|
||||
Map<String, BlockListAsLongs> perVolumeBlockLists =
|
||||
dn.getFSDataset().getBlockReports(bpid);
|
||||
|
||||
// Send block report
|
||||
StorageBlockReport[] reports =
|
||||
new StorageBlockReport[perVolumeBlockLists.size()];
|
||||
|
||||
int i = 0;
|
||||
for(Map.Entry<String, BlockListAsLongs> kvPair : perVolumeBlockLists.entrySet()) {
|
||||
String storageID = kvPair.getKey();
|
||||
long[] blockList = kvPair.getValue().getBlockListAsLongs();
|
||||
|
||||
// Dummy DatanodeStorage object just for sending the block report.
|
||||
DatanodeStorage dnStorage = new DatanodeStorage(storageID);
|
||||
reports[i++] = new StorageBlockReport(dnStorage, blockList);
|
||||
}
|
||||
|
||||
return reports;
|
||||
}
|
||||
|
||||
// Get block reports but modify the GS of one of the blocks.
|
||||
private static StorageBlockReport[] getBlockReportsCorruptSingleBlockGS(
|
||||
DataNode dn, String bpid) {
|
||||
Map<String, BlockListAsLongs> perVolumeBlockLists =
|
||||
dn.getFSDataset().getBlockReports(bpid);
|
||||
|
||||
// Send block report
|
||||
StorageBlockReport[] reports =
|
||||
new StorageBlockReport[perVolumeBlockLists.size()];
|
||||
|
||||
boolean corruptedBlock = false;
|
||||
|
||||
int i = 0;
|
||||
for(Map.Entry<String, BlockListAsLongs> kvPair : perVolumeBlockLists.entrySet()) {
|
||||
String storageID = kvPair.getKey();
|
||||
long[] blockList = kvPair.getValue().getBlockListAsLongs();
|
||||
|
||||
if (!corruptedBlock) {
|
||||
blockList[4] = rand.nextInt(); // Bad GS.
|
||||
corruptedBlock = true;
|
||||
}
|
||||
|
||||
// Dummy DatanodeStorage object just for sending the block report.
|
||||
DatanodeStorage dnStorage = new DatanodeStorage(storageID);
|
||||
reports[i++] = new StorageBlockReport(dnStorage, blockList);
|
||||
}
|
||||
|
||||
return reports;
|
||||
}
|
||||
|
||||
/**
|
||||
* Test write a file, verifies and closes it. Then the length of the blocks
|
||||
* are messed up and BlockReport is forced.
|
||||
@ -153,10 +207,8 @@ public void blockReport_01() throws IOException {
|
||||
DataNode dn = cluster.getDataNodes().get(DN_N0);
|
||||
String poolId = cluster.getNamesystem().getBlockPoolId();
|
||||
DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
|
||||
StorageBlockReport[] report = { new StorageBlockReport(
|
||||
new DatanodeStorage(dnR.getDatanodeUuid()),
|
||||
new BlockListAsLongs(blocks, null).getBlockListAsLongs()) };
|
||||
cluster.getNameNodeRpc().blockReport(dnR, poolId, report);
|
||||
StorageBlockReport[] reports = getBlockReports(dn, poolId);
|
||||
cluster.getNameNodeRpc().blockReport(dnR, poolId, reports);
|
||||
|
||||
List<LocatedBlock> blocksAfterReport =
|
||||
DFSTestUtil.getAllBlocks(fs.open(filePath));
|
||||
@ -211,7 +263,6 @@ public void blockReport_02() throws IOException {
|
||||
for (Integer aRemovedIndex : removedIndex) {
|
||||
blocks2Remove.add(lBlocks.get(aRemovedIndex).getBlock());
|
||||
}
|
||||
ArrayList<Block> blocks = locatedToBlocks(lBlocks, removedIndex);
|
||||
|
||||
if(LOG.isDebugEnabled()) {
|
||||
LOG.debug("Number of blocks allocated " + lBlocks.size());
|
||||
@ -225,8 +276,11 @@ public void blockReport_02() throws IOException {
|
||||
for (File f : findAllFiles(dataDir,
|
||||
new MyFileFilter(b.getBlockName(), true))) {
|
||||
DataNodeTestUtils.getFSDataset(dn0).unfinalizeBlock(b);
|
||||
if (!f.delete())
|
||||
if (!f.delete()) {
|
||||
LOG.warn("Couldn't delete " + b.getBlockName());
|
||||
} else {
|
||||
LOG.debug("Deleted file " + f.toString());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -235,10 +289,8 @@ public void blockReport_02() throws IOException {
|
||||
// all blocks belong to the same file, hence same BP
|
||||
String poolId = cluster.getNamesystem().getBlockPoolId();
|
||||
DatanodeRegistration dnR = dn0.getDNRegistrationForBP(poolId);
|
||||
StorageBlockReport[] report = { new StorageBlockReport(
|
||||
new DatanodeStorage(dnR.getDatanodeUuid()),
|
||||
new BlockListAsLongs(blocks, null).getBlockListAsLongs()) };
|
||||
cluster.getNameNodeRpc().blockReport(dnR, poolId, report);
|
||||
StorageBlockReport[] reports = getBlockReports(dn0, poolId);
|
||||
cluster.getNameNodeRpc().blockReport(dnR, poolId, reports);
|
||||
|
||||
BlockManagerTestUtil.getComputedDatanodeWork(cluster.getNamesystem()
|
||||
.getBlockManager());
|
||||
@ -253,9 +305,8 @@ public void blockReport_02() throws IOException {
|
||||
|
||||
|
||||
/**
|
||||
* Test writes a file and closes it. Then test finds a block
|
||||
* and changes its GS to be < of original one.
|
||||
* New empty block is added to the list of blocks.
|
||||
* Test writes a file and closes it.
|
||||
* Block reported is generated with a bad GS for a single block.
|
||||
* Block report is forced and the check for # of corrupted blocks is performed.
|
||||
*
|
||||
* @throws IOException in case of an error
|
||||
@ -264,41 +315,65 @@ public void blockReport_02() throws IOException {
|
||||
public void blockReport_03() throws IOException {
|
||||
final String METHOD_NAME = GenericTestUtils.getMethodName();
|
||||
Path filePath = new Path("/" + METHOD_NAME + ".dat");
|
||||
|
||||
ArrayList<Block> blocks =
|
||||
prepareForRide(filePath, METHOD_NAME, FILE_SIZE);
|
||||
|
||||
// The block with modified GS won't be found. Has to be deleted
|
||||
blocks.get(0).setGenerationStamp(rand.nextLong());
|
||||
// This new block is unknown to NN and will be mark for deletion.
|
||||
blocks.add(new Block());
|
||||
DFSTestUtil.createFile(fs, filePath,
|
||||
FILE_SIZE, REPL_FACTOR, rand.nextLong());
|
||||
|
||||
// all blocks belong to the same file, hence same BP
|
||||
DataNode dn = cluster.getDataNodes().get(DN_N0);
|
||||
String poolId = cluster.getNamesystem().getBlockPoolId();
|
||||
DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
|
||||
StorageBlockReport[] report = { new StorageBlockReport(
|
||||
new DatanodeStorage(dnR.getDatanodeUuid()),
|
||||
new BlockListAsLongs(blocks, null).getBlockListAsLongs()) };
|
||||
StorageBlockReport[] reports = getBlockReportsCorruptSingleBlockGS(dn, poolId);
|
||||
DatanodeCommand dnCmd =
|
||||
cluster.getNameNodeRpc().blockReport(dnR, poolId, report);
|
||||
cluster.getNameNodeRpc().blockReport(dnR, poolId, reports);
|
||||
if(LOG.isDebugEnabled()) {
|
||||
LOG.debug("Got the command: " + dnCmd);
|
||||
}
|
||||
printStats();
|
||||
|
||||
assertEquals("Wrong number of CorruptedReplica+PendingDeletion " +
|
||||
"blocks is found", 2,
|
||||
cluster.getNamesystem().getCorruptReplicaBlocks() +
|
||||
cluster.getNamesystem().getPendingDeletionBlocks());
|
||||
assertThat("Wrong number of corrupt blocks",
|
||||
cluster.getNamesystem().getCorruptReplicaBlocks(), is(1L));
|
||||
assertThat("Wrong number of PendingDeletion blocks",
|
||||
cluster.getNamesystem().getPendingDeletionBlocks(), is(0L));
|
||||
}
|
||||
|
||||
/**
|
||||
* This test isn't a representative case for BlockReport
|
||||
* The empty method is going to be left here to keep the naming
|
||||
* of the test plan in synch with the actual implementation
|
||||
* Test writes a file and closes it.
|
||||
* Block reported is generated with an extra block.
|
||||
* Block report is forced and the check for # of pendingdeletion
|
||||
* blocks is performed.
|
||||
*
|
||||
* @throws IOException in case of an error
|
||||
*/
|
||||
public void blockReport_04() {
|
||||
@Test
|
||||
public void blockReport_04() throws IOException {
|
||||
final String METHOD_NAME = GenericTestUtils.getMethodName();
|
||||
Path filePath = new Path("/" + METHOD_NAME + ".dat");
|
||||
DFSTestUtil.createFile(fs, filePath,
|
||||
FILE_SIZE, REPL_FACTOR, rand.nextLong());
|
||||
|
||||
|
||||
DataNode dn = cluster.getDataNodes().get(DN_N0);
|
||||
// all blocks belong to the same file, hence same BP
|
||||
String poolId = cluster.getNamesystem().getBlockPoolId();
|
||||
|
||||
// Create a bogus new block which will not be present on the namenode.
|
||||
ExtendedBlock b = new ExtendedBlock(
|
||||
poolId, rand.nextLong(), 1024L, rand.nextLong());
|
||||
dn.getFSDataset().createRbw(b);
|
||||
|
||||
DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
|
||||
StorageBlockReport[] reports = getBlockReports(dn, poolId);
|
||||
DatanodeCommand dnCmd =
|
||||
cluster.getNameNodeRpc().blockReport(dnR, poolId, reports);
|
||||
if(LOG.isDebugEnabled()) {
|
||||
LOG.debug("Got the command: " + dnCmd);
|
||||
}
|
||||
printStats();
|
||||
|
||||
assertThat("Wrong number of corrupt blocks",
|
||||
cluster.getNamesystem().getCorruptReplicaBlocks(), is(0L));
|
||||
assertThat("Wrong number of PendingDeletion blocks",
|
||||
cluster.getNamesystem().getPendingDeletionBlocks(), is(1L));
|
||||
}
|
||||
|
||||
// Client requests new block from NN. The test corrupts this very block
|
||||
@ -331,10 +406,8 @@ public void blockReport_06() throws Exception {
|
||||
DataNode dn = cluster.getDataNodes().get(DN_N1);
|
||||
String poolId = cluster.getNamesystem().getBlockPoolId();
|
||||
DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
|
||||
StorageBlockReport[] report = { new StorageBlockReport(
|
||||
new DatanodeStorage(dnR.getDatanodeUuid()),
|
||||
new BlockListAsLongs(blocks, null).getBlockListAsLongs()) };
|
||||
cluster.getNameNodeRpc().blockReport(dnR, poolId, report);
|
||||
StorageBlockReport[] reports = getBlockReports(dn, poolId);
|
||||
cluster.getNameNodeRpc().blockReport(dnR, poolId, reports);
|
||||
printStats();
|
||||
assertEquals("Wrong number of PendingReplication Blocks",
|
||||
0, cluster.getNamesystem().getUnderReplicatedBlocks());
|
||||
@ -382,9 +455,7 @@ public void blockReport_07() throws Exception {
|
||||
DataNode dn = cluster.getDataNodes().get(DN_N1);
|
||||
String poolId = cluster.getNamesystem().getBlockPoolId();
|
||||
DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
|
||||
StorageBlockReport[] report = { new StorageBlockReport(
|
||||
new DatanodeStorage(dnR.getDatanodeUuid()),
|
||||
new BlockListAsLongs(blocks, null).getBlockListAsLongs()) };
|
||||
StorageBlockReport[] report = getBlockReports(dn, poolId);
|
||||
cluster.getNameNodeRpc().blockReport(dnR, poolId, report);
|
||||
printStats();
|
||||
assertEquals("Wrong number of Corrupted blocks",
|
||||
@ -407,7 +478,7 @@ public void blockReport_07() throws Exception {
|
||||
}
|
||||
|
||||
report[0] = new StorageBlockReport(
|
||||
new DatanodeStorage(dnR.getDatanodeUuid()),
|
||||
report[0].getStorage(),
|
||||
new BlockListAsLongs(blocks, null).getBlockListAsLongs());
|
||||
cluster.getNameNodeRpc().blockReport(dnR, poolId, report);
|
||||
printStats();
|
||||
@ -458,9 +529,7 @@ public void blockReport_08() throws IOException {
|
||||
DataNode dn = cluster.getDataNodes().get(DN_N1);
|
||||
String poolId = cluster.getNamesystem().getBlockPoolId();
|
||||
DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
|
||||
StorageBlockReport[] report = { new StorageBlockReport(
|
||||
new DatanodeStorage(dnR.getDatanodeUuid()),
|
||||
new BlockListAsLongs(blocks, null).getBlockListAsLongs()) };
|
||||
StorageBlockReport[] report = getBlockReports(dn, poolId);
|
||||
cluster.getNameNodeRpc().blockReport(dnR, poolId, report);
|
||||
printStats();
|
||||
assertEquals("Wrong number of PendingReplication blocks",
|
||||
@ -506,9 +575,7 @@ public void blockReport_09() throws IOException {
|
||||
DataNode dn = cluster.getDataNodes().get(DN_N1);
|
||||
String poolId = cluster.getNamesystem().getBlockPoolId();
|
||||
DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
|
||||
StorageBlockReport[] report = { new StorageBlockReport(
|
||||
new DatanodeStorage(dnR.getDatanodeUuid()),
|
||||
new BlockListAsLongs(blocks, null).getBlockListAsLongs()) };
|
||||
StorageBlockReport[] report = getBlockReports(dn, poolId);
|
||||
cluster.getNameNodeRpc().blockReport(dnR, poolId, report);
|
||||
printStats();
|
||||
assertEquals("Wrong number of PendingReplication blocks",
|
||||
|
@ -447,7 +447,7 @@ void testScanInfoObject(long blockId, File blockFile, File metaFile)
|
||||
|
||||
void testScanInfoObject(long blockId) throws Exception {
|
||||
DirectoryScanner.ScanInfo scanInfo =
|
||||
new DirectoryScanner.ScanInfo(blockId);
|
||||
new DirectoryScanner.ScanInfo(blockId, null, null, null);
|
||||
assertEquals(blockId, scanInfo.getBlockId());
|
||||
assertNull(scanInfo.getBlockFile());
|
||||
assertNull(scanInfo.getMetaFile());
|
||||
|
Loading…
Reference in New Issue
Block a user