From e16ae55833e657087903281ea05636e1ffa2ec3e Mon Sep 17 00:00:00 2001 From: Viraj Jasani Date: Tue, 10 May 2022 17:10:03 -0700 Subject: [PATCH] HDFS-16568. dfsadmin -reconfig option to start/query reconfig on all live datanodes (#4264) Signed-off-by: Tao Li --- .../apache/hadoop/hdfs/tools/DFSAdmin.java | 122 ++++++++++++++++-- .../src/site/markdown/HDFSCommands.md | 4 +- .../src/site/markdown/HdfsUserGuide.md | 8 +- .../hadoop/hdfs/tools/TestDFSAdmin.java | 62 +++++++-- 4 files changed, 171 insertions(+), 25 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java index 38d6c1c371..982bca19c5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java @@ -36,7 +36,10 @@ import java.util.Map; import java.util.Optional; import java.util.TreeSet; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.thirdparty.com.google.common.base.Joiner; @@ -447,8 +450,7 @@ static int run(DistributedFileSystem dfs, String[] argv, int idx) throws IOExcep "\t[-refreshSuperUserGroupsConfiguration]\n" + "\t[-refreshCallQueue]\n" + "\t[-refresh [arg1..argn]\n" + - "\t[-reconfig " + - "]\n" + + "\t[-reconfig ]\n" + "\t[-printTopology]\n" + "\t[-refreshNamenodes datanode_host:ipc_port]\n" + "\t[-getVolumeReport datanode_host:ipc_port]\n" + @@ -1199,12 +1201,14 @@ private void printHelp(String cmd) { String refreshCallQueue = "-refreshCallQueue: Reload the call queue from config\n"; - String reconfig = "-reconfig " + + String reconfig = "-reconfig " + ":\n" + "\tStarts or gets the status of a reconfiguration operation, \n" + "\tor gets a list of reconfigurable properties.\n" + - - "\tThe second parameter specifies the node type\n"; + "\tThe second parameter specifies the node type\n" + + "\tThe third parameter specifies host address. For start or status, \n" + + "\tdatanode supports livenodes as third parameter, which will start \n" + + "\tor retrieve reconfiguration on all live datanodes."; String genericRefresh = "-refresh: Arguments are " + " [arg1..argn]\n" + "\tTriggers a runtime-refresh of the resource specified by " + @@ -1844,15 +1848,15 @@ public int refreshCallQueue() throws IOException { return 0; } - public int reconfig(String[] argv, int i) throws IOException { + public int reconfig(String[] argv, int i) throws IOException, InterruptedException { String nodeType = argv[i]; String address = argv[i + 1]; String op = argv[i + 2]; if ("start".equals(op)) { - return startReconfiguration(nodeType, address, System.out, System.err); + return startReconfigurationUtil(nodeType, address, System.out, System.err); } else if ("status".equals(op)) { - return getReconfigurationStatus(nodeType, address, System.out, System.err); + return getReconfigurationStatusUtil(nodeType, address, System.out, System.err); } else if ("properties".equals(op)) { return getReconfigurableProperties(nodeType, address, System.out, System.err); @@ -1862,12 +1866,57 @@ public int reconfig(String[] argv, int i) throws IOException { } int startReconfiguration(final String nodeThpe, final String address) - throws IOException { - return startReconfiguration(nodeThpe, address, System.out, System.err); + throws IOException, InterruptedException { + return startReconfigurationUtil(nodeThpe, address, System.out, System.err); + } + + int startReconfigurationUtil(final String nodeType, final String address, final PrintStream out, + final PrintStream err) throws IOException, InterruptedException { + if (!"livenodes".equals(address)) { + return startReconfiguration(nodeType, address, out, err); + } + if (!"datanode".equals(nodeType)) { + err.println("Only datanode type supports reconfiguration in bulk."); + return 1; + } + ExecutorService executorService = Executors.newFixedThreadPool(5); + DistributedFileSystem dfs = getDFS(); + DatanodeInfo[] nodes = dfs.getDataNodeStats(DatanodeReportType.LIVE); + AtomicInteger successCount = new AtomicInteger(); + AtomicInteger failCount = new AtomicInteger(); + if (nodes != null) { + for (DatanodeInfo node : nodes) { + executorService.submit(() -> { + int status = startReconfiguration(nodeType, node.getIpcAddr(false), out, err); + if (status == 0) { + successCount.incrementAndGet(); + } else { + failCount.incrementAndGet(); + } + }); + } + while ((successCount.get() + failCount.get()) < nodes.length) { + Thread.sleep(1000); + } + executorService.shutdown(); + if (!executorService.awaitTermination(1, TimeUnit.MINUTES)) { + err.println("Executor service could not be terminated in 60s. Please wait for" + + " sometime before the system cools down."); + } + out.println("Starting of reconfiguration task successful on " + successCount.get() + + " nodes, failed on " + failCount.get() + " nodes."); + if (failCount.get() == 0) { + return 0; + } else { + return 1; + } + } + err.println("DFS datanode stats could not be retrieved."); + return 1; } int startReconfiguration(final String nodeType, final String address, - final PrintStream out, final PrintStream err) throws IOException { + final PrintStream out, final PrintStream err) { String outMsg = null; String errMsg = null; int ret = 0; @@ -1908,8 +1957,53 @@ int startReconfigurationDispatch(final String nodeType, } } - int getReconfigurationStatus(final String nodeType, final String address, - final PrintStream out, final PrintStream err) throws IOException { + int getReconfigurationStatusUtil(final String nodeType, final String address, + final PrintStream out, final PrintStream err) throws IOException, InterruptedException { + if (!"livenodes".equals(address)) { + return getReconfigurationStatus(nodeType, address, out, err); + } + if (!"datanode".equals(nodeType)) { + err.println("Only datanode type supports reconfiguration in bulk."); + return 1; + } + ExecutorService executorService = Executors.newFixedThreadPool(5); + DistributedFileSystem dfs = getDFS(); + DatanodeInfo[] nodes = dfs.getDataNodeStats(DatanodeReportType.LIVE); + AtomicInteger successCount = new AtomicInteger(); + AtomicInteger failCount = new AtomicInteger(); + if (nodes != null) { + for (DatanodeInfo node : nodes) { + executorService.submit(() -> { + int status = getReconfigurationStatus(nodeType, node.getIpcAddr(false), out, err); + if (status == 0) { + successCount.incrementAndGet(); + } else { + failCount.incrementAndGet(); + } + }); + } + while ((successCount.get() + failCount.get()) < nodes.length) { + Thread.sleep(1000); + } + executorService.shutdown(); + if (!executorService.awaitTermination(1, TimeUnit.MINUTES)) { + err.println("Executor service could not be terminated in 60s. Please wait for" + + " sometime before the system cools down."); + } + out.println("Retrieval of reconfiguration status successful on " + successCount.get() + + " nodes, failed on " + failCount.get() + " nodes."); + if (failCount.get() == 0) { + return 0; + } else { + return 1; + } + } + err.println("DFS datanode stats could not be retrieved."); + return 1; + } + + int getReconfigurationStatus(final String nodeType, final String address, final PrintStream out, + final PrintStream err) { String outMsg = null; String errMsg = null; ReconfigurationTaskStatus status = null; @@ -2152,7 +2246,7 @@ private static void printUsage(String cmd) { + " [-refreshCallQueue]"); } else if ("-reconfig".equals(cmd)) { System.err.println("Usage: hdfs dfsadmin" - + " [-reconfig " + + " [-reconfig " + "]"); } else if ("-refresh".equals(cmd)) { System.err.println("Usage: hdfs dfsadmin" diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md index b21a1f14e2..090d03a6d8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md +++ b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md @@ -363,7 +363,7 @@ Usage: hdfs dfsadmin [-refreshSuperUserGroupsConfiguration] hdfs dfsadmin [-refreshCallQueue] hdfs dfsadmin [-refresh [arg1..argn]] - hdfs dfsadmin [-reconfig ] + hdfs dfsadmin [-reconfig ] hdfs dfsadmin [-printTopology] hdfs dfsadmin [-refreshNamenodes datanodehost:port] hdfs dfsadmin [-getVolumeReport datanodehost:port] @@ -401,7 +401,7 @@ Usage: | `-refreshSuperUserGroupsConfiguration` | Refresh superuser proxy groups mappings | | `-refreshCallQueue` | Reload the call queue from config. | | `-refresh` \ \ [arg1..argn] | Triggers a runtime-refresh of the resource specified by \ on \. All other args after are sent to the host. | -| `-reconfig` \ \ \ | Starts reconfiguration or gets the status of an ongoing reconfiguration, or gets a list of reconfigurable properties. The second parameter specifies the node type. | +| `-reconfig` \ \ \ | Starts reconfiguration or gets the status of an ongoing reconfiguration, or gets a list of reconfigurable properties. The second parameter specifies the node type. The third parameter specifies host address. For start or status, datanode supports livenodes as third parameter, which will start or retrieve reconfiguration on all live datanodes. | | `-printTopology` | Print a tree of the racks and their nodes as reported by the Namenode | | `-refreshNamenodes` datanodehost:port | For the given datanode, reloads the configuration files, stops serving the removed block-pools and starts serving new block-pools. | | `-getVolumeReport` datanodehost:port | For the given datanode, get the volume report. | diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HdfsUserGuide.md b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HdfsUserGuide.md index 54a8056068..16970730c9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HdfsUserGuide.md +++ b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HdfsUserGuide.md @@ -351,7 +351,13 @@ Datanode supports hot swappable drives. The user can add or replace HDFS data vo * The user runs `dfsadmin -reconfig datanode HOST:PORT start` to start the reconfiguration process. The user can use `dfsadmin -reconfig datanode HOST:PORT status` - to query the running status of the reconfiguration task. + to query the running status of the reconfiguration task. In place of + HOST:PORT, we can also specify livenodes for datanode. It would allow + start or query reconfiguration on all live datanodes, whereas specifying + HOST:PORT would only allow start or query of reconfiguration on the + particular datanode represented by HOST:PORT. The examples for livenodes + queries are `dfsadmin -reconfig datanode livenodes start` and + `dfsadmin -reconfig datanode livenodes status`. * Once the reconfiguration task has completed, the user can safely `umount` the removed data volume directories and physically remove the disks. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java index f8d72ca1fd..525503b621 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java @@ -177,20 +177,20 @@ private void restartCluster() throws IOException { } private void getReconfigurableProperties(String nodeType, String address, - final List outs, final List errs) throws IOException { + final List outs, final List errs) throws IOException, InterruptedException { reconfigurationOutErrFormatter("getReconfigurableProperties", nodeType, address, outs, errs); } private void getReconfigurationStatus(String nodeType, String address, - final List outs, final List errs) throws IOException { + final List outs, final List errs) throws IOException, InterruptedException { reconfigurationOutErrFormatter("getReconfigurationStatus", nodeType, address, outs, errs); } private void reconfigurationOutErrFormatter(String methodName, String nodeType, String address, final List outs, - final List errs) throws IOException { + final List errs) throws IOException, InterruptedException { ByteArrayOutputStream bufOut = new ByteArrayOutputStream(); PrintStream outStream = new PrintStream(bufOut); ByteArrayOutputStream bufErr = new ByteArrayOutputStream(); @@ -203,9 +203,9 @@ private void reconfigurationOutErrFormatter(String methodName, outStream, errStream); } else if (methodName.equals("getReconfigurationStatus")) { - admin.getReconfigurationStatus(nodeType, address, outStream, errStream); + admin.getReconfigurationStatusUtil(nodeType, address, outStream, errStream); } else if (methodName.equals("startReconfiguration")) { - admin.startReconfiguration(nodeType, address, outStream, errStream); + admin.startReconfigurationUtil(nodeType, address, outStream, errStream); } scanIntoList(bufOut, outs); @@ -326,7 +326,7 @@ public void testDFSAdminUnreachableDatanode() throws Exception { } @Test(timeout = 30000) - public void testDataNodeGetReconfigurableProperties() throws IOException { + public void testDataNodeGetReconfigurableProperties() throws IOException, InterruptedException { final int port = datanode.getIpcPort(); final String address = "localhost:" + port; final List outs = Lists.newArrayList(); @@ -422,7 +422,7 @@ public void testDataNodeGetReconfigurationStatus() throws IOException, } @Test(timeout = 30000) - public void testNameNodeGetReconfigurableProperties() throws IOException { + public void testNameNodeGetReconfigurableProperties() throws IOException, InterruptedException { final String address = namenode.getHostAndPort(); final List outs = Lists.newArrayList(); final List errs = Lists.newArrayList(); @@ -452,7 +452,7 @@ public Boolean get() { errs.clear(); try { getReconfigurationStatus(nodeType, address, outs, errs); - } catch (IOException e) { + } catch (IOException | InterruptedException e) { LOG.error(String.format( "call getReconfigurationStatus on %s[%s] failed.", nodeType, address), e); @@ -1062,4 +1062,50 @@ public Integer run() throws Exception { } }); } + + @Test + public void testAllDatanodesReconfig() + throws IOException, InterruptedException, TimeoutException { + ReconfigurationUtil reconfigurationUtil = mock(ReconfigurationUtil.class); + cluster.getDataNodes().get(0).setReconfigurationUtil(reconfigurationUtil); + cluster.getDataNodes().get(1).setReconfigurationUtil(reconfigurationUtil); + + List changes = new ArrayList<>(); + changes.add(new ReconfigurationUtil.PropertyChange( + DFS_DATANODE_PEER_STATS_ENABLED_KEY, "true", + datanode.getConf().get(DFS_DATANODE_PEER_STATS_ENABLED_KEY))); + when(reconfigurationUtil.parseChangedProperties(any(Configuration.class), + any(Configuration.class))).thenReturn(changes); + + assertEquals(0, admin.startReconfiguration("datanode", "livenodes")); + final List outsForStartReconf = new ArrayList<>(); + final List errsForStartReconf = new ArrayList<>(); + reconfigurationOutErrFormatter("startReconfiguration", "datanode", + "livenodes", outsForStartReconf, errsForStartReconf); + assertEquals(3, outsForStartReconf.size()); + assertEquals(0, errsForStartReconf.size()); + assertTrue(outsForStartReconf.get(0).startsWith("Started reconfiguration task on node")); + assertTrue(outsForStartReconf.get(1).startsWith("Started reconfiguration task on node")); + assertEquals("Starting of reconfiguration task successful on 2 nodes, failed on 0 nodes.", + outsForStartReconf.get(2)); + + Thread.sleep(1000); + final List outs = new ArrayList<>(); + final List errs = new ArrayList<>(); + awaitReconfigurationFinished("datanode", "livenodes", outs, errs); + assertEquals(9, outs.size()); + assertEquals(0, errs.size()); + LOG.info("dfsadmin -status -livenodes output:"); + outs.forEach(s -> LOG.info("{}", s)); + assertTrue(outs.get(0).startsWith("Reconfiguring status for node")); + assertEquals("SUCCESS: Changed property dfs.datanode.peer.stats.enabled", outs.get(2)); + assertEquals("\tFrom: \"false\"", outs.get(3)); + assertEquals("\tTo: \"true\"", outs.get(4)); + assertEquals("SUCCESS: Changed property dfs.datanode.peer.stats.enabled", outs.get(5)); + assertEquals("\tFrom: \"false\"", outs.get(6)); + assertEquals("\tTo: \"true\"", outs.get(7)); + assertEquals("Retrieval of reconfiguration status successful on 2 nodes, failed on 0 nodes.", + outs.get(8)); + } + } \ No newline at end of file