HDFS-6991. Notify NN of evicted block before deleting it from RAM disk. (Arpit Agarwal)

This commit is contained in:
arp 2014-09-08 14:29:30 -07:00
parent 6e5f780d1d
commit a18caf7753
8 changed files with 129 additions and 144 deletions

View File

@ -38,4 +38,6 @@
HDFS-6977. Delete all copies when a block is deleted from the block space. HDFS-6977. Delete all copies when a block is deleted from the block space.
(Arpit Agarwal) (Arpit Agarwal)
HDFS-6991. Notify NN of evicted block before deleting it from RAM disk.
(Arpit Agarwal)

View File

@ -129,6 +129,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final int DFS_DATANODE_FSDATASETCACHE_MAX_THREADS_PER_VOLUME_DEFAULT = 4; public static final int DFS_DATANODE_FSDATASETCACHE_MAX_THREADS_PER_VOLUME_DEFAULT = 4;
public static final String DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC = "dfs.datanode.lazywriter.interval.sec"; public static final String DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC = "dfs.datanode.lazywriter.interval.sec";
public static final int DFS_DATANODE_LAZY_WRITER_INTERVAL_DEFAULT_SEC = 60; public static final int DFS_DATANODE_LAZY_WRITER_INTERVAL_DEFAULT_SEC = 60;
public static final String DFS_DATANODE_RAM_DISK_LOW_WATERMARK_PERCENT = "dfs.datanode.ram.disk.low.watermark.percent";
public static final int DFS_DATANODE_RAM_DISK_LOW_WATERMARK_PERCENT_DEFAULT = 10;
public static final String DFS_DATANODE_RAM_DISK_LOW_WATERMARK_REPLICAS = "dfs.datanode.ram.disk.low.watermark.replicas";
public static final int DFS_DATANODE_RAM_DISK_LOW_WATERMARK_REPLICAS_DEFAULT = 3;
public static final String DFS_NAMENODE_PATH_BASED_CACHE_BLOCK_MAP_ALLOCATION_PERCENT = public static final String DFS_NAMENODE_PATH_BASED_CACHE_BLOCK_MAP_ALLOCATION_PERCENT =
"dfs.namenode.path.based.cache.block.map.allocation.percent"; "dfs.namenode.path.based.cache.block.map.allocation.percent";
public static final float DFS_NAMENODE_PATH_BASED_CACHE_BLOCK_MAP_ALLOCATION_PERCENT_DEFAULT = 0.25f; public static final float DFS_NAMENODE_PATH_BASED_CACHE_BLOCK_MAP_ALLOCATION_PERCENT_DEFAULT = 0.25f;

View File

@ -2085,8 +2085,8 @@ private BlockInfo processReportedBlock(
// Add replica if appropriate. If the replica was previously corrupt // Add replica if appropriate. If the replica was previously corrupt
// but now okay, it might need to be updated. // but now okay, it might need to be updated.
if (reportedState == ReplicaState.FINALIZED if (reportedState == ReplicaState.FINALIZED
&& (!storedBlock.findDatanode(dn) && (storedBlock.findStorageInfo(storageInfo) == -1 ||
|| corruptReplicas.isReplicaCorrupt(storedBlock, dn))) { corruptReplicas.isReplicaCorrupt(storedBlock, dn))) {
toAdd.add(storedBlock); toAdd.add(storedBlock);
} }
return storedBlock; return storedBlock;

View File

@ -230,7 +230,6 @@ void reportBadBlocks(ExtendedBlock block,
void notifyNamenodeReceivedBlock( void notifyNamenodeReceivedBlock(
ExtendedBlock block, String delHint, String storageUuid) { ExtendedBlock block, String delHint, String storageUuid) {
checkBlock(block); checkBlock(block);
checkDelHint(delHint);
ReceivedDeletedBlockInfo bInfo = new ReceivedDeletedBlockInfo( ReceivedDeletedBlockInfo bInfo = new ReceivedDeletedBlockInfo(
block.getLocalBlock(), block.getLocalBlock(),
ReceivedDeletedBlockInfo.BlockStatus.RECEIVED_BLOCK, ReceivedDeletedBlockInfo.BlockStatus.RECEIVED_BLOCK,
@ -249,11 +248,6 @@ private void checkBlock(ExtendedBlock block) {
block.getBlockPoolId(), getBlockPoolId()); block.getBlockPoolId(), getBlockPoolId());
} }
private void checkDelHint(String delHint) {
Preconditions.checkArgument(delHint != null,
"delHint is null");
}
void notifyNamenodeDeletedBlock(ExtendedBlock block, String storageUuid) { void notifyNamenodeDeletedBlock(ExtendedBlock block, String storageUuid) {
checkBlock(block); checkBlock(block);
ReceivedDeletedBlockInfo bInfo = new ReceivedDeletedBlockInfo( ReceivedDeletedBlockInfo bInfo = new ReceivedDeletedBlockInfo(

View File

@ -702,7 +702,7 @@ static DomainPeerServer getDomainPeerServer(Configuration conf,
} }
// calls specific to BP // calls specific to BP
protected void notifyNamenodeReceivedBlock( public void notifyNamenodeReceivedBlock(
ExtendedBlock block, String delHint, String storageUuid) { ExtendedBlock block, String delHint, String storageUuid) {
BPOfferService bpos = blockPoolManager.get(block.getBlockPoolId()); BPOfferService bpos = blockPoolManager.get(block.getBlockPoolId());
if(bpos != null) { if(bpos != null) {

View File

@ -42,7 +42,6 @@
import javax.management.StandardMBean; import javax.management.StandardMBean;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.collect.TreeMultimap;
import org.apache.commons.io.FileUtils; import org.apache.commons.io.FileUtils;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -276,9 +275,7 @@ public LengthInputStream getMetaDataInputStream(ExtendedBlock b)
cacheManager = new FsDatasetCache(this); cacheManager = new FsDatasetCache(this);
// Start the lazy writer once we have built the replica maps. // Start the lazy writer once we have built the replica maps.
lazyWriter = new Daemon(new LazyWriter( lazyWriter = new Daemon(new LazyWriter(conf));
conf.getInt(DFSConfigKeys.DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC,
DFSConfigKeys.DFS_DATANODE_LAZY_WRITER_INTERVAL_DEFAULT_SEC)));
lazyWriter.start(); lazyWriter.start();
registerMBean(datanode.getDatanodeUuid()); registerMBean(datanode.getDatanodeUuid());
} }
@ -2186,16 +2183,23 @@ class LazyWriter implements Runnable {
private volatile boolean shouldRun = true; private volatile boolean shouldRun = true;
final int checkpointerInterval; final int checkpointerInterval;
final long estimateBlockSize; final long estimateBlockSize;
final int lowWatermarkFreeSpacePercentage;
public static final int LOW_WATERMARK_FREE_SPACE_PERCENT = 10; final int lowWatermarkFreeSpaceReplicas;
public static final int LOW_WATERMARK_FREE_SPACE_REPLICAS = 3;
public LazyWriter(final int checkpointerInterval) { public LazyWriter(Configuration conf) {
this.checkpointerInterval = checkpointerInterval; this.checkpointerInterval = conf.getInt(
DFSConfigKeys.DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC,
DFSConfigKeys.DFS_DATANODE_LAZY_WRITER_INTERVAL_DEFAULT_SEC);
this.estimateBlockSize = conf.getLongBytes( this.estimateBlockSize = conf.getLongBytes(
DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DFSConfigKeys.DFS_BLOCK_SIZE_KEY,
DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT); DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT);
this.lowWatermarkFreeSpacePercentage = conf.getInt(
DFSConfigKeys.DFS_DATANODE_RAM_DISK_LOW_WATERMARK_PERCENT,
DFSConfigKeys.DFS_DATANODE_RAM_DISK_LOW_WATERMARK_PERCENT_DEFAULT);
this.lowWatermarkFreeSpaceReplicas = conf.getInt(
DFSConfigKeys.DFS_DATANODE_RAM_DISK_LOW_WATERMARK_REPLICAS,
DFSConfigKeys.DFS_DATANODE_RAM_DISK_LOW_WATERMARK_REPLICAS_DEFAULT);
} }
private void moveReplicaToNewVolume(String bpid, long blockId) private void moveReplicaToNewVolume(String bpid, long blockId)
@ -2282,32 +2286,37 @@ private boolean transientFreeSpaceBelowThreshold() throws IOException {
} }
int percentFree = (int) (free * 100 / capacity); int percentFree = (int) (free * 100 / capacity);
return percentFree < LOW_WATERMARK_FREE_SPACE_PERCENT || return percentFree < lowWatermarkFreeSpacePercentage ||
free < (estimateBlockSize * LOW_WATERMARK_FREE_SPACE_REPLICAS); free < (estimateBlockSize * lowWatermarkFreeSpaceReplicas);
} }
/** /**
* Attempt to evict one or more transient block replicas we have at least * Attempt to evict one or more transient block replicas we have at least
* spaceNeeded bytes free. * spaceNeeded bytes free.
*/ */
private synchronized void evictBlocks() throws IOException { private void evictBlocks() throws IOException {
int iterations = 0; int iterations = 0;
while (iterations++ < MAX_BLOCK_EVICTIONS_PER_ITERATION &&
transientFreeSpaceBelowThreshold()) {
LazyWriteReplicaTracker.ReplicaState replicaState = LazyWriteReplicaTracker.ReplicaState replicaState =
lazyWriteReplicaTracker.getNextCandidateForEviction(); lazyWriteReplicaTracker.getNextCandidateForEviction();
while (replicaState != null &&
iterations++ < MAX_BLOCK_EVICTIONS_PER_ITERATION &
transientFreeSpaceBelowThreshold()) {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.info("Evicting block " + replicaState); LOG.debug("Evicting block " + replicaState);
} }
ReplicaInfo replicaInfo = getReplicaInfo(replicaState.bpid, replicaState.blockId);
ReplicaInfo replicaInfo, newReplicaInfo;
File blockFile, metaFile;
long blockFileUsed, metaFileUsed;
synchronized (FsDatasetImpl.this) {
replicaInfo = getReplicaInfo(replicaState.bpid, replicaState.blockId);
Preconditions.checkState(replicaInfo.getVolume().isTransientStorage()); Preconditions.checkState(replicaInfo.getVolume().isTransientStorage());
File blockFile = replicaInfo.getBlockFile(); blockFile = replicaInfo.getBlockFile();
File metaFile = replicaInfo.getMetaFile(); metaFile = replicaInfo.getMetaFile();
long blockFileUsed = blockFile.length(); blockFileUsed = blockFile.length();
long metaFileUsed = metaFile.length(); metaFileUsed = metaFile.length();
lazyWriteReplicaTracker.discardReplica(replicaState, false); lazyWriteReplicaTracker.discardReplica(replicaState, false);
// Move the replica from lazyPersist/ to finalized/ on target volume // Move the replica from lazyPersist/ to finalized/ on target volume
@ -2316,15 +2325,24 @@ private synchronized void evictBlocks() throws IOException {
File newBlockFile = bpSlice.activateSavedReplica( File newBlockFile = bpSlice.activateSavedReplica(
replicaInfo, replicaState.savedBlockFile); replicaInfo, replicaState.savedBlockFile);
ReplicaInfo newReplicaInfo = newReplicaInfo =
new FinalizedReplica(replicaInfo.getBlockId(), new FinalizedReplica(replicaInfo.getBlockId(),
replicaInfo.getBytesOnDisk(), replicaInfo.getBytesOnDisk(),
replicaInfo.getGenerationStamp(), replicaInfo.getGenerationStamp(),
replicaState.lazyPersistVolume, replicaState.lazyPersistVolume,
newBlockFile.getParentFile()); newBlockFile.getParentFile());
// Update the volumeMap entry. This removes the old entry. // Update the volumeMap entry.
volumeMap.add(replicaState.bpid, newReplicaInfo); volumeMap.add(replicaState.bpid, newReplicaInfo);
}
// Before deleting the files from transient storage we must notify the
// NN that the files are on the new storage. Else a blockReport from
// the transient storage might cause the NN to think the blocks are lost.
ExtendedBlock extendedBlock =
new ExtendedBlock(replicaState.bpid, newReplicaInfo);
datanode.notifyNamenodeReceivedBlock(
extendedBlock, null, newReplicaInfo.getStorageUuid());
// Remove the old replicas from transient storage. // Remove the old replicas from transient storage.
if (blockFile.delete() || !blockFile.exists()) { if (blockFile.delete() || !blockFile.exists()) {
@ -2336,7 +2354,6 @@ private synchronized void evictBlocks() throws IOException {
// If deletion failed then the directory scanner will cleanup the blocks // If deletion failed then the directory scanner will cleanup the blocks
// eventually. // eventually.
replicaState = lazyWriteReplicaTracker.getNextCandidateForEviction();
} }
} }

View File

@ -1340,6 +1340,7 @@ public synchronized void startDataNodes(Configuration conf, int numDataNodes,
} }
int curDatanodesNum = dataNodes.size(); int curDatanodesNum = dataNodes.size();
final int curDatanodesNumSaved = curDatanodesNum;
// for mincluster's the default initialDelay for BRs is 0 // for mincluster's the default initialDelay for BRs is 0
if (conf.get(DFS_BLOCKREPORT_INITIAL_DELAY_KEY) == null) { if (conf.get(DFS_BLOCKREPORT_INITIAL_DELAY_KEY) == null) {
conf.setLong(DFS_BLOCKREPORT_INITIAL_DELAY_KEY, 0); conf.setLong(DFS_BLOCKREPORT_INITIAL_DELAY_KEY, 0);
@ -1479,7 +1480,7 @@ public synchronized void startDataNodes(Configuration conf, int numDataNodes,
waitActive(); waitActive();
if (storageCapacities != null) { if (storageCapacities != null) {
for (int i = curDatanodesNum; i < curDatanodesNum+numDataNodes; ++i) { for (int i = curDatanodesNumSaved; i < curDatanodesNumSaved+numDataNodes; ++i) {
List<? extends FsVolumeSpi> volumes = dns[i].getFSDataset().getVolumes(); List<? extends FsVolumeSpi> volumes = dns[i].getFSDataset().getVolumes();
assert storageCapacities[i].length == storagesPerDatanode; assert storageCapacities[i].length == storagesPerDatanode;
assert volumes.size() == storagesPerDatanode; assert volumes.size() == storagesPerDatanode;

View File

@ -23,7 +23,6 @@
import org.apache.commons.logging.impl.Log4JLogger; import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CreateFlag; import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
@ -71,13 +70,14 @@ public class TestLazyPersistFiles {
private static final int THREADPOOL_SIZE = 10; private static final int THREADPOOL_SIZE = 10;
private static short REPL_FACTOR = 1; private static final short REPL_FACTOR = 1;
private static final int BLOCK_SIZE = 10485760; // 10 MB private static final int BLOCK_SIZE = 10485760; // 10 MB
private static final int LAZY_WRITE_FILE_SCRUBBER_INTERVAL_SEC = 3; private static final int LAZY_WRITE_FILE_SCRUBBER_INTERVAL_SEC = 3;
private static final long HEARTBEAT_INTERVAL_SEC = 1; private static final long HEARTBEAT_INTERVAL_SEC = 1;
private static final int HEARTBEAT_RECHECK_INTERVAL_MSEC = 500; private static final int HEARTBEAT_RECHECK_INTERVAL_MSEC = 500;
private static final int LAZY_WRITER_INTERVAL_SEC = 1; private static final int LAZY_WRITER_INTERVAL_SEC = 1;
private static final int BUFFER_LENGTH = 4096; private static final int BUFFER_LENGTH = 4096;
private static final int EVICTION_LOW_WATERMARK = 1;
private MiniDFSCluster cluster; private MiniDFSCluster cluster;
private DistributedFileSystem fs; private DistributedFileSystem fs;
@ -101,7 +101,7 @@ public void shutDownCluster() throws IOException {
@Test (timeout=300000) @Test (timeout=300000)
public void testFlagNotSetByDefault() throws IOException { public void testFlagNotSetByDefault() throws IOException {
startUpCluster(REPL_FACTOR, null, -1); startUpCluster(false, -1);
final String METHOD_NAME = GenericTestUtils.getMethodName(); final String METHOD_NAME = GenericTestUtils.getMethodName();
Path path = new Path("/" + METHOD_NAME + ".dat"); Path path = new Path("/" + METHOD_NAME + ".dat");
@ -113,7 +113,7 @@ public void testFlagNotSetByDefault() throws IOException {
@Test (timeout=300000) @Test (timeout=300000)
public void testFlagPropagation() throws IOException { public void testFlagPropagation() throws IOException {
startUpCluster(REPL_FACTOR, null, -1); startUpCluster(false, -1);
final String METHOD_NAME = GenericTestUtils.getMethodName(); final String METHOD_NAME = GenericTestUtils.getMethodName();
Path path = new Path("/" + METHOD_NAME + ".dat"); Path path = new Path("/" + METHOD_NAME + ".dat");
@ -125,7 +125,7 @@ public void testFlagPropagation() throws IOException {
@Test (timeout=300000) @Test (timeout=300000)
public void testFlagPersistenceInEditLog() throws IOException { public void testFlagPersistenceInEditLog() throws IOException {
startUpCluster(REPL_FACTOR, null, -1); startUpCluster(false, -1);
final String METHOD_NAME = GenericTestUtils.getMethodName(); final String METHOD_NAME = GenericTestUtils.getMethodName();
Path path = new Path("/" + METHOD_NAME + ".dat"); Path path = new Path("/" + METHOD_NAME + ".dat");
@ -139,10 +139,9 @@ public void testFlagPersistenceInEditLog() throws IOException {
@Test (timeout=300000) @Test (timeout=300000)
public void testFlagPersistenceInFsImage() throws IOException { public void testFlagPersistenceInFsImage() throws IOException {
startUpCluster(REPL_FACTOR, null, -1); startUpCluster(false, -1);
final String METHOD_NAME = GenericTestUtils.getMethodName(); final String METHOD_NAME = GenericTestUtils.getMethodName();
Path path = new Path("/" + METHOD_NAME + ".dat"); Path path = new Path("/" + METHOD_NAME + ".dat");
FSDataOutputStream fos = null;
makeTestFile(path, 0, true); makeTestFile(path, 0, true);
// checkpoint // checkpoint
@ -158,7 +157,7 @@ public void testFlagPersistenceInFsImage() throws IOException {
@Test (timeout=300000) @Test (timeout=300000)
public void testPlacementOnRamDisk() throws IOException { public void testPlacementOnRamDisk() throws IOException {
startUpCluster(REPL_FACTOR, new StorageType[] { DEFAULT, RAM_DISK}, -1); startUpCluster(true, -1);
final String METHOD_NAME = GenericTestUtils.getMethodName(); final String METHOD_NAME = GenericTestUtils.getMethodName();
Path path = new Path("/" + METHOD_NAME + ".dat"); Path path = new Path("/" + METHOD_NAME + ".dat");
@ -168,8 +167,7 @@ public void testPlacementOnRamDisk() throws IOException {
@Test (timeout=300000) @Test (timeout=300000)
public void testPlacementOnSizeLimitedRamDisk() throws IOException { public void testPlacementOnSizeLimitedRamDisk() throws IOException {
startUpCluster(REPL_FACTOR, new StorageType[] { DEFAULT, RAM_DISK }, startUpCluster(true, 3);
3 * BLOCK_SIZE -1); // 2 replicas + delta
final String METHOD_NAME = GenericTestUtils.getMethodName(); final String METHOD_NAME = GenericTestUtils.getMethodName();
Path path1 = new Path("/" + METHOD_NAME + ".01.dat"); Path path1 = new Path("/" + METHOD_NAME + ".01.dat");
Path path2 = new Path("/" + METHOD_NAME + ".02.dat"); Path path2 = new Path("/" + METHOD_NAME + ".02.dat");
@ -188,7 +186,7 @@ public void testPlacementOnSizeLimitedRamDisk() throws IOException {
*/ */
@Test (timeout=300000) @Test (timeout=300000)
public void testFallbackToDisk() throws IOException { public void testFallbackToDisk() throws IOException {
startUpCluster(REPL_FACTOR, null, -1); startUpCluster(false, -1);
final String METHOD_NAME = GenericTestUtils.getMethodName(); final String METHOD_NAME = GenericTestUtils.getMethodName();
Path path = new Path("/" + METHOD_NAME + ".dat"); Path path = new Path("/" + METHOD_NAME + ".dat");
@ -202,7 +200,7 @@ public void testFallbackToDisk() throws IOException {
*/ */
@Test (timeout=300000) @Test (timeout=300000)
public void testFallbackToDiskFull() throws IOException { public void testFallbackToDiskFull() throws IOException {
startUpCluster(REPL_FACTOR, null, BLOCK_SIZE - 1); startUpCluster(false, 0);
final String METHOD_NAME = GenericTestUtils.getMethodName(); final String METHOD_NAME = GenericTestUtils.getMethodName();
Path path = new Path("/" + METHOD_NAME + ".dat"); Path path = new Path("/" + METHOD_NAME + ".dat");
@ -213,15 +211,13 @@ public void testFallbackToDiskFull() throws IOException {
/** /**
* File partially fit in RamDisk after eviction. * File partially fit in RamDisk after eviction.
* RamDisk can fit 2 blocks. Write a file with 5 blocks. * RamDisk can fit 2 blocks. Write a file with 5 blocks.
* Expect 2 blocks are on RamDisk whereas other 3 on disk. * Expect 2 or less blocks are on RamDisk and 3 or more on disk.
* @throws IOException * @throws IOException
*/ */
@Test (timeout=300000) @Test (timeout=300000)
public void testFallbackToDiskPartial() public void testFallbackToDiskPartial()
throws IOException, InterruptedException { throws IOException, InterruptedException {
startUpCluster(REPL_FACTOR, startUpCluster(true, 2);
new StorageType[] { RAM_DISK, DEFAULT },
BLOCK_SIZE * 3 - 1);
final String METHOD_NAME = GenericTestUtils.getMethodName(); final String METHOD_NAME = GenericTestUtils.getMethodName();
Path path = new Path("/" + METHOD_NAME + ".dat"); Path path = new Path("/" + METHOD_NAME + ".dat");
@ -241,12 +237,15 @@ public void testFallbackToDiskPartial()
for (LocatedBlock locatedBlock : locatedBlocks.getLocatedBlocks()) { for (LocatedBlock locatedBlock : locatedBlocks.getLocatedBlocks()) {
if (locatedBlock.getStorageTypes()[0] == RAM_DISK) { if (locatedBlock.getStorageTypes()[0] == RAM_DISK) {
numBlocksOnRamDisk++; numBlocksOnRamDisk++;
}else if (locatedBlock.getStorageTypes()[0] == DEFAULT) { } else if (locatedBlock.getStorageTypes()[0] == DEFAULT) {
numBlocksOnDisk++; numBlocksOnDisk++;
} }
} }
assertThat(numBlocksOnRamDisk, is(2));
assertThat(numBlocksOnDisk, is(3)); // Since eviction is asynchronous, depending on the timing of eviction
// wrt writes, we may get 2 or less blocks on RAM disk.
assert(numBlocksOnRamDisk <= 2);
assert(numBlocksOnDisk >= 3);
} }
/** /**
@ -257,7 +256,7 @@ public void testFallbackToDiskPartial()
*/ */
@Test (timeout=300000) @Test (timeout=300000)
public void testRamDiskNotChosenByDefault() throws IOException { public void testRamDiskNotChosenByDefault() throws IOException {
startUpCluster(REPL_FACTOR, new StorageType[] {RAM_DISK, RAM_DISK}, -1); startUpCluster(true, -1);
final String METHOD_NAME = GenericTestUtils.getMethodName(); final String METHOD_NAME = GenericTestUtils.getMethodName();
Path path = new Path("/" + METHOD_NAME + ".dat"); Path path = new Path("/" + METHOD_NAME + ".dat");
@ -275,7 +274,7 @@ public void testRamDiskNotChosenByDefault() throws IOException {
*/ */
@Test (timeout=300000) @Test (timeout=300000)
public void testAppendIsDenied() throws IOException { public void testAppendIsDenied() throws IOException {
startUpCluster(REPL_FACTOR, new StorageType[] {RAM_DISK, DEFAULT }, -1); startUpCluster(true, -1);
final String METHOD_NAME = GenericTestUtils.getMethodName(); final String METHOD_NAME = GenericTestUtils.getMethodName();
Path path = new Path("/" + METHOD_NAME + ".dat"); Path path = new Path("/" + METHOD_NAME + ".dat");
@ -297,17 +296,12 @@ public void testAppendIsDenied() throws IOException {
@Test (timeout=300000) @Test (timeout=300000)
public void testLazyPersistFilesAreDiscarded() public void testLazyPersistFilesAreDiscarded()
throws IOException, InterruptedException { throws IOException, InterruptedException {
startUpCluster(REPL_FACTOR, startUpCluster(true, 2);
new StorageType[] { RAM_DISK, DEFAULT },
(2 * BLOCK_SIZE - 1)); // 1 replica + delta.
final String METHOD_NAME = GenericTestUtils.getMethodName(); final String METHOD_NAME = GenericTestUtils.getMethodName();
Path path1 = new Path("/" + METHOD_NAME + ".01.dat"); Path path1 = new Path("/" + METHOD_NAME + ".01.dat");
Path path2 = new Path("/" + METHOD_NAME + ".02.dat");
makeTestFile(path1, BLOCK_SIZE, true); makeTestFile(path1, BLOCK_SIZE, true);
makeTestFile(path2, BLOCK_SIZE, false);
ensureFileReplicasOnStorageType(path1, RAM_DISK); ensureFileReplicasOnStorageType(path1, RAM_DISK);
ensureFileReplicasOnStorageType(path2, DEFAULT);
// Stop the DataNode and sleep for the time it takes the NN to // Stop the DataNode and sleep for the time it takes the NN to
// detect the DN as being dead. // detect the DN as being dead.
@ -315,30 +309,28 @@ public void testLazyPersistFilesAreDiscarded()
Thread.sleep(30000L); Thread.sleep(30000L);
assertThat(cluster.getNamesystem().getNumDeadDataNodes(), is(1)); assertThat(cluster.getNamesystem().getNumDeadDataNodes(), is(1));
// Next, wait for the replication monitor to mark the file as // Next, wait for the replication monitor to mark the file as corrupt
// corrupt, plus some delta.
Thread.sleep(2 * DFS_NAMENODE_REPLICATION_INTERVAL_DEFAULT * 1000); Thread.sleep(2 * DFS_NAMENODE_REPLICATION_INTERVAL_DEFAULT * 1000);
// Wait for the LazyPersistFileScrubber to run, plus some delta. // Wait for the LazyPersistFileScrubber to run
Thread.sleep(2 * LAZY_WRITE_FILE_SCRUBBER_INTERVAL_SEC * 1000); Thread.sleep(2 * LAZY_WRITE_FILE_SCRUBBER_INTERVAL_SEC * 1000);
// Ensure that path1 does not exist anymore, whereas path2 does. // Ensure that path1 does not exist anymore, whereas path2 does.
assert(!fs.exists(path1)); assert(!fs.exists(path1));
assert(fs.exists(path2));
// We should have only one block that needs replication i.e. the one // We should have zero blocks that needs replication i.e. the one
// belonging to path2. // belonging to path2.
assertThat(cluster.getNameNode() assertThat(cluster.getNameNode()
.getNamesystem() .getNamesystem()
.getBlockManager() .getBlockManager()
.getUnderReplicatedBlocksCount(), .getUnderReplicatedBlocksCount(),
is(1L)); is(0L));
} }
@Test (timeout=300000) @Test (timeout=300000)
public void testLazyPersistBlocksAreSaved() public void testLazyPersistBlocksAreSaved()
throws IOException, InterruptedException { throws IOException, InterruptedException {
startUpCluster(REPL_FACTOR, new StorageType[] { RAM_DISK, DEFAULT }, -1); startUpCluster(true, -1);
final String METHOD_NAME = GenericTestUtils.getMethodName(); final String METHOD_NAME = GenericTestUtils.getMethodName();
Path path = new Path("/" + METHOD_NAME + ".dat"); Path path = new Path("/" + METHOD_NAME + ".dat");
@ -386,16 +378,12 @@ public void testLazyPersistBlocksAreSaved()
/** /**
* RamDisk eviction after lazy persist to disk. * RamDisk eviction after lazy persist to disk.
* Evicted blocks are still readable with on-disk replicas.
* @throws IOException * @throws IOException
* @throws InterruptedException * @throws InterruptedException
*/ */
@Test (timeout=300000) @Test (timeout=300000)
public void testRamDiskEviction() public void testRamDiskEviction() throws IOException, InterruptedException {
throws IOException, InterruptedException { startUpCluster(true, 1 + EVICTION_LOW_WATERMARK);
startUpCluster(REPL_FACTOR,
new StorageType[] { RAM_DISK, DEFAULT },
(2 * BLOCK_SIZE - 1)); // 1 replica + delta.
final String METHOD_NAME = GenericTestUtils.getMethodName(); final String METHOD_NAME = GenericTestUtils.getMethodName();
Path path1 = new Path("/" + METHOD_NAME + ".01.dat"); Path path1 = new Path("/" + METHOD_NAME + ".01.dat");
Path path2 = new Path("/" + METHOD_NAME + ".02.dat"); Path path2 = new Path("/" + METHOD_NAME + ".02.dat");
@ -405,16 +393,16 @@ public void testRamDiskEviction()
ensureFileReplicasOnStorageType(path1, RAM_DISK); ensureFileReplicasOnStorageType(path1, RAM_DISK);
// Sleep for a short time to allow the lazy writer thread to do its job. // Sleep for a short time to allow the lazy writer thread to do its job.
// However the block replica should not be evicted from RAM_DISK yet.
Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000); Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000);
ensureFileReplicasOnStorageType(path1, RAM_DISK); ensureFileReplicasOnStorageType(path1, RAM_DISK);
// Create another file with a replica on RAM_DISK. // Create another file with a replica on RAM_DISK.
makeTestFile(path2, BLOCK_SIZE, true); makeTestFile(path2, BLOCK_SIZE, true);
Thread.sleep(3 * LAZY_WRITER_INTERVAL_SEC * 1000);
triggerBlockReport(); triggerBlockReport();
// Make sure that the second file's block replica is on RAM_DISK, whereas // Ensure the first file was evicted to disk, the second is still on
// the original file's block replica is now on disk. // RAM_DISK.
ensureFileReplicasOnStorageType(path2, RAM_DISK); ensureFileReplicasOnStorageType(path2, RAM_DISK);
ensureFileReplicasOnStorageType(path1, DEFAULT); ensureFileReplicasOnStorageType(path1, DEFAULT);
} }
@ -428,9 +416,7 @@ public void testRamDiskEviction()
@Test (timeout=300000) @Test (timeout=300000)
public void testRamDiskEvictionBeforePersist() public void testRamDiskEvictionBeforePersist()
throws IOException, InterruptedException { throws IOException, InterruptedException {
// 1 replica + delta, lazy persist interval every 50 minutes startUpCluster(true, 1);
startUpCluster(REPL_FACTOR, new StorageType[] { RAM_DISK, DEFAULT },
(2 * BLOCK_SIZE - 1));
final String METHOD_NAME = GenericTestUtils.getMethodName(); final String METHOD_NAME = GenericTestUtils.getMethodName();
Path path1 = new Path("/" + METHOD_NAME + ".01.dat"); Path path1 = new Path("/" + METHOD_NAME + ".01.dat");
Path path2 = new Path("/" + METHOD_NAME + ".02.dat"); Path path2 = new Path("/" + METHOD_NAME + ".02.dat");
@ -463,8 +449,7 @@ public void testRamDiskEvictionBeforePersist()
@Test (timeout=300000) @Test (timeout=300000)
public void testRamDiskEvictionLRU() public void testRamDiskEvictionLRU()
throws IOException, InterruptedException { throws IOException, InterruptedException {
startUpCluster(REPL_FACTOR, new StorageType[] { RAM_DISK, DEFAULT }, startUpCluster(true, 3);
(4 * BLOCK_SIZE -1)); // 3 replica + delta.
final String METHOD_NAME = GenericTestUtils.getMethodName(); final String METHOD_NAME = GenericTestUtils.getMethodName();
final int NUM_PATHS = 6; final int NUM_PATHS = 6;
Path paths[] = new Path[NUM_PATHS]; Path paths[] = new Path[NUM_PATHS];
@ -501,8 +486,7 @@ public void testRamDiskEvictionLRU()
@Test (timeout=300000) @Test (timeout=300000)
public void testDeleteBeforePersist() public void testDeleteBeforePersist()
throws IOException, InterruptedException { throws IOException, InterruptedException {
startUpCluster(REPL_FACTOR, new StorageType[] { RAM_DISK, DEFAULT }, startUpCluster(true, -1);
-1);
final String METHOD_NAME = GenericTestUtils.getMethodName(); final String METHOD_NAME = GenericTestUtils.getMethodName();
stopLazyWriter(cluster.getDataNodes().get(0)); stopLazyWriter(cluster.getDataNodes().get(0));
@ -527,7 +511,7 @@ public void testDeleteBeforePersist()
@Test (timeout=300000) @Test (timeout=300000)
public void testDeleteAfterPersist() public void testDeleteAfterPersist()
throws IOException, InterruptedException { throws IOException, InterruptedException {
startUpCluster(REPL_FACTOR, new StorageType[] { RAM_DISK, DEFAULT }, -1); startUpCluster(true, -1);
final String METHOD_NAME = GenericTestUtils.getMethodName(); final String METHOD_NAME = GenericTestUtils.getMethodName();
Path path = new Path("/" + METHOD_NAME + ".dat"); Path path = new Path("/" + METHOD_NAME + ".dat");
@ -554,8 +538,7 @@ public void testDeleteAfterPersist()
@Test (timeout=300000) @Test (timeout=300000)
public void testDfsUsageCreateDelete() public void testDfsUsageCreateDelete()
throws IOException, InterruptedException { throws IOException, InterruptedException {
startUpCluster(REPL_FACTOR, new StorageType[] { RAM_DISK, DEFAULT }, startUpCluster(true, 4);
5 * BLOCK_SIZE - 1); // 4 replica + delta
final String METHOD_NAME = GenericTestUtils.getMethodName(); final String METHOD_NAME = GenericTestUtils.getMethodName();
Path path = new Path("/" + METHOD_NAME + ".dat"); Path path = new Path("/" + METHOD_NAME + ".dat");
@ -586,8 +569,7 @@ public void testDfsUsageCreateDelete()
@Test (timeout=300000) @Test (timeout=300000)
public void testConcurrentRead() public void testConcurrentRead()
throws Exception { throws Exception {
startUpCluster(REPL_FACTOR, new StorageType[] { DEFAULT, RAM_DISK }, startUpCluster(true, 2);
3 * BLOCK_SIZE -1); // 2 replicas + delta
final String METHOD_NAME = GenericTestUtils.getMethodName(); final String METHOD_NAME = GenericTestUtils.getMethodName();
final Path path1 = new Path("/" + METHOD_NAME + ".dat"); final Path path1 = new Path("/" + METHOD_NAME + ".dat");
@ -638,8 +620,7 @@ public void run() {
@Test (timeout=300000) @Test (timeout=300000)
public void testConcurrentWrites() public void testConcurrentWrites()
throws IOException, InterruptedException { throws IOException, InterruptedException {
startUpCluster(REPL_FACTOR, new StorageType[] { RAM_DISK, DEFAULT }, startUpCluster(true, 9);
(10 * BLOCK_SIZE -1)); // 9 replica + delta.
final String METHOD_NAME = GenericTestUtils.getMethodName(); final String METHOD_NAME = GenericTestUtils.getMethodName();
final int SEED = 0xFADED; final int SEED = 0xFADED;
final int NUM_WRITERS = 4; final int NUM_WRITERS = 4;
@ -659,8 +640,7 @@ public void testConcurrentWrites()
ExecutorService executor = Executors.newFixedThreadPool(THREADPOOL_SIZE); ExecutorService executor = Executors.newFixedThreadPool(THREADPOOL_SIZE);
for (int i = 0; i < NUM_WRITERS; i++) { for (int i = 0; i < NUM_WRITERS; i++) {
Runnable writer = new WriterRunnable(cluster, i, paths[i], SEED, latch, Runnable writer = new WriterRunnable(i, paths[i], SEED, latch, testFailed);
testFailed);
executor.execute(writer); executor.execute(writer);
} }
@ -677,9 +657,7 @@ public void testConcurrentWrites()
public void testDnRestartWithSavedReplicas() public void testDnRestartWithSavedReplicas()
throws IOException, InterruptedException { throws IOException, InterruptedException {
startUpCluster(REPL_FACTOR, startUpCluster(true, -1);
new StorageType[] {RAM_DISK, DEFAULT },
(2 * BLOCK_SIZE - 1)); // 1 replica + delta.
final String METHOD_NAME = GenericTestUtils.getMethodName(); final String METHOD_NAME = GenericTestUtils.getMethodName();
Path path1 = new Path("/" + METHOD_NAME + ".01.dat"); Path path1 = new Path("/" + METHOD_NAME + ".01.dat");
@ -703,9 +681,7 @@ public void testDnRestartWithSavedReplicas()
public void testDnRestartWithUnsavedReplicas() public void testDnRestartWithUnsavedReplicas()
throws IOException, InterruptedException { throws IOException, InterruptedException {
startUpCluster(REPL_FACTOR, startUpCluster(true, 1);
new StorageType[] {RAM_DISK, DEFAULT },
(2 * BLOCK_SIZE - 1)); // 1 replica + delta.
stopLazyWriter(cluster.getDataNodes().get(0)); stopLazyWriter(cluster.getDataNodes().get(0));
final String METHOD_NAME = GenericTestUtils.getMethodName(); final String METHOD_NAME = GenericTestUtils.getMethodName();
@ -727,9 +703,8 @@ public void testDnRestartWithUnsavedReplicas()
* If ramDiskStorageLimit is >=0, then RAM_DISK capacity is artificially * If ramDiskStorageLimit is >=0, then RAM_DISK capacity is artificially
* capped. If ramDiskStorageLimit < 0 then it is ignored. * capped. If ramDiskStorageLimit < 0 then it is ignored.
*/ */
private void startUpCluster(final int numDataNodes, private void startUpCluster(boolean hasTransientStorage,
final StorageType[] storageTypes, final int ramDiskReplicaCapacity,
final long ramDiskStorageLimit,
final boolean useSCR) final boolean useSCR)
throws IOException { throws IOException {
@ -742,39 +717,33 @@ private void startUpCluster(final int numDataNodes,
HEARTBEAT_RECHECK_INTERVAL_MSEC); HEARTBEAT_RECHECK_INTERVAL_MSEC);
conf.setInt(DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC, conf.setInt(DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC,
LAZY_WRITER_INTERVAL_SEC); LAZY_WRITER_INTERVAL_SEC);
conf.setInt(DFS_DATANODE_RAM_DISK_LOW_WATERMARK_REPLICAS,
EVICTION_LOW_WATERMARK);
conf.setBoolean(DFS_CLIENT_READ_SHORTCIRCUIT_KEY,useSCR); conf.setBoolean(DFS_CLIENT_READ_SHORTCIRCUIT_KEY, useSCR);
REPL_FACTOR = 1; //Reset in case a test has modified the value long[] capacities = null;
if (hasTransientStorage && ramDiskReplicaCapacity >= 0) {
// Convert replica count to byte count, add some delta for .meta and VERSION files.
long ramDiskStorageLimit = ((long) ramDiskReplicaCapacity * BLOCK_SIZE) + (BLOCK_SIZE - 1);
capacities = new long[] { ramDiskStorageLimit, -1 };
}
cluster = new MiniDFSCluster cluster = new MiniDFSCluster
.Builder(conf) .Builder(conf)
.numDataNodes(numDataNodes) .numDataNodes(REPL_FACTOR)
.storageTypes(storageTypes != null ? storageTypes : new StorageType[] { DEFAULT, DEFAULT }) .storageCapacities(capacities)
.storageTypes(hasTransientStorage ? new StorageType[]{ RAM_DISK, DEFAULT } : null)
.build(); .build();
fs = cluster.getFileSystem(); fs = cluster.getFileSystem();
client = fs.getClient(); client = fs.getClient();
// Artificially cap the storage capacity of the RAM_DISK volume.
if (ramDiskStorageLimit >= 0) {
List<? extends FsVolumeSpi> volumes =
cluster.getDataNodes().get(0).getFSDataset().getVolumes();
for (FsVolumeSpi volume : volumes) {
if (volume.getStorageType() == RAM_DISK) {
((FsTransientVolumeImpl) volume).setCapacityForTesting(ramDiskStorageLimit);
}
}
}
LOG.info("Cluster startup complete"); LOG.info("Cluster startup complete");
} }
private void startUpCluster(final int numDataNodes, private void startUpCluster(boolean hasTransientStorage,
final StorageType[] storageTypes, final int ramDiskReplicaCapacity)
final long ramDiskStorageLimit)
throws IOException { throws IOException {
startUpCluster(numDataNodes, storageTypes, ramDiskStorageLimit, false); startUpCluster(hasTransientStorage, ramDiskReplicaCapacity, false);
} }
private void makeTestFile(Path path, long length, final boolean isLazyPersist) private void makeTestFile(Path path, long length, final boolean isLazyPersist)
@ -908,17 +877,15 @@ private void triggerBlockReport()
class WriterRunnable implements Runnable { class WriterRunnable implements Runnable {
private final int id; private final int id;
private final MiniDFSCluster cluster;
private final Path paths[]; private final Path paths[];
private final int seed; private final int seed;
private CountDownLatch latch; private CountDownLatch latch;
private AtomicBoolean bFail; private AtomicBoolean bFail;
public WriterRunnable(MiniDFSCluster cluster, int threadIndex, Path[] paths, public WriterRunnable(int threadIndex, Path[] paths,
int seed, CountDownLatch latch, int seed, CountDownLatch latch,
AtomicBoolean bFail) { AtomicBoolean bFail) {
id = threadIndex; id = threadIndex;
this.cluster = cluster;
this.paths = paths; this.paths = paths;
this.seed = seed; this.seed = seed;
this.latch = latch; this.latch = latch;