From 9a1d8cfaf50ec29ffb2d8522ba2f4bc6605d8b8b Mon Sep 17 00:00:00 2001 From: LeonGao91 Date: Fri, 16 Aug 2019 08:00:51 -0700 Subject: [PATCH] HDFS-14678. Allow triggerBlockReport to a specific namenode. (#1252). Contributed by Leon Gao. --- .../hdfs/client/BlockReportOptions.java | 20 ++++- .../ClientDatanodeProtocolTranslatorPB.java | 10 ++- .../main/proto/ClientDatanodeProtocol.proto | 1 + ...atanodeProtocolServerSideTranslatorPB.java | 9 ++- .../hadoop/hdfs/server/datanode/DataNode.java | 6 +- .../apache/hadoop/hdfs/tools/DFSAdmin.java | 20 +++-- .../src/site/markdown/HDFSCommands.md | 4 +- .../datanode/TestTriggerBlockReport.java | 79 +++++++++++++------ .../hadoop/hdfs/tools/TestDFSAdmin.java | 25 ++++++ 9 files changed, 135 insertions(+), 39 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/BlockReportOptions.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/BlockReportOptions.java index 07f483692c..0d08e52eb4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/BlockReportOptions.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/BlockReportOptions.java @@ -20,6 +20,8 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import java.net.InetSocketAddress; + /** * Options that can be specified when manually triggering a block report. */ @@ -27,17 +29,24 @@ @InterfaceStability.Evolving public final class BlockReportOptions { private final boolean incremental; + private final InetSocketAddress namenodeAddr; - private BlockReportOptions(boolean incremental) { + private BlockReportOptions(boolean incremental, InetSocketAddress namenodeAddr) { this.incremental = incremental; + this.namenodeAddr = namenodeAddr; } public boolean isIncremental() { return incremental; } + public InetSocketAddress getNamenodeAddr() { + return namenodeAddr; + } + public static class Factory { private boolean incremental = false; + private InetSocketAddress namenodeAddr; public Factory() { } @@ -47,13 +56,18 @@ public Factory setIncremental(boolean incremental) { return this; } + public Factory setNamenodeAddr(InetSocketAddress namenodeAddr) { + this.namenodeAddr = namenodeAddr; + return this; + } + public BlockReportOptions build() { - return new BlockReportOptions(incremental); + return new BlockReportOptions(incremental, namenodeAddr); } } @Override public String toString() { - return "BlockReportOptions{incremental=" + incremental + "}"; + return "BlockReportOptions{incremental=" + incremental + ", namenodeAddr=" + namenodeAddr + "}"; } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java index a9622bae7c..543a213873 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolTranslatorPB.java @@ -324,10 +324,12 @@ public List listReconfigurableProperties() throws IOException { public void triggerBlockReport(BlockReportOptions options) throws IOException { try { - rpcProxy.triggerBlockReport(NULL_CONTROLLER, - TriggerBlockReportRequestProto.newBuilder(). - setIncremental(options.isIncremental()). - build()); + TriggerBlockReportRequestProto.Builder builder = TriggerBlockReportRequestProto.newBuilder(). + setIncremental(options.isIncremental()); + if (options.getNamenodeAddr() != null) { + builder.setNnAddress(NetUtils.getHostPortString(options.getNamenodeAddr())); + } + rpcProxy.triggerBlockReport(NULL_CONTROLLER, builder.build()); } catch (ServiceException e) { throw ProtobufHelper.getRemoteException(e); } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientDatanodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientDatanodeProtocol.proto index 8b26a560d7..52f6330e0b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientDatanodeProtocol.proto +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ClientDatanodeProtocol.proto @@ -140,6 +140,7 @@ message GetVolumeReportResponseProto { message TriggerBlockReportRequestProto { required bool incremental = 1; + optional string nnAddress = 2; } message TriggerBlockReportResponseProto { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java index 09ca2747b7..4fa4c1a543 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientDatanodeProtocolServerSideTranslatorPB.java @@ -64,6 +64,7 @@ import com.google.protobuf.RpcController; import com.google.protobuf.ServiceException; import org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkStatus; +import org.apache.hadoop.net.NetUtils; /** * Implementation for protobuf service that forwards requests @@ -225,8 +226,12 @@ public TriggerBlockReportResponseProto triggerBlockReport( RpcController unused, TriggerBlockReportRequestProto request) throws ServiceException { try { - impl.triggerBlockReport(new BlockReportOptions.Factory(). - setIncremental(request.getIncremental()).build()); + BlockReportOptions.Factory factory = new BlockReportOptions.Factory(). + setIncremental(request.getIncremental()); + if (request.hasNnAddress()) { + factory.setNamenodeAddr(NetUtils.createSocketAddr(request.getNnAddress())); + } + impl.triggerBlockReport(factory.build()); } catch (IOException e) { throw new ServiceException(e); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index 163d73c02e..c8f6896820 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -3316,10 +3316,14 @@ public List listReconfigurableProperties() public void triggerBlockReport(BlockReportOptions options) throws IOException { checkSuperuserPrivilege(); + InetSocketAddress namenodeAddr = options.getNamenodeAddr(); + boolean shouldTriggerToAllNn = (namenodeAddr == null); for (BPOfferService bpos : blockPoolManager.getAllNamenodeThreads()) { if (bpos != null) { for (BPServiceActor actor : bpos.getBPServiceActors()) { - actor.triggerBlockReport(options); + if (shouldTriggerToAllNn || namenodeAddr.equals(actor.nnAddr)) { + actor.triggerBlockReport(options); + } } } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java index 57776b6a8f..b3ab32a4dd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java @@ -466,7 +466,7 @@ static int run(DistributedFileSystem dfs, String[] argv, int idx) throws IOExcep "\t[-evictWriters ]\n" + "\t[-getDatanodeInfo ]\n" + "\t[-metasave filename]\n" + - "\t[-triggerBlockReport [-incremental] ]\n" + + "\t[-triggerBlockReport [-incremental] [-namenode ]]\n" + "\t[-listOpenFiles [-blockingDecommission] [-path ]]\n" + "\t[-help [cmd]]\n"; @@ -727,6 +727,13 @@ public int triggerBlockReport(String[] argv) throws IOException { for (int j = 1; j < argv.length; j++) { args.add(argv[j]); } + // Block report to a specific namenode + InetSocketAddress namenodeAddr = null; + String nnHostPort = StringUtils.popOptionWithArgument("-namenode", args); + if (nnHostPort != null) { + namenodeAddr = NetUtils.createSocketAddr(nnHostPort); + } + boolean incremental = StringUtils.popOption("-incremental", args); String hostPort = StringUtils.popFirstNonOption(args); if (hostPort == null) { @@ -742,6 +749,7 @@ public int triggerBlockReport(String[] argv) throws IOException { try { dnProxy.triggerBlockReport( new BlockReportOptions.Factory(). + setNamenodeAddr(namenodeAddr). setIncremental(incremental). build()); } catch (IOException e) { @@ -750,7 +758,9 @@ public int triggerBlockReport(String[] argv) throws IOException { } System.out.println("Triggering " + (incremental ? "an incremental " : "a full ") + - "block report on " + hostPort + "."); + "block report on " + hostPort + + (namenodeAddr == null ? "" : " to namenode " + nnHostPort) + + "."); return 0; } @@ -1266,7 +1276,7 @@ private void printHelp(String cmd) { + "\tbe used for checking if a datanode is alive.\n"; String triggerBlockReport = - "-triggerBlockReport [-incremental] \n" + "-triggerBlockReport [-incremental] [-namenode ]\n" + "\tTrigger a block report for the datanode.\n" + "\tIf 'incremental' is specified, it will be an incremental\n" + "\tblock report; otherwise, it will be a full block report.\n"; @@ -2176,7 +2186,7 @@ private static void printUsage(String cmd) { + " [-getDatanodeInfo ]"); } else if ("-triggerBlockReport".equals(cmd)) { System.err.println("Usage: hdfs dfsadmin" - + " [-triggerBlockReport [-incremental] ]"); + + " [-triggerBlockReport [-incremental] [-namenode ]]"); } else if ("-listOpenFiles".equals(cmd)) { System.err.println("Usage: hdfs dfsadmin" + " [-listOpenFiles [-blockingDecommission] [-path ]]"); @@ -2334,7 +2344,7 @@ public int run(String[] argv) { return exitCode; } } else if ("-triggerBlockReport".equals(cmd)) { - if ((argv.length != 2) && (argv.length != 3)) { + if ((argv.length < 2) || (argv.length > 5)) { printUsage(cmd); return exitCode; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md index 92acd3d323..fd4b2061fe 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md +++ b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md @@ -377,7 +377,7 @@ Usage: hdfs dfsadmin [-evictWriters ] hdfs dfsadmin [-getDatanodeInfo ] hdfs dfsadmin [-metasave filename] - hdfs dfsadmin [-triggerBlockReport [-incremental] ] + hdfs dfsadmin [-triggerBlockReport [-incremental] [-namenode] ] hdfs dfsadmin [-listOpenFiles [-blockingDecommission] [-path ]] hdfs dfsadmin [-help [cmd]] @@ -415,7 +415,7 @@ Usage: | `-evictWriters` \ | Make the datanode evict all clients that are writing a block. This is useful if decommissioning is hung due to slow writers. | | `-getDatanodeInfo` \ | Get the information about the given datanode. See [Rolling Upgrade document](./HdfsRollingUpgrade.html#dfsadmin_-getDatanodeInfo) for the detail. | | `-metasave` filename | Save Namenode's primary data structures to *filename* in the directory specified by hadoop.log.dir property. *filename* is overwritten if it exists. *filename* will contain one line for each of the following
1. Datanodes heart beating with Namenode
2. Blocks waiting to be replicated
3. Blocks currently being replicated
4. Blocks waiting to be deleted | -| `-triggerBlockReport` `[-incremental]` \ | Trigger a block report for the given datanode. If 'incremental' is specified, it will be otherwise, it will be a full block report. | +| `-triggerBlockReport` `[-incremental]` \ `[-namenode]` \ | Trigger a block report for the given datanode. If 'incremental' is specified, it will be otherwise, it will be a full block report. If '-namenode \:\' is given, it only sends block report to a specified namenode. | | `-listOpenFiles` `[-blockingDecommission]` `[-path ]` | List all open files currently managed by the NameNode along with client name and client machine accessing them. Open files list will be filtered by given type and path. Add -blockingDecommission option if you only want to list open files that are blocking the DataNode decommissioning. | | `-help` [cmd] | Displays help for the given command or all commands if none is specified. | diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestTriggerBlockReport.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestTriggerBlockReport.java index db08bab658..14af74df1b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestTriggerBlockReport.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestTriggerBlockReport.java @@ -29,6 +29,7 @@ import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.MiniDFSNNTopology; import org.apache.hadoop.hdfs.client.BlockReportOptions; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB; @@ -42,11 +43,13 @@ import org.junit.Test; import org.mockito.Mockito; +import java.net.InetSocketAddress; + /** * Test manually requesting that the DataNode send a block report. */ public final class TestTriggerBlockReport { - private void testTriggerBlockReport(boolean incremental) throws Exception { + private void testTriggerBlockReport(boolean incremental, boolean withSpecificNN) throws Exception { Configuration conf = new HdfsConfiguration(); // Set a really long value for dfs.blockreport.intervalMsec and @@ -57,16 +60,24 @@ private void testTriggerBlockReport(boolean incremental) throws Exception { conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1080L); final MiniDFSCluster cluster = - new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); + new MiniDFSCluster.Builder(conf).nnTopology(MiniDFSNNTopology.simpleHATopology()).numDataNodes(1).build(); cluster.waitActive(); - FileSystem fs = cluster.getFileSystem(); - DatanodeProtocolClientSideTranslatorPB spy = + cluster.transitionToActive(0); + FileSystem fs = cluster.getFileSystem(0); + DatanodeProtocolClientSideTranslatorPB spyOnNn0 = InternalDataNodeTestUtils.spyOnBposToNN( - cluster.getDataNodes().get(0), cluster.getNameNode()); + cluster.getDataNodes().get(0), cluster.getNameNode(0)); + DatanodeProtocolClientSideTranslatorPB spyOnNn1 = + InternalDataNodeTestUtils.spyOnBposToNN( + cluster.getDataNodes().get(0), cluster.getNameNode(1)); DFSTestUtil.createFile(fs, new Path("/abc"), 16, (short) 1, 1L); - // We should get 1 incremental block report. - Mockito.verify(spy, timeout(60000).times(1)).blockReceivedAndDeleted( + // We should get 1 incremental block report on both NNs. + Mockito.verify(spyOnNn0, timeout(60000).times(1)).blockReceivedAndDeleted( + any(DatanodeRegistration.class), + anyString(), + any(StorageReceivedDeletedBlocks[].class)); + Mockito.verify(spyOnNn1, timeout(60000).times(1)).blockReceivedAndDeleted( any(DatanodeRegistration.class), anyString(), any(StorageReceivedDeletedBlocks[].class)); @@ -75,12 +86,21 @@ private void testTriggerBlockReport(boolean incremental) throws Exception { // since the interval we configured is so long. for (int i = 0; i < 3; i++) { Thread.sleep(10); - Mockito.verify(spy, times(0)).blockReport( + Mockito.verify(spyOnNn0, times(0)).blockReport( any(DatanodeRegistration.class), anyString(), any(StorageBlockReport[].class), any()); - Mockito.verify(spy, times(1)).blockReceivedAndDeleted( + Mockito.verify(spyOnNn0, times(1)).blockReceivedAndDeleted( + any(DatanodeRegistration.class), + anyString(), + any(StorageReceivedDeletedBlocks[].class)); + Mockito.verify(spyOnNn1, times(0)).blockReport( + any(DatanodeRegistration.class), + anyString(), + any(StorageBlockReport[].class), + any()); + Mockito.verify(spyOnNn1, times(1)).blockReceivedAndDeleted( any(DatanodeRegistration.class), anyString(), any(StorageReceivedDeletedBlocks[].class)); @@ -91,20 +111,21 @@ private void testTriggerBlockReport(boolean incremental) throws Exception { ReceivedDeletedBlockInfo rdbi = new ReceivedDeletedBlockInfo( new Block(5678, 512, 1000), BlockStatus.DELETED_BLOCK, null); DataNode datanode = cluster.getDataNodes().get(0); - BPServiceActor actor = - datanode.getAllBpOs().get(0).getBPServiceActors().get(0); - final FsDatasetSpi dataset = datanode.getFSDataset(); - final DatanodeStorage storage; - try (FsDatasetSpi.FsVolumeReferences volumes = - dataset.getFsVolumeReferences()) { - storage = dataset.getStorage(volumes.get(0).getStorageID()); + for (BPServiceActor actor : datanode.getAllBpOs().get(0).getBPServiceActors()) { + final FsDatasetSpi dataset = datanode.getFSDataset(); + final DatanodeStorage storage; + try (FsDatasetSpi.FsVolumeReferences volumes = dataset.getFsVolumeReferences()) { + storage = dataset.getStorage(volumes.get(0).getStorageID()); + } + actor.getIbrManager().addRDBI(rdbi, storage); } - actor.getIbrManager().addRDBI(rdbi, storage); - // Manually trigger a block report. + // Only trigger block report to NN1 when testing triggering block report on specific namenode. + InetSocketAddress nnAddr = withSpecificNN ? cluster.getNameNode(1).getServiceRpcAddress() : null; datanode.triggerBlockReport( new BlockReportOptions.Factory(). + setNamenodeAddr(nnAddr). setIncremental(incremental). build() ); @@ -112,13 +133,25 @@ private void testTriggerBlockReport(boolean incremental) throws Exception { // triggerBlockReport returns before the block report is // actually sent. Wait for it to be sent here. if (incremental) { - Mockito.verify(spy, timeout(60000).times(2)). + Mockito.verify(spyOnNn1, timeout(60000).times(2)). + blockReceivedAndDeleted( + any(DatanodeRegistration.class), + anyString(), + any(StorageReceivedDeletedBlocks[].class)); + int nn0IncrBlockReport = withSpecificNN ? 1 : 2; + Mockito.verify(spyOnNn0, timeout(60000).times(nn0IncrBlockReport)). blockReceivedAndDeleted( any(DatanodeRegistration.class), anyString(), any(StorageReceivedDeletedBlocks[].class)); } else { - Mockito.verify(spy, timeout(60000)).blockReport( + Mockito.verify(spyOnNn1, timeout(60000).times(1)).blockReport( + any(DatanodeRegistration.class), + anyString(), + any(StorageBlockReport[].class), + any()); + int nn0BlockReport = withSpecificNN ? 0 : 1; + Mockito.verify(spyOnNn0, timeout(60000).times(nn0BlockReport)).blockReport( any(DatanodeRegistration.class), anyString(), any(StorageBlockReport[].class), @@ -130,11 +163,13 @@ private void testTriggerBlockReport(boolean incremental) throws Exception { @Test public void testTriggerFullBlockReport() throws Exception { - testTriggerBlockReport(false); + testTriggerBlockReport(false, false); + testTriggerBlockReport(false, true); } @Test public void testTriggerIncrementalBlockReport() throws Exception { - testTriggerBlockReport(true); + testTriggerBlockReport(true, false); + testTriggerBlockReport(true, true); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java index 90d0761a8e..fb3b58413c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java @@ -245,6 +245,31 @@ public void testGetDatanodeInfo() throws Exception { } } + @Test(timeout = 30000) + public void testTriggerBlockReport() throws Exception { + redirectStream(); + final DFSAdmin dfsAdmin = new DFSAdmin(conf); + final DataNode dn = cluster.getDataNodes().get(0); + final NameNode nn = cluster.getNameNode(); + + final String dnAddr = String.format( + "%s:%d", + dn.getXferAddress().getHostString(), + dn.getIpcPort()); + final String nnAddr = nn.getHostAndPort(); + resetStream(); + final List outs = Lists.newArrayList(); + final int ret = ToolRunner.run(dfsAdmin, + new String[]{"-triggerBlockReport", dnAddr, "-incremental", "-namenode", nnAddr}); + assertEquals(0, ret); + + scanIntoList(out, outs); + assertEquals(1, outs.size()); + assertThat(outs.get(0), + is(allOf(containsString("Triggering an incremental block report on "), + containsString(" to namenode ")))); + } + @Test(timeout = 30000) public void testGetVolumeReport() throws Exception { redirectStream();