From 943b2190d72f930a76f15558fca0dbb128b2d592 Mon Sep 17 00:00:00 2001 From: Zhijie Shen Date: Fri, 7 Feb 2014 00:18:46 +0000 Subject: [PATCH] YARN-1689. Made RMAppAttempt get killed when RMApp is at ACCEPTED. Contributed by Vinod Kumar Vavilapalli. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1565497 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-yarn-project/CHANGES.txt | 3 + .../resourcemanager/rmapp/RMAppImpl.java | 6 +- .../yarn/server/resourcemanager/MockRM.java | 7 ++ .../yarn/server/resourcemanager/TestRM.java | 101 +++++++++++++++++- .../applicationsmanager/TestAMRestart.java | 2 +- .../rmapp/TestRMAppTransitions.java | 7 ++ .../webapp/TestRMWebServicesApps.java | 4 +- 7 files changed, 119 insertions(+), 11 deletions(-) diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 3110341307..6afd4bdbfe 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -633,6 +633,9 @@ Release 2.3.0 - UNRELEASED YARN-1661. Fixed DS ApplicationMaster to write the correct exit log. (Vinod Kumar Vavilapalli via zjshen) + YARN-1689. Made RMAppAttempt get killed when RMApp is at ACCEPTED. (Vinod + Kumar Vavilapalli via zjshen) + Release 2.2.0 - 2013-10-13 INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index edbe676bad..196e89d32c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -196,10 +196,8 @@ public class RMAppImpl implements RMApp, Recoverable { // waiting for the previous AM to exit. RMAppEventType.ATTEMPT_FAILED, new AttemptFailedTransition(RMAppState.ACCEPTED)) - .addTransition(RMAppState.ACCEPTED, RMAppState.FINAL_SAVING, - RMAppEventType.KILL, - new FinalSavingTransition( - new AppKilledTransition(), RMAppState.KILLED)) + .addTransition(RMAppState.ACCEPTED, RMAppState.KILLING, + RMAppEventType.KILL, new KillAttemptTransition()) // ACCECPTED state can once again receive APP_ACCEPTED event, because on // recovery the app returns ACCEPTED state and the app once again go // through the scheduler and triggers one more APP_ACCEPTED event at diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java index 935820e66b..31035b420a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java @@ -482,6 +482,13 @@ public class MockRM extends ResourceManager { RMAppAttempt attempt = app.getCurrentAppAttempt(); nm.nodeHeartbeat(true); MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId()); + rm.waitForState(attempt.getAppAttemptId(), RMAppAttemptState.LAUNCHED); + return am; + } + + public static MockAM launchAndRegisterAM(RMApp app, MockRM rm, MockNM nm) + throws Exception { + MockAM am = launchAM(app, rm, nm); am.registerAppAttempt(); rm.waitForState(app.getApplicationId(), RMAppState.RUNNING); return am; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java index c7e6d7fb97..b899ea708b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java @@ -18,6 +18,10 @@ package org.apache.hadoop.yarn.server.resourcemanager; +import static org.mockito.Matchers.argThat; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.spy; + import java.util.ArrayList; import java.util.EnumSet; import java.util.HashMap; @@ -33,6 +37,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse; +import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.Container; @@ -44,9 +49,17 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.AbstractEvent; +import org.apache.hadoop.yarn.event.AsyncDispatcher; +import org.apache.hadoop.yarn.event.Dispatcher; +import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; @@ -54,7 +67,9 @@ import org.apache.log4j.Level; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import org.junit.Test; +import org.mockito.ArgumentMatcher; +@SuppressWarnings({"unchecked", "rawtypes"}) public class TestRM { private static final Log LOG = LogFactory.getLog(TestRM.class); @@ -397,19 +412,19 @@ public class TestRM { MockNM nm1 = new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService()); nm1.registerNode(); - MockAM am1 = MockRM.launchAM(app1, rm1, nm1); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); MockRM.finishApplicationMaster(app1, rm1, nm1, am1); // a failed app RMApp app2 = rm1.submitApp(200); - MockAM am2 = MockRM.launchAM(app2, rm1, nm1); + MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm1); nm1.nodeHeartbeat(am2.getApplicationAttemptId(), 1, ContainerState.COMPLETE); am2.waitForState(RMAppAttemptState.FAILED); rm1.waitForState(app2.getApplicationId(), RMAppState.FAILED); // a killed app RMApp app3 = rm1.submitApp(200); - MockAM am3 = MockRM.launchAM(app3, rm1, nm1); + MockAM am3 = MockRM.launchAndRegisterAM(app3, rm1, nm1); rm1.killApp(app3.getApplicationId()); rm1.waitForState(app3.getApplicationId(), RMAppState.KILLED); rm1.waitForState(am3.getApplicationAttemptId(), RMAppAttemptState.KILLED); @@ -449,7 +464,7 @@ public class TestRM { // a failed app RMApp app2 = rm1.submitApp(200); - MockAM am2 = MockRM.launchAM(app2, rm1, nm1); + MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm1); nm1 .nodeHeartbeat(am2.getApplicationAttemptId(), 1, ContainerState.COMPLETE); am2.waitForState(RMAppAttemptState.FAILED); @@ -466,10 +481,88 @@ public class TestRM { Assert.assertEquals(-1, report1.getRpcPort()); } + /** + * Validate killing an application when it is at accepted state. + * @throws Exception exception + */ + @Test (timeout = 60000) + public void testApplicationKillAtAcceptedState() throws Exception { + + YarnConfiguration conf = new YarnConfiguration(); + final Dispatcher dispatcher = new AsyncDispatcher() { + @Override + public EventHandler getEventHandler() { + + class EventArgMatcher extends ArgumentMatcher { + @Override + public boolean matches(Object argument) { + if (argument instanceof RMAppAttemptEvent) { + if (((RMAppAttemptEvent) argument).getType().equals( + RMAppAttemptEventType.KILL)) { + return true; + } + } + return false; + } + } + + EventHandler handler = spy(super.getEventHandler()); + doNothing().when(handler).handle(argThat(new EventArgMatcher())); + return handler; + } + }; + + MockRM rm = new MockRM(conf) { + @Override + protected Dispatcher createDispatcher() { + return dispatcher; + } + }; + + rm.start(); + MockNM nm1 = + new MockNM("127.0.0.1:1234", 15120, rm.getResourceTrackerService()); + nm1.registerNode(); + + // a failed app + RMApp application = rm.submitApp(200); + MockAM am = MockRM.launchAM(application, rm, nm1); + am.waitForState(RMAppAttemptState.LAUNCHED); + nm1.nodeHeartbeat(am.getApplicationAttemptId(), 1, ContainerState.RUNNING); + rm.waitForState(application.getApplicationId(), RMAppState.ACCEPTED); + + // Now kill the application before new attempt is launched, the app report + // returns the invalid AM host and port. + KillApplicationRequest request = + KillApplicationRequest.newInstance(application.getApplicationId()); + rm.getClientRMService().forceKillApplication(request); + + // Specific test for YARN-1689 follows + // Now let's say a race causes AM to register now. This should not crash RM. + am.registerAppAttempt(false); + + // We explicitly intercepted the kill-event to RMAppAttempt, so app should + // still be in KILLING state. + rm.waitForState(application.getApplicationId(), RMAppState.KILLING); + // AM should now be in running + rm.waitForState(am.getApplicationAttemptId(), RMAppAttemptState.RUNNING); + + // Simulate that appAttempt is killed. + rm.getRMContext().getDispatcher().getEventHandler().handle( + new RMAppEvent(application.getApplicationId(), + RMAppEventType.ATTEMPT_KILLED)); + rm.waitForState(application.getApplicationId(), RMAppState.KILLED); + } + public static void main(String[] args) throws Exception { TestRM t = new TestRM(); t.testGetNewAppId(); t.testAppWithNoContainers(); t.testAppOnMultiNode(); + t.testNMToken(); + t.testActivatingApplicationAfterAddingNM(); + t.testInvalidateAMHostPortWhenAMFailedOrKilled(); + t.testInvalidatedAMHostPortOnAMRestart(); + t.testApplicationKillAtAcceptedState(); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java index f8329d68a7..ca9befd599 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRestart.java @@ -69,7 +69,7 @@ public class TestAMRestart { new MockNM("127.0.0.1:2351", 4089, rm1.getResourceTrackerService()); nm2.registerNode(); - MockAM am1 = MockRM.launchAM(app1, rm1, nm1); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); int NUM_CONTAINERS = 3; // allocate NUM_CONTAINERS containers am1.allocate("127.0.0.1", 1024, NUM_CONTAINERS, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java index 5ac9353928..58482ee38b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/TestRMAppTransitions.java @@ -639,6 +639,13 @@ public class TestRMAppTransitions { RMAppEventType.KILL); application.handle(event); rmDispatcher.await(); + + assertAppState(RMAppState.KILLING, application); + RMAppEvent appAttemptKilled = + new RMAppEvent(application.getApplicationId(), + RMAppEventType.ATTEMPT_KILLED); + application.handle(appAttemptKilled); + assertAppState(RMAppState.FINAL_SAVING, application); sendAppUpdateSavedEvent(application); assertKilled(application); assertAppFinalStateSaved(application); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesApps.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesApps.java index 18350fb30d..cfdf9283ac 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesApps.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesApps.java @@ -1389,7 +1389,7 @@ public class TestRMWebServicesApps extends JerseyTest { rm.start(); MockNM amNodeManager = rm.registerNode("127.0.0.1:1234", 8192); RMApp app1 = rm.submitApp(CONTAINER_MB, "testwordcount", "user1"); - MockAM am = MockRM.launchAM(app1, rm, amNodeManager); + MockAM am = MockRM.launchAndRegisterAM(app1, rm, amNodeManager); int maxAppAttempts = rm.getConfig().getInt( YarnConfiguration.RM_AM_MAX_ATTEMPTS, YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS); @@ -1405,7 +1405,7 @@ public class TestRMWebServicesApps extends JerseyTest { } // wait for app to start a new attempt. rm.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED); - am = MockRM.launchAM(app1, rm, amNodeManager); + am = MockRM.launchAndRegisterAM(app1, rm, amNodeManager); numAttempt++; } assertEquals("incorrect number of attempts", maxAppAttempts,