diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES_HDFS-2832.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES_HDFS-2832.txt index 80b8ba800d..cd139d4845 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES_HDFS-2832.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES_HDFS-2832.txt @@ -45,3 +45,5 @@ IMPROVEMENTS: HDFS-5390. Send one incremental block report per storage directory. (Arpit Agarwal) + + HDFS-5401. Fix NPE in Directory Scanner. (Arpit Agarwal) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java index c9486e9e4b..5d584616df 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java @@ -27,6 +27,7 @@ import org.apache.commons.logging.Log; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; +import org.apache.hadoop.hdfs.StorageType; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; @@ -180,10 +181,11 @@ public String toString() { } } - void reportBadBlocks(ExtendedBlock block) { + void reportBadBlocks(ExtendedBlock block, + String storageUuid, StorageType storageType) { checkBlock(block); for (BPServiceActor actor : bpServices) { - actor.reportBadBlocks(block); + actor.reportBadBlocks(block, storageUuid, storageType); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java index 4eb843d247..172fb0fc30 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java @@ -28,6 +28,7 @@ import org.apache.commons.logging.Log; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hdfs.DFSUtil; +import org.apache.hadoop.hdfs.StorageType; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; @@ -237,12 +238,18 @@ void scheduleBlockReport(long delay) { resetBlockReportTime = true; // reset future BRs for randomness } - void reportBadBlocks(ExtendedBlock block) { + void reportBadBlocks(ExtendedBlock block, + String storageUuid, StorageType storageType) { if (bpRegistration == null) { return; } DatanodeInfo[] dnArr = { new DatanodeInfo(bpRegistration) }; - LocatedBlock[] blocks = { new LocatedBlock(block, dnArr) }; + String[] uuids = { storageUuid }; + StorageType[] types = { storageType }; + // TODO: Corrupt flag is set to false for compatibility. We can probably + // set it to true here. + LocatedBlock[] blocks = { + new LocatedBlock(block, dnArr, uuids, types, -1, false) }; try { bpNamenode.reportBadBlocks(blocks); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index 526c89e7a4..318d2f3705 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -559,7 +559,9 @@ public void notifyNamenodeDeletedBlock(ExtendedBlock block, String storageUuid) */ public void reportBadBlocks(ExtendedBlock block) throws IOException{ BPOfferService bpos = getBPOSForBlock(block); - bpos.reportBadBlocks(block); + FsVolumeSpi volume = getFSDataset().getVolume(block); + bpos.reportBadBlocks( + block, volume.getStorageID(), volume.getStorageType()); } /** @@ -1265,8 +1267,10 @@ private void transferBlock(ExtendedBlock block, DatanodeInfo xferTargets[]) // Check if NN recorded length matches on-disk length long onDiskLength = data.getLength(block); if (block.getNumBytes() > onDiskLength) { + FsVolumeSpi volume = getFSDataset().getVolume(block); // Shorter on-disk len indicates corruption so report NN the corrupt block - bpos.reportBadBlocks(block); + bpos.reportBadBlocks( + block, volume.getStorageID(), volume.getStorageType()); LOG.warn("Can't replicate block " + block + " because on-disk length " + onDiskLength + " is shorter than NameNode recorded length " + block.getNumBytes()); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index f5928e47cd..9077c40a83 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -198,7 +198,9 @@ public LengthInputStream getMetaDataInputStream(ExtendedBlock b) // two maps. This might require some refactoring // rewrite of FsDatasetImpl. final ReplicaMap volumeMap; - final Map perVolumeReplicaMap; + + // Map from StorageID to ReplicaMap. + final Map perVolumeReplicaMap; // Used for synchronizing access to usage stats @@ -249,7 +251,7 @@ public LengthInputStream getMetaDataInputStream(ExtendedBlock b) LOG.info("Added volume - " + dir + ", StorageType: " + storageType); } volumeMap = new ReplicaMap(this); - perVolumeReplicaMap = new HashMap(); + perVolumeReplicaMap = new HashMap(); @SuppressWarnings("unchecked") final VolumeChoosingPolicy blockChooserImpl = @@ -628,7 +630,7 @@ private synchronized ReplicaBeingWritten append(String bpid, // Replace finalized replica by a RBW replica in replicas map volumeMap.add(bpid, newReplicaInfo); - perVolumeReplicaMap.get(v).add(bpid, newReplicaInfo); + perVolumeReplicaMap.get(v.getStorageID()).add(bpid, newReplicaInfo); return newReplicaInfo; } @@ -759,7 +761,7 @@ public synchronized ReplicaInPipeline createRbw(ExtendedBlock b) ReplicaBeingWritten newReplicaInfo = new ReplicaBeingWritten(b.getBlockId(), b.getGenerationStamp(), v, f.getParentFile()); volumeMap.add(b.getBlockPoolId(), newReplicaInfo); - perVolumeReplicaMap.get(v).add(b.getBlockPoolId(), newReplicaInfo); + perVolumeReplicaMap.get(v.getStorageID()).add(b.getBlockPoolId(), newReplicaInfo); return newReplicaInfo; } @@ -878,7 +880,7 @@ public synchronized ReplicaInPipeline convertTemporaryToRbw( rbw.setBytesAcked(visible); // overwrite the RBW in the volume map volumeMap.add(b.getBlockPoolId(), rbw); - perVolumeReplicaMap.get(v).add(b.getBlockPoolId(), rbw); + perVolumeReplicaMap.get(v.getStorageID()).add(b.getBlockPoolId(), rbw); return rbw; } @@ -898,7 +900,7 @@ public synchronized ReplicaInPipeline createTemporary(ExtendedBlock b) ReplicaInPipeline newReplicaInfo = new ReplicaInPipeline(b.getBlockId(), b.getGenerationStamp(), v, f.getParentFile()); volumeMap.add(b.getBlockPoolId(), newReplicaInfo); - perVolumeReplicaMap.get(v).add(b.getBlockPoolId(), newReplicaInfo); + perVolumeReplicaMap.get(v.getStorageID()).add(b.getBlockPoolId(), newReplicaInfo); return newReplicaInfo; } @@ -967,7 +969,8 @@ private synchronized FinalizedReplica finalizeReplica(String bpid, newReplicaInfo = new FinalizedReplica(replicaInfo, v, dest.getParentFile()); } volumeMap.add(bpid, newReplicaInfo); - perVolumeReplicaMap.get(newReplicaInfo.getVolume()).add(bpid, newReplicaInfo); + perVolumeReplicaMap.get(newReplicaInfo.getVolume().getStorageID()) + .add(bpid, newReplicaInfo); return newReplicaInfo; } @@ -981,7 +984,7 @@ public synchronized void unfinalizeBlock(ExtendedBlock b) throws IOException { if (replicaInfo != null && replicaInfo.getState() == ReplicaState.TEMPORARY) { // remove from volumeMap volumeMap.remove(b.getBlockPoolId(), b.getLocalBlock()); - perVolumeReplicaMap.get((FsVolumeImpl) replicaInfo.getVolume()) + perVolumeReplicaMap.get(replicaInfo.getVolume().getStorageID()) .remove(b.getBlockPoolId(), b.getLocalBlock()); // delete the on-disk temp file @@ -1064,7 +1067,7 @@ public Map getBlockReports(String bpid) { new HashMap(); for (FsVolumeImpl v : getVolumes()) { - ReplicaMap rMap = perVolumeReplicaMap.get(v); + ReplicaMap rMap = perVolumeReplicaMap.get(v.getStorageID()); BlockListAsLongs blockList = getBlockReportWithReplicaMap(bpid, rMap); blockReportMap.put(v.getStorageID(), blockList); } @@ -1212,7 +1215,7 @@ public void invalidate(String bpid, Block invalidBlks[]) throws IOException { v.clearPath(bpid, parent); } volumeMap.remove(bpid, invalidBlks[i]); - perVolumeReplicaMap.get(v).remove(bpid, invalidBlks[i]); + perVolumeReplicaMap.get(v.getStorageID()).remove(bpid, invalidBlks[i]); } // Delete the block asynchronously to make sure we can do it fast enough @@ -1274,7 +1277,8 @@ public void checkDataDir() throws DiskErrorException { LOG.warn("Removing replica " + bpid + ":" + b.getBlockId() + " on failed volume " + fv.getCurrentDir().getAbsolutePath()); ib.remove(); - perVolumeReplicaMap.get(fv).remove(bpid, b.getBlockId()); + perVolumeReplicaMap.get(fv.getStorageID()) + .remove(bpid, b.getBlockId()); removedBlocks++; } } @@ -1391,8 +1395,7 @@ public void checkAndUpdate(String bpid, long blockId, File diskFile, // Block is in memory and not on the disk // Remove the block from volumeMap volumeMap.remove(bpid, blockId); - perVolumeReplicaMap.get((FsVolumeImpl) memBlockInfo.getVolume()) - .remove(bpid, blockId); + perVolumeReplicaMap.get(vol.getStorageID()).remove(bpid, blockId); final DataBlockScanner blockScanner = datanode.getBlockScanner(); if (blockScanner != null) { blockScanner.deleteBlock(bpid, new Block(blockId)); @@ -1416,8 +1419,8 @@ public void checkAndUpdate(String bpid, long blockId, File diskFile, ReplicaInfo diskBlockInfo = new FinalizedReplica(blockId, diskFile.length(), diskGS, vol, diskFile.getParentFile()); volumeMap.add(bpid, diskBlockInfo); - perVolumeReplicaMap.get((FsVolumeImpl) memBlockInfo.getVolume()). - remove(bpid, diskBlockInfo); + perVolumeReplicaMap.get(vol.getStorageID()) + .remove(bpid, diskBlockInfo); final DataBlockScanner blockScanner = datanode.getBlockScanner(); if (blockScanner != null) { blockScanner.addBlock(new ExtendedBlock(bpid, diskBlockInfo)); @@ -1695,7 +1698,7 @@ public synchronized void addBlockPool(String bpid, Configuration conf) // TODO: Avoid the double scan. for (FsVolumeImpl v : getVolumes()) { - ReplicaMap rMap = perVolumeReplicaMap.get(v); + ReplicaMap rMap = perVolumeReplicaMap.get(v.getStorageID()); rMap.initBlockPool(bpid); volumes.getVolumeMap(bpid, v, rMap); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java index a0e65db364..671996718b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java @@ -90,13 +90,13 @@ long getRemaining() throws IOException { return remaining; } - void initializeReplicaMaps(Map perVolumeReplicaMap, + void initializeReplicaMaps(Map perVolumeReplicaMap, ReplicaMap globalReplicaMap, Object mutex) throws IOException { for (FsVolumeImpl v : volumes) { ReplicaMap rMap = new ReplicaMap(mutex); v.getVolumeMap(rMap); - perVolumeReplicaMap.put(v, rMap); + perVolumeReplicaMap.put(v.getStorageID(), rMap); globalReplicaMap.addAll(rMap); } }