HDFS-5648. Get rid of FsDatasetImpl#perVolumeReplicaMap.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-2832@1550357 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
c656562556
commit
fba994ffe2
@ -123,3 +123,5 @@ IMPROVEMENTS:
|
||||
HDFS-5484. StorageType and State in DatanodeStorageInfo in NameNode is
|
||||
not accurate. (Eric Sirianni via Arpit Agarwal)
|
||||
|
||||
HDFS-5648. Get rid of FsDatasetImpl#perVolumeReplicaMap. (Arpit Agarwal)
|
||||
|
||||
|
@ -38,6 +38,7 @@ import javax.management.NotCompliantMBeanException;
|
||||
import javax.management.ObjectName;
|
||||
import javax.management.StandardMBean;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
@ -197,15 +198,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
||||
final FsDatasetCache cacheManager;
|
||||
private final int validVolsRequired;
|
||||
|
||||
// TODO HDFS-2832: Consider removing duplicated block info from these
|
||||
// two maps and move the perVolumeReplicaMap to FsVolumeImpl.
|
||||
// This might require some refactoring.
|
||||
final ReplicaMap volumeMap;
|
||||
|
||||
// Map from StorageID to ReplicaMap.
|
||||
final Map<String, ReplicaMap> perVolumeReplicaMap;
|
||||
|
||||
|
||||
// Used for synchronizing access to usage stats
|
||||
private final Object statsLock = new Object();
|
||||
|
||||
@ -254,7 +248,6 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
||||
LOG.info("Added volume - " + dir + ", StorageType: " + storageType);
|
||||
}
|
||||
volumeMap = new ReplicaMap(this);
|
||||
perVolumeReplicaMap = new HashMap<String, ReplicaMap>();
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
final VolumeChoosingPolicy<FsVolumeImpl> blockChooserImpl =
|
||||
@ -263,7 +256,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
||||
RoundRobinVolumeChoosingPolicy.class,
|
||||
VolumeChoosingPolicy.class), conf);
|
||||
volumes = new FsVolumeList(volArray, volsFailed, blockChooserImpl);
|
||||
volumes.initializeReplicaMaps(perVolumeReplicaMap, volumeMap, this);
|
||||
volumes.initializeReplicaMaps(volumeMap);
|
||||
|
||||
File[] roots = new File[storage.getNumStorageDirs()];
|
||||
for (int idx = 0; idx < storage.getNumStorageDirs(); idx++) {
|
||||
@ -661,7 +654,6 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
||||
|
||||
// Replace finalized replica by a RBW replica in replicas map
|
||||
volumeMap.add(bpid, newReplicaInfo);
|
||||
perVolumeReplicaMap.get(v.getStorageID()).add(bpid, newReplicaInfo);
|
||||
|
||||
return newReplicaInfo;
|
||||
}
|
||||
@ -792,7 +784,6 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
||||
ReplicaBeingWritten newReplicaInfo = new ReplicaBeingWritten(b.getBlockId(),
|
||||
b.getGenerationStamp(), v, f.getParentFile());
|
||||
volumeMap.add(b.getBlockPoolId(), newReplicaInfo);
|
||||
perVolumeReplicaMap.get(v.getStorageID()).add(b.getBlockPoolId(), newReplicaInfo);
|
||||
return newReplicaInfo;
|
||||
}
|
||||
|
||||
@ -911,7 +902,6 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
||||
rbw.setBytesAcked(visible);
|
||||
// overwrite the RBW in the volume map
|
||||
volumeMap.add(b.getBlockPoolId(), rbw);
|
||||
perVolumeReplicaMap.get(v.getStorageID()).add(b.getBlockPoolId(), rbw);
|
||||
return rbw;
|
||||
}
|
||||
|
||||
@ -931,7 +921,6 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
||||
ReplicaInPipeline newReplicaInfo = new ReplicaInPipeline(b.getBlockId(),
|
||||
b.getGenerationStamp(), v, f.getParentFile());
|
||||
volumeMap.add(b.getBlockPoolId(), newReplicaInfo);
|
||||
perVolumeReplicaMap.get(v.getStorageID()).add(b.getBlockPoolId(), newReplicaInfo);
|
||||
|
||||
return newReplicaInfo;
|
||||
}
|
||||
@ -1000,8 +989,6 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
||||
newReplicaInfo = new FinalizedReplica(replicaInfo, v, dest.getParentFile());
|
||||
}
|
||||
volumeMap.add(bpid, newReplicaInfo);
|
||||
perVolumeReplicaMap.get(newReplicaInfo.getVolume().getStorageID())
|
||||
.add(bpid, newReplicaInfo);
|
||||
return newReplicaInfo;
|
||||
}
|
||||
|
||||
@ -1015,8 +1002,6 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
||||
if (replicaInfo != null && replicaInfo.getState() == ReplicaState.TEMPORARY) {
|
||||
// remove from volumeMap
|
||||
volumeMap.remove(b.getBlockPoolId(), b.getLocalBlock());
|
||||
perVolumeReplicaMap.get(replicaInfo.getVolume().getStorageID())
|
||||
.remove(b.getBlockPoolId(), b.getLocalBlock());
|
||||
|
||||
// delete the on-disk temp file
|
||||
if (delBlockFromDisk(replicaInfo.getBlockFile(),
|
||||
@ -1051,39 +1036,6 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
||||
return true;
|
||||
}
|
||||
|
||||
private BlockListAsLongs getBlockReportWithReplicaMap(
|
||||
String bpid, ReplicaMap rMap) {
|
||||
int size = rMap.size(bpid);
|
||||
ArrayList<ReplicaInfo> finalized = new ArrayList<ReplicaInfo>(size);
|
||||
ArrayList<ReplicaInfo> uc = new ArrayList<ReplicaInfo>();
|
||||
if (size == 0) {
|
||||
return new BlockListAsLongs(finalized, uc);
|
||||
}
|
||||
|
||||
synchronized(this) {
|
||||
for (ReplicaInfo b : rMap.replicas(bpid)) {
|
||||
switch(b.getState()) {
|
||||
case FINALIZED:
|
||||
finalized.add(b);
|
||||
break;
|
||||
case RBW:
|
||||
case RWR:
|
||||
uc.add(b);
|
||||
break;
|
||||
case RUR:
|
||||
ReplicaUnderRecovery rur = (ReplicaUnderRecovery)b;
|
||||
uc.add(rur.getOriginalReplica());
|
||||
break;
|
||||
case TEMPORARY:
|
||||
break;
|
||||
default:
|
||||
assert false : "Illegal ReplicaInfo state.";
|
||||
}
|
||||
}
|
||||
return new BlockListAsLongs(finalized, uc);
|
||||
}
|
||||
}
|
||||
|
||||
@Override // FsDatasetSpi
|
||||
public List<Long> getCacheReport(String bpid) {
|
||||
return cacheManager.getCachedBlocks(bpid);
|
||||
@ -1091,16 +1043,49 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
||||
|
||||
@Override
|
||||
public Map<DatanodeStorage, BlockListAsLongs> getBlockReports(String bpid) {
|
||||
Map<DatanodeStorage, BlockListAsLongs> blockReportMap =
|
||||
Map<DatanodeStorage, BlockListAsLongs> blockReportsMap =
|
||||
new HashMap<DatanodeStorage, BlockListAsLongs>();
|
||||
|
||||
for (FsVolumeImpl v : getVolumes()) {
|
||||
ReplicaMap rMap = perVolumeReplicaMap.get(v.getStorageID());
|
||||
BlockListAsLongs blockList = getBlockReportWithReplicaMap(bpid, rMap);
|
||||
blockReportMap.put(v.toDatanodeStorage(), blockList);
|
||||
Map<String, ArrayList<ReplicaInfo>> finalized =
|
||||
new HashMap<String, ArrayList<ReplicaInfo>>();
|
||||
Map<String, ArrayList<ReplicaInfo>> uc =
|
||||
new HashMap<String, ArrayList<ReplicaInfo>>();
|
||||
|
||||
for (FsVolumeSpi v : volumes.volumes) {
|
||||
finalized.put(v.getStorageID(), new ArrayList<ReplicaInfo>());
|
||||
uc.put(v.getStorageID(), new ArrayList<ReplicaInfo>());
|
||||
}
|
||||
|
||||
return blockReportMap;
|
||||
synchronized(this) {
|
||||
for (ReplicaInfo b : volumeMap.replicas(bpid)) {
|
||||
switch(b.getState()) {
|
||||
case FINALIZED:
|
||||
finalized.get(b.getVolume().getStorageID()).add(b);
|
||||
break;
|
||||
case RBW:
|
||||
case RWR:
|
||||
uc.get(b.getVolume().getStorageID()).add(b);
|
||||
break;
|
||||
case RUR:
|
||||
ReplicaUnderRecovery rur = (ReplicaUnderRecovery)b;
|
||||
uc.get(rur.getVolume().getStorageID()).add(rur.getOriginalReplica());
|
||||
break;
|
||||
case TEMPORARY:
|
||||
break;
|
||||
default:
|
||||
assert false : "Illegal ReplicaInfo state.";
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for (FsVolumeImpl v : volumes.volumes) {
|
||||
ArrayList<ReplicaInfo> finalizedList = finalized.get(v.getStorageID());
|
||||
ArrayList<ReplicaInfo> ucList = uc.get(v.getStorageID());
|
||||
blockReportsMap.put(v.toDatanodeStorage(),
|
||||
new BlockListAsLongs(finalizedList, ucList));
|
||||
}
|
||||
|
||||
return blockReportsMap;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -1244,7 +1229,6 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
||||
v.clearPath(bpid, parent);
|
||||
}
|
||||
volumeMap.remove(bpid, invalidBlks[i]);
|
||||
perVolumeReplicaMap.get(v.getStorageID()).remove(bpid, invalidBlks[i]);
|
||||
}
|
||||
// If the block is cached, start uncaching it.
|
||||
cacheManager.uncacheBlock(bpid, invalidBlks[i].getBlockId());
|
||||
@ -1374,8 +1358,6 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
||||
LOG.warn("Removing replica " + bpid + ":" + b.getBlockId()
|
||||
+ " on failed volume " + fv.getCurrentDir().getAbsolutePath());
|
||||
ib.remove();
|
||||
perVolumeReplicaMap.get(fv.getStorageID())
|
||||
.remove(bpid, b.getBlockId());
|
||||
removedBlocks++;
|
||||
}
|
||||
}
|
||||
@ -1492,7 +1474,6 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
||||
// Block is in memory and not on the disk
|
||||
// Remove the block from volumeMap
|
||||
volumeMap.remove(bpid, blockId);
|
||||
perVolumeReplicaMap.get(vol.getStorageID()).remove(bpid, blockId);
|
||||
final DataBlockScanner blockScanner = datanode.getBlockScanner();
|
||||
if (blockScanner != null) {
|
||||
blockScanner.deleteBlock(bpid, new Block(blockId));
|
||||
@ -1516,8 +1497,6 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
||||
ReplicaInfo diskBlockInfo = new FinalizedReplica(blockId,
|
||||
diskFile.length(), diskGS, vol, diskFile.getParentFile());
|
||||
volumeMap.add(bpid, diskBlockInfo);
|
||||
perVolumeReplicaMap.get(vol.getStorageID())
|
||||
.remove(bpid, diskBlockInfo);
|
||||
final DataBlockScanner blockScanner = datanode.getBlockScanner();
|
||||
if (blockScanner != null) {
|
||||
blockScanner.addBlock(new ExtendedBlock(bpid, diskBlockInfo));
|
||||
@ -1792,13 +1771,6 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
||||
volumes.addBlockPool(bpid, conf);
|
||||
volumeMap.initBlockPool(bpid);
|
||||
volumes.getAllVolumesMap(bpid, volumeMap);
|
||||
|
||||
// TODO: Avoid the double scan.
|
||||
for (FsVolumeImpl v : getVolumes()) {
|
||||
ReplicaMap rMap = perVolumeReplicaMap.get(v.getStorageID());
|
||||
rMap.initBlockPool(bpid);
|
||||
volumes.getVolumeMap(bpid, v, rMap);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -90,14 +90,9 @@ class FsVolumeList {
|
||||
return remaining;
|
||||
}
|
||||
|
||||
void initializeReplicaMaps(Map<String, ReplicaMap> perVolumeReplicaMap,
|
||||
ReplicaMap globalReplicaMap,
|
||||
Object mutex) throws IOException {
|
||||
void initializeReplicaMaps(ReplicaMap globalReplicaMap) throws IOException {
|
||||
for (FsVolumeImpl v : volumes) {
|
||||
ReplicaMap rMap = new ReplicaMap(mutex);
|
||||
v.getVolumeMap(rMap);
|
||||
perVolumeReplicaMap.put(v.getStorageID(), rMap);
|
||||
globalReplicaMap.addAll(rMap);
|
||||
v.getVolumeMap(globalReplicaMap);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -71,4 +71,23 @@ public class DatanodeStorage {
|
||||
public static String generateUuid() {
|
||||
return "DS-" + UUID.randomUUID();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object other){
|
||||
if (other == this) {
|
||||
return true;
|
||||
}
|
||||
|
||||
if ((other == null) ||
|
||||
!(other instanceof DatanodeStorage)) {
|
||||
return false;
|
||||
}
|
||||
DatanodeStorage otherStorage = (DatanodeStorage) other;
|
||||
return otherStorage.getStorageID().compareTo(getStorageID()) == 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return getStorageID().hashCode();
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user