From 7a88f45366722932211514a9ce0c13492a0bd576 Mon Sep 17 00:00:00 2001 From: Ahmed Hussein <50450311+amahussein@users.noreply.github.com> Date: Thu, 17 Dec 2020 18:13:28 -0500 Subject: [PATCH] YARN-10536. Client in distributedShell swallows interrupt exceptions (#2554) --- .../applications/distributedshell/Client.java | 53 +++++++++------ .../TestDistributedShell.java | 66 +++++++++++++++++-- 2 files changed, 92 insertions(+), 27 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java index 7262b80da4..d7114d0481 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java @@ -143,6 +143,9 @@ public class Client { private static final int DEFAULT_AM_VCORES = 1; private static final int DEFAULT_CONTAINER_MEMORY = 10; private static final int DEFAULT_CONTAINER_VCORES = 1; + + // check the application once per second. + private static final int APP_MONITOR_INTERVAL = 1000; // Configuration private Configuration conf; @@ -209,7 +212,7 @@ public class Client { private String rollingFilesPattern = ""; // Start time for client - private final long clientStartTime = System.currentTimeMillis(); + private long clientStartTime = System.currentTimeMillis(); // Timeout threshold for client. Kill app after time interval expires. private long clientTimeout = 600000; @@ -670,6 +673,8 @@ public boolean run() throws IOException, YarnException { LOG.info("Running Client"); yarnClient.start(); + // set the client start time. + clientStartTime = System.currentTimeMillis(); YarnClusterMetrics clusterMetrics = yarnClient.getYarnClusterMetrics(); LOG.info("Got Cluster metric info from ASM" @@ -983,7 +988,6 @@ public boolean run() throws IOException, YarnException { if (keepContainers) { vargs.add("--keep_containers_across_application_attempts"); } - for (Map.Entry entry : shellEnv.entrySet()) { vargs.add("--shell_env " + entry.getKey() + "=" + entry.getValue()); } @@ -1110,13 +1114,17 @@ void specifyLogAggregationContext(ApplicationSubmissionContext appContext) { private boolean monitorApplication(ApplicationId appId) throws YarnException, IOException { + boolean res = false; + boolean needForceKill = false; while (true) { - // Check app status every 1 second. try { - Thread.sleep(1000); + Thread.sleep(APP_MONITOR_INTERVAL); } catch (InterruptedException e) { - LOG.debug("Thread sleep in monitoring loop interrupted"); + LOG.warn("Thread sleep in monitoring loop interrupted"); + // if the application is to be killed when client times out; + // then set needForceKill to true + break; } // Get application report for the appId we are interested in @@ -1139,22 +1147,20 @@ private boolean monitorApplication(ApplicationId appId) FinalApplicationStatus dsStatus = report.getFinalApplicationStatus(); if (YarnApplicationState.FINISHED == state) { if (FinalApplicationStatus.SUCCEEDED == dsStatus) { - LOG.info("Application has completed successfully. Breaking monitoring loop"); - return true; + LOG.info("Application has completed successfully. " + + "Breaking monitoring loop"); + res = true; + } else { + LOG.info("Application did finished unsuccessfully. " + + "YarnState={}, DSFinalStatus={}. Breaking monitoring loop", + state, dsStatus); } - else { - LOG.info("Application did finished unsuccessfully." - + " YarnState=" + state.toString() + ", DSFinalStatus=" + dsStatus.toString() - + ". Breaking monitoring loop"); - return false; - } - } - else if (YarnApplicationState.KILLED == state + break; + } else if (YarnApplicationState.KILLED == state || YarnApplicationState.FAILED == state) { - LOG.info("Application did not finish." - + " YarnState=" + state.toString() + ", DSFinalStatus=" + dsStatus.toString() - + ". Breaking monitoring loop"); - return false; + LOG.info("Application did not finish. YarnState={}, DSFinalStatus={}. " + + "Breaking monitoring loop", state, dsStatus); + break; } // The value equal or less than 0 means no timeout @@ -1162,11 +1168,16 @@ else if (YarnApplicationState.KILLED == state && System.currentTimeMillis() > (clientStartTime + clientTimeout)) { LOG.info("Reached client specified timeout for application. " + "Killing application"); - forceKillApplication(appId); - return false; + needForceKill = true; + break; } } + if (needForceKill) { + forceKillApplication(appId); + } + + return res; } /** diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java index 41ba8dfa36..438b12bca4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java @@ -107,6 +107,7 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; +import org.junit.rules.TestName; import org.junit.rules.Timeout; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -139,6 +140,13 @@ public class TestDistributedShell { @Rule public TemporaryFolder tmpFolder = new TemporaryFolder(); + @Rule + public TestName name = new TestName(); + + private String generateAppName() { + return name.getMethodName().replaceFirst("test", ""); + } + @Before public void setup() throws Exception { setupInternal(NUM_NMS, timelineVersionWatcher.getTimelineVersion(), @@ -738,6 +746,8 @@ protected String getSleepCommand(int sec) { @Test public void testDSRestartWithPreviousRunningContainers() throws Exception { String[] args = { + "--appname", + generateAppName(), "--jar", APPMASTER_JAR, "--num_containers", @@ -773,6 +783,8 @@ public void testDSRestartWithPreviousRunningContainers() throws Exception { @Test public void testDSAttemptFailuresValidityIntervalSucess() throws Exception { String[] args = { + "--appname", + generateAppName(), "--jar", APPMASTER_JAR, "--num_containers", @@ -811,6 +823,8 @@ public void testDSAttemptFailuresValidityIntervalSucess() throws Exception { @Test public void testDSAttemptFailuresValidityIntervalFailed() throws Exception { String[] args = { + "--appname", + generateAppName(), "--jar", APPMASTER_JAR, "--num_containers", @@ -858,6 +872,8 @@ public void testDSShellWithCustomLogPropertyFile() throws Exception { fileWriter.write("log4j.rootLogger=debug,stdout"); fileWriter.close(); String[] args = { + "--appname", + generateAppName(), "--jar", APPMASTER_JAR, "--num_containers", @@ -907,6 +923,8 @@ public void testDSShellWithCustomLogPropertyFile() throws Exception { public void testSpecifyingLogAggregationContext() throws Exception { String regex = ".*(foo|bar)\\d"; String[] args = { + "--appname", + generateAppName(), "--jar", APPMASTER_JAR, "--shell_command", @@ -929,6 +947,8 @@ public void testSpecifyingLogAggregationContext() throws Exception { public void testDSShellWithCommands() throws Exception { String[] args = { + "--appname", + generateAppName(), "--jar", APPMASTER_JAR, "--num_containers", @@ -961,6 +981,8 @@ public void testDSShellWithCommands() throws Exception { @Test public void testDSShellWithMultipleArgs() throws Exception { String[] args = { + "--appname", + generateAppName(), "--jar", APPMASTER_JAR, "--num_containers", @@ -1011,6 +1033,8 @@ public void testDSShellWithShellScript() throws Exception { fileWriter.close(); System.out.println(customShellScript.getAbsolutePath()); String[] args = { + "--appname", + generateAppName(), "--jar", APPMASTER_JAR, "--num_containers", @@ -1056,6 +1080,8 @@ public void testDSShellWithInvalidArgs() throws Exception { LOG.info("Initializing DS Client with no jar file"); try { String[] args = { + "--appname", + generateAppName(), "--num_containers", "2", "--shell_command", @@ -1264,6 +1290,8 @@ protected void waitForNMsToRegister() throws Exception { @Test public void testContainerLaunchFailureHandling() throws Exception { String[] args = { + "--appname", + generateAppName(), "--jar", APPMASTER_JAR, "--num_containers", @@ -1292,6 +1320,8 @@ public void testContainerLaunchFailureHandling() throws Exception { @Test public void testDebugFlag() throws Exception { String[] args = { + "--appname", + generateAppName(), "--jar", APPMASTER_JAR, "--num_containers", @@ -1389,14 +1419,18 @@ private int verifyContainerLog(int containerNum, @Test public void testDistributedShellResourceProfiles() throws Exception { + String appName = generateAppName(); String[][] args = { - {"--jar", APPMASTER_JAR, "--num_containers", "1", "--shell_command", + {"--appname", appName + "-0", "--jar", APPMASTER_JAR, + "--num_containers", "1", "--shell_command", Shell.WINDOWS ? "dir" : "ls", "--container_resource_profile", "maximum" }, - {"--jar", APPMASTER_JAR, "--num_containers", "1", "--shell_command", + {"--appname", appName + "-1", "--jar", APPMASTER_JAR, + "--num_containers", "1", "--shell_command", Shell.WINDOWS ? "dir" : "ls", "--master_resource_profile", "default" }, - {"--jar", APPMASTER_JAR, "--num_containers", "1", "--shell_command", + {"--appname", appName + "-2", "--jar", APPMASTER_JAR, + "--num_containers", "1", "--shell_command", Shell.WINDOWS ? "dir" : "ls", "--master_resource_profile", "default", "--container_resource_profile", "maximum" } }; @@ -1420,6 +1454,8 @@ public void testDSShellWithOpportunisticContainers() throws Exception { Client client = new Client(new Configuration(yarnCluster.getConfig())); try { String[] args = { + "--appname", + generateAppName(), "--jar", APPMASTER_JAR, "--num_containers", @@ -1450,6 +1486,8 @@ public void testDSShellWithEnforceExecutionType() throws Exception { Client client = new Client(new Configuration(yarnCluster.getConfig())); try { String[] args = { + "--appname", + generateAppName(), "--jar", APPMASTER_JAR, "--num_containers", @@ -1570,6 +1608,8 @@ public void doTestDistributedShellWithResources(boolean largeContainers) } String[] args = { + "--appname", + generateAppName(), "--jar", APPMASTER_JAR, "--num_containers", @@ -1651,6 +1691,8 @@ public void run() { public void testDistributedShellAMResourcesWithIllegalArguments() throws Exception { String[] args = { + "--appname", + generateAppName(), "--jar", APPMASTER_JAR, "--num_containers", @@ -1668,6 +1710,8 @@ public void testDistributedShellAMResourcesWithIllegalArguments() public void testDistributedShellAMResourcesWithMissingArgumentValue() throws Exception { String[] args = { + "--appname", + generateAppName(), "--jar", APPMASTER_JAR, "--num_containers", @@ -1684,6 +1728,8 @@ public void testDistributedShellAMResourcesWithMissingArgumentValue() public void testDistributedShellAMResourcesWithUnknownResource() throws Exception { String[] args = { + "--appname", + generateAppName(), "--jar", APPMASTER_JAR, "--num_containers", @@ -1702,6 +1748,8 @@ public void testDistributedShellAMResourcesWithUnknownResource() public void testDistributedShellNonExistentQueue() throws Exception { String[] args = { + "--appname", + generateAppName(), "--jar", APPMASTER_JAR, "--num_containers", @@ -1720,6 +1768,8 @@ public void testDistributedShellNonExistentQueue() public void testDistributedShellWithSingleFileLocalization() throws Exception { String[] args = { + "--appname", + generateAppName(), "--jar", APPMASTER_JAR, "--num_containers", @@ -1741,6 +1791,8 @@ public void testDistributedShellWithSingleFileLocalization() public void testDistributedShellWithMultiFileLocalization() throws Exception { String[] args = { + "--appname", + generateAppName(), "--jar", APPMASTER_JAR, "--num_containers", @@ -1762,6 +1814,8 @@ public void testDistributedShellWithMultiFileLocalization() public void testDistributedShellWithNonExistentFileLocalization() throws Exception { String[] args = { + "--appname", + generateAppName(), "--jar", APPMASTER_JAR, "--num_containers", @@ -1785,14 +1839,14 @@ public void testDistributedShellCleanup() throws Exception { String appName = "DistributedShellCleanup"; String[] args = { + "--appname", + generateAppName(), "--jar", APPMASTER_JAR, "--num_containers", "1", "--shell_command", - Shell.WINDOWS ? "dir" : "ls", - "--appname", - appName + Shell.WINDOWS ? "dir" : "ls" }; Configuration config = new Configuration(yarnCluster.getConfig()); Client client = new Client(config);