YARN-37. Change TestRMAppTransitions to use the DrainDispatcher. (Contributed by Mayank Bansal)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1377803 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
10e704c50b
commit
0ec6cda4b7
@ -42,6 +42,9 @@ Release 2.1.0-alpha - Unreleased
|
|||||||
YARN-22. Fix ContainerLogs to work if the log-dir is specified as a URI.
|
YARN-22. Fix ContainerLogs to work if the log-dir is specified as a URI.
|
||||||
(Mayank Bansal via sseth)
|
(Mayank Bansal via sseth)
|
||||||
|
|
||||||
|
YARN-37. Change TestRMAppTransitions to use the DrainDispatcher.
|
||||||
|
(Mayank Bansal via sseth)
|
||||||
|
|
||||||
Release 0.23.3 - Unreleased
|
Release 0.23.3 - Unreleased
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
@ -33,14 +33,15 @@
|
|||||||
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
||||||
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
|
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.event.AsyncDispatcher;
|
import org.apache.hadoop.yarn.event.DrainDispatcher;
|
||||||
import org.apache.hadoop.yarn.event.EventHandler;
|
import org.apache.hadoop.yarn.event.EventHandler;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
|
import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEvent;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEventType;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
|
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.ApplicationsStore.ApplicationStore;
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.ApplicationsStore.ApplicationStore;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemStore;
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemStore;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.InlineDispatcher;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
|
||||||
@ -48,6 +49,8 @@
|
|||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.security.ApplicationTokenSecretManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.security.ApplicationTokenSecretManager;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
@ -60,7 +63,7 @@ public class TestRMAppTransitions {
|
|||||||
private RMContext rmContext;
|
private RMContext rmContext;
|
||||||
private static int maxRetries = 4;
|
private static int maxRetries = 4;
|
||||||
private static int appId = 1;
|
private static int appId = 1;
|
||||||
// private AsyncDispatcher rmDispatcher;
|
private DrainDispatcher rmDispatcher;
|
||||||
|
|
||||||
// ignore all the RM application attempt events
|
// ignore all the RM application attempt events
|
||||||
private static final class TestApplicationAttemptEventDispatcher implements
|
private static final class TestApplicationAttemptEventDispatcher implements
|
||||||
@ -110,12 +113,27 @@ public void handle(RMAppEvent event) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// handle all the RM application manager events - same as in
|
||||||
|
// ResourceManager.java
|
||||||
|
private static final class TestApplicationManagerEventDispatcher implements
|
||||||
|
EventHandler<RMAppManagerEvent> {
|
||||||
|
@Override
|
||||||
|
public void handle(RMAppManagerEvent event) {
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// handle all the scheduler events - same as in ResourceManager.java
|
||||||
|
private static final class TestSchedulerEventDispatcher implements
|
||||||
|
EventHandler<SchedulerEvent> {
|
||||||
|
@Override
|
||||||
|
public void handle(SchedulerEvent event) {
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void setUp() throws Exception {
|
public void setUp() throws Exception {
|
||||||
AsyncDispatcher rmDispatcher = new AsyncDispatcher();
|
|
||||||
Configuration conf = new Configuration();
|
Configuration conf = new Configuration();
|
||||||
rmDispatcher = new InlineDispatcher();
|
rmDispatcher = new DrainDispatcher();
|
||||||
|
|
||||||
ContainerAllocationExpirer containerAllocationExpirer =
|
ContainerAllocationExpirer containerAllocationExpirer =
|
||||||
mock(ContainerAllocationExpirer.class);
|
mock(ContainerAllocationExpirer.class);
|
||||||
AMLivelinessMonitor amLivelinessMonitor = mock(AMLivelinessMonitor.class);
|
AMLivelinessMonitor amLivelinessMonitor = mock(AMLivelinessMonitor.class);
|
||||||
@ -131,6 +149,13 @@ null, new ApplicationTokenSecretManager(conf),
|
|||||||
|
|
||||||
rmDispatcher.register(RMAppEventType.class,
|
rmDispatcher.register(RMAppEventType.class,
|
||||||
new TestApplicationEventDispatcher(rmContext));
|
new TestApplicationEventDispatcher(rmContext));
|
||||||
|
|
||||||
|
rmDispatcher.register(RMAppManagerEventType.class,
|
||||||
|
new TestApplicationManagerEventDispatcher());
|
||||||
|
|
||||||
|
rmDispatcher.register(SchedulerEventType.class,
|
||||||
|
new TestSchedulerEventDispatcher());
|
||||||
|
|
||||||
rmDispatcher.init(conf);
|
rmDispatcher.init(conf);
|
||||||
rmDispatcher.start();
|
rmDispatcher.start();
|
||||||
}
|
}
|
||||||
@ -225,9 +250,8 @@ private static void assertKilled(RMApp application) {
|
|||||||
"Application killed by user.", diag.toString());
|
"Application killed by user.", diag.toString());
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void assertAppAndAttemptKilled(RMApp application) {
|
private static void assertAppAndAttemptKilled(RMApp application) throws InterruptedException {
|
||||||
assertKilled(application);
|
assertKilled(application);
|
||||||
/* also check if the attempt is killed */
|
|
||||||
Assert.assertEquals( RMAppAttemptState.KILLED,
|
Assert.assertEquals( RMAppAttemptState.KILLED,
|
||||||
application.getCurrentAppAttempt().getAppAttemptState()
|
application.getCurrentAppAttempt().getAppAttemptState()
|
||||||
);
|
);
|
||||||
@ -332,6 +356,7 @@ public void testUnmanagedApp() throws IOException {
|
|||||||
RMAppEvent event = new RMAppFailedAttemptEvent(
|
RMAppEvent event = new RMAppFailedAttemptEvent(
|
||||||
application.getApplicationId(), RMAppEventType.ATTEMPT_FAILED, "");
|
application.getApplicationId(), RMAppEventType.ATTEMPT_FAILED, "");
|
||||||
application.handle(event);
|
application.handle(event);
|
||||||
|
rmDispatcher.await();
|
||||||
RMAppAttempt appAttempt = application.getCurrentAppAttempt();
|
RMAppAttempt appAttempt = application.getCurrentAppAttempt();
|
||||||
Assert.assertEquals(1, appAttempt.getAppAttemptId().getAttemptId());
|
Assert.assertEquals(1, appAttempt.getAppAttemptId().getAttemptId());
|
||||||
assertFailed(application,
|
assertFailed(application,
|
||||||
@ -353,6 +378,7 @@ public void testAppNewKill() throws IOException {
|
|||||||
RMAppEvent event =
|
RMAppEvent event =
|
||||||
new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL);
|
new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL);
|
||||||
application.handle(event);
|
application.handle(event);
|
||||||
|
rmDispatcher.await();
|
||||||
assertKilled(application);
|
assertKilled(application);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -366,6 +392,7 @@ public void testAppNewReject() throws IOException {
|
|||||||
RMAppEvent event =
|
RMAppEvent event =
|
||||||
new RMAppRejectedEvent(application.getApplicationId(), rejectedText);
|
new RMAppRejectedEvent(application.getApplicationId(), rejectedText);
|
||||||
application.handle(event);
|
application.handle(event);
|
||||||
|
rmDispatcher.await();
|
||||||
assertFailed(application, rejectedText);
|
assertFailed(application, rejectedText);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -379,18 +406,22 @@ public void testAppSubmittedRejected() throws IOException {
|
|||||||
RMAppEvent event =
|
RMAppEvent event =
|
||||||
new RMAppRejectedEvent(application.getApplicationId(), rejectedText);
|
new RMAppRejectedEvent(application.getApplicationId(), rejectedText);
|
||||||
application.handle(event);
|
application.handle(event);
|
||||||
|
rmDispatcher.await();
|
||||||
assertFailed(application, rejectedText);
|
assertFailed(application, rejectedText);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testAppSubmittedKill() throws IOException {
|
public void testAppSubmittedKill() throws IOException, InterruptedException {
|
||||||
LOG.info("--- START: testAppSubmittedKill---");
|
LOG.info("--- START: testAppSubmittedKill---");
|
||||||
|
RMApp application = testCreateAppSubmitted(null);
|
||||||
RMApp application = testCreateAppAccepted(null);
|
// SUBMITTED => KILLED event RMAppEventType.KILL
|
||||||
// SUBMITTED => KILLED event RMAppEventType.KILL
|
RMAppEvent event = new RMAppEvent(application.getApplicationId(),
|
||||||
RMAppEvent event = new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL);
|
RMAppEventType.KILL);
|
||||||
this.rmContext.getRMApps().putIfAbsent(application.getApplicationId(), application);
|
this.rmContext.getRMApps().putIfAbsent(application.getApplicationId(),
|
||||||
|
application);
|
||||||
application.handle(event);
|
application.handle(event);
|
||||||
|
rmDispatcher.await();
|
||||||
|
assertKilled(application);
|
||||||
assertAppAndAttemptKilled(application);
|
assertAppAndAttemptKilled(application);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -410,6 +441,7 @@ public void testAppAcceptedFailed() throws IOException {
|
|||||||
new RMAppEvent(application.getApplicationId(),
|
new RMAppEvent(application.getApplicationId(),
|
||||||
RMAppEventType.APP_ACCEPTED);
|
RMAppEventType.APP_ACCEPTED);
|
||||||
application.handle(event);
|
application.handle(event);
|
||||||
|
rmDispatcher.await();
|
||||||
assertAppState(RMAppState.ACCEPTED, application);
|
assertAppState(RMAppState.ACCEPTED, application);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -420,19 +452,23 @@ public void testAppAcceptedFailed() throws IOException {
|
|||||||
new RMAppFailedAttemptEvent(application.getApplicationId(),
|
new RMAppFailedAttemptEvent(application.getApplicationId(),
|
||||||
RMAppEventType.ATTEMPT_FAILED, message);
|
RMAppEventType.ATTEMPT_FAILED, message);
|
||||||
application.handle(event);
|
application.handle(event);
|
||||||
|
rmDispatcher.await();
|
||||||
assertFailed(application, ".*" + message + ".*Failing the application.*");
|
assertFailed(application, ".*" + message + ".*Failing the application.*");
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testAppAcceptedKill() throws IOException {
|
public void testAppAcceptedKill() throws IOException, InterruptedException {
|
||||||
LOG.info("--- START: testAppAcceptedKill ---");
|
LOG.info("--- START: testAppAcceptedKill ---");
|
||||||
|
|
||||||
RMApp application = testCreateAppAccepted(null);
|
RMApp application = testCreateAppAccepted(null);
|
||||||
// ACCEPTED => KILLED event RMAppEventType.KILL
|
// ACCEPTED => KILLED event RMAppEventType.KILL
|
||||||
RMAppEvent event =
|
RMAppEvent event = new RMAppEvent(application.getApplicationId(),
|
||||||
new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL);
|
RMAppEventType.KILL);
|
||||||
|
this.rmContext.getRMApps().putIfAbsent(application.getApplicationId(),
|
||||||
|
application);
|
||||||
application.handle(event);
|
application.handle(event);
|
||||||
|
rmDispatcher.await();
|
||||||
assertKilled(application);
|
assertKilled(application);
|
||||||
|
assertAppAndAttemptKilled(application);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@ -444,6 +480,7 @@ public void testAppRunningKill() throws IOException {
|
|||||||
RMAppEvent event =
|
RMAppEvent event =
|
||||||
new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL);
|
new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL);
|
||||||
application.handle(event);
|
application.handle(event);
|
||||||
|
rmDispatcher.await();
|
||||||
assertKilled(application);
|
assertKilled(application);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -462,6 +499,7 @@ public void testAppRunningFailed() throws IOException {
|
|||||||
new RMAppFailedAttemptEvent(application.getApplicationId(),
|
new RMAppFailedAttemptEvent(application.getApplicationId(),
|
||||||
RMAppEventType.ATTEMPT_FAILED, "");
|
RMAppEventType.ATTEMPT_FAILED, "");
|
||||||
application.handle(event);
|
application.handle(event);
|
||||||
|
rmDispatcher.await();
|
||||||
assertAppState(RMAppState.SUBMITTED, application);
|
assertAppState(RMAppState.SUBMITTED, application);
|
||||||
appAttempt = application.getCurrentAppAttempt();
|
appAttempt = application.getCurrentAppAttempt();
|
||||||
Assert.assertEquals(++expectedAttemptId,
|
Assert.assertEquals(++expectedAttemptId,
|
||||||
@ -470,11 +508,13 @@ public void testAppRunningFailed() throws IOException {
|
|||||||
new RMAppEvent(application.getApplicationId(),
|
new RMAppEvent(application.getApplicationId(),
|
||||||
RMAppEventType.APP_ACCEPTED);
|
RMAppEventType.APP_ACCEPTED);
|
||||||
application.handle(event);
|
application.handle(event);
|
||||||
|
rmDispatcher.await();
|
||||||
assertAppState(RMAppState.ACCEPTED, application);
|
assertAppState(RMAppState.ACCEPTED, application);
|
||||||
event =
|
event =
|
||||||
new RMAppEvent(application.getApplicationId(),
|
new RMAppEvent(application.getApplicationId(),
|
||||||
RMAppEventType.ATTEMPT_REGISTERED);
|
RMAppEventType.ATTEMPT_REGISTERED);
|
||||||
application.handle(event);
|
application.handle(event);
|
||||||
|
rmDispatcher.await();
|
||||||
assertAppState(RMAppState.RUNNING, application);
|
assertAppState(RMAppState.RUNNING, application);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -484,11 +524,13 @@ public void testAppRunningFailed() throws IOException {
|
|||||||
new RMAppFailedAttemptEvent(application.getApplicationId(),
|
new RMAppFailedAttemptEvent(application.getApplicationId(),
|
||||||
RMAppEventType.ATTEMPT_FAILED, "");
|
RMAppEventType.ATTEMPT_FAILED, "");
|
||||||
application.handle(event);
|
application.handle(event);
|
||||||
|
rmDispatcher.await();
|
||||||
assertFailed(application, ".*Failing the application.*");
|
assertFailed(application, ".*Failing the application.*");
|
||||||
|
|
||||||
// FAILED => FAILED event RMAppEventType.KILL
|
// FAILED => FAILED event RMAppEventType.KILL
|
||||||
event = new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL);
|
event = new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL);
|
||||||
application.handle(event);
|
application.handle(event);
|
||||||
|
rmDispatcher.await();
|
||||||
assertFailed(application, ".*Failing the application.*");
|
assertFailed(application, ".*Failing the application.*");
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -501,6 +543,7 @@ public void testAppFinishingKill() throws IOException {
|
|||||||
RMAppEvent event =
|
RMAppEvent event =
|
||||||
new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL);
|
new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL);
|
||||||
application.handle(event);
|
application.handle(event);
|
||||||
|
rmDispatcher.await();
|
||||||
assertAppState(RMAppState.FINISHED, application);
|
assertAppState(RMAppState.FINISHED, application);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -513,6 +556,7 @@ public void testAppFinishedFinished() throws IOException {
|
|||||||
RMAppEvent event =
|
RMAppEvent event =
|
||||||
new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL);
|
new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL);
|
||||||
application.handle(event);
|
application.handle(event);
|
||||||
|
rmDispatcher.await();
|
||||||
assertTimesAtFinish(application);
|
assertTimesAtFinish(application);
|
||||||
assertAppState(RMAppState.FINISHED, application);
|
assertAppState(RMAppState.FINISHED, application);
|
||||||
StringBuilder diag = application.getDiagnostics();
|
StringBuilder diag = application.getDiagnostics();
|
||||||
@ -530,6 +574,7 @@ public void testAppKilledKilled() throws IOException {
|
|||||||
RMAppEvent event =
|
RMAppEvent event =
|
||||||
new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL);
|
new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL);
|
||||||
application.handle(event);
|
application.handle(event);
|
||||||
|
rmDispatcher.await();
|
||||||
assertTimesAtFinish(application);
|
assertTimesAtFinish(application);
|
||||||
assertAppState(RMAppState.KILLED, application);
|
assertAppState(RMAppState.KILLED, application);
|
||||||
|
|
||||||
@ -538,6 +583,7 @@ public void testAppKilledKilled() throws IOException {
|
|||||||
new RMAppEvent(application.getApplicationId(),
|
new RMAppEvent(application.getApplicationId(),
|
||||||
RMAppEventType.ATTEMPT_FINISHED);
|
RMAppEventType.ATTEMPT_FINISHED);
|
||||||
application.handle(event);
|
application.handle(event);
|
||||||
|
rmDispatcher.await();
|
||||||
assertTimesAtFinish(application);
|
assertTimesAtFinish(application);
|
||||||
assertAppState(RMAppState.KILLED, application);
|
assertAppState(RMAppState.KILLED, application);
|
||||||
|
|
||||||
@ -546,6 +592,7 @@ public void testAppKilledKilled() throws IOException {
|
|||||||
new RMAppFailedAttemptEvent(application.getApplicationId(),
|
new RMAppFailedAttemptEvent(application.getApplicationId(),
|
||||||
RMAppEventType.ATTEMPT_FAILED, "");
|
RMAppEventType.ATTEMPT_FAILED, "");
|
||||||
application.handle(event);
|
application.handle(event);
|
||||||
|
rmDispatcher.await();
|
||||||
assertTimesAtFinish(application);
|
assertTimesAtFinish(application);
|
||||||
assertAppState(RMAppState.KILLED, application);
|
assertAppState(RMAppState.KILLED, application);
|
||||||
|
|
||||||
@ -554,12 +601,14 @@ public void testAppKilledKilled() throws IOException {
|
|||||||
new RMAppEvent(application.getApplicationId(),
|
new RMAppEvent(application.getApplicationId(),
|
||||||
RMAppEventType.ATTEMPT_KILLED);
|
RMAppEventType.ATTEMPT_KILLED);
|
||||||
application.handle(event);
|
application.handle(event);
|
||||||
|
rmDispatcher.await();
|
||||||
assertTimesAtFinish(application);
|
assertTimesAtFinish(application);
|
||||||
assertAppState(RMAppState.KILLED, application);
|
assertAppState(RMAppState.KILLED, application);
|
||||||
|
|
||||||
// KILLED => KILLED event RMAppEventType.KILL
|
// KILLED => KILLED event RMAppEventType.KILL
|
||||||
event = new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL);
|
event = new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL);
|
||||||
application.handle(event);
|
application.handle(event);
|
||||||
|
rmDispatcher.await();
|
||||||
assertTimesAtFinish(application);
|
assertTimesAtFinish(application);
|
||||||
assertAppState(RMAppState.KILLED, application);
|
assertAppState(RMAppState.KILLED, application);
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user