HDFS-12685. [READ] FsVolumeImpl exception when scanning Provided storage volume

This commit is contained in:
Virajith Jalaparti 2017-11-30 10:11:12 -08:00 committed by Chris Douglas
parent 4d59dabb7f
commit cc933cba77
5 changed files with 38 additions and 33 deletions

View File

@ -439,7 +439,8 @@ public void close() throws IOException {
@Override
public void refresh() throws IOException {
//nothing to do;
throw new UnsupportedOperationException(
"Refresh not supported by " + getClass());
}
}

View File

@ -515,7 +515,8 @@ private void addDifference(LinkedList<ScanInfo> diffRecord,
*
* @return a map of sorted arrays of block information
*/
private Map<String, ScanInfo[]> getDiskReport() {
@VisibleForTesting
public Map<String, ScanInfo[]> getDiskReport() {
ScanInfoPerBlockPool list = new ScanInfoPerBlockPool();
ScanInfoPerBlockPool[] dirReports = null;
// First get list of data directories

View File

@ -296,30 +296,13 @@ private static String getSuffix(File f, String prefix) {
*/
public ScanInfo(long blockId, File blockFile, File metaFile,
FsVolumeSpi vol) {
this(blockId, blockFile, metaFile, vol, null,
(blockFile != null) ? blockFile.length() : 0);
}
/**
* Create a ScanInfo object for a block. This constructor will examine
* the block data and meta-data files.
*
* @param blockId the block ID
* @param blockFile the path to the block data file
* @param metaFile the path to the block meta-data file
* @param vol the volume that contains the block
* @param fileRegion the file region (for provided blocks)
* @param length the length of the block data
*/
public ScanInfo(long blockId, File blockFile, File metaFile,
FsVolumeSpi vol, FileRegion fileRegion, long length) {
this.blockId = blockId;
String condensedVolPath =
(vol == null || vol.getBaseURI() == null) ? null :
getCondensedPath(new File(vol.getBaseURI()).getAbsolutePath());
this.blockSuffix = blockFile == null ? null :
getSuffix(blockFile, condensedVolPath);
this.blockLength = length;
this.blockLength = (blockFile != null) ? blockFile.length() : 0;
if (metaFile == null) {
this.metaSuffix = null;
} else if (blockFile == null) {
@ -329,7 +312,26 @@ public ScanInfo(long blockId, File blockFile, File metaFile,
condensedVolPath + blockSuffix);
}
this.volume = vol;
this.fileRegion = null;
}
/**
* Create a ScanInfo object for a block. This constructor will examine
* the block data and meta-data files.
*
* @param blockId the block ID
* @param vol the volume that contains the block
* @param fileRegion the file region (for provided blocks)
* @param length the length of the block data
*/
public ScanInfo(long blockId, FsVolumeSpi vol, FileRegion fileRegion,
long length) {
this.blockId = blockId;
this.blockLength = length;
this.volume = vol;
this.fileRegion = fileRegion;
this.blockSuffix = null;
this.metaSuffix = null;
}
/**

View File

@ -226,9 +226,7 @@ public void compileReport(LinkedList<ScanInfo> report,
reportCompiler.throttle();
FileRegion region = iter.next();
if (region.getBlockPoolId().equals(bpid)) {
LOG.info("Adding ScanInfo for blkid " +
region.getBlock().getBlockId());
report.add(new ScanInfo(region.getBlock().getBlockId(), null, null,
report.add(new ScanInfo(region.getBlock().getBlockId(),
providedVolume, region, region.getLength()));
}
}

View File

@ -61,6 +61,7 @@
import org.apache.hadoop.hdfs.server.datanode.DNConf;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataStorage;
import org.apache.hadoop.hdfs.server.datanode.DirectoryScanner;
import org.apache.hadoop.hdfs.server.datanode.ProvidedReplica;
import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
import org.apache.hadoop.hdfs.server.datanode.ShortCircuitRegistry;
@ -231,14 +232,6 @@ public Writer<FileRegion> getWriter(Writer.Options opts)
public void refresh() throws IOException {
// do nothing!
}
public void setMinBlkId(int minId) {
this.minId = minId;
}
public void setBlockCount(int numBlocks) {
this.numBlocks = numBlocks;
}
}
private static Storage.StorageDirectory createLocalStorageDirectory(
@ -606,4 +599,14 @@ public void testProvidedReplicaPrefix() throws Exception {
}
}
}
@Test
public void testScannerWithProvidedVolumes() throws Exception {
DirectoryScanner scanner = new DirectoryScanner(datanode, dataset, conf);
Map<String, FsVolumeSpi.ScanInfo[]> report = scanner.getDiskReport();
// no blocks should be reported for the Provided volume as long as
// the directoryScanner is disabled.
assertEquals(0, report.get(BLOCK_POOL_IDS[CHOSEN_BP_ID]).length);
}
}