HDFS-7632. MiniDFSCluster configures DataNode data directories incorrectly if using more than 1 DataNode and more than 2 storage locations per DataNode. Contributed by Chris Nauroth.

This commit is contained in:
cnauroth 2015-01-16 10:52:01 -08:00
parent 9c94015e36
commit ec4389cf72
14 changed files with 58 additions and 57 deletions

View File

@ -716,6 +716,10 @@ Release 2.7.0 - UNRELEASED
HDFS-7635. Remove TestCorruptFilesJsp from branch-2. (cnauroth)
HDFS-7632. MiniDFSCluster configures DataNode data directories incorrectly if
using more than 1 DataNode and more than 2 storage locations per DataNode.
(cnauroth)
Release 2.6.1 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -1876,7 +1876,7 @@ public String readBlockOnDataNode(int i, ExtendedBlock block)
* @return true if a replica was corrupted, false otherwise
* Types: delete, write bad data, truncate
*/
public static boolean corruptReplica(int i, ExtendedBlock blk)
public boolean corruptReplica(int i, ExtendedBlock blk)
throws IOException {
File blockFile = getBlockFile(i, blk);
return corruptBlock(blockFile);
@ -1913,7 +1913,7 @@ public static boolean corruptBlockByDeletingBlockFile(File blockFile)
return blockFile.delete();
}
public static boolean changeGenStampOfBlock(int dnIndex, ExtendedBlock blk,
public boolean changeGenStampOfBlock(int dnIndex, ExtendedBlock blk,
long newGenStamp) throws IOException {
File blockFile = getBlockFile(dnIndex, blk);
File metaFile = FsDatasetUtil.findMetaFile(blockFile);
@ -2429,7 +2429,7 @@ public File getInstanceStorageDir(int dnIndex, int dirIndex) {
* @param dirIndex directory index.
* @return Storage directory
*/
public static File getStorageDir(int dnIndex, int dirIndex) {
public File getStorageDir(int dnIndex, int dirIndex) {
return new File(getBaseDirectory(), getStorageDirPath(dnIndex, dirIndex));
}
@ -2440,8 +2440,8 @@ public static File getStorageDir(int dnIndex, int dirIndex) {
* @param dirIndex directory index.
* @return storage directory path
*/
private static String getStorageDirPath(int dnIndex, int dirIndex) {
return "data/data" + (2 * dnIndex + 1 + dirIndex);
private String getStorageDirPath(int dnIndex, int dirIndex) {
return "data/data" + (storagesPerDatanode * dnIndex + 1 + dirIndex);
}
/**
@ -2570,10 +2570,10 @@ public File[] getAllBlockFiles(ExtendedBlock block) {
* @param dnIndex Index of the datanode to get block files for
* @param block block for which corresponding files are needed
*/
public static File getBlockFile(int dnIndex, ExtendedBlock block) {
public File getBlockFile(int dnIndex, ExtendedBlock block) {
// Check for block file in the two storage directories of the datanode
for (int i = 0; i <=1 ; i++) {
File storageDir = MiniDFSCluster.getStorageDir(dnIndex, i);
File storageDir = getStorageDir(dnIndex, i);
File blockFile = getBlockFile(storageDir, block);
if (blockFile.exists()) {
return blockFile;
@ -2588,10 +2588,10 @@ public static File getBlockFile(int dnIndex, ExtendedBlock block) {
* @param dnIndex Index of the datanode to get block files for
* @param block block for which corresponding files are needed
*/
public static File getBlockMetadataFile(int dnIndex, ExtendedBlock block) {
public File getBlockMetadataFile(int dnIndex, ExtendedBlock block) {
// Check for block file in the two storage directories of the datanode
for (int i = 0; i <=1 ; i++) {
File storageDir = MiniDFSCluster.getStorageDir(dnIndex, i);
File storageDir = getStorageDir(dnIndex, i);
File blockMetaFile = getBlockMetadataFile(storageDir, block);
if (blockMetaFile.exists()) {
return blockMetaFile;

View File

@ -160,8 +160,8 @@ public void runBlockReaderLocalTest(BlockReaderLocalTest test,
fsIn.close();
fsIn = null;
ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, TEST_PATH);
File dataFile = MiniDFSCluster.getBlockFile(0, block);
File metaFile = MiniDFSCluster.getBlockMetadataFile(0, block);
File dataFile = cluster.getBlockFile(0, block);
File metaFile = cluster.getBlockMetadataFile(0, block);
ShortCircuitCache shortCircuitCache =
ClientContext.getFromConf(conf).getShortCircuitCache();

View File

@ -179,10 +179,6 @@ public void testDatanodeBlockScanner() throws IOException, TimeoutException {
cluster.shutdown();
}
public static boolean corruptReplica(ExtendedBlock blk, int replica) throws IOException {
return MiniDFSCluster.corruptReplica(replica, blk);
}
@Test
public void testBlockCorruptionPolicy() throws Exception {
Configuration conf = new HdfsConfiguration();
@ -202,7 +198,7 @@ public void testBlockCorruptionPolicy() throws Exception {
assertFalse(DFSTestUtil.allBlockReplicasCorrupt(cluster, file1, 0));
// Corrupt random replica of block
assertTrue(MiniDFSCluster.corruptReplica(rand, block));
assertTrue(cluster.corruptReplica(rand, block));
// Restart the datanode hoping the corrupt block to be reported
cluster.restartDataNode(rand);
@ -213,9 +209,9 @@ public void testBlockCorruptionPolicy() throws Exception {
// Corrupt all replicas. Now, block should be marked as corrupt
// and we should get all the replicas
assertTrue(MiniDFSCluster.corruptReplica(0, block));
assertTrue(MiniDFSCluster.corruptReplica(1, block));
assertTrue(MiniDFSCluster.corruptReplica(2, block));
assertTrue(cluster.corruptReplica(0, block));
assertTrue(cluster.corruptReplica(1, block));
assertTrue(cluster.corruptReplica(2, block));
// Trigger each of the DNs to scan this block immediately.
// The block pool scanner doesn't run frequently enough on its own
@ -288,7 +284,7 @@ private void blockCorruptionRecoveryPolicy(int numDataNodes,
// Corrupt numCorruptReplicas replicas of block
int[] corruptReplicasDNIDs = new int[numCorruptReplicas];
for (int i=0, j=0; (j != numCorruptReplicas) && (i < numDataNodes); i++) {
if (corruptReplica(block, i)) {
if (cluster.corruptReplica(i, block)) {
corruptReplicasDNIDs[j++] = i;
LOG.info("successfully corrupted block " + block + " on node "
+ i + " " + cluster.getDataNodes().get(i).getDisplayName());
@ -373,7 +369,7 @@ public void testTruncatedBlockReport() throws Exception {
assertTrue(waitForVerification(infoPort, fs, fileName, 1, startTime, TIMEOUT) >= startTime);
// Truncate replica of block
if (!changeReplicaLength(block, 0, -1)) {
if (!changeReplicaLength(cluster, block, 0, -1)) {
throw new IOException(
"failed to find or change length of replica on node 0 "
+ cluster.getDataNodes().get(0).getDisplayName());
@ -403,7 +399,7 @@ public void testTruncatedBlockReport() throws Exception {
cluster.getFileSystem(), fileName, REPLICATION_FACTOR);
// Make sure that truncated block will be deleted
waitForBlockDeleted(block, 0, TIMEOUT);
waitForBlockDeleted(cluster, block, 0, TIMEOUT);
} finally {
cluster.shutdown();
}
@ -412,9 +408,9 @@ public void testTruncatedBlockReport() throws Exception {
/**
* Change the length of a block at datanode dnIndex
*/
static boolean changeReplicaLength(ExtendedBlock blk, int dnIndex,
int lenDelta) throws IOException {
File blockFile = MiniDFSCluster.getBlockFile(dnIndex, blk);
static boolean changeReplicaLength(MiniDFSCluster cluster, ExtendedBlock blk,
int dnIndex, int lenDelta) throws IOException {
File blockFile = cluster.getBlockFile(dnIndex, blk);
if (blockFile != null && blockFile.exists()) {
RandomAccessFile raFile = new RandomAccessFile(blockFile, "rw");
raFile.setLength(raFile.length()+lenDelta);
@ -425,9 +421,10 @@ static boolean changeReplicaLength(ExtendedBlock blk, int dnIndex,
return false;
}
private static void waitForBlockDeleted(ExtendedBlock blk, int dnIndex,
long timeout) throws TimeoutException, InterruptedException {
File blockFile = MiniDFSCluster.getBlockFile(dnIndex, blk);
private static void waitForBlockDeleted(MiniDFSCluster cluster,
ExtendedBlock blk, int dnIndex, long timeout) throws TimeoutException,
InterruptedException {
File blockFile = cluster.getBlockFile(dnIndex, blk);
long failtime = Time.monotonicNow()
+ ((timeout > 0) ? timeout : Long.MAX_VALUE);
while (blockFile != null && blockFile.exists()) {
@ -436,7 +433,7 @@ private static void waitForBlockDeleted(ExtendedBlock blk, int dnIndex,
+ blockFile.getPath() + (blockFile.exists() ? " still exists; " : " is absent; "));
}
Thread.sleep(100);
blockFile = MiniDFSCluster.getBlockFile(dnIndex, blk);
blockFile = cluster.getBlockFile(dnIndex, blk);
}
}

View File

@ -79,7 +79,7 @@ public void testMissingBlocksAlert()
// Corrupt the block
ExtendedBlock block = DFSTestUtil.getFirstBlock(dfs, corruptFile);
assertTrue(TestDatanodeBlockScanner.corruptReplica(block, 0));
assertTrue(cluster.corruptReplica(0, block));
// read the file so that the corrupt block is reported to NN
FSDataInputStream in = dfs.open(corruptFile);
@ -124,8 +124,7 @@ public void testMissingBlocksAlert()
DFSTestUtil.createFile(dfs, replOneFile, fileLen, (short)1, 0);
ExtendedBlock replOneBlock = DFSTestUtil.getFirstBlock(
dfs, replOneFile);
assertTrue(TestDatanodeBlockScanner.corruptReplica(
replOneBlock, 0));
assertTrue(cluster.corruptReplica(0, replOneBlock));
// read the file so that the corrupt block is reported to NN
in = dfs.open(replOneFile);

View File

@ -349,7 +349,6 @@ public void testPendingReplicationRetry() throws IOException {
0, Long.MAX_VALUE).get(0).getBlock();
cluster.shutdown();
cluster = null;
for (int i=0; i<25; i++) {
buffer[i] = '0';
@ -358,7 +357,7 @@ public void testPendingReplicationRetry() throws IOException {
int fileCount = 0;
// Choose 3 copies of block file - delete 1 and corrupt the remaining 2
for (int dnIndex=0; dnIndex<3; dnIndex++) {
File blockFile = MiniDFSCluster.getBlockFile(dnIndex, block);
File blockFile = cluster.getBlockFile(dnIndex, block);
LOG.info("Checking for file " + blockFile);
if (blockFile != null && blockFile.exists()) {
@ -445,7 +444,8 @@ private void changeBlockLen(MiniDFSCluster cluster, int lenDelta)
// Change the length of a replica
for (int i=0; i<cluster.getDataNodes().size(); i++) {
if (TestDatanodeBlockScanner.changeReplicaLength(block, i, lenDelta)) {
if (TestDatanodeBlockScanner.changeReplicaLength(cluster, block, i,
lenDelta)) {
break;
}
}

View File

@ -209,7 +209,7 @@ public void testCorruptBlockRereplicatedAcrossRacks() throws Exception {
// Corrupt a replica of the block
int dnToCorrupt = DFSTestUtil.firstDnWithBlock(cluster, b);
assertTrue(MiniDFSCluster.corruptReplica(dnToCorrupt, b));
assertTrue(cluster.corruptReplica(dnToCorrupt, b));
// Restart the datanode so blocks are re-scanned, and the corrupt
// block is detected.

View File

@ -68,7 +68,7 @@ public void testProcesOverReplicateBlock() throws Exception {
// corrupt the block on datanode 0
ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, fileName);
assertTrue(TestDatanodeBlockScanner.corruptReplica(block, 0));
assertTrue(cluster.corruptReplica(0, block));
DataNodeProperties dnProps = cluster.stopDataNode(0);
// remove block scanner log to trigger block scanning
File scanLog = new File(MiniDFSCluster.getFinalizedDir(

View File

@ -227,7 +227,7 @@ public void testFadviseAfterWriteThenRead() throws Exception {
// verify that we dropped everything from the cache during file creation.
ExtendedBlock block = cluster.getNameNode().getRpcServer().getBlockLocations(
TEST_PATH, 0, Long.MAX_VALUE).get(0).getBlock();
String fadvisedFileName = MiniDFSCluster.getBlockFile(0, block).getName();
String fadvisedFileName = cluster.getBlockFile(0, block).getName();
Stats stats = tracker.getStats(fadvisedFileName);
stats.assertDroppedInRange(0, TEST_PATH_LEN - WRITE_PACKET_SIZE);
stats.clear();
@ -272,7 +272,7 @@ public void testClientDefaults() throws Exception {
// verify that we dropped everything from the cache during file creation.
ExtendedBlock block = cluster.getNameNode().getRpcServer().getBlockLocations(
TEST_PATH, 0, Long.MAX_VALUE).get(0).getBlock();
String fadvisedFileName = MiniDFSCluster.getBlockFile(0, block).getName();
String fadvisedFileName = cluster.getBlockFile(0, block).getName();
Stats stats = tracker.getStats(fadvisedFileName);
stats.assertDroppedInRange(0, TEST_PATH_LEN - WRITE_PACKET_SIZE);
stats.clear();
@ -313,7 +313,7 @@ public void testFadviseSkippedForSmallReads() throws Exception {
// specify any policy, we should have done drop-behind.
ExtendedBlock block = cluster.getNameNode().getRpcServer().getBlockLocations(
TEST_PATH, 0, Long.MAX_VALUE).get(0).getBlock();
String fadvisedFileName = MiniDFSCluster.getBlockFile(0, block).getName();
String fadvisedFileName = cluster.getBlockFile(0, block).getName();
Stats stats = tracker.getStats(fadvisedFileName);
stats.assertDroppedInRange(0, TEST_PATH_LEN - WRITE_PACKET_SIZE);
stats.clear();
@ -355,7 +355,7 @@ public void testNoFadviseAfterWriteThenRead() throws Exception {
// verify that we did not drop everything from the cache during file creation.
ExtendedBlock block = cluster.getNameNode().getRpcServer().getBlockLocations(
TEST_PATH, 0, Long.MAX_VALUE).get(0).getBlock();
String fadvisedFileName = MiniDFSCluster.getBlockFile(0, block).getName();
String fadvisedFileName = cluster.getBlockFile(0, block).getName();
Stats stats = tracker.getStats(fadvisedFileName);
Assert.assertNull(stats);

View File

@ -178,7 +178,7 @@ private void doShortCircuitReadAfterEvictionTest() throws IOException,
// Verify short-circuit read from RAM_DISK.
ensureFileReplicasOnStorageType(path1, RAM_DISK);
File metaFile = MiniDFSCluster.getBlockMetadataFile(0,
File metaFile = cluster.getBlockMetadataFile(0,
DFSTestUtil.getFirstBlock(fs, path1));
assertTrue(metaFile.length() <= BlockMetadataHeader.getHeaderSize());
assertTrue(verifyReadRandomFile(path1, BLOCK_SIZE, SEED));
@ -188,7 +188,7 @@ private void doShortCircuitReadAfterEvictionTest() throws IOException,
// Verify short-circuit read from RAM_DISK once again.
ensureFileReplicasOnStorageType(path1, RAM_DISK);
metaFile = MiniDFSCluster.getBlockMetadataFile(0,
metaFile = cluster.getBlockMetadataFile(0,
DFSTestUtil.getFirstBlock(fs, path1));
assertTrue(metaFile.length() <= BlockMetadataHeader.getHeaderSize());
assertTrue(verifyReadRandomFile(path1, BLOCK_SIZE, SEED));
@ -201,7 +201,7 @@ private void doShortCircuitReadAfterEvictionTest() throws IOException,
// Verify short-circuit read still works from DEFAULT storage. This time,
// we'll have a checksum written during lazy persistence.
ensureFileReplicasOnStorageType(path1, DEFAULT);
metaFile = MiniDFSCluster.getBlockMetadataFile(0,
metaFile = cluster.getBlockMetadataFile(0,
DFSTestUtil.getFirstBlock(fs, path1));
assertTrue(metaFile.length() > BlockMetadataHeader.getHeaderSize());
assertTrue(verifyReadRandomFile(path1, BLOCK_SIZE, SEED));
@ -251,7 +251,7 @@ public void doShortCircuitReadBlockFileCorruptionTest() throws IOException,
// Corrupt the lazy-persisted block file, and verify that checksum
// verification catches it.
ensureFileReplicasOnStorageType(path1, DEFAULT);
MiniDFSCluster.corruptReplica(0, DFSTestUtil.getFirstBlock(fs, path1));
cluster.corruptReplica(0, DFSTestUtil.getFirstBlock(fs, path1));
exception.expect(ChecksumException.class);
DFSTestUtil.readFileBuffer(fs, path1);
}
@ -291,7 +291,7 @@ public void doShortCircuitReadMetaFileCorruptionTest() throws IOException,
// Corrupt the lazy-persisted checksum file, and verify that checksum
// verification catches it.
ensureFileReplicasOnStorageType(path1, DEFAULT);
File metaFile = MiniDFSCluster.getBlockMetadataFile(0,
File metaFile = cluster.getBlockMetadataFile(0,
DFSTestUtil.getFirstBlock(fs, path1));
MiniDFSCluster.corruptBlock(metaFile);
exception.expect(ChecksumException.class);

View File

@ -345,7 +345,7 @@ public void testFsckMove() throws Exception {
totalMissingBlocks += ctFile.getTotalMissingBlocks();
}
for (CorruptedTestFile ctFile : ctFiles) {
ctFile.removeBlocks();
ctFile.removeBlocks(cluster);
}
// Wait for fsck to discover all the missing blocks
while (true) {
@ -432,14 +432,15 @@ private byte[] cacheInitialContents() throws IOException {
return content;
}
public void removeBlocks() throws AccessControlException,
FileNotFoundException, UnresolvedLinkException, IOException {
public void removeBlocks(MiniDFSCluster cluster)
throws AccessControlException, FileNotFoundException,
UnresolvedLinkException, IOException {
for (int corruptIdx : blocksToCorrupt) {
// Corrupt a block by deleting it
ExtendedBlock block = dfsClient.getNamenode().getBlockLocations(
name, blockSize * corruptIdx, Long.MAX_VALUE).get(0).getBlock();
for (int i = 0; i < numDataNodes; i++) {
File blockFile = MiniDFSCluster.getBlockFile(i, block);
File blockFile = cluster.getBlockFile(i, block);
if(blockFile != null && blockFile.exists()) {
assertTrue(blockFile.delete());
}
@ -517,7 +518,7 @@ public void testFsckMoveAndDelete() throws Exception {
ExtendedBlock block = dfsClient.getNamenode().getBlockLocations(
corruptFileName, 0, Long.MAX_VALUE).get(0).getBlock();
for (int i=0; i<4; i++) {
File blockFile = MiniDFSCluster.getBlockFile(i, block);
File blockFile = cluster.getBlockFile(i, block);
if(blockFile != null && blockFile.exists()) {
assertTrue(blockFile.delete());
}
@ -647,7 +648,7 @@ public void testCorruptBlock() throws Exception {
assertTrue(outStr.contains(NamenodeFsck.HEALTHY_STATUS));
// corrupt replicas
File blockFile = MiniDFSCluster.getBlockFile(0, block);
File blockFile = cluster.getBlockFile(0, block);
if (blockFile != null && blockFile.exists()) {
RandomAccessFile raFile = new RandomAccessFile(blockFile, "rw");
FileChannel channel = raFile.getChannel();
@ -1303,7 +1304,7 @@ public void testBlockIdCKCorruption() throws Exception {
// corrupt replicas
block = DFSTestUtil.getFirstBlock(dfs, path);
File blockFile = MiniDFSCluster.getBlockFile(0, block);
File blockFile = cluster.getBlockFile(0, block);
if (blockFile != null && blockFile.exists()) {
RandomAccessFile raFile = new RandomAccessFile(blockFile, "rw");
FileChannel channel = raFile.getChannel();

View File

@ -267,14 +267,14 @@ private void corruptBlock(MiniDFSCluster cluster, FileSystem fs, final Path file
// corrupt the block on datanode dnIndex
// the indexes change once the nodes are restarted.
// But the datadirectory will not change
assertTrue(MiniDFSCluster.corruptReplica(dnIndex, block));
assertTrue(cluster.corruptReplica(dnIndex, block));
DataNodeProperties dnProps = cluster.stopDataNode(0);
// Each datanode has multiple data dirs, check each
for (int dirIndex = 0; dirIndex < 2; dirIndex++) {
final String bpid = cluster.getNamesystem().getBlockPoolId();
File storageDir = MiniDFSCluster.getStorageDir(dnIndex, dirIndex);
File storageDir = cluster.getStorageDir(dnIndex, dirIndex);
File dataDir = MiniDFSCluster.getFinalizedDir(storageDir, bpid);
File scanLogFile = new File(dataDir, "dncp_block_verification.log.curr");
if (scanLogFile.exists()) {

View File

@ -67,7 +67,7 @@ public void testChangedStorageId() throws IOException, URISyntaxException,
// Change the gen stamp of the block on datanode to go back in time (gen
// stamps start at 1000)
ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, filePath);
assertTrue(MiniDFSCluster.changeGenStampOfBlock(0, block, 900));
assertTrue(cluster.changeGenStampOfBlock(0, block, 900));
// Stop the DN so the replica with the changed gen stamp will be reported
// when this DN starts up.

View File

@ -466,7 +466,7 @@ public void testHandleTruncatedBlockFile() throws IOException {
"waitReplication: " + e);
}
ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, TEST_PATH);
File dataFile = MiniDFSCluster.getBlockFile(0, block);
File dataFile = cluster.getBlockFile(0, block);
cluster.shutdown();
cluster = null;
RandomAccessFile raf = null;