diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 2734df5e1a..d108c8e210 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -558,6 +558,9 @@ Release 2.4.0 - UNRELEASED YARN-1776. Fixed DelegationToken renewal to survive RM failover. (Zhijie Shen via jianhe) + YARN-1577. Made UnmanagedAMLauncher do launchAM after the attempt reaches + the LAUNCHED state. (Jian He via zjshen) + Release 2.3.1 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/main/java/org/apache/hadoop/yarn/applications/unmanagedamlauncher/UnmanagedAMLauncher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/main/java/org/apache/hadoop/yarn/applications/unmanagedamlauncher/UnmanagedAMLauncher.java index f75358a85a..2414a6777f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/main/java/org/apache/hadoop/yarn/applications/unmanagedamlauncher/UnmanagedAMLauncher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/main/java/org/apache/hadoop/yarn/applications/unmanagedamlauncher/UnmanagedAMLauncher.java @@ -44,6 +44,7 @@ import org.apache.hadoop.security.token.Token; import org.apache.hadoop.yarn.api.ApplicationConstants; import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; @@ -51,6 +52,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -77,7 +79,7 @@ public class UnmanagedAMLauncher { private Configuration conf; // Handle to talk to the Resource Manager/Applications Manager - private YarnClient rmClient; + protected YarnClient rmClient; // Application master specific info to register a new Application with RM/ASM private String appName = ""; @@ -92,6 +94,7 @@ public class UnmanagedAMLauncher { private volatile boolean amCompleted = false; + private static final long AM_STATE_WAIT_TIMEOUT_MS = 10000; /** * @param args * Command line arguments @@ -173,12 +176,6 @@ public class UnmanagedAMLauncher { public void launchAM(ApplicationAttemptId attemptId) throws IOException, YarnException { - ApplicationReport report = - rmClient.getApplicationReport(attemptId.getApplicationId()); - if (report.getYarnApplicationState() != YarnApplicationState.ACCEPTED) { - throw new YarnException( - "Umanaged AM must be in ACCEPTED state before launching"); - } Credentials credentials = new Credentials(); Token token = rmClient.getAMRMToken(attemptId.getApplicationId()); @@ -338,20 +335,27 @@ public class UnmanagedAMLauncher { // Submit the application to the applications manager LOG.info("Submitting application to ASM"); rmClient.submitApplication(appContext); - - // Monitor the application to wait for launch state - ApplicationReport appReport = monitorApplication(appId, - EnumSet.of(YarnApplicationState.ACCEPTED)); - ApplicationAttemptId attemptId = appReport.getCurrentApplicationAttemptId(); - LOG.info("Launching application with id: " + attemptId); - - // launch AM - launchAM(attemptId); - - // Monitor the application for end state - appReport = monitorApplication(appId, EnumSet.of( - YarnApplicationState.KILLED, YarnApplicationState.FAILED, - YarnApplicationState.FINISHED)); + + ApplicationReport appReport = + monitorApplication(appId, EnumSet.of(YarnApplicationState.ACCEPTED, + YarnApplicationState.KILLED, YarnApplicationState.FAILED, + YarnApplicationState.FINISHED)); + + if (appReport.getYarnApplicationState() == YarnApplicationState.ACCEPTED) { + // Monitor the application attempt to wait for launch state + ApplicationAttemptReport attemptReport = + monitorCurrentAppAttempt(appId, + YarnApplicationAttemptState.LAUNCHED); + ApplicationAttemptId attemptId = + attemptReport.getApplicationAttemptId(); + LOG.info("Launching AM with application attempt id " + attemptId); + // launch AM + launchAM(attemptId); + // Monitor the application for end state + appReport = + monitorApplication(appId, EnumSet.of(YarnApplicationState.KILLED, + YarnApplicationState.FAILED, YarnApplicationState.FINISHED)); + } YarnApplicationState appState = appReport.getYarnApplicationState(); FinalApplicationStatus appStatus = appReport.getFinalApplicationStatus(); @@ -376,6 +380,43 @@ public class UnmanagedAMLauncher { } } + private ApplicationAttemptReport monitorCurrentAppAttempt( + ApplicationId appId, YarnApplicationAttemptState attemptState) + throws YarnException, IOException { + long startTime = System.currentTimeMillis(); + ApplicationAttemptId attemptId = null; + while (true) { + if (attemptId == null) { + attemptId = + rmClient.getApplicationReport(appId) + .getCurrentApplicationAttemptId(); + } + ApplicationAttemptReport attemptReport = null; + if (attemptId != null) { + attemptReport = rmClient.getApplicationAttemptReport(attemptId); + if (attemptState.equals(attemptReport.getYarnApplicationAttemptState())) { + return attemptReport; + } + } + LOG.info("Current attempt state of " + appId + " is " + (attemptReport == null + ? " N/A " : attemptReport.getYarnApplicationAttemptState()) + + ", waiting for current attempt to reach " + attemptState); + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + LOG.warn("Interrupted while waiting for current attempt of " + appId + + " to reach " + attemptState); + } + if (System.currentTimeMillis() - startTime > AM_STATE_WAIT_TIMEOUT_MS) { + String errmsg = + "Timeout for waiting current attempt of " + appId + " to reach " + + attemptState; + LOG.error(errmsg); + throw new RuntimeException(errmsg); + } + } + } + /** * Monitor the submitted application for completion. Kill application if time * expires. @@ -391,7 +432,6 @@ public class UnmanagedAMLauncher { IOException { long foundAMCompletedTime = 0; - final int timeToWaitMS = 10000; StringBuilder expectedFinalState = new StringBuilder(); boolean first = true; for (YarnApplicationState state : finalState) { @@ -438,8 +478,8 @@ public class UnmanagedAMLauncher { if (foundAMCompletedTime == 0) { foundAMCompletedTime = System.currentTimeMillis(); } else if ((System.currentTimeMillis() - foundAMCompletedTime) - > timeToWaitMS) { - LOG.warn("Waited " + timeToWaitMS/1000 + > AM_STATE_WAIT_TIMEOUT_MS) { + LOG.warn("Waited " + AM_STATE_WAIT_TIMEOUT_MS/1000 + " seconds after process completed for AppReport" + " to reach desired final state. Not waiting anymore." + "CurrentState = " + state diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/test/java/org/apache/hadoop/yarn/applications/unmanagedamlauncher/TestUnmanagedAMLauncher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/test/java/org/apache/hadoop/yarn/applications/unmanagedamlauncher/TestUnmanagedAMLauncher.java index a4c7832f7d..08cacee2b2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/test/java/org/apache/hadoop/yarn/applications/unmanagedamlauncher/TestUnmanagedAMLauncher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-unmanaged-am-launcher/src/test/java/org/apache/hadoop/yarn/applications/unmanagedamlauncher/TestUnmanagedAMLauncher.java @@ -28,8 +28,6 @@ import java.io.IOException; import java.io.OutputStream; import java.net.URL; -import org.junit.Assert; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -38,11 +36,15 @@ import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState; import org.apache.hadoop.yarn.client.ClientRMProxy; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.MiniYARNCluster; import org.junit.AfterClass; +import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; @@ -122,7 +124,7 @@ public class TestUnmanagedAMLauncher { } @Test(timeout=30000) - public void testDSShell() throws Exception { + public void testUMALauncher() throws Exception { String classpath = getTestRuntimeClasspath(); String javaHome = System.getenv("JAVA_HOME"); if (javaHome == null) { @@ -141,8 +143,18 @@ public class TestUnmanagedAMLauncher { + " success" }; LOG.info("Initializing Launcher"); - UnmanagedAMLauncher launcher = new UnmanagedAMLauncher(new Configuration( - yarnCluster.getConfig())); + UnmanagedAMLauncher launcher = + new UnmanagedAMLauncher(new Configuration(yarnCluster.getConfig())) { + public void launchAM(ApplicationAttemptId attemptId) + throws IOException, YarnException { + YarnApplicationAttemptState attemptState = + rmClient.getApplicationAttemptReport(attemptId) + .getYarnApplicationAttemptState(); + Assert.assertTrue(attemptState + .equals(YarnApplicationAttemptState.LAUNCHED)); + super.launchAM(attemptId); + } + }; boolean initSuccess = launcher.init(args); Assert.assertTrue(initSuccess); LOG.info("Running Launcher"); @@ -154,7 +166,7 @@ public class TestUnmanagedAMLauncher { } @Test(timeout=30000) - public void testDSShellError() throws Exception { + public void testUMALauncherError() throws Exception { String classpath = getTestRuntimeClasspath(); String javaHome = System.getenv("JAVA_HOME"); if (javaHome == null) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java index 3e90ec8ec1..efe07212e0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java @@ -1650,11 +1650,14 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable { this.readLock.lock(); ApplicationAttemptReport attemptReport = null; try { + // AM container maybe not yet allocated. and also unmangedAM doesn't have + // am container. + ContainerId amId = + masterContainer == null ? null : masterContainer.getId(); attemptReport = ApplicationAttemptReport.newInstance(this .getAppAttemptId(), this.getHost(), this.getRpcPort(), this .getTrackingUrl(), this.getDiagnostics(), YarnApplicationAttemptState - .valueOf(this.getState().toString()), this.getMasterContainer() - .getId()); + .valueOf(this.getState().toString()), amId); } finally { this.readLock.unlock(); }