diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java index 7262b80da4..d7114d0481 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java @@ -143,6 +143,9 @@ public class Client { private static final int DEFAULT_AM_VCORES = 1; private static final int DEFAULT_CONTAINER_MEMORY = 10; private static final int DEFAULT_CONTAINER_VCORES = 1; + + // check the application once per second. + private static final int APP_MONITOR_INTERVAL = 1000; // Configuration private Configuration conf; @@ -209,7 +212,7 @@ public class Client { private String rollingFilesPattern = ""; // Start time for client - private final long clientStartTime = System.currentTimeMillis(); + private long clientStartTime = System.currentTimeMillis(); // Timeout threshold for client. Kill app after time interval expires. private long clientTimeout = 600000; @@ -670,6 +673,8 @@ public boolean run() throws IOException, YarnException { LOG.info("Running Client"); yarnClient.start(); + // set the client start time. + clientStartTime = System.currentTimeMillis(); YarnClusterMetrics clusterMetrics = yarnClient.getYarnClusterMetrics(); LOG.info("Got Cluster metric info from ASM" @@ -983,7 +988,6 @@ public boolean run() throws IOException, YarnException { if (keepContainers) { vargs.add("--keep_containers_across_application_attempts"); } - for (Map.Entry entry : shellEnv.entrySet()) { vargs.add("--shell_env " + entry.getKey() + "=" + entry.getValue()); } @@ -1110,13 +1114,17 @@ void specifyLogAggregationContext(ApplicationSubmissionContext appContext) { private boolean monitorApplication(ApplicationId appId) throws YarnException, IOException { + boolean res = false; + boolean needForceKill = false; while (true) { - // Check app status every 1 second. try { - Thread.sleep(1000); + Thread.sleep(APP_MONITOR_INTERVAL); } catch (InterruptedException e) { - LOG.debug("Thread sleep in monitoring loop interrupted"); + LOG.warn("Thread sleep in monitoring loop interrupted"); + // if the application is to be killed when client times out; + // then set needForceKill to true + break; } // Get application report for the appId we are interested in @@ -1139,22 +1147,20 @@ private boolean monitorApplication(ApplicationId appId) FinalApplicationStatus dsStatus = report.getFinalApplicationStatus(); if (YarnApplicationState.FINISHED == state) { if (FinalApplicationStatus.SUCCEEDED == dsStatus) { - LOG.info("Application has completed successfully. Breaking monitoring loop"); - return true; + LOG.info("Application has completed successfully. " + + "Breaking monitoring loop"); + res = true; + } else { + LOG.info("Application did finished unsuccessfully. " + + "YarnState={}, DSFinalStatus={}. Breaking monitoring loop", + state, dsStatus); } - else { - LOG.info("Application did finished unsuccessfully." - + " YarnState=" + state.toString() + ", DSFinalStatus=" + dsStatus.toString() - + ". Breaking monitoring loop"); - return false; - } - } - else if (YarnApplicationState.KILLED == state + break; + } else if (YarnApplicationState.KILLED == state || YarnApplicationState.FAILED == state) { - LOG.info("Application did not finish." - + " YarnState=" + state.toString() + ", DSFinalStatus=" + dsStatus.toString() - + ". Breaking monitoring loop"); - return false; + LOG.info("Application did not finish. YarnState={}, DSFinalStatus={}. " + + "Breaking monitoring loop", state, dsStatus); + break; } // The value equal or less than 0 means no timeout @@ -1162,11 +1168,16 @@ else if (YarnApplicationState.KILLED == state && System.currentTimeMillis() > (clientStartTime + clientTimeout)) { LOG.info("Reached client specified timeout for application. " + "Killing application"); - forceKillApplication(appId); - return false; + needForceKill = true; + break; } } + if (needForceKill) { + forceKillApplication(appId); + } + + return res; } /** diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java index 8634f79827..f778785c19 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java @@ -107,6 +107,7 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; +import org.junit.rules.TestName; import org.junit.rules.Timeout; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -139,6 +140,13 @@ public class TestDistributedShell { @Rule public TemporaryFolder tmpFolder = new TemporaryFolder(); + @Rule + public TestName name = new TestName(); + + private String generateAppName() { + return name.getMethodName().replaceFirst("test", ""); + } + @Before public void setup() throws Exception { setupInternal(NUM_NMS, timelineVersionWatcher.getTimelineVersion()); @@ -737,6 +745,8 @@ protected String getSleepCommand(int sec) { @Test public void testDSRestartWithPreviousRunningContainers() throws Exception { String[] args = { + "--appname", + generateAppName(), "--jar", APPMASTER_JAR, "--num_containers", @@ -772,6 +782,8 @@ public void testDSRestartWithPreviousRunningContainers() throws Exception { @Test public void testDSAttemptFailuresValidityIntervalSucess() throws Exception { String[] args = { + "--appname", + generateAppName(), "--jar", APPMASTER_JAR, "--num_containers", @@ -810,6 +822,8 @@ public void testDSAttemptFailuresValidityIntervalSucess() throws Exception { @Test public void testDSAttemptFailuresValidityIntervalFailed() throws Exception { String[] args = { + "--appname", + generateAppName(), "--jar", APPMASTER_JAR, "--num_containers", @@ -857,6 +871,8 @@ public void testDSShellWithCustomLogPropertyFile() throws Exception { fileWriter.write("log4j.rootLogger=debug,stdout"); fileWriter.close(); String[] args = { + "--appname", + generateAppName(), "--jar", APPMASTER_JAR, "--num_containers", @@ -906,6 +922,8 @@ public void testDSShellWithCustomLogPropertyFile() throws Exception { public void testSpecifyingLogAggregationContext() throws Exception { String regex = ".*(foo|bar)\\d"; String[] args = { + "--appname", + generateAppName(), "--jar", APPMASTER_JAR, "--shell_command", @@ -928,6 +946,8 @@ public void testSpecifyingLogAggregationContext() throws Exception { public void testDSShellWithCommands() throws Exception { String[] args = { + "--appname", + generateAppName(), "--jar", APPMASTER_JAR, "--num_containers", @@ -960,6 +980,8 @@ public void testDSShellWithCommands() throws Exception { @Test public void testDSShellWithMultipleArgs() throws Exception { String[] args = { + "--appname", + generateAppName(), "--jar", APPMASTER_JAR, "--num_containers", @@ -1010,6 +1032,8 @@ public void testDSShellWithShellScript() throws Exception { fileWriter.close(); System.out.println(customShellScript.getAbsolutePath()); String[] args = { + "--appname", + generateAppName(), "--jar", APPMASTER_JAR, "--num_containers", @@ -1055,6 +1079,8 @@ public void testDSShellWithInvalidArgs() throws Exception { LOG.info("Initializing DS Client with no jar file"); try { String[] args = { + "--appname", + generateAppName(), "--num_containers", "2", "--shell_command", @@ -1263,6 +1289,8 @@ protected void waitForNMsToRegister() throws Exception { @Test public void testContainerLaunchFailureHandling() throws Exception { String[] args = { + "--appname", + generateAppName(), "--jar", APPMASTER_JAR, "--num_containers", @@ -1291,6 +1319,8 @@ public void testContainerLaunchFailureHandling() throws Exception { @Test public void testDebugFlag() throws Exception { String[] args = { + "--appname", + generateAppName(), "--jar", APPMASTER_JAR, "--num_containers", @@ -1388,14 +1418,18 @@ private int verifyContainerLog(int containerNum, @Test public void testDistributedShellResourceProfiles() throws Exception { + String appName = generateAppName(); String[][] args = { - {"--jar", APPMASTER_JAR, "--num_containers", "1", "--shell_command", + {"--appname", appName + "-0", "--jar", APPMASTER_JAR, + "--num_containers", "1", "--shell_command", Shell.WINDOWS ? "dir" : "ls", "--container_resource_profile", "maximum" }, - {"--jar", APPMASTER_JAR, "--num_containers", "1", "--shell_command", + {"--appname", appName + "-1", "--jar", APPMASTER_JAR, + "--num_containers", "1", "--shell_command", Shell.WINDOWS ? "dir" : "ls", "--master_resource_profile", "default" }, - {"--jar", APPMASTER_JAR, "--num_containers", "1", "--shell_command", + {"--appname", appName + "-2", "--jar", APPMASTER_JAR, + "--num_containers", "1", "--shell_command", Shell.WINDOWS ? "dir" : "ls", "--master_resource_profile", "default", "--container_resource_profile", "maximum" } }; @@ -1419,6 +1453,8 @@ public void testDSShellWithOpportunisticContainers() throws Exception { Client client = new Client(new Configuration(yarnCluster.getConfig())); try { String[] args = { + "--appname", + generateAppName(), "--jar", APPMASTER_JAR, "--num_containers", @@ -1449,6 +1485,8 @@ public void testDSShellWithEnforceExecutionType() throws Exception { Client client = new Client(new Configuration(yarnCluster.getConfig())); try { String[] args = { + "--appname", + generateAppName(), "--jar", APPMASTER_JAR, "--num_containers", @@ -1569,6 +1607,8 @@ public void doTestDistributedShellWithResources(boolean largeContainers) } String[] args = { + "--appname", + generateAppName(), "--jar", APPMASTER_JAR, "--num_containers", @@ -1650,6 +1690,8 @@ public void run() { public void testDistributedShellAMResourcesWithIllegalArguments() throws Exception { String[] args = { + "--appname", + generateAppName(), "--jar", APPMASTER_JAR, "--num_containers", @@ -1667,6 +1709,8 @@ public void testDistributedShellAMResourcesWithIllegalArguments() public void testDistributedShellAMResourcesWithMissingArgumentValue() throws Exception { String[] args = { + "--appname", + generateAppName(), "--jar", APPMASTER_JAR, "--num_containers", @@ -1683,6 +1727,8 @@ public void testDistributedShellAMResourcesWithMissingArgumentValue() public void testDistributedShellAMResourcesWithUnknownResource() throws Exception { String[] args = { + "--appname", + generateAppName(), "--jar", APPMASTER_JAR, "--num_containers", @@ -1701,6 +1747,8 @@ public void testDistributedShellAMResourcesWithUnknownResource() public void testDistributedShellNonExistentQueue() throws Exception { String[] args = { + "--appname", + generateAppName(), "--jar", APPMASTER_JAR, "--num_containers", @@ -1719,6 +1767,8 @@ public void testDistributedShellNonExistentQueue() public void testDistributedShellWithSingleFileLocalization() throws Exception { String[] args = { + "--appname", + generateAppName(), "--jar", APPMASTER_JAR, "--num_containers", @@ -1740,6 +1790,8 @@ public void testDistributedShellWithSingleFileLocalization() public void testDistributedShellWithMultiFileLocalization() throws Exception { String[] args = { + "--appname", + generateAppName(), "--jar", APPMASTER_JAR, "--num_containers", @@ -1761,6 +1813,8 @@ public void testDistributedShellWithMultiFileLocalization() public void testDistributedShellWithNonExistentFileLocalization() throws Exception { String[] args = { + "--appname", + generateAppName(), "--jar", APPMASTER_JAR, "--num_containers", @@ -1784,14 +1838,14 @@ public void testDistributedShellCleanup() throws Exception { String appName = "DistributedShellCleanup"; String[] args = { + "--appname", + generateAppName(), "--jar", APPMASTER_JAR, "--num_containers", "1", "--shell_command", - Shell.WINDOWS ? "dir" : "ls", - "--appname", - appName + Shell.WINDOWS ? "dir" : "ls" }; Configuration config = new Configuration(yarnCluster.getConfig()); Client client = new Client(config);