diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java index c7ba9d2e76..9486bd1974 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java @@ -26,13 +26,22 @@ import com.google.common.base.Supplier; import com.google.common.collect.Lists; +import org.apache.commons.lang.text.StrBuilder; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.ReconfigurationUtil; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DFSClient; import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; +import org.apache.hadoop.hdfs.protocol.LocatedBlocks; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil; import org.apache.hadoop.hdfs.server.common.Storage; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.StorageLocation; @@ -452,4 +461,135 @@ public void testNameNodeGetReconfigurationStatus() throws IOException, assertThat(outs.get(offset + 2), is(allOf(containsString("To:"), containsString("6")))); } + + private static String scanIntoString(final ByteArrayOutputStream baos) { + final StrBuilder sb = new StrBuilder(); + final Scanner scanner = new Scanner(baos.toString()); + while (scanner.hasNextLine()) { + sb.appendln(scanner.nextLine()); + } + scanner.close(); + return sb.toString(); + } + + @Test(timeout = 30000) + public void testReportCommand() throws Exception { + redirectStream(); + + /* init conf */ + final Configuration dfsConf = new HdfsConfiguration(); + dfsConf.setInt( + DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, + 500); // 0.5s + dfsConf.setLong(DFS_HEARTBEAT_INTERVAL_KEY, 1); + final Path baseDir = new Path( + PathUtils.getTestDir(getClass()).getAbsolutePath(), + GenericTestUtils.getMethodName()); + dfsConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, baseDir.toString()); + + final int numDn = 3; + + /* init cluster */ + try(MiniDFSCluster miniCluster = new MiniDFSCluster + .Builder(dfsConf) + .numDataNodes(numDn).build()) { + + miniCluster.waitActive(); + assertEquals(numDn, miniCluster.getDataNodes().size()); + + /* local vars */ + final DFSAdmin dfsAdmin = new DFSAdmin(dfsConf); + final DFSClient client = miniCluster.getFileSystem().getClient(); + + /* run and verify report command */ + resetStream(); + assertEquals(0, ToolRunner.run(dfsAdmin, new String[] {"-report"})); + verifyNodesAndCorruptBlocks(numDn, numDn, 0, client); + + /* shut down one DN */ + final List datanodes = miniCluster.getDataNodes(); + final DataNode last = datanodes.get(datanodes.size() - 1); + last.shutdown(); + miniCluster.setDataNodeDead(last.getDatanodeId()); + + /* run and verify report command */ + assertEquals(0, ToolRunner.run(dfsAdmin, new String[] {"-report"})); + verifyNodesAndCorruptBlocks(numDn, numDn - 1, 0, client); + + /* corrupt one block */ + final short replFactor = 1; + final long fileLength = 512L; + final FileSystem fs = miniCluster.getFileSystem(); + final Path file = new Path(baseDir, "/corrupted"); + DFSTestUtil.createFile(fs, file, fileLength, replFactor, 12345L); + DFSTestUtil.waitReplication(fs, file, replFactor); + + final ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, file); + final int blockFilesCorrupted = miniCluster + .corruptBlockOnDataNodes(block); + assertEquals("Fail to corrupt all replicas for block " + block, + replFactor, blockFilesCorrupted); + + /* + * Increase replication factor, this should invoke transfer request. + * Receiving datanode fails on checksum and reports it to namenode + */ + fs.setReplication(file, (short) (replFactor + 1)); + + /* get block details and check if the block is corrupt */ + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + LocatedBlocks blocks = null; + try { + blocks = client.getNamenode().getBlockLocations(file.toString(), 0, + Long.MAX_VALUE); + } catch (IOException e) { + return false; + } + return blocks != null && blocks.get(0).isCorrupt(); + } + }, 100, 60000); + + BlockManagerTestUtil.updateState( + miniCluster.getNameNode().getNamesystem().getBlockManager()); + + /* run and verify report command */ + resetStream(); + assertEquals(0, ToolRunner.run(dfsAdmin, new String[] {"-report"})); + verifyNodesAndCorruptBlocks(numDn, numDn - 1, 1, client); + } + } + + private void verifyNodesAndCorruptBlocks( + final int numDn, + final int numLiveDn, + final int numCorruptBlocks, + final DFSClient client) throws IOException { + + /* init vars */ + final String outStr = scanIntoString(out); + final String expectedLiveNodesStr = String.format( + "Live datanodes (%d)", + numLiveDn); + final String expectedCorruptedBlocksStr = String.format( + "Blocks with corrupt replicas: %d", + numCorruptBlocks); + + /* verify nodes and corrupt blocks */ + assertThat(outStr, is(allOf( + containsString(expectedLiveNodesStr), + containsString(expectedCorruptedBlocksStr)))); + + assertEquals( + numDn, + client.getDatanodeStorageReport(DatanodeReportType.ALL).length); + assertEquals( + numLiveDn, + client.getDatanodeStorageReport(DatanodeReportType.LIVE).length); + assertEquals( + numDn - numLiveDn, + client.getDatanodeStorageReport(DatanodeReportType.DEAD).length); + assertEquals(numCorruptBlocks, client.getCorruptBlocksCount()); + } } \ No newline at end of file