From 94576b17fbc19c440efafb6c3322f53ec78a5b55 Mon Sep 17 00:00:00 2001 From: Erik Krogen Date: Mon, 18 Dec 2017 11:36:22 -0800 Subject: [PATCH] HDFS-12818. Support multiple storages in DataNodeCluster / SimulatedFSDataset. Contributed by Erik Krogen. --- .../server/datanode/SimulatedFSDataset.java | 306 ++++++++++++------ .../datanode/TestSimulatedFSDataset.java | 147 +++++---- ...imulatedFSDatasetWithMultipleStorages.java | 50 +++ 3 files changed, 351 insertions(+), 152 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDatasetWithMultipleStorages.java diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java index c31df4c033..987ba97d64 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java @@ -23,8 +23,8 @@ import java.io.OutputStream; import java.net.URI; import java.nio.channels.ClosedChannelException; +import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; import java.util.HashMap; import java.util.LinkedList; import java.util.List; @@ -37,11 +37,13 @@ import javax.management.ObjectName; import javax.management.StandardMBean; +import com.google.common.math.LongMath; import org.apache.commons.lang.ArrayUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.DF; import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.server.datanode.checker.VolumeCheckResult; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetImplTestUtils; import org.apache.hadoop.util.AutoCloseableLock; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.protocol.Block; @@ -88,6 +90,7 @@ */ public class SimulatedFSDataset implements FsDatasetSpi { public final static int BYTE_MASK = 0xff; + private final static int DEFAULT_NUM_SIMULATED_DATA_DIRS = 1; static class Factory extends FsDatasetSpi.Factory { @Override public SimulatedFSDataset newInstance(DataNode datanode, @@ -100,10 +103,42 @@ public boolean isSimulated() { return true; } } - + + /** + * Used to change the default number of data storages and to mark the + * FSDataset as simulated. + */ + static class TestUtilsFactory + extends FsDatasetTestUtils.Factory { + @Override + public FsDatasetTestUtils newInstance(DataNode datanode) { + return new FsDatasetImplTestUtils(datanode) { + @Override + public int getDefaultNumOfDataDirs() { + return DEFAULT_NUM_SIMULATED_DATA_DIRS; + } + }; + } + + @Override + public boolean isSimulated() { + return true; + } + + @Override + public int getDefaultNumOfDataDirs() { + return DEFAULT_NUM_SIMULATED_DATA_DIRS; + } + + } + public static void setFactory(Configuration conf) { conf.set(DFSConfigKeys.DFS_DATANODE_FSDATASET_FACTORY_KEY, Factory.class.getName()); + conf.setClass("org.apache.hadoop.hdfs.server.datanode." + + "SimulatedFSDatasetTestUtilsFactory", + TestUtilsFactory.class, FsDatasetTestUtils.Factory.class + ); } public static byte simulatedByte(Block b, long offsetInBlk) { @@ -151,7 +186,7 @@ private class BInfo implements ReplicaInPipeline { if (theBlock.getNumBytes() < 0) { theBlock.setNumBytes(0); } - if (!storage.alloc(bpid, theBlock.getNumBytes())) { + if (!getStorage(theBlock).alloc(bpid, theBlock.getNumBytes())) { // expected length - actual length may // be more - we find out at finalize DataNode.LOG.warn("Lack of free storage on a block alloc"); @@ -169,7 +204,7 @@ private class BInfo implements ReplicaInPipeline { @Override public String getStorageUuid() { - return storage.getStorageUuid(); + return getStorage(theBlock).getStorageUuid(); } @Override @@ -226,12 +261,12 @@ synchronized void finalizeBlock(String bpid, long finalSize) // adjust if necessary long extraLen = finalSize - theBlock.getNumBytes(); if (extraLen > 0) { - if (!storage.alloc(bpid,extraLen)) { + if (!getStorage(theBlock).alloc(bpid, extraLen)) { DataNode.LOG.warn("Lack of free storage on a block alloc"); throw new IOException("Creating block, no free space available"); } } else { - storage.free(bpid, -extraLen); + getStorage(theBlock).free(bpid, -extraLen); } theBlock.setNumBytes(finalSize); @@ -271,7 +306,7 @@ synchronized public ReplicaOutputStreams createStreams(boolean isCreate, } else { SimulatedOutputStream crcStream = new SimulatedOutputStream(); return new ReplicaOutputStreams(oStream, crcStream, requestedChecksum, - volume, fileIoProvider); + getStorage(theBlock).getVolume(), fileIoProvider); } } @@ -368,6 +403,7 @@ public void stopWriter(long xceiverStopTimeout) throws IOException { */ private static class SimulatedBPStorage { private long used; // in bytes + private final Map blockMap = new TreeMap<>(); long getUsed() { return used; @@ -381,6 +417,10 @@ void free(long amount) { used -= amount; } + Map getBlockMap() { + return blockMap; + } + SimulatedBPStorage() { used = 0; } @@ -392,10 +432,11 @@ void free(long amount) { */ private static class SimulatedStorage { private final Map map = - new HashMap(); + new ConcurrentHashMap<>(); private final long capacity; // in bytes private final DatanodeStorage dnStorage; + private final SimulatedVolume volume; synchronized long getFree() { return capacity - getUsed(); @@ -433,11 +474,15 @@ synchronized void free(String bpid, long amount) throws IOException { getBPStorage(bpid).free(amount); } - SimulatedStorage(long cap, DatanodeStorage.State state) { + SimulatedStorage(long cap, DatanodeStorage.State state, + FileIoProvider fileIoProvider, Configuration conf) { capacity = cap; dnStorage = new DatanodeStorage( "SimulatedStorage-" + DatanodeStorage.generateUuid(), state, StorageType.DEFAULT); + DataNodeVolumeMetrics volumeMetrics = + DataNodeVolumeMetrics.create(conf, dnStorage.getStorageID()); + this.volume = new SimulatedVolume(this, fileIoProvider, volumeMetrics); } synchronized void addBlockPool(String bpid) { @@ -473,6 +518,18 @@ synchronized StorageReport getStorageReport(String bpid) { false, getCapacity(), getUsed(), getFree(), map.get(bpid).getUsed(), 0L); } + + SimulatedVolume getVolume() { + return volume; + } + + Map getBlockMap(String bpid) throws IOException { + SimulatedBPStorage bpStorage = map.get(bpid); + if (bpStorage == null) { + throw new IOException("Nonexistent block pool: " + bpid); + } + return bpStorage.getBlockMap(); + } } static class SimulatedVolume implements FsVolumeSpi { @@ -601,10 +658,7 @@ public VolumeCheckResult check(VolumeCheckContext context) } } - private final Map> blockMap - = new ConcurrentHashMap>(); - private final SimulatedStorage storage; - private final SimulatedVolume volume; + private final List storages; private final String datanodeUuid; private final DataNode datanode; @@ -615,27 +669,30 @@ public SimulatedFSDataset(DataStorage storage, Configuration conf) { public SimulatedFSDataset(DataNode datanode, DataStorage storage, Configuration conf) { this.datanode = datanode; - if (storage != null) { + int storageCount; + if (storage != null && storage.getNumStorageDirs() > 0) { + storageCount = storage.getNumStorageDirs(); for (int i = 0; i < storage.getNumStorageDirs(); ++i) { DataStorage.createStorageID(storage.getStorageDir(i), false, conf); } this.datanodeUuid = storage.getDatanodeUuid(); } else { + storageCount = DataNode.getStorageLocations(conf).size(); this.datanodeUuid = "SimulatedDatanode-" + DataNode.generateUuid(); } registerMBean(datanodeUuid); this.fileIoProvider = new FileIoProvider(conf, datanode); - this.storage = new SimulatedStorage( - conf.getLong(CONFIG_PROPERTY_CAPACITY, DEFAULT_CAPACITY), - conf.getEnum(CONFIG_PROPERTY_STATE, DEFAULT_STATE)); - // TODO: per volume id or path - DataNodeVolumeMetrics volumeMetrics = DataNodeVolumeMetrics.create(conf, - datanodeUuid); - this.volume = new SimulatedVolume(this.storage, this.fileIoProvider, - volumeMetrics); this.datasetLock = new AutoCloseableLock(); + + this.storages = new ArrayList<>(); + for (int i = 0; i < storageCount; i++) { + this.storages.add(new SimulatedStorage( + conf.getLong(CONFIG_PROPERTY_CAPACITY, DEFAULT_CAPACITY), + conf.getEnum(CONFIG_PROPERTY_STATE, DEFAULT_STATE), + fileIoProvider, conf)); + } } public synchronized void injectBlocks(String bpid, @@ -651,33 +708,50 @@ public synchronized void injectBlocks(String bpid, throw new IOException("Block already exists in block list"); } } - Map map = blockMap.get(bpid); - if (map == null) { - map = new TreeMap<>(); - blockMap.put(bpid, map); + + for (SimulatedStorage storage : storages) { + storage.addBlockPool(bpid); } - + for (Block b: injectBlocks) { BInfo binfo = new BInfo(bpid, b, false); - map.put(binfo.theBlock, binfo); + getBlockMap(b, bpid).put(binfo.theBlock, binfo); } } } + + /** Get the storage that a given block lives within. */ + private SimulatedStorage getStorage(Block b) { + return storages.get(LongMath.mod(b.getBlockId(), storages.size())); + } - /** Get a map for a given block pool Id */ - private Map getMap(String bpid) throws IOException { - final Map map = blockMap.get(bpid); - if (map == null) { - throw new IOException("Non existent blockpool " + bpid); - } - return map; + /** + * Get the block map that a given block lives within, assuming it is within + * block pool bpid. + * @param b The block to look for + * @param bpid The block pool that contains b + * @return The block map (non-null) + * @throws IOException if bpid does not exist + */ + private Map getBlockMap(Block b, String bpid) + throws IOException { + return getStorage(b).getBlockMap(bpid); + } + + /** + * Get the block map that a given block lives within. + * @param b The extended block to look for + * @return The block map (non-null) + * @throws IOException if b is in a nonexistent block pool + */ + private Map getBlockMap(ExtendedBlock b) throws IOException { + return getBlockMap(b.getLocalBlock(), b.getBlockPoolId()); } @Override // FsDatasetSpi public synchronized void finalizeBlock(ExtendedBlock b, boolean fsyncDir) throws IOException { - final Map map = getMap(b.getBlockPoolId()); - BInfo binfo = map.get(b.getLocalBlock()); + BInfo binfo = getBlockMap(b).get(b.getLocalBlock()); if (binfo == null) { throw new IOException("Finalizing a non existing block " + b); } @@ -687,20 +761,21 @@ public synchronized void finalizeBlock(ExtendedBlock b, boolean fsyncDir) @Override // FsDatasetSpi public synchronized void unfinalizeBlock(ExtendedBlock b) throws IOException{ if (isValidRbw(b)) { - final Map map = getMap(b.getBlockPoolId()); - map.remove(b.getLocalBlock()); + getBlockMap(b).remove(b.getLocalBlock()); } } - synchronized BlockListAsLongs getBlockReport(String bpid) { + synchronized BlockListAsLongs getBlockReport(String bpid, + SimulatedStorage storage) { BlockListAsLongs.Builder report = BlockListAsLongs.builder(); - final Map map = blockMap.get(bpid); - if (map != null) { - for (BInfo b : map.values()) { + try { + for (BInfo b : storage.getBlockMap(bpid).values()) { if (b.isFinalized()) { report.add(b); } } + } catch (IOException ioe) { + DataNode.LOG.error("Exception while getting block reports", ioe); } return report.build(); } @@ -708,7 +783,11 @@ synchronized BlockListAsLongs getBlockReport(String bpid) { @Override public synchronized Map getBlockReports( String bpid) { - return Collections.singletonMap(storage.getDnStorage(), getBlockReport(bpid)); + Map blockReports = new HashMap<>(); + for (SimulatedStorage storage : storages) { + blockReports.put(storage.getDnStorage(), getBlockReport(bpid, storage)); + } + return blockReports; } @Override // FsDatasetSpi @@ -718,27 +797,49 @@ public List getCacheReport(String bpid) { @Override // FSDatasetMBean public long getCapacity() { - return storage.getCapacity(); + long total = 0; + for (SimulatedStorage storage : storages) { + total += storage.getCapacity(); + } + return total; } @Override // FSDatasetMBean public long getDfsUsed() { - return storage.getUsed(); + long total = 0; + for (SimulatedStorage storage : storages) { + total += storage.getUsed(); + } + return total; } @Override // FSDatasetMBean public long getBlockPoolUsed(String bpid) throws IOException { - return storage.getBlockPoolUsed(bpid); + long total = 0; + for (SimulatedStorage storage : storages) { + total += storage.getBlockPoolUsed(bpid); + } + return total; } @Override // FSDatasetMBean public long getRemaining() { - return storage.getFree(); + + long total = 0; + for (SimulatedStorage storage : storages) { + total += storage.getFree(); + } + return total; } @Override // FSDatasetMBean public int getNumFailedVolumes() { - return storage.getNumFailedVolumes(); + + int total = 0; + for (SimulatedStorage storage : storages) { + total += storage.getNumFailedVolumes(); + } + return total; } @Override // FSDatasetMBean @@ -803,8 +904,7 @@ public void getMetrics(MetricsCollector collector, boolean all) { @Override // FsDatasetSpi public synchronized long getLength(ExtendedBlock b) throws IOException { - final Map map = getMap(b.getBlockPoolId()); - BInfo binfo = map.get(b.getLocalBlock()); + BInfo binfo = getBlockMap(b).get(b.getLocalBlock()); if (binfo == null) { throw new IOException("Finalizing a non existing block " + b); } @@ -814,34 +914,38 @@ public synchronized long getLength(ExtendedBlock b) throws IOException { @Override @Deprecated public Replica getReplica(String bpid, long blockId) { - final Map map = blockMap.get(bpid); - if (map != null) { - return map.get(new Block(blockId)); + Block b = new Block(blockId); + try { + return getBlockMap(b, bpid).get(b); + } catch (IOException ioe) { + return null; } - return null; } @Override public synchronized String getReplicaString(String bpid, long blockId) { Replica r = null; - final Map map = blockMap.get(bpid); - if (map != null) { - r = map.get(new Block(blockId)); + try { + Block b = new Block(blockId); + r = getBlockMap(b, bpid).get(b); + } catch (IOException ioe) { + // Ignore } return r == null? "null": r.toString(); } @Override // FsDatasetSpi public Block getStoredBlock(String bpid, long blkid) throws IOException { - final Map map = blockMap.get(bpid); - if (map != null) { - BInfo binfo = map.get(new Block(blkid)); + Block b = new Block(blkid); + try { + BInfo binfo = getBlockMap(b, bpid).get(b); if (binfo == null) { return null; } return new Block(blkid, binfo.getGenerationStamp(), binfo.getNumBytes()); + } catch (IOException ioe) { + return null; } - return null; } @Override // FsDatasetSpi @@ -851,18 +955,18 @@ public synchronized void invalidate(String bpid, Block[] invalidBlks) if (invalidBlks == null) { return; } - final Map map = getMap(bpid); for (Block b: invalidBlks) { if (b == null) { continue; } + Map map = getBlockMap(b, bpid); BInfo binfo = map.get(b); if (binfo == null) { error = true; DataNode.LOG.warn("Invalidate: Missing block"); continue; } - storage.free(bpid, binfo.getNumBytes()); + getStorage(b).free(bpid, binfo.getNumBytes()); map.remove(b); if (datanode != null) { datanode.notifyNamenodeDeletedBlock(new ExtendedBlock(bpid, b), @@ -892,8 +996,11 @@ public boolean isCached(String bpid, long blockId) { } private BInfo getBInfo(final ExtendedBlock b) { - final Map map = blockMap.get(b.getBlockPoolId()); - return map == null? null: map.get(b.getLocalBlock()); + try { + return getBlockMap(b).get(b.getLocalBlock()); + } catch (IOException ioe) { + return null; + } } @Override // {@link FsDatasetSpi} @@ -957,8 +1064,7 @@ public String toString() { @Override // FsDatasetSpi public synchronized ReplicaHandler append( ExtendedBlock b, long newGS, long expectedBlockLen) throws IOException { - final Map map = getMap(b.getBlockPoolId()); - BInfo binfo = map.get(b.getLocalBlock()); + BInfo binfo = getBlockMap(b).get(b.getLocalBlock()); if (binfo == null || !binfo.isFinalized()) { throw new ReplicaNotFoundException("Block " + b + " is not valid, and cannot be appended to."); @@ -970,7 +1076,7 @@ public synchronized ReplicaHandler append( @Override // FsDatasetSpi public synchronized ReplicaHandler recoverAppend( ExtendedBlock b, long newGS, long expectedBlockLen) throws IOException { - final Map map = getMap(b.getBlockPoolId()); + final Map map = getBlockMap(b); BInfo binfo = map.get(b.getLocalBlock()); if (binfo == null) { throw new ReplicaNotFoundException("Block " + b @@ -988,7 +1094,7 @@ public synchronized ReplicaHandler recoverAppend( @Override // FsDatasetSpi public Replica recoverClose(ExtendedBlock b, long newGS, long expectedBlockLen) throws IOException { - final Map map = getMap(b.getBlockPoolId()); + final Map map = getBlockMap(b); BInfo binfo = map.get(b.getLocalBlock()); if (binfo == null) { throw new ReplicaNotFoundException("Block " + b @@ -1007,7 +1113,7 @@ public Replica recoverClose(ExtendedBlock b, long newGS, long expectedBlockLen) public synchronized ReplicaHandler recoverRbw( ExtendedBlock b, long newGS, long minBytesRcvd, long maxBytesRcvd) throws IOException { - final Map map = getMap(b.getBlockPoolId()); + final Map map = getBlockMap(b); BInfo binfo = map.get(b.getLocalBlock()); if ( binfo == null) { throw new ReplicaNotFoundException("Block " + b @@ -1042,16 +1148,14 @@ public synchronized ReplicaHandler createTemporary(StorageType storageType, throw new ReplicaAlreadyExistsException("Block " + b + " is being written, and cannot be written to."); } - final Map map = getMap(b.getBlockPoolId()); BInfo binfo = new BInfo(b.getBlockPoolId(), b.getLocalBlock(), true); - map.put(binfo.theBlock, binfo); + getBlockMap(b).put(binfo.theBlock, binfo); return new ReplicaHandler(binfo, null); } protected synchronized InputStream getBlockInputStream(ExtendedBlock b) throws IOException { - final Map map = getMap(b.getBlockPoolId()); - BInfo binfo = map.get(b.getLocalBlock()); + BInfo binfo = getBlockMap(b).get(b.getLocalBlock()); if (binfo == null) { throw new IOException("No such Block " + b ); } @@ -1077,8 +1181,7 @@ public ReplicaInputStreams getTmpInputStreams(ExtendedBlock b, long blkoff, @Override // FsDatasetSpi public synchronized LengthInputStream getMetaDataInputStream(ExtendedBlock b ) throws IOException { - final Map map = getMap(b.getBlockPoolId()); - BInfo binfo = map.get(b.getLocalBlock()); + BInfo binfo = getBlockMap(b).get(b.getLocalBlock()); if (binfo == null) { throw new IOException("No such Block " + b ); } @@ -1266,8 +1369,7 @@ public boolean hasEnoughResource() { public ReplicaRecoveryInfo initReplicaRecovery(RecoveringBlock rBlock) throws IOException { ExtendedBlock b = rBlock.getBlock(); - final Map map = getMap(b.getBlockPoolId()); - BInfo binfo = map.get(b.getLocalBlock()); + BInfo binfo = getBlockMap(b).get(b.getLocalBlock()); if (binfo == null) { throw new IOException("No such Block " + b ); } @@ -1282,7 +1384,7 @@ public Replica updateReplicaUnderRecovery(ExtendedBlock oldBlock, long recoveryId, long newBlockId, long newlength) throws IOException { - return getMap(oldBlock.getBlockPoolId()).get(oldBlock.getLocalBlock()); + return getBInfo(oldBlock); } @Override // FsDatasetSpi @@ -1292,15 +1394,16 @@ public long getReplicaVisibleLength(ExtendedBlock block) { @Override // FsDatasetSpi public void addBlockPool(String bpid, Configuration conf) { - Map map = new TreeMap<>(); - blockMap.put(bpid, map); - storage.addBlockPool(bpid); + for (SimulatedStorage storage : storages) { + storage.addBlockPool(bpid); + } } @Override // FsDatasetSpi public void shutdownBlockPool(String bpid) { - blockMap.remove(bpid); - storage.removeBlockPool(bpid); + for (SimulatedStorage storage : storages) { + storage.removeBlockPool(bpid); + } } @Override // FsDatasetSpi @@ -1311,11 +1414,7 @@ public void deleteBlockPool(String bpid, boolean force) { @Override public ReplicaInPipeline convertTemporaryToRbw(ExtendedBlock temporary) throws IOException { - final Map map = blockMap.get(temporary.getBlockPoolId()); - if (map == null) { - throw new IOException("Block pool not found, temporary=" + temporary); - } - final BInfo r = map.get(temporary.getLocalBlock()); + final BInfo r = getBlockMap(temporary).get(temporary.getLocalBlock()); if (r == null) { throw new IOException("Block not found, temporary=" + temporary); } else if (r.isFinalized()) { @@ -1359,7 +1458,11 @@ public void checkAndUpdate(String bpid, ScanInfo info) throws IOException { @Override public FsVolumeReferences getFsVolumeReferences() { - return new FsVolumeReferences(Collections.singletonList(volume)); + List volumes = new ArrayList<>(); + for (SimulatedStorage storage : storages) { + volumes.add(storage.getVolume()); + } + return new FsVolumeReferences(volumes); } @Override @@ -1371,14 +1474,21 @@ public void addVolume( @Override public DatanodeStorage getStorage(final String storageUuid) { - return storageUuid.equals(storage.getStorageUuid()) ? - storage.dnStorage : - null; + for (SimulatedStorage storage : storages) { + if (storageUuid.equals(storage.getStorageUuid())) { + return storage.getDnStorage(); + } + } + return null; } @Override public StorageReport[] getStorageReports(String bpid) { - return new StorageReport[] {storage.getStorageReport(bpid)}; + List reports = new ArrayList<>(); + for (SimulatedStorage storage : storages) { + reports.add(storage.getStorageReport(bpid)); + } + return reports.toArray(new StorageReport[0]); } @Override @@ -1393,7 +1503,7 @@ public Map getVolumeInfoMap() { @Override public FsVolumeSpi getVolume(ExtendedBlock b) { - return volume; + return getStorage(b.getLocalBlock()).getVolume(); } @Override @@ -1428,12 +1538,12 @@ public ReplicaInfo moveBlockAcrossStorage(ExtendedBlock block, @Override public void setPinning(ExtendedBlock b) throws IOException { - blockMap.get(b.getBlockPoolId()).get(b.getLocalBlock()).pinned = true; + getBlockMap(b).get(b.getLocalBlock()).pinned = true; } @Override public boolean getPinning(ExtendedBlock b) throws IOException { - return blockMap.get(b.getBlockPoolId()).get(b.getLocalBlock()).pinned; + return getBlockMap(b).get(b.getLocalBlock()).pinned; } @Override diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java index 4775fc7ac4..dde9ad50fa 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java @@ -26,20 +26,19 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.util.Map; -import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; -import org.apache.hadoop.hdfs.server.blockmanagement.SequentialBlockIdGenerator; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams; import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetFactory; +import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.util.DataChecksum; -import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -53,6 +52,16 @@ public class TestSimulatedFSDataset { static final int BLOCK_LENGTH_MULTIPLIER = 79; static final long FIRST_BLK_ID = 1; + private final int storageCount; + + public TestSimulatedFSDataset() { + this(1); + } + + protected TestSimulatedFSDataset(int storageCount) { + this.storageCount = storageCount; + } + @Before public void setUp() throws Exception { conf = new HdfsConfiguration(); @@ -187,43 +196,28 @@ private void testWriteRead(boolean negativeBlkID) throws IOException { @Test public void testGetBlockReport() throws IOException { - SimulatedFSDataset fsdataset = getSimulatedFSDataset(); - BlockListAsLongs blockReport = fsdataset.getBlockReport(bpid); - assertEquals(0, blockReport.getNumberOfBlocks()); + final SimulatedFSDataset fsdataset = getSimulatedFSDataset(); + assertBlockReportCountAndSize(fsdataset, 0); addSomeBlocks(fsdataset); - blockReport = fsdataset.getBlockReport(bpid); - assertEquals(NUMBLOCKS, blockReport.getNumberOfBlocks()); - for (Block b: blockReport) { - assertNotNull(b); - assertEquals(blockIdToLen(b.getBlockId()), b.getNumBytes()); - } + assertBlockReportCountAndSize(fsdataset, NUMBLOCKS); + assertBlockLengthInBlockReports(fsdataset); } @Test public void testInjectionEmpty() throws IOException { SimulatedFSDataset fsdataset = getSimulatedFSDataset(); - BlockListAsLongs blockReport = fsdataset.getBlockReport(bpid); - assertEquals(0, blockReport.getNumberOfBlocks()); + assertBlockReportCountAndSize(fsdataset, 0); int bytesAdded = addSomeBlocks(fsdataset); - blockReport = fsdataset.getBlockReport(bpid); - assertEquals(NUMBLOCKS, blockReport.getNumberOfBlocks()); - for (Block b: blockReport) { - assertNotNull(b); - assertEquals(blockIdToLen(b.getBlockId()), b.getNumBytes()); - } + assertBlockReportCountAndSize(fsdataset, NUMBLOCKS); + assertBlockLengthInBlockReports(fsdataset); // Inject blocks into an empty fsdataset // - injecting the blocks we got above. SimulatedFSDataset sfsdataset = getSimulatedFSDataset(); - sfsdataset.injectBlocks(bpid, blockReport); - blockReport = sfsdataset.getBlockReport(bpid); - assertEquals(NUMBLOCKS, blockReport.getNumberOfBlocks()); - for (Block b: blockReport) { - assertNotNull(b); - assertEquals(blockIdToLen(b.getBlockId()), b.getNumBytes()); - assertEquals(blockIdToLen(b.getBlockId()), sfsdataset - .getLength(new ExtendedBlock(bpid, b))); - } + injectBlocksFromBlockReport(fsdataset, sfsdataset); + assertBlockReportCountAndSize(fsdataset, NUMBLOCKS); + assertBlockLengthInBlockReports(fsdataset, sfsdataset); + assertEquals(bytesAdded, sfsdataset.getDfsUsed()); assertEquals(sfsdataset.getCapacity()-bytesAdded, sfsdataset.getRemaining()); } @@ -231,16 +225,10 @@ public void testInjectionEmpty() throws IOException { @Test public void testInjectionNonEmpty() throws IOException { SimulatedFSDataset fsdataset = getSimulatedFSDataset(); - BlockListAsLongs blockReport = fsdataset.getBlockReport(bpid); - assertEquals(0, blockReport.getNumberOfBlocks()); + assertBlockReportCountAndSize(fsdataset, 0); int bytesAdded = addSomeBlocks(fsdataset); - blockReport = fsdataset.getBlockReport(bpid); - assertEquals(NUMBLOCKS, blockReport.getNumberOfBlocks()); - for (Block b: blockReport) { - assertNotNull(b); - assertEquals(blockIdToLen(b.getBlockId()), b.getNumBytes()); - } - fsdataset = null; + assertBlockReportCountAndSize(fsdataset, NUMBLOCKS); + assertBlockLengthInBlockReports(fsdataset); // Inject blocks into an non-empty fsdataset // - injecting the blocks we got above. @@ -248,19 +236,10 @@ public void testInjectionNonEmpty() throws IOException { // Add come blocks whose block ids do not conflict with // the ones we are going to inject. bytesAdded += addSomeBlocks(sfsdataset, NUMBLOCKS+1, false); - sfsdataset.getBlockReport(bpid); - assertEquals(NUMBLOCKS, blockReport.getNumberOfBlocks()); - sfsdataset.getBlockReport(bpid); - assertEquals(NUMBLOCKS, blockReport.getNumberOfBlocks()); - sfsdataset.injectBlocks(bpid, blockReport); - blockReport = sfsdataset.getBlockReport(bpid); - assertEquals(NUMBLOCKS*2, blockReport.getNumberOfBlocks()); - for (Block b: blockReport) { - assertNotNull(b); - assertEquals(blockIdToLen(b.getBlockId()), b.getNumBytes()); - assertEquals(blockIdToLen(b.getBlockId()), sfsdataset - .getLength(new ExtendedBlock(bpid, b))); - } + assertBlockReportCountAndSize(sfsdataset, NUMBLOCKS); + injectBlocksFromBlockReport(fsdataset, sfsdataset); + assertBlockReportCountAndSize(sfsdataset, NUMBLOCKS * 2); + assertBlockLengthInBlockReports(fsdataset, sfsdataset); assertEquals(bytesAdded, sfsdataset.getDfsUsed()); assertEquals(sfsdataset.getCapacity()-bytesAdded, sfsdataset.getRemaining()); @@ -270,7 +249,7 @@ public void testInjectionNonEmpty() throws IOException { try { sfsdataset = getSimulatedFSDataset(); sfsdataset.addBlockPool(bpid, conf); - sfsdataset.injectBlocks(bpid, blockReport); + injectBlocksFromBlockReport(fsdataset, sfsdataset); assertTrue("Expected an IO exception", false); } catch (IOException e) { // ok - as expected @@ -334,8 +313,68 @@ public void testInvalidate() throws IOException { assertTrue(fsdataset.isValidBlock(new ExtendedBlock(bpid, b))); } } - - private SimulatedFSDataset getSimulatedFSDataset() { + + /** + * Inject all of the blocks returned from sourceFSDataset's block reports + * into destinationFSDataset. + */ + private void injectBlocksFromBlockReport(SimulatedFSDataset sourceFSDataset, + SimulatedFSDataset destinationFSDataset) throws IOException { + for (Map.Entry ent : + sourceFSDataset.getBlockReports(bpid).entrySet()) { + destinationFSDataset.injectBlocks(bpid, ent.getValue()); + } + } + + /** + * Assert that the number of block reports returned from fsdataset matches + * {@code storageCount}, and that the total number of blocks is equal to + * expectedBlockCount. + */ + private void assertBlockReportCountAndSize(SimulatedFSDataset fsdataset, + int expectedBlockCount) { + Map blockReportMap = + fsdataset.getBlockReports(bpid); + assertEquals(storageCount, blockReportMap.size()); + int totalCount = 0; + for (Map.Entry ent : + blockReportMap.entrySet()) { + totalCount += ent.getValue().getNumberOfBlocks(); + } + assertEquals(expectedBlockCount, totalCount); + } + + /** + * Convenience method to call {@link #assertBlockLengthInBlockReports( + * SimulatedFSDataset,SimulatedFSDataset)} with a null second parameter. + */ + private void assertBlockLengthInBlockReports(SimulatedFSDataset fsdataset) + throws IOException { + assertBlockLengthInBlockReports(fsdataset, null); + } + + /** + * Assert that, for all of the blocks in the block report(s) returned from + * fsdataset, they are not null and their length matches the expectation. + * If otherFSDataset is non-null, additionally confirm that its idea of the + * length of the block matches as well. + */ + private void assertBlockLengthInBlockReports(SimulatedFSDataset fsdataset, + SimulatedFSDataset otherFSDataset) throws IOException { + for (Map.Entry ent : + fsdataset.getBlockReports(bpid).entrySet()) { + for (Block b : ent.getValue()) { + assertNotNull(b); + assertEquals(blockIdToLen(b.getBlockId()), b.getNumBytes()); + if (otherFSDataset != null) { + assertEquals(blockIdToLen(b.getBlockId()), otherFSDataset + .getLength(new ExtendedBlock(bpid, b))); + } + } + } + } + + protected SimulatedFSDataset getSimulatedFSDataset() { SimulatedFSDataset fsdataset = new SimulatedFSDataset(null, conf); fsdataset.addBlockPool(bpid, conf); return fsdataset; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDatasetWithMultipleStorages.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDatasetWithMultipleStorages.java new file mode 100644 index 0000000000..b31ae98571 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDatasetWithMultipleStorages.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.datanode; + +import org.junit.Before; +import org.junit.Test; + +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY; +import static org.junit.Assert.assertEquals; + + +/** + * Test that the {@link SimulatedFSDataset} works correctly when configured + * with multiple storages. + */ +public class TestSimulatedFSDatasetWithMultipleStorages + extends TestSimulatedFSDataset { + + public TestSimulatedFSDatasetWithMultipleStorages() { + super(2); + } + + @Before + public void setUp() throws Exception { + super.setUp(); + conf.set(DFS_DATANODE_DATA_DIR_KEY, "data1,data2"); + } + + @Test + public void testMultipleStoragesConfigured() { + SimulatedFSDataset fsDataset = getSimulatedFSDataset(); + assertEquals(2, fsDataset.getStorageReports(bpid).length); + } + +}