HDFS-5401. Fix NPE in Directory Scanner.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-2832@1535158 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
b442fe92fb
commit
f39f8c5734
@ -45,3 +45,5 @@ IMPROVEMENTS:
|
||||
|
||||
HDFS-5390. Send one incremental block report per storage directory.
|
||||
(Arpit Agarwal)
|
||||
|
||||
HDFS-5401. Fix NPE in Directory Scanner. (Arpit Agarwal)
|
||||
|
@ -27,6 +27,7 @@
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
|
||||
import org.apache.hadoop.hdfs.StorageType;
|
||||
import org.apache.hadoop.hdfs.protocol.Block;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||
@ -180,10 +181,11 @@ public String toString() {
|
||||
}
|
||||
}
|
||||
|
||||
void reportBadBlocks(ExtendedBlock block) {
|
||||
void reportBadBlocks(ExtendedBlock block,
|
||||
String storageUuid, StorageType storageType) {
|
||||
checkBlock(block);
|
||||
for (BPServiceActor actor : bpServices) {
|
||||
actor.reportBadBlocks(block);
|
||||
actor.reportBadBlocks(block, storageUuid, storageType);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -28,6 +28,7 @@
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hdfs.DFSUtil;
|
||||
import org.apache.hadoop.hdfs.StorageType;
|
||||
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||
@ -237,12 +238,18 @@ void scheduleBlockReport(long delay) {
|
||||
resetBlockReportTime = true; // reset future BRs for randomness
|
||||
}
|
||||
|
||||
void reportBadBlocks(ExtendedBlock block) {
|
||||
void reportBadBlocks(ExtendedBlock block,
|
||||
String storageUuid, StorageType storageType) {
|
||||
if (bpRegistration == null) {
|
||||
return;
|
||||
}
|
||||
DatanodeInfo[] dnArr = { new DatanodeInfo(bpRegistration) };
|
||||
LocatedBlock[] blocks = { new LocatedBlock(block, dnArr) };
|
||||
String[] uuids = { storageUuid };
|
||||
StorageType[] types = { storageType };
|
||||
// TODO: Corrupt flag is set to false for compatibility. We can probably
|
||||
// set it to true here.
|
||||
LocatedBlock[] blocks = {
|
||||
new LocatedBlock(block, dnArr, uuids, types, -1, false) };
|
||||
|
||||
try {
|
||||
bpNamenode.reportBadBlocks(blocks);
|
||||
|
@ -559,7 +559,9 @@ public void notifyNamenodeDeletedBlock(ExtendedBlock block, String storageUuid)
|
||||
*/
|
||||
public void reportBadBlocks(ExtendedBlock block) throws IOException{
|
||||
BPOfferService bpos = getBPOSForBlock(block);
|
||||
bpos.reportBadBlocks(block);
|
||||
FsVolumeSpi volume = getFSDataset().getVolume(block);
|
||||
bpos.reportBadBlocks(
|
||||
block, volume.getStorageID(), volume.getStorageType());
|
||||
}
|
||||
|
||||
/**
|
||||
@ -1265,8 +1267,10 @@ private void transferBlock(ExtendedBlock block, DatanodeInfo xferTargets[])
|
||||
// Check if NN recorded length matches on-disk length
|
||||
long onDiskLength = data.getLength(block);
|
||||
if (block.getNumBytes() > onDiskLength) {
|
||||
FsVolumeSpi volume = getFSDataset().getVolume(block);
|
||||
// Shorter on-disk len indicates corruption so report NN the corrupt block
|
||||
bpos.reportBadBlocks(block);
|
||||
bpos.reportBadBlocks(
|
||||
block, volume.getStorageID(), volume.getStorageType());
|
||||
LOG.warn("Can't replicate block " + block
|
||||
+ " because on-disk length " + onDiskLength
|
||||
+ " is shorter than NameNode recorded length " + block.getNumBytes());
|
||||
|
@ -198,7 +198,9 @@ public LengthInputStream getMetaDataInputStream(ExtendedBlock b)
|
||||
// two maps. This might require some refactoring
|
||||
// rewrite of FsDatasetImpl.
|
||||
final ReplicaMap volumeMap;
|
||||
final Map<FsVolumeImpl, ReplicaMap> perVolumeReplicaMap;
|
||||
|
||||
// Map from StorageID to ReplicaMap.
|
||||
final Map<String, ReplicaMap> perVolumeReplicaMap;
|
||||
|
||||
|
||||
// Used for synchronizing access to usage stats
|
||||
@ -249,7 +251,7 @@ public LengthInputStream getMetaDataInputStream(ExtendedBlock b)
|
||||
LOG.info("Added volume - " + dir + ", StorageType: " + storageType);
|
||||
}
|
||||
volumeMap = new ReplicaMap(this);
|
||||
perVolumeReplicaMap = new HashMap<FsVolumeImpl, ReplicaMap>();
|
||||
perVolumeReplicaMap = new HashMap<String, ReplicaMap>();
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
final VolumeChoosingPolicy<FsVolumeImpl> blockChooserImpl =
|
||||
@ -628,7 +630,7 @@ private synchronized ReplicaBeingWritten append(String bpid,
|
||||
|
||||
// Replace finalized replica by a RBW replica in replicas map
|
||||
volumeMap.add(bpid, newReplicaInfo);
|
||||
perVolumeReplicaMap.get(v).add(bpid, newReplicaInfo);
|
||||
perVolumeReplicaMap.get(v.getStorageID()).add(bpid, newReplicaInfo);
|
||||
|
||||
return newReplicaInfo;
|
||||
}
|
||||
@ -759,7 +761,7 @@ public synchronized ReplicaInPipeline createRbw(ExtendedBlock b)
|
||||
ReplicaBeingWritten newReplicaInfo = new ReplicaBeingWritten(b.getBlockId(),
|
||||
b.getGenerationStamp(), v, f.getParentFile());
|
||||
volumeMap.add(b.getBlockPoolId(), newReplicaInfo);
|
||||
perVolumeReplicaMap.get(v).add(b.getBlockPoolId(), newReplicaInfo);
|
||||
perVolumeReplicaMap.get(v.getStorageID()).add(b.getBlockPoolId(), newReplicaInfo);
|
||||
return newReplicaInfo;
|
||||
}
|
||||
|
||||
@ -878,7 +880,7 @@ public synchronized ReplicaInPipeline convertTemporaryToRbw(
|
||||
rbw.setBytesAcked(visible);
|
||||
// overwrite the RBW in the volume map
|
||||
volumeMap.add(b.getBlockPoolId(), rbw);
|
||||
perVolumeReplicaMap.get(v).add(b.getBlockPoolId(), rbw);
|
||||
perVolumeReplicaMap.get(v.getStorageID()).add(b.getBlockPoolId(), rbw);
|
||||
return rbw;
|
||||
}
|
||||
|
||||
@ -898,7 +900,7 @@ public synchronized ReplicaInPipeline createTemporary(ExtendedBlock b)
|
||||
ReplicaInPipeline newReplicaInfo = new ReplicaInPipeline(b.getBlockId(),
|
||||
b.getGenerationStamp(), v, f.getParentFile());
|
||||
volumeMap.add(b.getBlockPoolId(), newReplicaInfo);
|
||||
perVolumeReplicaMap.get(v).add(b.getBlockPoolId(), newReplicaInfo);
|
||||
perVolumeReplicaMap.get(v.getStorageID()).add(b.getBlockPoolId(), newReplicaInfo);
|
||||
|
||||
return newReplicaInfo;
|
||||
}
|
||||
@ -967,7 +969,8 @@ private synchronized FinalizedReplica finalizeReplica(String bpid,
|
||||
newReplicaInfo = new FinalizedReplica(replicaInfo, v, dest.getParentFile());
|
||||
}
|
||||
volumeMap.add(bpid, newReplicaInfo);
|
||||
perVolumeReplicaMap.get(newReplicaInfo.getVolume()).add(bpid, newReplicaInfo);
|
||||
perVolumeReplicaMap.get(newReplicaInfo.getVolume().getStorageID())
|
||||
.add(bpid, newReplicaInfo);
|
||||
return newReplicaInfo;
|
||||
}
|
||||
|
||||
@ -981,7 +984,7 @@ public synchronized void unfinalizeBlock(ExtendedBlock b) throws IOException {
|
||||
if (replicaInfo != null && replicaInfo.getState() == ReplicaState.TEMPORARY) {
|
||||
// remove from volumeMap
|
||||
volumeMap.remove(b.getBlockPoolId(), b.getLocalBlock());
|
||||
perVolumeReplicaMap.get((FsVolumeImpl) replicaInfo.getVolume())
|
||||
perVolumeReplicaMap.get(replicaInfo.getVolume().getStorageID())
|
||||
.remove(b.getBlockPoolId(), b.getLocalBlock());
|
||||
|
||||
// delete the on-disk temp file
|
||||
@ -1064,7 +1067,7 @@ public Map<String, BlockListAsLongs> getBlockReports(String bpid) {
|
||||
new HashMap<String, BlockListAsLongs>();
|
||||
|
||||
for (FsVolumeImpl v : getVolumes()) {
|
||||
ReplicaMap rMap = perVolumeReplicaMap.get(v);
|
||||
ReplicaMap rMap = perVolumeReplicaMap.get(v.getStorageID());
|
||||
BlockListAsLongs blockList = getBlockReportWithReplicaMap(bpid, rMap);
|
||||
blockReportMap.put(v.getStorageID(), blockList);
|
||||
}
|
||||
@ -1212,7 +1215,7 @@ public void invalidate(String bpid, Block invalidBlks[]) throws IOException {
|
||||
v.clearPath(bpid, parent);
|
||||
}
|
||||
volumeMap.remove(bpid, invalidBlks[i]);
|
||||
perVolumeReplicaMap.get(v).remove(bpid, invalidBlks[i]);
|
||||
perVolumeReplicaMap.get(v.getStorageID()).remove(bpid, invalidBlks[i]);
|
||||
}
|
||||
|
||||
// Delete the block asynchronously to make sure we can do it fast enough
|
||||
@ -1274,7 +1277,8 @@ public void checkDataDir() throws DiskErrorException {
|
||||
LOG.warn("Removing replica " + bpid + ":" + b.getBlockId()
|
||||
+ " on failed volume " + fv.getCurrentDir().getAbsolutePath());
|
||||
ib.remove();
|
||||
perVolumeReplicaMap.get(fv).remove(bpid, b.getBlockId());
|
||||
perVolumeReplicaMap.get(fv.getStorageID())
|
||||
.remove(bpid, b.getBlockId());
|
||||
removedBlocks++;
|
||||
}
|
||||
}
|
||||
@ -1391,8 +1395,7 @@ public void checkAndUpdate(String bpid, long blockId, File diskFile,
|
||||
// Block is in memory and not on the disk
|
||||
// Remove the block from volumeMap
|
||||
volumeMap.remove(bpid, blockId);
|
||||
perVolumeReplicaMap.get((FsVolumeImpl) memBlockInfo.getVolume())
|
||||
.remove(bpid, blockId);
|
||||
perVolumeReplicaMap.get(vol.getStorageID()).remove(bpid, blockId);
|
||||
final DataBlockScanner blockScanner = datanode.getBlockScanner();
|
||||
if (blockScanner != null) {
|
||||
blockScanner.deleteBlock(bpid, new Block(blockId));
|
||||
@ -1416,8 +1419,8 @@ public void checkAndUpdate(String bpid, long blockId, File diskFile,
|
||||
ReplicaInfo diskBlockInfo = new FinalizedReplica(blockId,
|
||||
diskFile.length(), diskGS, vol, diskFile.getParentFile());
|
||||
volumeMap.add(bpid, diskBlockInfo);
|
||||
perVolumeReplicaMap.get((FsVolumeImpl) memBlockInfo.getVolume()).
|
||||
remove(bpid, diskBlockInfo);
|
||||
perVolumeReplicaMap.get(vol.getStorageID())
|
||||
.remove(bpid, diskBlockInfo);
|
||||
final DataBlockScanner blockScanner = datanode.getBlockScanner();
|
||||
if (blockScanner != null) {
|
||||
blockScanner.addBlock(new ExtendedBlock(bpid, diskBlockInfo));
|
||||
@ -1695,7 +1698,7 @@ public synchronized void addBlockPool(String bpid, Configuration conf)
|
||||
|
||||
// TODO: Avoid the double scan.
|
||||
for (FsVolumeImpl v : getVolumes()) {
|
||||
ReplicaMap rMap = perVolumeReplicaMap.get(v);
|
||||
ReplicaMap rMap = perVolumeReplicaMap.get(v.getStorageID());
|
||||
rMap.initBlockPool(bpid);
|
||||
volumes.getVolumeMap(bpid, v, rMap);
|
||||
}
|
||||
|
@ -90,13 +90,13 @@ long getRemaining() throws IOException {
|
||||
return remaining;
|
||||
}
|
||||
|
||||
void initializeReplicaMaps(Map<FsVolumeImpl, ReplicaMap> perVolumeReplicaMap,
|
||||
void initializeReplicaMaps(Map<String, ReplicaMap> perVolumeReplicaMap,
|
||||
ReplicaMap globalReplicaMap,
|
||||
Object mutex) throws IOException {
|
||||
for (FsVolumeImpl v : volumes) {
|
||||
ReplicaMap rMap = new ReplicaMap(mutex);
|
||||
v.getVolumeMap(rMap);
|
||||
perVolumeReplicaMap.put(v, rMap);
|
||||
perVolumeReplicaMap.put(v.getStorageID(), rMap);
|
||||
globalReplicaMap.addAll(rMap);
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user