From 8f0bf54d3442e6beedfaeaf3b53c5769019ca9d1 Mon Sep 17 00:00:00 2001 From: Vinod Kumar Vavilapalli Date: Sat, 28 Dec 2013 01:09:07 +0000 Subject: [PATCH] YARN-1541. Changed ResourceManager to invalidate ApplicationMaster host/port information once an AM crashes. Contributed by Jian He. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1553772 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-yarn-project/CHANGES.txt | 3 + .../rmapp/attempt/RMAppAttemptImpl.java | 10 +- .../yarn/server/resourcemanager/TestRM.java | 116 +++++++++++++++++- .../attempt/TestRMAppAttemptTransitions.java | 8 ++ 4 files changed, 133 insertions(+), 4 deletions(-) diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index bb5b895428..f593ac8a89 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -191,6 +191,9 @@ Release 2.4.0 - UNRELEASED YARN-1523. Use StandbyException instead of RMNotYetReadyException (kasha) + YARN-1541. Changed ResourceManager to invalidate ApplicationMaster host/port + information once an AM crashes. (Jian He via vinodkv) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java index 8823952dfa..647bc59c9d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java @@ -139,7 +139,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable { private float progress = 0; private String host = "N/A"; - private int rpcPort; + private int rpcPort = -1; private String originalTrackingUrl = "N/A"; private String proxiedTrackingUrl = "N/A"; private long startTime = 0; @@ -526,6 +526,11 @@ private void setTrackingUrlToRMAppPage() { proxiedTrackingUrl = originalTrackingUrl; } + private void invalidateAMHostAndPort() { + this.host = "N/A"; + this.rpcPort = -1; + } + // This is only used for RMStateStore. Normal operation must invoke the secret // manager to get the key and not use the local key directly. @Override @@ -1033,6 +1038,7 @@ public void transition(RMAppAttemptImpl appAttempt, { // don't leave the tracking URL pointing to a non-existent AM appAttempt.setTrackingUrlToRMAppPage(); + appAttempt.invalidateAMHostAndPort(); appEvent = new RMAppFailedAttemptEvent(applicationId, RMAppEventType.ATTEMPT_KILLED, @@ -1043,6 +1049,7 @@ public void transition(RMAppAttemptImpl appAttempt, { // don't leave the tracking URL pointing to a non-existent AM appAttempt.setTrackingUrlToRMAppPage(); + appAttempt.invalidateAMHostAndPort(); appEvent = new RMAppFailedAttemptEvent(applicationId, RMAppEventType.ATTEMPT_FAILED, @@ -1059,7 +1066,6 @@ public void transition(RMAppAttemptImpl appAttempt, appAttempt.eventHandler.handle(appEvent); appAttempt.eventHandler.handle(new AppAttemptRemovedSchedulerEvent( appAttemptId, finalAttemptState)); - appAttempt.removeCredentials(appAttempt); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java index 205846a11d..a2bf4ae97b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java @@ -19,26 +19,33 @@ package org.apache.hadoop.yarn.server.resourcemanager; import java.util.ArrayList; +import java.util.EnumSet; import java.util.HashMap; import java.util.List; -import javax.security.auth.login.Configuration; - import junit.framework.Assert; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.NMToken; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.Token; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; @@ -368,6 +375,111 @@ public void testActivatingApplicationAfterAddingNM() throws Exception { rm1.stop(); } + // This is to test AM Host and rpc port are invalidated after the am attempt + // is killed or failed, so that client doesn't get the wrong information. + @Test (timeout = 80000) + public void testInvalidateAMHostPortWhenAMFailedOrKilled() throws Exception { + YarnConfiguration conf = new YarnConfiguration(); + conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1); + MockRM rm1 = new MockRM(conf); + rm1.start(); + + // a succeeded app + RMApp app1 = rm1.submitApp(200); + MockNM nm1 = + new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService()); + nm1.registerNode(); + MockAM am1 = launchAM(app1, rm1, nm1); + finishApplicationMaster(app1, rm1, nm1, am1); + + // a failed app + RMApp app2 = rm1.submitApp(200); + MockAM am2 = launchAM(app2, rm1, nm1); + nm1.nodeHeartbeat(am2.getApplicationAttemptId(), 1, ContainerState.COMPLETE); + am2.waitForState(RMAppAttemptState.FAILED); + rm1.waitForState(app2.getApplicationId(), RMAppState.FAILED); + + // a killed app + RMApp app3 = rm1.submitApp(200); + MockAM am3 = launchAM(app3, rm1, nm1); + rm1.killApp(app3.getApplicationId()); + rm1.waitForState(app3.getApplicationId(), RMAppState.KILLED); + rm1.waitForState(am3.getApplicationAttemptId(), RMAppAttemptState.KILLED); + + GetApplicationsRequest request1 = + GetApplicationsRequest.newInstance(EnumSet.of( + YarnApplicationState.FINISHED, YarnApplicationState.KILLED, + YarnApplicationState.FAILED)); + GetApplicationsResponse response1 = + rm1.getClientRMService().getApplications(request1); + List appList1 = response1.getApplicationList(); + + Assert.assertEquals(3, appList1.size()); + for (ApplicationReport report : appList1) { + // killed/failed apps host and rpc port are invalidated. + if (report.getApplicationId().equals(app2.getApplicationId()) + || report.getApplicationId().equals(app3.getApplicationId())) { + Assert.assertEquals("N/A", report.getHost()); + Assert.assertEquals(-1, report.getRpcPort()); + } + // succeeded app's host and rpc port is not invalidated + if (report.getApplicationId().equals(app1.getApplicationId())) { + Assert.assertFalse(report.getHost().equals("N/A")); + Assert.assertTrue(report.getRpcPort() != -1); + } + } + } + + @Test (timeout = 60000) + public void testInvalidatedAMHostPortOnAMRestart() throws Exception { + YarnConfiguration conf = new YarnConfiguration(); + MockRM rm1 = new MockRM(conf); + rm1.start(); + MockNM nm1 = + new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService()); + nm1.registerNode(); + + // a failed app + RMApp app2 = rm1.submitApp(200); + MockAM am2 = launchAM(app2, rm1, nm1); + nm1 + .nodeHeartbeat(am2.getApplicationAttemptId(), 1, ContainerState.COMPLETE); + am2.waitForState(RMAppAttemptState.FAILED); + rm1.waitForState(app2.getApplicationId(), RMAppState.ACCEPTED); + + // before new attempt is launched, the app report returns the invalid AM + // host and port. + GetApplicationReportRequest request1 = + GetApplicationReportRequest.newInstance(app2.getApplicationId()); + ApplicationReport report1 = + rm1.getClientRMService().getApplicationReport(request1) + .getApplicationReport(); + Assert.assertEquals("N/A", report1.getHost()); + Assert.assertEquals(-1, report1.getRpcPort()); + } + + private MockAM launchAM(RMApp app, MockRM rm, MockNM nm) + throws Exception { + RMAppAttempt attempt = app.getCurrentAppAttempt(); + nm.nodeHeartbeat(true); + MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId()); + am.registerAppAttempt(); + rm.waitForState(app.getApplicationId(), RMAppState.RUNNING); + return am; + } + + private void finishApplicationMaster(RMApp rmApp, MockRM rm, MockNM nm, + MockAM am) throws Exception { + FinishApplicationMasterRequest req = + FinishApplicationMasterRequest.newInstance( + FinalApplicationStatus.SUCCEEDED, "", ""); + am.unregisterAppAttempt(req); + am.waitForState(RMAppAttemptState.FINISHING); + nm.nodeHeartbeat(am.getApplicationAttemptId(), 1, ContainerState.COMPLETE); + am.waitForState(RMAppAttemptState.FINISHED); + rm.waitForState(rmApp.getApplicationId(), RMAppState.FINISHED); + } + public static void main(String[] args) throws Exception { TestRM t = new TestRM(); t.testGetNewAppId(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java index 6c72d97080..0ad2f2a037 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java @@ -806,6 +806,7 @@ public void testRunningToFailed() { applicationAttempt.getAppAttemptId().getApplicationId()); assertEquals(rmAppPageUrl, applicationAttempt.getOriginalTrackingUrl()); assertEquals(rmAppPageUrl, applicationAttempt.getTrackingUrl()); + verifyAMHostAndPortInvalidated(); } @Test @@ -841,6 +842,7 @@ public void testRunningToKilled() { assertEquals(rmAppPageUrl, applicationAttempt.getOriginalTrackingUrl()); assertEquals(rmAppPageUrl, applicationAttempt.getTrackingUrl()); verifyTokenCount(applicationAttempt.getAppAttemptId(), 1); + verifyAMHostAndPortInvalidated(); } @Test(timeout=10000) @@ -878,6 +880,7 @@ public void testRunningExpire() { assertEquals(rmAppPageUrl, applicationAttempt.getOriginalTrackingUrl()); assertEquals(rmAppPageUrl, applicationAttempt.getTrackingUrl()); verifyTokenCount(applicationAttempt.getAppAttemptId(), 1); + verifyAMHostAndPortInvalidated(); } @Test @@ -1125,4 +1128,9 @@ private void verifyAttemptFinalStateSaved() { verify(store, times(1)).updateApplicationAttemptState( any(ApplicationAttemptState.class)); } + + private void verifyAMHostAndPortInvalidated() { + assertEquals("N/A", applicationAttempt.getHost()); + assertEquals(-1, applicationAttempt.getRpcPort()); + } }