diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES_HDFS-2832.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES_HDFS-2832.txt index f822f9cf33..1391c14f82 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES_HDFS-2832.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES_HDFS-2832.txt @@ -123,3 +123,5 @@ IMPROVEMENTS: HDFS-5484. StorageType and State in DatanodeStorageInfo in NameNode is not accurate. (Eric Sirianni via Arpit Agarwal) + HDFS-5648. Get rid of FsDatasetImpl#perVolumeReplicaMap. (Arpit Agarwal) + diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index 806e9f7040..b81235625e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -38,6 +38,7 @@ import javax.management.NotCompliantMBeanException; import javax.management.ObjectName; import javax.management.StandardMBean; +import com.google.common.base.Preconditions; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; @@ -197,15 +198,8 @@ class FsDatasetImpl implements FsDatasetSpi { final FsDatasetCache cacheManager; private final int validVolsRequired; - // TODO HDFS-2832: Consider removing duplicated block info from these - // two maps and move the perVolumeReplicaMap to FsVolumeImpl. - // This might require some refactoring. final ReplicaMap volumeMap; - // Map from StorageID to ReplicaMap. - final Map perVolumeReplicaMap; - - // Used for synchronizing access to usage stats private final Object statsLock = new Object(); @@ -254,7 +248,6 @@ class FsDatasetImpl implements FsDatasetSpi { LOG.info("Added volume - " + dir + ", StorageType: " + storageType); } volumeMap = new ReplicaMap(this); - perVolumeReplicaMap = new HashMap(); @SuppressWarnings("unchecked") final VolumeChoosingPolicy blockChooserImpl = @@ -263,7 +256,7 @@ class FsDatasetImpl implements FsDatasetSpi { RoundRobinVolumeChoosingPolicy.class, VolumeChoosingPolicy.class), conf); volumes = new FsVolumeList(volArray, volsFailed, blockChooserImpl); - volumes.initializeReplicaMaps(perVolumeReplicaMap, volumeMap, this); + volumes.initializeReplicaMaps(volumeMap); File[] roots = new File[storage.getNumStorageDirs()]; for (int idx = 0; idx < storage.getNumStorageDirs(); idx++) { @@ -661,7 +654,6 @@ class FsDatasetImpl implements FsDatasetSpi { // Replace finalized replica by a RBW replica in replicas map volumeMap.add(bpid, newReplicaInfo); - perVolumeReplicaMap.get(v.getStorageID()).add(bpid, newReplicaInfo); return newReplicaInfo; } @@ -792,7 +784,6 @@ class FsDatasetImpl implements FsDatasetSpi { ReplicaBeingWritten newReplicaInfo = new ReplicaBeingWritten(b.getBlockId(), b.getGenerationStamp(), v, f.getParentFile()); volumeMap.add(b.getBlockPoolId(), newReplicaInfo); - perVolumeReplicaMap.get(v.getStorageID()).add(b.getBlockPoolId(), newReplicaInfo); return newReplicaInfo; } @@ -911,7 +902,6 @@ class FsDatasetImpl implements FsDatasetSpi { rbw.setBytesAcked(visible); // overwrite the RBW in the volume map volumeMap.add(b.getBlockPoolId(), rbw); - perVolumeReplicaMap.get(v.getStorageID()).add(b.getBlockPoolId(), rbw); return rbw; } @@ -931,7 +921,6 @@ class FsDatasetImpl implements FsDatasetSpi { ReplicaInPipeline newReplicaInfo = new ReplicaInPipeline(b.getBlockId(), b.getGenerationStamp(), v, f.getParentFile()); volumeMap.add(b.getBlockPoolId(), newReplicaInfo); - perVolumeReplicaMap.get(v.getStorageID()).add(b.getBlockPoolId(), newReplicaInfo); return newReplicaInfo; } @@ -1000,8 +989,6 @@ class FsDatasetImpl implements FsDatasetSpi { newReplicaInfo = new FinalizedReplica(replicaInfo, v, dest.getParentFile()); } volumeMap.add(bpid, newReplicaInfo); - perVolumeReplicaMap.get(newReplicaInfo.getVolume().getStorageID()) - .add(bpid, newReplicaInfo); return newReplicaInfo; } @@ -1015,8 +1002,6 @@ class FsDatasetImpl implements FsDatasetSpi { if (replicaInfo != null && replicaInfo.getState() == ReplicaState.TEMPORARY) { // remove from volumeMap volumeMap.remove(b.getBlockPoolId(), b.getLocalBlock()); - perVolumeReplicaMap.get(replicaInfo.getVolume().getStorageID()) - .remove(b.getBlockPoolId(), b.getLocalBlock()); // delete the on-disk temp file if (delBlockFromDisk(replicaInfo.getBlockFile(), @@ -1051,39 +1036,6 @@ class FsDatasetImpl implements FsDatasetSpi { return true; } - private BlockListAsLongs getBlockReportWithReplicaMap( - String bpid, ReplicaMap rMap) { - int size = rMap.size(bpid); - ArrayList finalized = new ArrayList(size); - ArrayList uc = new ArrayList(); - if (size == 0) { - return new BlockListAsLongs(finalized, uc); - } - - synchronized(this) { - for (ReplicaInfo b : rMap.replicas(bpid)) { - switch(b.getState()) { - case FINALIZED: - finalized.add(b); - break; - case RBW: - case RWR: - uc.add(b); - break; - case RUR: - ReplicaUnderRecovery rur = (ReplicaUnderRecovery)b; - uc.add(rur.getOriginalReplica()); - break; - case TEMPORARY: - break; - default: - assert false : "Illegal ReplicaInfo state."; - } - } - return new BlockListAsLongs(finalized, uc); - } - } - @Override // FsDatasetSpi public List getCacheReport(String bpid) { return cacheManager.getCachedBlocks(bpid); @@ -1091,16 +1043,49 @@ class FsDatasetImpl implements FsDatasetSpi { @Override public Map getBlockReports(String bpid) { - Map blockReportMap = + Map blockReportsMap = new HashMap(); - for (FsVolumeImpl v : getVolumes()) { - ReplicaMap rMap = perVolumeReplicaMap.get(v.getStorageID()); - BlockListAsLongs blockList = getBlockReportWithReplicaMap(bpid, rMap); - blockReportMap.put(v.toDatanodeStorage(), blockList); + Map> finalized = + new HashMap>(); + Map> uc = + new HashMap>(); + + for (FsVolumeSpi v : volumes.volumes) { + finalized.put(v.getStorageID(), new ArrayList()); + uc.put(v.getStorageID(), new ArrayList()); } - return blockReportMap; + synchronized(this) { + for (ReplicaInfo b : volumeMap.replicas(bpid)) { + switch(b.getState()) { + case FINALIZED: + finalized.get(b.getVolume().getStorageID()).add(b); + break; + case RBW: + case RWR: + uc.get(b.getVolume().getStorageID()).add(b); + break; + case RUR: + ReplicaUnderRecovery rur = (ReplicaUnderRecovery)b; + uc.get(rur.getVolume().getStorageID()).add(rur.getOriginalReplica()); + break; + case TEMPORARY: + break; + default: + assert false : "Illegal ReplicaInfo state."; + } + } + } + + for (FsVolumeImpl v : volumes.volumes) { + ArrayList finalizedList = finalized.get(v.getStorageID()); + ArrayList ucList = uc.get(v.getStorageID()); + blockReportsMap.put(v.toDatanodeStorage(), + new BlockListAsLongs(finalizedList, ucList)); + } + + return blockReportsMap; } /** @@ -1244,7 +1229,6 @@ class FsDatasetImpl implements FsDatasetSpi { v.clearPath(bpid, parent); } volumeMap.remove(bpid, invalidBlks[i]); - perVolumeReplicaMap.get(v.getStorageID()).remove(bpid, invalidBlks[i]); } // If the block is cached, start uncaching it. cacheManager.uncacheBlock(bpid, invalidBlks[i].getBlockId()); @@ -1374,8 +1358,6 @@ class FsDatasetImpl implements FsDatasetSpi { LOG.warn("Removing replica " + bpid + ":" + b.getBlockId() + " on failed volume " + fv.getCurrentDir().getAbsolutePath()); ib.remove(); - perVolumeReplicaMap.get(fv.getStorageID()) - .remove(bpid, b.getBlockId()); removedBlocks++; } } @@ -1492,7 +1474,6 @@ class FsDatasetImpl implements FsDatasetSpi { // Block is in memory and not on the disk // Remove the block from volumeMap volumeMap.remove(bpid, blockId); - perVolumeReplicaMap.get(vol.getStorageID()).remove(bpid, blockId); final DataBlockScanner blockScanner = datanode.getBlockScanner(); if (blockScanner != null) { blockScanner.deleteBlock(bpid, new Block(blockId)); @@ -1516,8 +1497,6 @@ class FsDatasetImpl implements FsDatasetSpi { ReplicaInfo diskBlockInfo = new FinalizedReplica(blockId, diskFile.length(), diskGS, vol, diskFile.getParentFile()); volumeMap.add(bpid, diskBlockInfo); - perVolumeReplicaMap.get(vol.getStorageID()) - .remove(bpid, diskBlockInfo); final DataBlockScanner blockScanner = datanode.getBlockScanner(); if (blockScanner != null) { blockScanner.addBlock(new ExtendedBlock(bpid, diskBlockInfo)); @@ -1792,13 +1771,6 @@ class FsDatasetImpl implements FsDatasetSpi { volumes.addBlockPool(bpid, conf); volumeMap.initBlockPool(bpid); volumes.getAllVolumesMap(bpid, volumeMap); - - // TODO: Avoid the double scan. - for (FsVolumeImpl v : getVolumes()) { - ReplicaMap rMap = perVolumeReplicaMap.get(v.getStorageID()); - rMap.initBlockPool(bpid); - volumes.getVolumeMap(bpid, v, rMap); - } } @Override diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java index 708ffaf06e..89830fcf24 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java @@ -90,14 +90,9 @@ class FsVolumeList { return remaining; } - void initializeReplicaMaps(Map perVolumeReplicaMap, - ReplicaMap globalReplicaMap, - Object mutex) throws IOException { + void initializeReplicaMaps(ReplicaMap globalReplicaMap) throws IOException { for (FsVolumeImpl v : volumes) { - ReplicaMap rMap = new ReplicaMap(mutex); - v.getVolumeMap(rMap); - perVolumeReplicaMap.put(v.getStorageID(), rMap); - globalReplicaMap.addAll(rMap); + v.getVolumeMap(globalReplicaMap); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeStorage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeStorage.java index da80f21e98..271f71091d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeStorage.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeStorage.java @@ -71,4 +71,23 @@ public class DatanodeStorage { public static String generateUuid() { return "DS-" + UUID.randomUUID(); } + + @Override + public boolean equals(Object other){ + if (other == this) { + return true; + } + + if ((other == null) || + !(other instanceof DatanodeStorage)) { + return false; + } + DatanodeStorage otherStorage = (DatanodeStorage) other; + return otherStorage.getStorageID().compareTo(getStorageID()) == 0; + } + + @Override + public int hashCode() { + return getStorageID().hashCode(); + } }