HDFS-7475. Make TestLazyPersistFiles#testLazyPersistBlocksAreSaved deterministic. (Contributed by Xiaoyu Yao)

This commit is contained in:
arp 2014-12-10 18:24:22 -08:00
parent 92916ae487
commit 9a44db48b4
3 changed files with 48 additions and 31 deletions

View File

@ -567,6 +567,9 @@ Release 2.7.0 - UNRELEASED
HDFS-5578. [JDK8] Fix Javadoc errors caused by incorrect or illegal tags
in doc comments. (Andrew Purtell via wheat9)
HDFS-7475. Make TestLazyPersistFiles#testLazyPersistBlocksAreSaved
deterministic. (Xiaoyu Yao via Arpit Agarwal)
Release 2.6.1 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -50,6 +50,8 @@
import java.util.Arrays;
import java.util.EnumSet;
import java.util.List;
import java.util.HashSet;
import java.util.Set;
import java.util.UUID;
import static org.apache.hadoop.fs.CreateFlag.CREATE;
@ -131,6 +133,48 @@ protected final LocatedBlocks ensureFileReplicasOnStorageType(
return locatedBlocks;
}
/**
* Make sure at least one non-transient volume has a saved copy of the replica.
* An infinite loop is used to ensure the async lazy persist tasks are completely
* done before verification. Caller of ensureLazyPersistBlocksAreSaved expects
* either a successful pass or timeout failure.
*/
protected final void ensureLazyPersistBlocksAreSaved(
LocatedBlocks locatedBlocks) throws IOException, InterruptedException {
final String bpid = cluster.getNamesystem().getBlockPoolId();
List<? extends FsVolumeSpi> volumes =
cluster.getDataNodes().get(0).getFSDataset().getVolumes();
final Set<Long> persistedBlockIds = new HashSet<Long>();
while (persistedBlockIds.size() < locatedBlocks.getLocatedBlocks().size()) {
// Take 1 second sleep before each verification iteration
Thread.sleep(1000);
for (LocatedBlock lb : locatedBlocks.getLocatedBlocks()) {
for (FsVolumeSpi v : volumes) {
if (v.isTransientStorage()) {
continue;
}
FsVolumeImpl volume = (FsVolumeImpl) v;
File lazyPersistDir = volume.getBlockPoolSlice(bpid).getLazypersistDir();
long blockId = lb.getBlock().getBlockId();
File targetDir =
DatanodeUtil.idToBlockDir(lazyPersistDir, blockId);
File blockFile = new File(targetDir, lb.getBlock().getBlockName());
if (blockFile.exists()) {
// Found a persisted copy for this block and added to the Set
persistedBlockIds.add(blockId);
}
}
}
}
// We should have found a persisted copy for each located block.
assertThat(persistedBlockIds.size(), is(locatedBlocks.getLocatedBlocks().size()));
}
protected final void makeRandomTestFile(Path path, long length,
boolean isLazyPersist, long seed) throws IOException {
DFSTestUtil.createFile(fs, path, isLazyPersist, BUFFER_LENGTH, length,

View File

@ -304,37 +304,7 @@ public void testLazyPersistBlocksAreSaved()
// Make sure that there is a saved copy of the replica on persistent
// storage.
final String bpid = cluster.getNamesystem().getBlockPoolId();
List<? extends FsVolumeSpi> volumes =
cluster.getDataNodes().get(0).getFSDataset().getVolumes();
final Set<Long> persistedBlockIds = new HashSet<Long>();
// Make sure at least one non-transient volume has a saved copy of
// the replica.
for (FsVolumeSpi v : volumes) {
if (v.isTransientStorage()) {
continue;
}
FsVolumeImpl volume = (FsVolumeImpl) v;
File lazyPersistDir = volume.getBlockPoolSlice(bpid).getLazypersistDir();
for (LocatedBlock lb : locatedBlocks.getLocatedBlocks()) {
File targetDir = DatanodeUtil.idToBlockDir(lazyPersistDir, lb.getBlock().getBlockId());
File blockFile = new File(targetDir, lb.getBlock().getBlockName());
if (blockFile.exists()) {
// Found a persisted copy for this block!
boolean added = persistedBlockIds.add(lb.getBlock().getBlockId());
assertThat(added, is(true));
} else {
LOG.error(blockFile + " not found");
}
}
}
// We should have found a persisted copy for each located block.
assertThat(persistedBlockIds.size(), is(locatedBlocks.getLocatedBlocks().size()));
ensureLazyPersistBlocksAreSaved(locatedBlocks);
}
/**