From 938565925adb9d866e8c6951361cd5582076e013 Mon Sep 17 00:00:00 2001 From: Arpit Agarwal Date: Mon, 16 Dec 2013 00:58:40 +0000 Subject: [PATCH] HDFS-5406. Send incremental block reports for all storages in a single call. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1551093 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 + .../hdfs/server/datanode/BPServiceActor.java | 41 +++-- .../server/namenode/NameNodeRpcServer.java | 1 + .../namenode/metrics/NameNodeMetrics.java | 6 + .../apache/hadoop/hdfs/MiniDFSCluster.java | 9 +- .../hdfs/server/datanode/TestBlockReport.java | 171 ++++++++++++++---- 6 files changed, 170 insertions(+), 61 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 4f26a54987..4608f2d80c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -451,6 +451,9 @@ Trunk (Unreleased) HDFS-5626. dfsadmin -report shows incorrect cache values. (cmccabe) + HDFS-5406. Send incremental block reports for all storages in a + single call. (Arpit Agarwal) + BREAKDOWN OF HDFS-2832 SUBTASKS AND RELATED JIRAS HDFS-4985. Add storage type to the protocol and expose it in block report diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java index 9e33e09faa..c91fca381f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.net.SocketTimeoutException; +import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Map; @@ -273,7 +274,8 @@ void reportBadBlocks(ExtendedBlock block, private void reportReceivedDeletedBlocks() throws IOException { // Generate a list of the pending reports for each storage under the lock - Map blockArrays = Maps.newHashMap(); + ArrayList reports = + new ArrayList(pendingIncrementalBRperStorage.size()); synchronized (pendingIncrementalBRperStorage) { for (Map.Entry entry : pendingIncrementalBRperStorage.entrySet()) { @@ -286,33 +288,34 @@ private void reportReceivedDeletedBlocks() throws IOException { pendingReceivedRequests = (pendingReceivedRequests > rdbi.length ? (pendingReceivedRequests - rdbi.length) : 0); - blockArrays.put(storageUuid, rdbi); + reports.add(new StorageReceivedDeletedBlocks(storageUuid, rdbi)); } } } - // Send incremental block reports to the Namenode outside the lock - for (Map.Entry entry : - blockArrays.entrySet()) { - final String storageUuid = entry.getKey(); - final ReceivedDeletedBlockInfo[] rdbi = entry.getValue(); + if (reports.size() == 0) { + // Nothing new to report. + return; + } - StorageReceivedDeletedBlocks[] report = { new StorageReceivedDeletedBlocks( - storageUuid, rdbi) }; - boolean success = false; - try { - bpNamenode.blockReceivedAndDeleted(bpRegistration, - bpos.getBlockPoolId(), report); - success = true; - } finally { - if (!success) { - synchronized (pendingIncrementalBRperStorage) { + // Send incremental block reports to the Namenode outside the lock + boolean success = false; + try { + bpNamenode.blockReceivedAndDeleted(bpRegistration, + bpos.getBlockPoolId(), + reports.toArray(new StorageReceivedDeletedBlocks[reports.size()])); + success = true; + } finally { + if (!success) { + synchronized (pendingIncrementalBRperStorage) { + for (StorageReceivedDeletedBlocks report : reports) { // If we didn't succeed in sending the report, put all of the // blocks back onto our queue, but only in the case where we // didn't put something newer in the meantime. PerStoragePendingIncrementalBR perStorageMap = - pendingIncrementalBRperStorage.get(storageUuid); - pendingReceivedRequests += perStorageMap.putMissingBlockInfos(rdbi); + pendingIncrementalBRperStorage.get(report.getStorageID()); + pendingReceivedRequests += + perStorageMap.putMissingBlockInfos(report.getBlocks()); } } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java index 457a89ca09..84360e5eb4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java @@ -1006,6 +1006,7 @@ public DatanodeCommand cacheReport(DatanodeRegistration nodeReg, public void blockReceivedAndDeleted(DatanodeRegistration nodeReg, String poolId, StorageReceivedDeletedBlocks[] receivedAndDeletedBlocks) throws IOException { verifyRequest(nodeReg); + metrics.incrBlockReceivedAndDeletedOps(); if(blockStateChangeLog.isDebugEnabled()) { blockStateChangeLog.debug("*BLOCK* NameNode.blockReceivedAndDeleted: " +"from "+nodeReg+" "+receivedAndDeletedBlocks.length diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java index c0a204e952..2916da0799 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java @@ -71,6 +71,8 @@ public class NameNodeMetrics { MutableCounterLong listSnapshottableDirOps; @Metric("Number of snapshotDiffReport operations") MutableCounterLong snapshotDiffReportOps; + @Metric("Number of blockReceivedAndDeleted calls") + MutableCounterLong blockReceivedAndDeletedOps; @Metric("Journal transactions") MutableRate transactions; @Metric("Journal syncs") MutableRate syncs; @@ -209,6 +211,10 @@ public void incrSnapshotDiffReportOps() { snapshotDiffReportOps.incr(); } + public void incrBlockReceivedAndDeletedOps() { + blockReceivedAndDeletedOps.incr(); + } + public void addTransaction(long latency) { transactions.add(latency); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java index 8d2ad2208a..1221a7f2a1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java @@ -2143,17 +2143,14 @@ public File getInstanceStorageDir(int dnIndex, int dirIndex) { } /** - * Get a storage directory for a datanode. There are two storage directories - * per datanode: + * Get a storage directory for a datanode. *
    *
  1. /data/data<2*dnIndex + 1>
  2. *
  3. /data/data<2*dnIndex + 2>
  4. *
* * @param dnIndex datanode index (starts from 0) - * @param dirIndex directory index (0 or 1). Index 0 provides access to the - * first storage directory. Index 1 provides access to the second - * storage directory. + * @param dirIndex directory index. * @return Storage directory */ public static File getStorageDir(int dnIndex, int dirIndex) { @@ -2164,7 +2161,7 @@ public static File getStorageDir(int dnIndex, int dirIndex) { * Calculate the DN instance-specific path for appending to the base dir * to determine the location of the storage of a DN instance in the mini cluster * @param dnIndex datanode index - * @param dirIndex directory index (0 or 1). + * @param dirIndex directory index. * @return */ private static String getStorageDirPath(int dnIndex, int dirIndex) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReport.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReport.java index bd54edd0ce..d777697c21 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReport.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReport.java @@ -71,7 +71,15 @@ /** * This test simulates a variety of situations when blocks are being * intentionally corrupted, unexpectedly modified, and so on before a block - * report is happening + * report is happening. + * + * For each test case it runs two variations: + * #1 - For a given DN, the first variation sends block reports for all + * storages in a single call to the NN. + * #2 - For a given DN, the second variation sends block reports for each + * storage in a separate call. + * + * The behavior should be the same in either variation. */ public class TestBlockReport { public static final Log LOG = LogFactory.getLog(TestBlockReport.class); @@ -157,6 +165,113 @@ private static StorageBlockReport[] getBlockReports( return reports; } + /** + * Utility routine to send block reports to the NN, either in a single call + * or reporting one storage per call. + * + * @param dnR + * @param poolId + * @param reports + * @param needtoSplit + * @throws IOException + */ + private void sendBlockReports(DatanodeRegistration dnR, String poolId, + StorageBlockReport[] reports, boolean needtoSplit) throws IOException { + if (!needtoSplit) { + LOG.info("Sending combined block reports for " + dnR); + cluster.getNameNodeRpc().blockReport(dnR, poolId, reports); + } else { + for (StorageBlockReport report : reports) { + LOG.info("Sending block report for storage " + report.getStorage().getStorageID()); + StorageBlockReport[] singletonReport = { report }; + cluster.getNameNodeRpc().blockReport(dnR, poolId, singletonReport); + } + } + } + + /** + * Test variations blockReport_01 through blockReport_09 with combined + * and split block reports. + */ + @Test + public void blockReportCombined_01() throws IOException { + blockReport_01(false); + } + + @Test + public void blockReportSplit_01() throws IOException { + blockReport_01(true); + } + + @Test + public void blockReportCombined_02() throws IOException { + blockReport_02(false); + } + + @Test + public void blockReportSplit_02() throws IOException { + blockReport_02(true); + } + + @Test + public void blockReportCombined_03() throws IOException { + blockReport_03(false); + } + + @Test + public void blockReportSplit_03() throws IOException { + blockReport_03(true); + } + + @Test + public void blockReportCombined_04() throws IOException { + blockReport_04(false); + } + + @Test + public void blockReportSplit_04() throws IOException { + blockReport_04(true); + } + + @Test + public void blockReportCombined_06() throws Exception { + blockReport_06(false); + } + + @Test + public void blockReportSplit_06() throws Exception { + blockReport_06(true); + } + + @Test + public void blockReportCombined_07() throws Exception { + blockReport_07(false); + } + + @Test + public void blockReportSplit_07() throws Exception { + blockReport_07(true); + } + + @Test + public void blockReportCombined_08() throws Exception { + blockReport_08(false); + } + + @Test + public void blockReportSplit_08() throws Exception { + blockReport_08(true); + } + + @Test + public void blockReportCombined_09() throws Exception { + blockReport_09(false); + } + + @Test + public void blockReportSplit_09() throws Exception { + blockReport_09(true); + } /** * Test write a file, verifies and closes it. Then the length of the blocks * are messed up and BlockReport is forced. @@ -164,8 +279,7 @@ private static StorageBlockReport[] getBlockReports( * * @throws java.io.IOException on an error */ - @Test - public void blockReport_01() throws IOException { + private void blockReport_01(boolean splitBlockReports) throws IOException { final String METHOD_NAME = GenericTestUtils.getMethodName(); Path filePath = new Path("/" + METHOD_NAME + ".dat"); @@ -198,7 +312,7 @@ public void blockReport_01() throws IOException { String poolId = cluster.getNamesystem().getBlockPoolId(); DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId); StorageBlockReport[] reports = getBlockReports(dn, poolId, false, false); - cluster.getNameNodeRpc().blockReport(dnR, poolId, reports); + sendBlockReports(dnR, poolId, reports, splitBlockReports); List blocksAfterReport = DFSTestUtil.getAllBlocks(fs.open(filePath)); @@ -224,8 +338,7 @@ public void blockReport_01() throws IOException { * * @throws IOException in case of errors */ - @Test - public void blockReport_02() throws IOException { + private void blockReport_02(boolean splitBlockReports) throws IOException { final String METHOD_NAME = GenericTestUtils.getMethodName(); LOG.info("Running test " + METHOD_NAME); @@ -280,7 +393,7 @@ public void blockReport_02() throws IOException { String poolId = cluster.getNamesystem().getBlockPoolId(); DatanodeRegistration dnR = dn0.getDNRegistrationForBP(poolId); StorageBlockReport[] reports = getBlockReports(dn0, poolId, false, false); - cluster.getNameNodeRpc().blockReport(dnR, poolId, reports); + sendBlockReports(dnR, poolId, reports, splitBlockReports); BlockManagerTestUtil.getComputedDatanodeWork(cluster.getNamesystem() .getBlockManager()); @@ -301,8 +414,7 @@ public void blockReport_02() throws IOException { * * @throws IOException in case of an error */ - @Test - public void blockReport_03() throws IOException { + private void blockReport_03(boolean splitBlockReports) throws IOException { final String METHOD_NAME = GenericTestUtils.getMethodName(); Path filePath = new Path("/" + METHOD_NAME + ".dat"); ArrayList blocks = writeFile(METHOD_NAME, FILE_SIZE, filePath); @@ -312,11 +424,7 @@ public void blockReport_03() throws IOException { String poolId = cluster.getNamesystem().getBlockPoolId(); DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId); StorageBlockReport[] reports = getBlockReports(dn, poolId, true, false); - DatanodeCommand dnCmd = - cluster.getNameNodeRpc().blockReport(dnR, poolId, reports); - if(LOG.isDebugEnabled()) { - LOG.debug("Got the command: " + dnCmd); - } + sendBlockReports(dnR, poolId, reports, splitBlockReports); printStats(); assertThat("Wrong number of corrupt blocks", @@ -333,8 +441,7 @@ public void blockReport_03() throws IOException { * * @throws IOException in case of an error */ - @Test - public void blockReport_04() throws IOException { + private void blockReport_04(boolean splitBlockReports) throws IOException { final String METHOD_NAME = GenericTestUtils.getMethodName(); Path filePath = new Path("/" + METHOD_NAME + ".dat"); DFSTestUtil.createFile(fs, filePath, @@ -352,11 +459,7 @@ public void blockReport_04() throws IOException { DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId); StorageBlockReport[] reports = getBlockReports(dn, poolId, false, false); - DatanodeCommand dnCmd = - cluster.getNameNodeRpc().blockReport(dnR, poolId, reports); - if(LOG.isDebugEnabled()) { - LOG.debug("Got the command: " + dnCmd); - } + sendBlockReports(dnR, poolId, reports, splitBlockReports); printStats(); assertThat("Wrong number of corrupt blocks", @@ -373,8 +476,7 @@ public void blockReport_04() throws IOException { * * @throws IOException in case of an error */ - @Test - public void blockReport_06() throws Exception { + private void blockReport_06(boolean splitBlockReports) throws Exception { final String METHOD_NAME = GenericTestUtils.getMethodName(); Path filePath = new Path("/" + METHOD_NAME + ".dat"); final int DN_N1 = DN_N0 + 1; @@ -387,7 +489,7 @@ public void blockReport_06() throws Exception { String poolId = cluster.getNamesystem().getBlockPoolId(); DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId); StorageBlockReport[] reports = getBlockReports(dn, poolId, false, false); - cluster.getNameNodeRpc().blockReport(dnR, poolId, reports); + sendBlockReports(dnR, poolId, reports, splitBlockReports); printStats(); assertEquals("Wrong number of PendingReplication Blocks", 0, cluster.getNamesystem().getUnderReplicatedBlocks()); @@ -406,8 +508,7 @@ public void blockReport_06() throws Exception { * * @throws IOException in case of an error */ - @Test - public void blockReport_07() throws Exception { + private void blockReport_07(boolean splitBlockReports) throws Exception { final String METHOD_NAME = GenericTestUtils.getMethodName(); Path filePath = new Path("/" + METHOD_NAME + ".dat"); final int DN_N1 = DN_N0 + 1; @@ -421,7 +522,7 @@ public void blockReport_07() throws Exception { String poolId = cluster.getNamesystem().getBlockPoolId(); DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId); StorageBlockReport[] reports = getBlockReports(dn, poolId, true, false); - cluster.getNameNodeRpc().blockReport(dnR, poolId, reports); + sendBlockReports(dnR, poolId, reports, splitBlockReports); printStats(); assertThat("Wrong number of corrupt blocks", @@ -432,7 +533,7 @@ public void blockReport_07() throws Exception { cluster.getNamesystem().getPendingReplicationBlocks(), is(0L)); reports = getBlockReports(dn, poolId, true, true); - cluster.getNameNodeRpc().blockReport(dnR, poolId, reports); + sendBlockReports(dnR, poolId, reports, splitBlockReports); printStats(); assertThat("Wrong number of corrupt blocks", @@ -458,8 +559,7 @@ public void blockReport_07() throws Exception { * * @throws IOException in case of an error */ - @Test - public void blockReport_08() throws IOException { + private void blockReport_08(boolean splitBlockReports) throws IOException { final String METHOD_NAME = GenericTestUtils.getMethodName(); Path filePath = new Path("/" + METHOD_NAME + ".dat"); final int DN_N1 = DN_N0 + 1; @@ -483,8 +583,8 @@ public void blockReport_08() throws IOException { DataNode dn = cluster.getDataNodes().get(DN_N1); String poolId = cluster.getNamesystem().getBlockPoolId(); DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId); - StorageBlockReport[] report = getBlockReports(dn, poolId, false, false); - cluster.getNameNodeRpc().blockReport(dnR, poolId, report); + StorageBlockReport[] reports = getBlockReports(dn, poolId, false, false); + sendBlockReports(dnR, poolId, reports, splitBlockReports); printStats(); assertEquals("Wrong number of PendingReplication blocks", blocks.size(), cluster.getNamesystem().getPendingReplicationBlocks()); @@ -500,8 +600,7 @@ public void blockReport_08() throws IOException { // Similar to BlockReport_08 but corrupts GS and len of the TEMPORARY's // replica block. Expect the same behaviour: NN should simply ignore this // block - @Test - public void blockReport_09() throws IOException { + private void blockReport_09(boolean splitBlockReports) throws IOException { final String METHOD_NAME = GenericTestUtils.getMethodName(); Path filePath = new Path("/" + METHOD_NAME + ".dat"); final int DN_N1 = DN_N0 + 1; @@ -526,8 +625,8 @@ public void blockReport_09() throws IOException { DataNode dn = cluster.getDataNodes().get(DN_N1); String poolId = cluster.getNamesystem().getBlockPoolId(); DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId); - StorageBlockReport[] report = getBlockReports(dn, poolId, true, true); - cluster.getNameNodeRpc().blockReport(dnR, poolId, report); + StorageBlockReport[] reports = getBlockReports(dn, poolId, true, true); + sendBlockReports(dnR, poolId, reports, splitBlockReports); printStats(); assertEquals("Wrong number of PendingReplication blocks", 2, cluster.getNamesystem().getPendingReplicationBlocks());