HDFS-13958. Miscellaneous Improvements for FsVolumeSpi. Contributed by BELUGA BEHR.

This commit is contained in:
Inigo Goiri 2018-10-05 09:32:19 -07:00
parent f13e231025
commit 73c660b43f
7 changed files with 254 additions and 297 deletions

View File

@ -17,8 +17,6 @@
*/ */
package org.apache.hadoop.hdfs.server.datanode; package org.apache.hadoop.hdfs.server.datanode;
import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
@ -26,7 +24,6 @@
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
@ -46,7 +43,6 @@
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi.ScanInfo; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi.ScanInfo;
@ -657,7 +653,7 @@ public ScanInfoVolumeReport call() throws IOException {
perfTimer.start(); perfTimer.start();
throttleTimer.start(); throttleTimer.start();
for (String bpid : bpList) { for (String bpid : bpList) {
LinkedList<ScanInfo> report = new LinkedList<>(); List<ScanInfo> report = new ArrayList<>(DEFAULT_MAP_SIZE);
perfTimer.reset().start(); perfTimer.reset().start();
throttleTimer.reset().start(); throttleTimer.reset().start();
@ -720,16 +716,4 @@ private void accumulateTimeWaiting() {
perfTimer.reset().start(); perfTimer.reset().start();
} }
} }
public enum BlockDirFilter implements FilenameFilter {
INSTANCE;
@Override
public boolean accept(File dir, String name) {
return name.startsWith(DataStorage.BLOCK_SUBDIR_PREFIX)
|| name.startsWith(DataStorage.STORAGE_DIR_FINALIZED)
|| name.startsWith(Block.BLOCK_FILE_PREFIX);
}
}
} }

View File

@ -22,7 +22,7 @@
import java.io.IOException; import java.io.IOException;
import java.net.URI; import java.net.URI;
import java.nio.channels.ClosedChannelException; import java.nio.channels.ClosedChannelException;
import java.util.LinkedList; import java.util.Collection;
import java.util.regex.Matcher; import java.util.regex.Matcher;
import java.util.regex.Pattern; import java.util.regex.Pattern;
@ -32,9 +32,9 @@
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.datanode.FileIoProvider;
import org.apache.hadoop.hdfs.server.common.FileRegion; import org.apache.hadoop.hdfs.server.common.FileRegion;
import org.apache.hadoop.hdfs.server.datanode.DirectoryScanner.ReportCompiler; import org.apache.hadoop.hdfs.server.datanode.DirectoryScanner.ReportCompiler;
import org.apache.hadoop.hdfs.server.datanode.FileIoProvider;
import org.apache.hadoop.hdfs.server.datanode.StorageLocation; import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
import org.apache.hadoop.hdfs.server.datanode.checker.Checkable; import org.apache.hadoop.hdfs.server.datanode.checker.Checkable;
import org.apache.hadoop.hdfs.server.datanode.checker.VolumeCheckResult; import org.apache.hadoop.hdfs.server.datanode.checker.VolumeCheckResult;
@ -362,13 +362,13 @@ public long getBlockLength() {
public File getMetaFile() { public File getMetaFile() {
if (metaSuffix == null) { if (metaSuffix == null) {
return null; return null;
} else if (blockSuffix == null) {
return new File(new File(volume.getBaseURI()).getAbsolutePath(),
metaSuffix);
} else {
return new File(new File(volume.getBaseURI()).getAbsolutePath(),
blockSuffix + metaSuffix);
} }
String fileSuffix = metaSuffix;
if (blockSuffix != null) {
fileSuffix = blockSuffix + metaSuffix;
}
return new File(new File(volume.getBaseURI()).getAbsolutePath(),
fileSuffix);
} }
/** /**
@ -389,18 +389,12 @@ public FsVolumeSpi getVolume() {
return volume; return volume;
} }
@Override // Comparable @Override
public int compareTo(ScanInfo b) { public int compareTo(ScanInfo b) {
if (blockId < b.blockId) { return Long.compare(this.blockId, b.blockId);
return -1;
} else if (blockId == b.blockId) {
return 0;
} else {
return 1;
}
} }
@Override // Object @Override
public boolean equals(Object o) { public boolean equals(Object o) {
if (this == o) { if (this == o) {
return true; return true;
@ -411,9 +405,9 @@ public boolean equals(Object o) {
return blockId == ((ScanInfo) o).blockId; return blockId == ((ScanInfo) o).blockId;
} }
@Override // Object @Override
public int hashCode() { public int hashCode() {
return (int)(blockId^(blockId>>>32)); return Long.hashCode(this.blockId);
} }
public long getGenStamp() { public long getGenStamp() {
@ -447,8 +441,8 @@ byte[] loadLastPartialChunkChecksum(File blockFile, File metaFile)
* @param reportCompiler * @param reportCompiler
* @throws IOException * @throws IOException
*/ */
LinkedList<ScanInfo> compileReport(String bpid, void compileReport(String bpid,
LinkedList<ScanInfo> report, ReportCompiler reportCompiler) Collection<ScanInfo> report, ReportCompiler reportCompiler)
throws InterruptedException, IOException; throws InterruptedException, IOException;
/** /**

View File

@ -28,8 +28,8 @@
import java.nio.channels.ClosedChannelException; import java.nio.channels.ClosedChannelException;
import java.nio.file.Paths; import java.nio.file.Paths;
import java.nio.file.StandardCopyOption; import java.nio.file.StandardCopyOption;
import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
@ -46,38 +46,37 @@
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.DF; import org.apache.hadoop.fs.DF;
import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.server.datanode.FileIoProvider;
import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
import org.apache.hadoop.hdfs.server.datanode.checker.VolumeCheckResult;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.DataNodeVolumeMetrics;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica;
import org.apache.hadoop.hdfs.server.datanode.ReplicaBeingWritten;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtilClient; import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.datanode.DataStorage;
import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
import org.apache.hadoop.hdfs.server.datanode.LocalReplica;
import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory; import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException; import org.apache.hadoop.hdfs.server.datanode.DataStorage;
import org.apache.hadoop.hdfs.server.datanode.ReplicaBuilder; import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
import org.apache.hadoop.hdfs.server.datanode.LocalReplicaInPipeline;
import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipeline;
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
import org.apache.hadoop.hdfs.server.datanode.DirectoryScanner.BlockDirFilter;
import org.apache.hadoop.hdfs.server.datanode.DirectoryScanner.ReportCompiler; import org.apache.hadoop.hdfs.server.datanode.DirectoryScanner.ReportCompiler;
import org.apache.hadoop.hdfs.server.datanode.FileIoProvider;
import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica;
import org.apache.hadoop.hdfs.server.datanode.LocalReplica;
import org.apache.hadoop.hdfs.server.datanode.LocalReplicaInPipeline;
import org.apache.hadoop.hdfs.server.datanode.ReplicaBeingWritten;
import org.apache.hadoop.hdfs.server.datanode.ReplicaBuilder;
import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipeline;
import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
import org.apache.hadoop.hdfs.server.datanode.checker.VolumeCheckResult;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.DataNodeVolumeMetrics;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.RamDiskReplicaTracker.RamDiskReplica; import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.RamDiskReplicaTracker.RamDiskReplica;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.util.CloseableReferenceCount; import org.apache.hadoop.util.CloseableReferenceCount;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.DiskChecker.DiskErrorException; import org.apache.hadoop.util.DiskChecker.DiskErrorException;
import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.Timer; import org.apache.hadoop.util.Timer;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -94,7 +93,7 @@
/** /**
* The underlying volume used to store replica. * The underlying volume used to store replica.
* *
* It uses the {@link FsDatasetImpl} object for synchronization. * It uses the {@link FsDatasetImpl} object for synchronization.
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
@ -311,11 +310,8 @@ void setClosed() throws IOException {
*/ */
boolean checkClosed() { boolean checkClosed() {
if (this.reference.getReferenceCount() > 0) { if (this.reference.getReferenceCount() > 0) {
if (FsDatasetImpl.LOG.isDebugEnabled()) { FsDatasetImpl.LOG.debug("The reference count for {} is {}, wait to be 0.",
FsDatasetImpl.LOG.debug(String.format( this, reference.getReferenceCount());
"The reference count for %s is %d, wait to be 0.",
this, reference.getReferenceCount()));
}
return false; return false;
} }
return true; return true;
@ -325,7 +321,7 @@ boolean checkClosed() {
File getCurrentDir() { File getCurrentDir() {
return currentDir; return currentDir;
} }
protected File getRbwDir(String bpid) throws IOException { protected File getRbwDir(String bpid) throws IOException {
return getBlockPoolSlice(bpid).getRbwDir(); return getBlockPoolSlice(bpid).getRbwDir();
} }
@ -381,7 +377,7 @@ void incDfsUsed(String bpid, long value) {
@VisibleForTesting @VisibleForTesting
public long getDfsUsed() throws IOException { public long getDfsUsed() throws IOException {
long dfsUsed = 0; long dfsUsed = 0;
for(BlockPoolSlice s : bpSlices.values()) { for (BlockPoolSlice s : bpSlices.values()) {
dfsUsed += s.getDfsUsed(); dfsUsed += s.getDfsUsed();
} }
return dfsUsed; return dfsUsed;
@ -390,21 +386,20 @@ public long getDfsUsed() throws IOException {
long getBlockPoolUsed(String bpid) throws IOException { long getBlockPoolUsed(String bpid) throws IOException {
return getBlockPoolSlice(bpid).getDfsUsed(); return getBlockPoolSlice(bpid).getDfsUsed();
} }
/** /**
* Return either the configured capacity of the file system if configured; or * Return either the configured capacity of the file system if configured; or
* the capacity of the file system excluding space reserved for non-HDFS. * the capacity of the file system excluding space reserved for non-HDFS.
* *
* @return the unreserved number of bytes left in this filesystem. May be * @return the unreserved number of bytes left in this filesystem. May be
* zero. * zero.
*/ */
@VisibleForTesting @VisibleForTesting
public long getCapacity() { public long getCapacity() {
if (configuredCapacity < 0) { if (configuredCapacity < 0L) {
long remaining = usage.getCapacity() - getReserved(); long remaining = usage.getCapacity() - getReserved();
return remaining > 0 ? remaining : 0; return Math.max(remaining, 0L);
} }
return configuredCapacity; return configuredCapacity;
} }
@ -418,10 +413,10 @@ public void setCapacityForTesting(long capacity) {
this.configuredCapacity = capacity; this.configuredCapacity = capacity;
} }
/* /**
* Calculate the available space of the filesystem, excluding space reserved * Calculate the available space of the filesystem, excluding space reserved
* for non-HDFS and space reserved for RBW * for non-HDFS and space reserved for RBW.
* *
* @return the available number of bytes left in this filesystem. May be zero. * @return the available number of bytes left in this filesystem. May be zero.
*/ */
@Override @Override
@ -432,7 +427,7 @@ public long getAvailable() throws IOException {
if (remaining > available) { if (remaining > available) {
remaining = available; remaining = available;
} }
return (remaining > 0) ? remaining : 0; return Math.max(remaining, 0L);
} }
long getActualNonDfsUsed() throws IOException { long getActualNonDfsUsed() throws IOException {
@ -458,10 +453,8 @@ private long getRemainingReserved() throws IOException {
public long getNonDfsUsed() throws IOException { public long getNonDfsUsed() throws IOException {
long actualNonDfsUsed = getActualNonDfsUsed(); long actualNonDfsUsed = getActualNonDfsUsed();
long actualReserved = getReserved(); long actualReserved = getReserved();
if (actualNonDfsUsed < actualReserved) { long nonDfsUsed = actualNonDfsUsed - actualReserved;
return 0L; return Math.max(nonDfsUsed, 0L);
}
return actualNonDfsUsed - actualReserved;
} }
@VisibleForTesting @VisibleForTesting
@ -503,7 +496,7 @@ public DF getUsageStats(Configuration conf) {
try { try {
return new DF(new File(currentDir.getParent()), conf); return new DF(new File(currentDir.getParent()), conf);
} catch (IOException e) { } catch (IOException e) {
LOG.error("Unable to get disk statistics for volume " + this); LOG.error("Unable to get disk statistics for volume {}", this, e);
} }
} }
return null; return null;
@ -525,11 +518,11 @@ public File getFinalizedDir(String bpid) throws IOException {
} }
/** /**
* Make a deep copy of the list of currently active BPIDs * Make a deep copy of the list of currently active BPIDs.
*/ */
@Override @Override
public String[] getBlockPoolList() { public String[] getBlockPoolList() {
return bpSlices.keySet().toArray(new String[bpSlices.keySet().size()]); return bpSlices.keySet().toArray(new String[0]);
} }
/** /**
@ -549,7 +542,7 @@ File createTmpFile(String bpid, Block b) throws IOException {
@Override @Override
public void reserveSpaceForReplica(long bytesToReserve) { public void reserveSpaceForReplica(long bytesToReserve) {
if (bytesToReserve != 0) { if (bytesToReserve != 0L) {
reservedForReplicas.addAndGet(bytesToReserve); reservedForReplicas.addAndGet(bytesToReserve);
recentReserved = bytesToReserve; recentReserved = bytesToReserve;
} }
@ -557,17 +550,15 @@ public void reserveSpaceForReplica(long bytesToReserve) {
@Override @Override
public void releaseReservedSpace(long bytesToRelease) { public void releaseReservedSpace(long bytesToRelease) {
if (bytesToRelease != 0) { if (bytesToRelease != 0L) {
long oldReservation, newReservation; long oldReservation, newReservation;
do { do {
oldReservation = reservedForReplicas.get(); oldReservation = reservedForReplicas.get();
newReservation = oldReservation - bytesToRelease; newReservation = oldReservation - bytesToRelease;
if (newReservation < 0) {
// Failsafe, this should never occur in practice, but if it does we // Fail-safe, this should never be less than zero in practice, but if it
// don't want to start advertising more space than we have available. // does, do not advertise more space than is have available.
newReservation = 0; newReservation = Math.max(newReservation, 0L);
}
} while (!reservedForReplicas.compareAndSet(oldReservation, } while (!reservedForReplicas.compareAndSet(oldReservation,
newReservation)); newReservation));
} }
@ -679,20 +670,15 @@ private String getNextSubDir(String prev, File dir)
FsVolumeImpl.this, dir, SubdirFilter.INSTANCE); FsVolumeImpl.this, dir, SubdirFilter.INSTANCE);
cache = null; cache = null;
cacheMs = 0; cacheMs = 0;
if (children.size() == 0) { if (children.isEmpty()) {
LOG.trace("getNextSubDir({}, {}): no subdirectories found in {}", LOG.trace("getNextSubDir({}, {}): no subdirectories found in {}",
storageID, bpid, dir.getAbsolutePath()); storageID, bpid, dir.getAbsolutePath());
return null; return null;
} }
Collections.sort(children); Collections.sort(children);
String nextSubDir = nextSorted(children, prev); String nextSubDir = nextSorted(children, prev);
if (nextSubDir == null) { LOG.trace("getNextSubDir({}, {}): picking next subdirectory {} within {}",
LOG.trace("getNextSubDir({}, {}): no more subdirectories found in {}", storageID, bpid, nextSubDir, dir.getAbsolutePath());
storageID, bpid, dir.getAbsolutePath());
} else {
LOG.trace("getNextSubDir({}, {}): picking next subdirectory {} " +
"within {}", storageID, bpid, nextSubDir, dir.getAbsolutePath());
}
return nextSubDir; return nextSubDir;
} }
@ -731,16 +717,13 @@ private List<String> getSubdirEntries() throws IOException {
state.curFinalizedDir, state.curFinalizedSubDir).toFile(); state.curFinalizedDir, state.curFinalizedSubDir).toFile();
List<String> entries = fileIoProvider.listDirectory( List<String> entries = fileIoProvider.listDirectory(
FsVolumeImpl.this, dir, BlockFileFilter.INSTANCE); FsVolumeImpl.this, dir, BlockFileFilter.INSTANCE);
if (entries.size() == 0) { if (entries.isEmpty()) {
entries = null; entries = null;
LOG.trace("getSubdirEntries({}, {}): no entries found in {}", storageID,
bpid, dir.getAbsolutePath());
} else { } else {
Collections.sort(entries); Collections.sort(entries);
} LOG.trace("getSubdirEntries({}, {}): listed {} entries in {}",
if (entries == null) {
LOG.trace("getSubdirEntries({}, {}): no entries found in {}",
storageID, bpid, dir.getAbsolutePath());
} else {
LOG.trace("getSubdirEntries({}, {}): listed {} entries in {}",
storageID, bpid, entries.size(), dir.getAbsolutePath()); storageID, bpid, entries.size(), dir.getAbsolutePath());
} }
cache = entries; cache = entries;
@ -872,9 +855,11 @@ FsVolumeImpl.this, getTempSaveFile()), "UTF-8"))) {
public void load() throws IOException { public void load() throws IOException {
File file = getSaveFile(); File file = getSaveFile();
this.state = READER.readValue(file); this.state = READER.readValue(file);
LOG.trace("load({}, {}): loaded iterator {} from {}: {}", storageID, if (LOG.isTraceEnabled()) {
bpid, name, file.getAbsoluteFile(), LOG.trace("load({}, {}): loaded iterator {} from {}: {}", storageID,
WRITER.writeValueAsString(state)); bpid, name, file.getAbsoluteFile(),
WRITER.writeValueAsString(state));
}
} }
File getSaveFile() { File getSaveFile() {
@ -956,15 +941,21 @@ ReplicaInfo addFinalizedBlock(String bpid, Block b, ReplicaInfo replicaInfo,
long bytesReserved) throws IOException { long bytesReserved) throws IOException {
releaseReservedSpace(bytesReserved); releaseReservedSpace(bytesReserved);
File dest = getBlockPoolSlice(bpid).addFinalizedBlock(b, replicaInfo); File dest = getBlockPoolSlice(bpid).addFinalizedBlock(b, replicaInfo);
byte[] checksum = null; final byte[] checksum;
// copy the last partial checksum if the replica is originally // copy the last partial checksum if the replica is originally
// in finalized or rbw state. // in finalized or rbw state.
if (replicaInfo.getState() == ReplicaState.FINALIZED) { switch (replicaInfo.getState()) {
FinalizedReplica finalized = (FinalizedReplica)replicaInfo; case FINALIZED:
FinalizedReplica finalized = (FinalizedReplica) replicaInfo;
checksum = finalized.getLastPartialChunkChecksum(); checksum = finalized.getLastPartialChunkChecksum();
} else if (replicaInfo.getState() == ReplicaState.RBW) { break;
ReplicaBeingWritten rbw = (ReplicaBeingWritten)replicaInfo; case RBW:
ReplicaBeingWritten rbw = (ReplicaBeingWritten) replicaInfo;
checksum = rbw.getLastChecksumAndDataLen().getChecksum(); checksum = rbw.getLastChecksumAndDataLen().getChecksum();
break;
default:
checksum = null;
break;
} }
return new ReplicaBuilder(ReplicaState.FINALIZED) return new ReplicaBuilder(ReplicaState.FINALIZED)
@ -983,28 +974,26 @@ Executor getCacheExecutor() {
public VolumeCheckResult check(VolumeCheckContext ignored) public VolumeCheckResult check(VolumeCheckContext ignored)
throws DiskErrorException { throws DiskErrorException {
// TODO:FEDERATION valid synchronization // TODO:FEDERATION valid synchronization
for(BlockPoolSlice s : bpSlices.values()) { for (BlockPoolSlice s : bpSlices.values()) {
s.checkDirs(); s.checkDirs();
} }
return VolumeCheckResult.HEALTHY; return VolumeCheckResult.HEALTHY;
} }
void getVolumeMap(ReplicaMap volumeMap, void getVolumeMap(ReplicaMap volumeMap,
final RamDiskReplicaTracker ramDiskReplicaMap) final RamDiskReplicaTracker ramDiskReplicaMap) throws IOException {
throws IOException { for (BlockPoolSlice s : bpSlices.values()) {
for(BlockPoolSlice s : bpSlices.values()) {
s.getVolumeMap(volumeMap, ramDiskReplicaMap); s.getVolumeMap(volumeMap, ramDiskReplicaMap);
} }
} }
void getVolumeMap(String bpid, ReplicaMap volumeMap, void getVolumeMap(String bpid, ReplicaMap volumeMap,
final RamDiskReplicaTracker ramDiskReplicaMap) final RamDiskReplicaTracker ramDiskReplicaMap) throws IOException {
throws IOException {
getBlockPoolSlice(bpid).getVolumeMap(volumeMap, ramDiskReplicaMap); getBlockPoolSlice(bpid).getVolumeMap(volumeMap, ramDiskReplicaMap);
} }
long getNumBlocks() { long getNumBlocks() {
long numBlocks = 0; long numBlocks = 0L;
for (BlockPoolSlice s : bpSlices.values()) { for (BlockPoolSlice s : bpSlices.values()) {
numBlocks += s.getNumOfBlocks(); numBlocks += s.getNumOfBlocks();
} }
@ -1038,13 +1027,12 @@ void addBlockPool(String bpid, Configuration c, Timer timer)
File bpdir = new File(currentDir, bpid); File bpdir = new File(currentDir, bpid);
BlockPoolSlice bp; BlockPoolSlice bp;
if (timer == null) { if (timer == null) {
bp = new BlockPoolSlice(bpid, this, bpdir, c, new Timer()); timer = new Timer();
} else {
bp = new BlockPoolSlice(bpid, this, bpdir, c, timer);
} }
bp = new BlockPoolSlice(bpid, this, bpdir, c, timer);
bpSlices.put(bpid, bp); bpSlices.put(bpid, bp);
} }
void shutdownBlockPool(String bpid, BlockListAsLongs blocksListsAsLongs) { void shutdownBlockPool(String bpid, BlockListAsLongs blocksListsAsLongs) {
BlockPoolSlice bp = bpSlices.get(bpid); BlockPoolSlice bp = bpSlices.get(bpid);
if (bp != null) { if (bp != null) {
@ -1070,7 +1058,7 @@ boolean isBPDirEmpty(String bpid) throws IOException {
} }
return true; return true;
} }
void deleteBPDirectories(String bpid, boolean force) throws IOException { void deleteBPDirectories(String bpid, boolean force) throws IOException {
File volumeCurrentDir = this.getCurrentDir(); File volumeCurrentDir = this.getCurrentDir();
File bpDir = new File(volumeCurrentDir, bpid); File bpDir = new File(volumeCurrentDir, bpid);
@ -1078,7 +1066,7 @@ void deleteBPDirectories(String bpid, boolean force) throws IOException {
// nothing to be deleted // nothing to be deleted
return; return;
} }
File tmpDir = new File(bpDir, DataStorage.STORAGE_DIR_TMP); File tmpDir = new File(bpDir, DataStorage.STORAGE_DIR_TMP);
File bpCurrentDir = new File(bpDir, DataStorage.STORAGE_DIR_CURRENT); File bpCurrentDir = new File(bpDir, DataStorage.STORAGE_DIR_CURRENT);
File finalizedDir = new File(bpCurrentDir, File finalizedDir = new File(bpCurrentDir,
DataStorage.STORAGE_DIR_FINALIZED); DataStorage.STORAGE_DIR_FINALIZED);
@ -1127,17 +1115,16 @@ void deleteBPDirectories(String bpid, boolean force) throws IOException {
public String getStorageID() { public String getStorageID() {
return storageID; return storageID;
} }
@Override @Override
public StorageType getStorageType() { public StorageType getStorageType() {
return storageType; return storageType;
} }
DatanodeStorage toDatanodeStorage() { DatanodeStorage toDatanodeStorage() {
return new DatanodeStorage(storageID, DatanodeStorage.State.NORMAL, storageType); return new DatanodeStorage(storageID, DatanodeStorage.State.NORMAL, storageType);
} }
@Override @Override
public byte[] loadLastPartialChunkChecksum( public byte[] loadLastPartialChunkChecksum(
File blockFile, File metaFile) throws IOException { File blockFile, File metaFile) throws IOException {
@ -1313,11 +1300,10 @@ private File[] copyReplicaWithNewBlockIdAndGS(
} }
@Override @Override
public LinkedList<ScanInfo> compileReport(String bpid, public void compileReport(String bpid, Collection<ScanInfo> report,
LinkedList<ScanInfo> report, ReportCompiler reportCompiler) ReportCompiler reportCompiler) throws InterruptedException, IOException {
throws InterruptedException, IOException { compileReport(getFinalizedDir(bpid), getFinalizedDir(bpid), report,
return compileReport(getFinalizedDir(bpid), reportCompiler);
getFinalizedDir(bpid), report, reportCompiler);
} }
@Override @Override
@ -1330,21 +1316,35 @@ public DataNodeVolumeMetrics getMetrics() {
return metrics; return metrics;
} }
private LinkedList<ScanInfo> compileReport(File bpFinalizedDir, /**
File dir, LinkedList<ScanInfo> report, ReportCompiler reportCompiler) * Filter for block file names stored on the file system volumes.
throws InterruptedException { */
public enum BlockDirFilter implements FilenameFilter {
INSTANCE;
@Override
public boolean accept(File dir, String name) {
return name.startsWith(DataStorage.BLOCK_SUBDIR_PREFIX)
|| name.startsWith(DataStorage.STORAGE_DIR_FINALIZED)
|| name.startsWith(Block.BLOCK_FILE_PREFIX);
}
}
private void compileReport(File bpFinalizedDir, File dir,
Collection<ScanInfo> report, ReportCompiler reportCompiler)
throws InterruptedException {
reportCompiler.throttle(); reportCompiler.throttle();
List <String> fileNames; List <String> fileNames;
try { try {
fileNames = fileIoProvider.listDirectory( fileNames =
this, dir, BlockDirFilter.INSTANCE); fileIoProvider.listDirectory(this, dir, BlockDirFilter.INSTANCE);
} catch (IOException ioe) { } catch (IOException ioe) {
LOG.warn("Exception occurred while compiling report: ", ioe); LOG.warn("Exception occurred while compiling report", ioe);
// Volume error check moved to FileIoProvider. // Volume error check moved to FileIoProvider.
// Ignore this directory and proceed. // Ignore this directory and proceed.
return report; return;
} }
Collections.sort(fileNames); Collections.sort(fileNames);
@ -1396,7 +1396,6 @@ private LinkedList<ScanInfo> compileReport(File bpFinalizedDir,
verifyFileLocation(blockFile, bpFinalizedDir, blockId); verifyFileLocation(blockFile, bpFinalizedDir, blockId);
report.add(new ScanInfo(blockId, blockFile, metaFile, this)); report.add(new ScanInfo(blockId, blockFile, metaFile, this));
} }
return report;
} }
/** /**

View File

@ -17,15 +17,17 @@
*/ */
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl; package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PROVIDED_ALIASMAP_LOAD_RETRIES;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.net.URI; import java.net.URI;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Iterator; import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map; import java.util.Map;
import java.util.Set;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
@ -41,21 +43,23 @@
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.common.FileRegion; import org.apache.hadoop.hdfs.server.common.FileRegion;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory; import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
import org.apache.hadoop.hdfs.server.common.blockaliasmap.BlockAliasMap; import org.apache.hadoop.hdfs.server.common.blockaliasmap.BlockAliasMap;
import org.apache.hadoop.hdfs.server.common.blockaliasmap.impl.TextFileRegionAliasMap; import org.apache.hadoop.hdfs.server.common.blockaliasmap.impl.TextFileRegionAliasMap;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; import org.apache.hadoop.hdfs.server.datanode.DirectoryScanner.ReportCompiler;
import org.apache.hadoop.hdfs.server.datanode.FileIoProvider;
import org.apache.hadoop.hdfs.server.datanode.ReplicaBuilder;
import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipeline; import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipeline;
import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo; import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
import org.apache.hadoop.hdfs.server.datanode.DirectoryScanner.ReportCompiler;
import org.apache.hadoop.hdfs.server.datanode.StorageLocation; import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
import org.apache.hadoop.hdfs.server.datanode.checker.VolumeCheckResult; import org.apache.hadoop.hdfs.server.datanode.checker.VolumeCheckResult;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.datanode.FileIoProvider;
import org.apache.hadoop.hdfs.server.datanode.ReplicaBuilder;
import org.apache.hadoop.util.Timer;
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
import org.apache.hadoop.util.AutoCloseableLock; import org.apache.hadoop.util.AutoCloseableLock;
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.Timer;
import org.codehaus.jackson.annotate.JsonProperty; import org.codehaus.jackson.annotate.JsonProperty;
import org.codehaus.jackson.map.ObjectMapper; import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.map.ObjectReader; import org.codehaus.jackson.map.ObjectReader;
@ -63,11 +67,6 @@
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Time;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_PROVIDED_ALIASMAP_LOAD_RETRIES;
/** /**
* This class is used to create provided volumes. * This class is used to create provided volumes.
*/ */
@ -227,9 +226,9 @@ public void shutdown(BlockListAsLongs blocksListsAsLongs) {
// nothing to do! // nothing to do!
} }
public void compileReport(LinkedList<ScanInfo> report, public void compileReport(Collection<ScanInfo> report,
ReportCompiler reportCompiler) ReportCompiler reportCompiler)
throws IOException, InterruptedException { throws IOException, InterruptedException {
/* refresh the aliasMap and return the list of blocks found. /* refresh the aliasMap and return the list of blocks found.
* the assumption here is that the block ids in the external * the assumption here is that the block ids in the external
* block map, after the refresh, are consistent with those * block map, after the refresh, are consistent with those
@ -240,9 +239,8 @@ public void compileReport(LinkedList<ScanInfo> report,
BlockAliasMap.Reader<FileRegion> reader = aliasMap.getReader(null, bpid); BlockAliasMap.Reader<FileRegion> reader = aliasMap.getReader(null, bpid);
for (FileRegion region : reader) { for (FileRegion region : reader) {
reportCompiler.throttle(); reportCompiler.throttle();
report.add(new ScanInfo(region.getBlock().getBlockId(), report.add(new ScanInfo(region.getBlock().getBlockId(), providedVolume,
providedVolume, region, region, region.getProvidedStorageLocation().getLength()));
region.getProvidedStorageLocation().getLength()));
} }
} }
@ -336,7 +334,7 @@ public long getNonDfsUsed() throws IOException {
@Override @Override
long getNumBlocks() { long getNumBlocks() {
long numBlocks = 0; long numBlocks = 0L;
for (ProvidedBlockPoolSlice s : bpSlices.values()) { for (ProvidedBlockPoolSlice s : bpSlices.values()) {
numBlocks += s.getNumOfBlocks(); numBlocks += s.getNumOfBlocks();
} }
@ -381,7 +379,7 @@ private static class ProvidedBlockIteratorState {
iterStartMs = Time.now(); iterStartMs = Time.now();
lastSavedMs = iterStartMs; lastSavedMs = iterStartMs;
atEnd = false; atEnd = false;
lastBlockId = -1; lastBlockId = -1L;
} }
// The wall-clock ms since the epoch at which this iterator was last saved. // The wall-clock ms since the epoch at which this iterator was last saved.
@ -533,7 +531,7 @@ void getVolumeMap(ReplicaMap volumeMap,
final RamDiskReplicaTracker ramDiskReplicaMap) final RamDiskReplicaTracker ramDiskReplicaMap)
throws IOException { throws IOException {
LOG.info("Creating volumemap for provided volume " + this); LOG.info("Creating volumemap for provided volume " + this);
for(ProvidedBlockPoolSlice s : bpSlices.values()) { for (ProvidedBlockPoolSlice s : bpSlices.values()) {
s.fetchVolumeMap(volumeMap, ramDiskReplicaMap, remoteFS); s.fetchVolumeMap(volumeMap, ramDiskReplicaMap, remoteFS);
} }
} }
@ -611,14 +609,12 @@ void deleteBPDirectories(String bpid, boolean force) throws IOException {
} }
@Override @Override
public LinkedList<ScanInfo> compileReport(String bpid, public void compileReport(String bpid, Collection<ScanInfo> report,
LinkedList<ScanInfo> report, ReportCompiler reportCompiler) ReportCompiler reportCompiler) throws InterruptedException, IOException {
throws InterruptedException, IOException { LOG.info("Compiling report for volume: {}; bpid: {}", this, bpid);
LOG.info("Compiling report for volume: " + this + " bpid " + bpid); if (bpSlices.containsKey(bpid)) {
if(bpSlices.containsKey(bpid)) {
bpSlices.get(bpid).compileReport(report, reportCompiler); bpSlices.get(bpid).compileReport(report, reportCompiler);
} }
return report;
} }
@Override @Override

View File

@ -25,10 +25,11 @@
import java.nio.channels.ClosedChannelException; import java.nio.channels.ClosedChannelException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects;
import java.util.Set; import java.util.Set;
import java.util.TreeMap; import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
@ -75,18 +76,18 @@
/** /**
* This class implements a simulated FSDataset. * This class implements a simulated FSDataset.
* *
* Blocks that are created are recorded but their data (plus their CRCs) are * Blocks that are created are recorded but their data (plus their CRCs) are
* discarded. * discarded.
* Fixed data is returned when blocks are read; a null CRC meta file is * Fixed data is returned when blocks are read; a null CRC meta file is
* created for such data. * created for such data.
* *
* This FSDataset does not remember any block information across its * This FSDataset does not remember any block information across its
* restarts; it does however offer an operation to inject blocks * restarts; it does however offer an operation to inject blocks
* (See the TestInectionForSImulatedStorage() * (See the TestInectionForSImulatedStorage()
* for a usage example of injection. * for a usage example of injection.
* *
* Note the synchronization is coarse grained - it is at each method. * Note the synchronization is coarse grained - it is at each method.
*/ */
public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> { public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
public final static int BYTE_MASK = 0xff; public final static int BYTE_MASK = 0xff;
@ -145,17 +146,17 @@ public static byte simulatedByte(Block b, long offsetInBlk) {
byte firstByte = (byte) (b.getBlockId() & BYTE_MASK); byte firstByte = (byte) (b.getBlockId() & BYTE_MASK);
return (byte) ((firstByte + offsetInBlk % 29) & BYTE_MASK); return (byte) ((firstByte + offsetInBlk % 29) & BYTE_MASK);
} }
public static final String CONFIG_PROPERTY_CAPACITY = public static final String CONFIG_PROPERTY_CAPACITY =
"dfs.datanode.simulateddatastorage.capacity"; "dfs.datanode.simulateddatastorage.capacity";
public static final long DEFAULT_CAPACITY = 2L<<40; // 1 terabyte public static final long DEFAULT_CAPACITY = 2L<<40; // 1 terabyte
public static final String CONFIG_PROPERTY_STATE = public static final String CONFIG_PROPERTY_STATE =
"dfs.datanode.simulateddatastorage.state"; "dfs.datanode.simulateddatastorage.state";
private static final DatanodeStorage.State DEFAULT_STATE = private static final DatanodeStorage.State DEFAULT_STATE =
DatanodeStorage.State.NORMAL; DatanodeStorage.State.NORMAL;
static final byte[] nullCrcFileData; static final byte[] nullCrcFileData;
private final AutoCloseableLock datasetLock; private final AutoCloseableLock datasetLock;
@ -183,8 +184,8 @@ private class BInfo implements ReplicaInPipeline {
private boolean pinned = false; private boolean pinned = false;
BInfo(String bpid, Block b, boolean forWriting) throws IOException { BInfo(String bpid, Block b, boolean forWriting) throws IOException {
theBlock = new Block(b); theBlock = new Block(b);
if (theBlock.getNumBytes() < 0) { if (theBlock.getNumBytes() < 0L) {
theBlock.setNumBytes(0); theBlock.setNumBytes(0L);
} }
if (!getStorage(theBlock).alloc(bpid, theBlock.getNumBytes())) { if (!getStorage(theBlock).alloc(bpid, theBlock.getNumBytes())) {
// expected length - actual length may // expected length - actual length may
@ -201,7 +202,7 @@ private class BInfo implements ReplicaInPipeline {
oStream = null; oStream = null;
} }
} }
@Override @Override
public String getStorageUuid() { public String getStorageUuid() {
return getStorage(theBlock).getStorageUuid(); return getStorage(theBlock).getStorageUuid();
@ -229,7 +230,7 @@ synchronized public void setNumBytes(long length) {
theBlock.setNumBytes(length); theBlock.setNumBytes(length);
} }
} }
synchronized SimulatedInputStream getIStream() { synchronized SimulatedInputStream getIStream() {
if (!finalized) { if (!finalized) {
// throw new IOException("Trying to read an unfinalized block"); // throw new IOException("Trying to read an unfinalized block");
@ -238,12 +239,12 @@ synchronized SimulatedInputStream getIStream() {
return new SimulatedInputStream(theBlock.getNumBytes(), theBlock); return new SimulatedInputStream(theBlock.getNumBytes(), theBlock);
} }
} }
synchronized void finalizeBlock(String bpid, long finalSize) synchronized void finalizeBlock(String bpid, long finalSize)
throws IOException { throws IOException {
if (finalized) { if (finalized) {
throw new IOException( throw new IOException(
"Finalizing a block that has already been finalized" + "Finalizing a block that has already been finalized" +
theBlock.getBlockId()); theBlock.getBlockId());
} }
if (oStream == null) { if (oStream == null) {
@ -257,10 +258,10 @@ synchronized void finalizeBlock(String bpid, long finalSize)
throw new IOException( throw new IOException(
"Size passed to finalize does not match the amount of data written"); "Size passed to finalize does not match the amount of data written");
} }
// We had allocated the expected length when block was created; // We had allocated the expected length when block was created;
// adjust if necessary // adjust if necessary
long extraLen = finalSize - theBlock.getNumBytes(); long extraLen = finalSize - theBlock.getNumBytes();
if (extraLen > 0) { if (extraLen > 0L) {
if (!getStorage(theBlock).alloc(bpid, extraLen)) { if (!getStorage(theBlock).alloc(bpid, extraLen)) {
DataNode.LOG.warn("Lack of free storage on a block alloc"); DataNode.LOG.warn("Lack of free storage on a block alloc");
throw new IOException("Creating block, no free space available"); throw new IOException("Creating block, no free space available");
@ -268,7 +269,7 @@ synchronized void finalizeBlock(String bpid, long finalSize)
} else { } else {
getStorage(theBlock).free(bpid, -extraLen); getStorage(theBlock).free(bpid, -extraLen);
} }
theBlock.setNumBytes(finalSize); theBlock.setNumBytes(finalSize);
finalized = true; finalized = true;
oStream = null; oStream = null;
@ -289,7 +290,7 @@ synchronized void unfinalizeBlock() throws IOException {
} }
SimulatedInputStream getMetaIStream() { SimulatedInputStream getMetaIStream() {
return new SimulatedInputStream(nullCrcFileData); return new SimulatedInputStream(nullCrcFileData);
} }
synchronized boolean isFinalized() { synchronized boolean isFinalized() {
@ -297,7 +298,7 @@ synchronized boolean isFinalized() {
} }
@Override @Override
synchronized public ReplicaOutputStreams createStreams(boolean isCreate, synchronized public ReplicaOutputStreams createStreams(boolean isCreate,
DataChecksum requestedChecksum) DataChecksum requestedChecksum)
throws IOException { throws IOException {
if (finalized) { if (finalized) {
@ -396,36 +397,37 @@ public boolean attemptToSetWriter(Thread prevWriter, Thread newWriter) {
public void stopWriter(long xceiverStopTimeout) throws IOException { public void stopWriter(long xceiverStopTimeout) throws IOException {
} }
} }
/** /**
* Class is used for tracking block pool storage utilization similar * Class is used for tracking block pool storage utilization similar
* to {@link BlockPoolSlice} * to {@link BlockPoolSlice}
*/ */
private static class SimulatedBPStorage { private static class SimulatedBPStorage {
private long used; // in bytes // in bytes
private long used;
private final Map<Block, BInfo> blockMap = new TreeMap<>(); private final Map<Block, BInfo> blockMap = new TreeMap<>();
long getUsed() { long getUsed() {
return used; return used;
} }
void alloc(long amount) { void alloc(long amount) {
used += amount; used += amount;
} }
void free(long amount) { void free(long amount) {
used -= amount; used -= amount;
} }
Map<Block, BInfo> getBlockMap() { Map<Block, BInfo> getBlockMap() {
return blockMap; return blockMap;
} }
SimulatedBPStorage() { SimulatedBPStorage() {
used = 0; used = 0L;
} }
} }
/** /**
* Class used for tracking datanode level storage utilization similar * Class used for tracking datanode level storage utilization similar
* to {@link FSVolumeSet} * to {@link FSVolumeSet}
@ -437,27 +439,27 @@ private static class SimulatedStorage {
private final long capacity; // in bytes private final long capacity; // in bytes
private final DatanodeStorage dnStorage; private final DatanodeStorage dnStorage;
private final SimulatedVolume volume; private final SimulatedVolume volume;
synchronized long getFree() { synchronized long getFree() {
return capacity - getUsed(); return capacity - getUsed();
} }
long getCapacity() { long getCapacity() {
return capacity; return capacity;
} }
synchronized long getUsed() { synchronized long getUsed() {
long used = 0; long used = 0L;
for (SimulatedBPStorage bpStorage : map.values()) { for (SimulatedBPStorage bpStorage : map.values()) {
used += bpStorage.getUsed(); used += bpStorage.getUsed();
} }
return used; return used;
} }
synchronized long getBlockPoolUsed(String bpid) throws IOException { synchronized long getBlockPoolUsed(String bpid) throws IOException {
return getBPStorage(bpid).getUsed(); return getBPStorage(bpid).getUsed();
} }
int getNumFailedVolumes() { int getNumFailedVolumes() {
return 0; return 0;
} }
@ -467,13 +469,13 @@ synchronized boolean alloc(String bpid, long amount) throws IOException {
getBPStorage(bpid).alloc(amount); getBPStorage(bpid).alloc(amount);
return true; return true;
} }
return false; return false;
} }
synchronized void free(String bpid, long amount) throws IOException { synchronized void free(String bpid, long amount) throws IOException {
getBPStorage(bpid).free(amount); getBPStorage(bpid).free(amount);
} }
SimulatedStorage(long cap, DatanodeStorage.State state, SimulatedStorage(long cap, DatanodeStorage.State state,
FileIoProvider fileIoProvider, Configuration conf) { FileIoProvider fileIoProvider, Configuration conf) {
capacity = cap; capacity = cap;
@ -484,7 +486,7 @@ synchronized void free(String bpid, long amount) throws IOException {
DataNodeVolumeMetrics.create(conf, dnStorage.getStorageID()); DataNodeVolumeMetrics.create(conf, dnStorage.getStorageID());
this.volume = new SimulatedVolume(this, fileIoProvider, volumeMetrics); this.volume = new SimulatedVolume(this, fileIoProvider, volumeMetrics);
} }
synchronized void addBlockPool(String bpid) { synchronized void addBlockPool(String bpid) {
SimulatedBPStorage bpStorage = map.get(bpid); SimulatedBPStorage bpStorage = map.get(bpid);
if (bpStorage != null) { if (bpStorage != null) {
@ -492,11 +494,11 @@ synchronized void addBlockPool(String bpid) {
} }
map.put(bpid, new SimulatedBPStorage()); map.put(bpid, new SimulatedBPStorage());
} }
synchronized void removeBlockPool(String bpid) { synchronized void removeBlockPool(String bpid) {
map.remove(bpid); map.remove(bpid);
} }
private SimulatedBPStorage getBPStorage(String bpid) throws IOException { private SimulatedBPStorage getBPStorage(String bpid) throws IOException {
SimulatedBPStorage bpStorage = map.get(bpid); SimulatedBPStorage bpStorage = map.get(bpid);
if (bpStorage == null) { if (bpStorage == null) {
@ -508,7 +510,7 @@ private SimulatedBPStorage getBPStorage(String bpid) throws IOException {
String getStorageUuid() { String getStorageUuid() {
return dnStorage.getStorageID(); return dnStorage.getStorageID();
} }
DatanodeStorage getDnStorage() { DatanodeStorage getDnStorage() {
return dnStorage; return dnStorage;
} }
@ -531,7 +533,7 @@ Map<Block, BInfo> getBlockMap(String bpid) throws IOException {
return bpStorage.getBlockMap(); return bpStorage.getBlockMap();
} }
} }
static class SimulatedVolume implements FsVolumeSpi { static class SimulatedVolume implements FsVolumeSpi {
private final SimulatedStorage storage; private final SimulatedStorage storage;
private final FileIoProvider fileIoProvider; private final FileIoProvider fileIoProvider;
@ -635,10 +637,9 @@ public byte[] loadLastPartialChunkChecksum(
} }
@Override @Override
public LinkedList<ScanInfo> compileReport(String bpid, public void compileReport(String bpid,
LinkedList<ScanInfo> report, ReportCompiler reportCompiler) Collection<ScanInfo> report, ReportCompiler reportCompiler)
throws InterruptedException, IOException { throws InterruptedException, IOException {
return null;
} }
@Override @Override
@ -661,7 +662,6 @@ public VolumeCheckResult check(VolumeCheckContext context)
private final List<SimulatedStorage> storages; private final List<SimulatedStorage> storages;
private final String datanodeUuid; private final String datanodeUuid;
private final DataNode datanode; private final DataNode datanode;
public SimulatedFSDataset(DataStorage storage, Configuration conf) { public SimulatedFSDataset(DataStorage storage, Configuration conf) {
this(null, storage, conf); this(null, storage, conf);
@ -724,7 +724,7 @@ public synchronized void injectBlocks(String bpid,
private SimulatedStorage getStorage(Block b) { private SimulatedStorage getStorage(Block b) {
return storages.get(LongMath.mod(b.getBlockId(), storages.size())); return storages.get(LongMath.mod(b.getBlockId(), storages.size()));
} }
/** /**
* Get the block map that a given block lives within, assuming it is within * Get the block map that a given block lives within, assuming it is within
* block pool bpid. * block pool bpid.
@ -792,12 +792,12 @@ public synchronized Map<DatanodeStorage, BlockListAsLongs> getBlockReports(
@Override // FsDatasetSpi @Override // FsDatasetSpi
public List<Long> getCacheReport(String bpid) { public List<Long> getCacheReport(String bpid) {
return new LinkedList<Long>(); return Collections.emptyList();
} }
@Override // FSDatasetMBean @Override // FSDatasetMBean
public long getCapacity() { public long getCapacity() {
long total = 0; long total = 0L;
for (SimulatedStorage storage : storages) { for (SimulatedStorage storage : storages) {
total += storage.getCapacity(); total += storage.getCapacity();
} }
@ -806,7 +806,7 @@ public long getCapacity() {
@Override // FSDatasetMBean @Override // FSDatasetMBean
public long getDfsUsed() { public long getDfsUsed() {
long total = 0; long total = 0L;
for (SimulatedStorage storage : storages) { for (SimulatedStorage storage : storages) {
total += storage.getUsed(); total += storage.getUsed();
} }
@ -815,17 +815,16 @@ public long getDfsUsed() {
@Override // FSDatasetMBean @Override // FSDatasetMBean
public long getBlockPoolUsed(String bpid) throws IOException { public long getBlockPoolUsed(String bpid) throws IOException {
long total = 0; long total = 0L;
for (SimulatedStorage storage : storages) { for (SimulatedStorage storage : storages) {
total += storage.getBlockPoolUsed(bpid); total += storage.getBlockPoolUsed(bpid);
} }
return total; return total;
} }
@Override // FSDatasetMBean @Override // FSDatasetMBean
public long getRemaining() { public long getRemaining() {
long total = 0L;
long total = 0;
for (SimulatedStorage storage : storages) { for (SimulatedStorage storage : storages) {
total += storage.getFree(); total += storage.getFree();
} }
@ -834,7 +833,6 @@ public long getRemaining() {
@Override // FSDatasetMBean @Override // FSDatasetMBean
public int getNumFailedVolumes() { public int getNumFailedVolumes() {
int total = 0; int total = 0;
for (SimulatedStorage storage : storages) { for (SimulatedStorage storage : storages) {
total += storage.getNumFailedVolumes(); total += storage.getNumFailedVolumes();
@ -849,12 +847,12 @@ public String[] getFailedStorageLocations() {
@Override // FSDatasetMBean @Override // FSDatasetMBean
public long getLastVolumeFailureDate() { public long getLastVolumeFailureDate() {
return 0; return 0L;
} }
@Override // FSDatasetMBean @Override // FSDatasetMBean
public long getEstimatedCapacityLostTotal() { public long getEstimatedCapacityLostTotal() {
return 0; return 0L;
} }
@Override // FsDatasetSpi @Override // FsDatasetSpi
@ -864,27 +862,27 @@ public VolumeFailureSummary getVolumeFailureSummary() {
@Override // FSDatasetMBean @Override // FSDatasetMBean
public long getCacheUsed() { public long getCacheUsed() {
return 0l; return 0L;
} }
@Override // FSDatasetMBean @Override // FSDatasetMBean
public long getCacheCapacity() { public long getCacheCapacity() {
return 0l; return 0L;
} }
@Override // FSDatasetMBean @Override // FSDatasetMBean
public long getNumBlocksCached() { public long getNumBlocksCached() {
return 0l; return 0L;
} }
@Override @Override
public long getNumBlocksFailedToCache() { public long getNumBlocksFailedToCache() {
return 0l; return 0L;
} }
@Override @Override
public long getNumBlocksFailedToUncache() { public long getNumBlocksFailedToUncache() {
return 0l; return 0L;
} }
/** /**
@ -922,7 +920,7 @@ public Replica getReplica(String bpid, long blockId) {
} }
} }
@Override @Override
public synchronized String getReplicaString(String bpid, long blockId) { public synchronized String getReplicaString(String bpid, long blockId) {
Replica r = null; Replica r = null;
try { try {
@ -931,7 +929,7 @@ public synchronized String getReplicaString(String bpid, long blockId) {
} catch (IOException ioe) { } catch (IOException ioe) {
// Ignore // Ignore
} }
return r == null? "null": r.toString(); return Objects.toString(r);
} }
@Override // FsDatasetSpi @Override // FsDatasetSpi
@ -1011,21 +1009,21 @@ public boolean contains(ExtendedBlock block) {
/** /**
* Check if a block is valid. * Check if a block is valid.
* *
* @param b The block to check. * @param b The block to check.
* @param minLength The minimum length that the block must have. May be 0. * @param minLength The minimum length that the block must have. May be 0.
* @param state If this is null, it is ignored. If it is non-null, we * @param state If this is null, it is ignored. If it is non-null, we will
* will check that the replica has this state. * check that the replica has this state.
* *
* @throws ReplicaNotFoundException If the replica is not found * @throws ReplicaNotFoundException If the replica is not found
* *
* @throws UnexpectedReplicaStateException If the replica is not in the * @throws UnexpectedReplicaStateException If the replica is not in the
* expected state. * expected state.
*/ */
@Override // {@link FsDatasetSpi} @Override // {@link FsDatasetSpi}
public void checkBlock(ExtendedBlock b, long minLength, ReplicaState state) public void checkBlock(ExtendedBlock b, long minLength, ReplicaState state)
throws ReplicaNotFoundException, UnexpectedReplicaStateException { throws ReplicaNotFoundException, UnexpectedReplicaStateException {
final BInfo binfo = getBInfo(b); final BInfo binfo = getBInfo(b);
if (binfo == null) { if (binfo == null) {
throw new ReplicaNotFoundException(b); throw new ReplicaNotFoundException(b);
} }
@ -1108,7 +1106,7 @@ public Replica recoverClose(ExtendedBlock b, long newGS, long expectedBlockLen)
map.put(binfo.theBlock, binfo); map.put(binfo.theBlock, binfo);
return binfo; return binfo;
} }
@Override // FsDatasetSpi @Override // FsDatasetSpi
public synchronized ReplicaHandler recoverRbw( public synchronized ReplicaHandler recoverRbw(
ExtendedBlock b, long newGS, long minBytesRcvd, long maxBytesRcvd) ExtendedBlock b, long newGS, long minBytesRcvd, long maxBytesRcvd)
@ -1157,12 +1155,11 @@ protected synchronized InputStream getBlockInputStream(ExtendedBlock b)
throws IOException { throws IOException {
BInfo binfo = getBlockMap(b).get(b.getLocalBlock()); BInfo binfo = getBlockMap(b).get(b.getLocalBlock());
if (binfo == null) { if (binfo == null) {
throw new IOException("No such Block " + b ); throw new IOException("No such Block " + b);
} }
return binfo.getIStream(); return binfo.getIStream();
} }
@Override // FsDatasetSpi @Override // FsDatasetSpi
public synchronized InputStream getBlockInputStream(ExtendedBlock b, public synchronized InputStream getBlockInputStream(ExtendedBlock b,
long seekOffset) throws IOException { long seekOffset) throws IOException {
@ -1183,10 +1180,10 @@ public synchronized LengthInputStream getMetaDataInputStream(ExtendedBlock b
) throws IOException { ) throws IOException {
BInfo binfo = getBlockMap(b).get(b.getLocalBlock()); BInfo binfo = getBlockMap(b).get(b.getLocalBlock());
if (binfo == null) { if (binfo == null) {
throw new IOException("No such Block " + b ); throw new IOException("No such Block " + b);
} }
if (!binfo.finalized) { if (!binfo.finalized) {
throw new IOException("Block " + b + throw new IOException("Block " + b +
" is being written, its meta cannot be read"); " is being written, its meta cannot be read");
} }
final SimulatedInputStream sin = binfo.getMetaIStream(); final SimulatedInputStream sin = binfo.getMetaIStream();
@ -1199,23 +1196,20 @@ public void handleVolumeFailures(Set<FsVolumeSpi> failedVolumes) {
@Override // FsDatasetSpi @Override // FsDatasetSpi
public synchronized void adjustCrcChannelPosition(ExtendedBlock b, public synchronized void adjustCrcChannelPosition(ExtendedBlock b,
ReplicaOutputStreams stream, ReplicaOutputStreams stream, int checksumSize) throws IOException {
int checksumSize)
throws IOException {
} }
/** /**
* Simulated input and output streams * Simulated input and output streams.
*
*/ */
static private class SimulatedInputStream extends java.io.InputStream { static private class SimulatedInputStream extends java.io.InputStream {
final long length; // bytes final long length; // bytes
int currentPos = 0; int currentPos = 0;
byte[] data = null; byte[] data = null;
Block theBlock = null; Block theBlock = null;
/** /**
* An input stream of size l with repeated bytes * An input stream of size l with repeated bytes.
* @param l size of the stream * @param l size of the stream
* @param iRepeatedData byte that is repeated in the stream * @param iRepeatedData byte that is repeated in the stream
*/ */
@ -1223,7 +1217,7 @@ static private class SimulatedInputStream extends java.io.InputStream {
length = l; length = l;
theBlock = b; theBlock = b;
} }
/** /**
* An input stream of of the supplied data * An input stream of of the supplied data
* @param iData data to construct the stream * @param iData data to construct the stream
@ -1232,7 +1226,7 @@ static private class SimulatedInputStream extends java.io.InputStream {
data = iData; data = iData;
length = data.length; length = data.length;
} }
/** /**
* @return the lenght of the input stream * @return the lenght of the input stream
*/ */
@ -1251,10 +1245,9 @@ public int read() throws IOException {
return simulatedByte(theBlock, currentPos++) & BYTE_MASK; return simulatedByte(theBlock, currentPos++) & BYTE_MASK;
} }
} }
@Override
public int read(byte[] b) throws IOException {
@Override
public int read(byte[] b) throws IOException {
if (b == null) { if (b == null) {
throw new NullPointerException(); throw new NullPointerException();
} }
@ -1276,23 +1269,22 @@ public int read(byte[] b) throws IOException {
return bytesRead; return bytesRead;
} }
} }
/** /**
* This class implements an output stream that merely throws its data away, but records its * This class implements an output stream that merely throws its data away, but records its
* length. * length.
*
*/ */
static private class SimulatedOutputStream extends OutputStream { static private class SimulatedOutputStream extends OutputStream {
long length = 0; long length = 0;
/** /**
* constructor for Simulated Output Steram * constructor for Simulated Output Steram
*/ */
SimulatedOutputStream() { SimulatedOutputStream() {
} }
/** /**
* *
* @return the length of the data created so far. * @return the length of the data created so far.
*/ */
long getLength() { long getLength() {
@ -1304,29 +1296,25 @@ long getLength() {
void setLength(long length) { void setLength(long length) {
this.length = length; this.length = length;
} }
@Override @Override
public void write(int arg0) throws IOException { public void write(int arg0) throws IOException {
length++; length++;
} }
@Override @Override
public void write(byte[] b) throws IOException { public void write(byte[] b) throws IOException {
length += b.length; length += b.length;
} }
@Override @Override
public void write(byte[] b, public void write(byte[] b, int off, int len) throws IOException {
int off,
int len) throws IOException {
length += len; length += len;
} }
} }
private ObjectName mbeanName; private ObjectName mbeanName;
/** /**
* Register the FSDataset MBean using the name * Register the FSDataset MBean using the name
* "hadoop:service=DataNode,name=FSDatasetState-<storageid>" * "hadoop:service=DataNode,name=FSDatasetState-<storageid>"
@ -1335,7 +1323,7 @@ public void write(byte[] b,
*/ */
void registerMBean(final String storageId) { void registerMBean(final String storageId) {
// We wrap to bypass standard mbean naming convetion. // We wrap to bypass standard mbean naming convetion.
// This wraping can be removed in java 6 as it is more flexible in // This wraping can be removed in java 6 as it is more flexible in
// package naming for mbeans and their impl. // package naming for mbeans and their impl.
StandardMBean bean; StandardMBean bean;
@ -1346,7 +1334,7 @@ void registerMBean(final String storageId) {
} catch (NotCompliantMBeanException e) { } catch (NotCompliantMBeanException e) {
DataNode.LOG.warn("Error registering FSDatasetState MBean", e); DataNode.LOG.warn("Error registering FSDatasetState MBean", e);
} }
DataNode.LOG.info("Registered FSDatasetState MBean"); DataNode.LOG.info("Registered FSDatasetState MBean");
} }
@ -1359,7 +1347,7 @@ public void shutdown() {
public String getStorageInfo() { public String getStorageInfo() {
return "Simulated FSDataset-" + datanodeUuid; return "Simulated FSDataset-" + datanodeUuid;
} }
@Override @Override
public boolean hasEnoughResource() { public boolean hasEnoughResource() {
return true; return true;
@ -1371,12 +1359,12 @@ public ReplicaRecoveryInfo initReplicaRecovery(RecoveringBlock rBlock)
ExtendedBlock b = rBlock.getBlock(); ExtendedBlock b = rBlock.getBlock();
BInfo binfo = getBlockMap(b).get(b.getLocalBlock()); BInfo binfo = getBlockMap(b).get(b.getLocalBlock());
if (binfo == null) { if (binfo == null) {
throw new IOException("No such Block " + b ); throw new IOException("No such Block " + b);
} }
return new ReplicaRecoveryInfo(binfo.getBlockId(), binfo.getBytesOnDisk(), return new ReplicaRecoveryInfo(binfo.getBlockId(), binfo.getBytesOnDisk(),
binfo.getGenerationStamp(), binfo.getGenerationStamp(),
binfo.isFinalized()?ReplicaState.FINALIZED : ReplicaState.RBW); binfo.isFinalized() ? ReplicaState.FINALIZED : ReplicaState.RBW);
} }
@Override // FsDatasetSpi @Override // FsDatasetSpi
@ -1398,14 +1386,14 @@ public void addBlockPool(String bpid, Configuration conf) {
storage.addBlockPool(bpid); storage.addBlockPool(bpid);
} }
} }
@Override // FsDatasetSpi @Override // FsDatasetSpi
public void shutdownBlockPool(String bpid) { public void shutdownBlockPool(String bpid) {
for (SimulatedStorage storage : storages) { for (SimulatedStorage storage : storages) {
storage.removeBlockPool(bpid); storage.removeBlockPool(bpid);
} }
} }
@Override // FsDatasetSpi @Override // FsDatasetSpi
public void deleteBlockPool(String bpid, boolean force) { public void deleteBlockPool(String bpid, boolean force) {
return; return;
@ -1535,17 +1523,17 @@ public ReplicaInfo moveBlockAcrossStorage(ExtendedBlock block,
// TODO Auto-generated method stub // TODO Auto-generated method stub
return null; return null;
} }
@Override @Override
public void setPinning(ExtendedBlock b) throws IOException { public void setPinning(ExtendedBlock b) throws IOException {
getBlockMap(b).get(b.getLocalBlock()).pinned = true; getBlockMap(b).get(b.getLocalBlock()).pinned = true;
} }
@Override @Override
public boolean getPinning(ExtendedBlock b) throws IOException { public boolean getPinning(ExtendedBlock b) throws IOException {
return getBlockMap(b).get(b.getLocalBlock()).pinned; return getBlockMap(b).get(b.getLocalBlock()).pinned;
} }
@Override @Override
public boolean isDeletingBlock(String bpid, long blockId) { public boolean isDeletingBlock(String bpid, long blockId) {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();

View File

@ -35,7 +35,6 @@
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Iterator; import java.util.Iterator;
import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Random; import java.util.Random;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
@ -924,10 +923,9 @@ public byte[] loadLastPartialChunkChecksum(File blockFile, File metaFile)
} }
@Override @Override
public LinkedList<ScanInfo> compileReport(String bpid, public void compileReport(String bpid,
LinkedList<ScanInfo> report, ReportCompiler reportCompiler) Collection<ScanInfo> report, ReportCompiler reportCompiler)
throws InterruptedException, IOException { throws InterruptedException, IOException {
return null;
} }
@Override @Override

View File

@ -22,14 +22,14 @@
import java.io.IOException; import java.io.IOException;
import java.net.URI; import java.net.URI;
import java.nio.channels.ClosedChannelException; import java.nio.channels.ClosedChannelException;
import java.util.LinkedList; import java.util.Collection;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.DF; import org.apache.hadoop.fs.DF;
import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.server.datanode.DirectoryScanner.ReportCompiler;
import org.apache.hadoop.hdfs.server.datanode.FileIoProvider; import org.apache.hadoop.hdfs.server.datanode.FileIoProvider;
import org.apache.hadoop.hdfs.server.datanode.StorageLocation; import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
import org.apache.hadoop.hdfs.server.datanode.DirectoryScanner.ReportCompiler;
import org.apache.hadoop.hdfs.server.datanode.checker.VolumeCheckResult; import org.apache.hadoop.hdfs.server.datanode.checker.VolumeCheckResult;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.DataNodeVolumeMetrics; import org.apache.hadoop.hdfs.server.datanode.fsdataset.DataNodeVolumeMetrics;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
@ -117,10 +117,8 @@ public byte[] loadLastPartialChunkChecksum(
} }
@Override @Override
public LinkedList<ScanInfo> compileReport(String bpid, public void compileReport(String bpid, Collection<ScanInfo> report,
LinkedList<ScanInfo> report, ReportCompiler reportCompiler) ReportCompiler reportCompiler) throws InterruptedException, IOException {
throws InterruptedException, IOException {
return null;
} }
@Override @Override