YARN-1410. Added tests to validate that clients can fail-over to a new RM

after getting an application-ID but before submission and can still submit
to the newly active RM with no issues. Contributed by Xuan Gong.


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1575478 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2014-03-08 04:43:29 +00:00
parent b7428fe63d
commit 8497b870af
10 changed files with 398 additions and 165 deletions

View File

@ -275,6 +275,10 @@ Release 2.4.0 - UNRELEASED
utilization for local disks so as to be able to offline full disks. (Varun utilization for local disks so as to be able to offline full disks. (Varun
Vasudev via vinodkv) Vasudev via vinodkv)
YARN-1410. Added tests to validate that clients can fail-over to a new RM
after getting an application-ID but before submission and can still submit to
the newly active RM with no issues. (Xuan Gong via vinodkv)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -31,6 +31,9 @@
* <p>The response sent by the <code>ResourceManager</code> to the client for * <p>The response sent by the <code>ResourceManager</code> to the client for
* a request to get a new {@link ApplicationId} for submitting applications.</p> * a request to get a new {@link ApplicationId} for submitting applications.</p>
* *
* <p>Clients can submit an application with the returned
* {@link ApplicationId}.</p>
*
* @see ApplicationClientProtocol#getNewApplication(GetNewApplicationRequest) * @see ApplicationClientProtocol#getNewApplication(GetNewApplicationRequest)
*/ */
@Public @Public

View File

@ -0,0 +1,47 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.exceptions;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
/**
* Exception to be thrown when Client submit an application without
* providing {@link ApplicationId} in {@link ApplicationSubmissionContext}.
*/
@Public
@Unstable
public class ApplicationIdNotProvidedException extends YarnException{
private static final long serialVersionUID = 911754350L;
public ApplicationIdNotProvidedException(Throwable cause) {
super(cause);
}
public ApplicationIdNotProvidedException(String message) {
super(message);
}
public ApplicationIdNotProvidedException(String message, Throwable cause) {
super(message, cause);
}
}

View File

@ -44,6 +44,7 @@
import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.api.records.YarnClusterMetrics; import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
import org.apache.hadoop.yarn.client.api.impl.YarnClientImpl; import org.apache.hadoop.yarn.client.api.impl.YarnClientImpl;
import org.apache.hadoop.yarn.exceptions.ApplicationIdNotProvidedException;
import org.apache.hadoop.yarn.exceptions.ContainerNotFoundException; import org.apache.hadoop.yarn.exceptions.ContainerNotFoundException;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
@ -88,6 +89,11 @@ public abstract YarnClientApplication createApplication()
* application has been submitted and accepted by the ResourceManager. * application has been submitted and accepted by the ResourceManager.
* </p> * </p>
* *
* <p>
* Should provide an {@link ApplicationId} when submits a new application,
* otherwise, it will throw the {@link ApplicationIdNotProvidedException}
* </p>
*
* @param appContext * @param appContext
* {@link ApplicationSubmissionContext} containing all the details * {@link ApplicationSubmissionContext} containing all the details
* needed to submit a new application * needed to submit a new application

View File

@ -69,6 +69,7 @@
import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.client.api.YarnClientApplication; import org.apache.hadoop.yarn.client.api.YarnClientApplication;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.ApplicationIdNotProvidedException;
import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException; import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
@ -172,15 +173,21 @@ public YarnClientApplication createApplication()
submitApplication(ApplicationSubmissionContext appContext) submitApplication(ApplicationSubmissionContext appContext)
throws YarnException, IOException { throws YarnException, IOException {
ApplicationId applicationId = appContext.getApplicationId(); ApplicationId applicationId = appContext.getApplicationId();
appContext.setApplicationId(applicationId); if (applicationId == null) {
throw new ApplicationIdNotProvidedException(
"ApplicationId is not provided in ApplicationSubmissionContext");
}
SubmitApplicationRequest request = SubmitApplicationRequest request =
Records.newRecord(SubmitApplicationRequest.class); Records.newRecord(SubmitApplicationRequest.class);
request.setApplicationSubmissionContext(appContext); request.setApplicationSubmissionContext(appContext);
//TODO: YARN-1763:Handle RM failovers during the submitApplication call.
rmClient.submitApplication(request); rmClient.submitApplication(request);
int pollCount = 0; int pollCount = 0;
long startTime = System.currentTimeMillis(); long startTime = System.currentTimeMillis();
//TODO: YARN-1764:Handle RM fail overs after the submitApplication call.
while (true) { while (true) {
YarnApplicationState state = YarnApplicationState state =
getApplicationReport(applicationId).getYarnApplicationState(); getApplicationReport(applicationId).getYarnApplicationState();

View File

@ -59,6 +59,7 @@
import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.client.api.YarnClientApplication; import org.apache.hadoop.yarn.client.api.YarnClientApplication;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.ApplicationIdNotProvidedException;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.MiniYARNCluster; import org.apache.hadoop.yarn.server.MiniYARNCluster;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
@ -110,6 +111,24 @@ public void testSubmitApplication() {
YarnApplicationState.FAILED, YarnApplicationState.FAILED,
YarnApplicationState.KILLED YarnApplicationState.KILLED
}; };
// Submit an application without ApplicationId provided
// Should get ApplicationIdNotProvidedException
ApplicationSubmissionContext contextWithoutApplicationId =
mock(ApplicationSubmissionContext.class);
try {
client.submitApplication(contextWithoutApplicationId);
Assert.fail("Should throw the ApplicationIdNotProvidedException");
} catch (YarnException e) {
Assert.assertTrue(e instanceof ApplicationIdNotProvidedException);
Assert.assertTrue(e.getMessage().contains(
"ApplicationId is not provided in ApplicationSubmissionContext"));
} catch (IOException e) {
Assert.fail("IOException is not expected.");
}
// Submit the application with applicationId provided
// Should be successful
for (int i = 0; i < exitStates.length; ++i) { for (int i = 0; i < exitStates.length; ++i) {
ApplicationSubmissionContext context = ApplicationSubmissionContext context =
mock(ApplicationSubmissionContext.class); mock(ApplicationSubmissionContext.class);

View File

@ -31,6 +31,8 @@
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol; import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest; import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
@ -40,6 +42,7 @@
import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
@ -221,13 +224,24 @@ public RMApp submitApp(int masterMemory, String name, String user,
public RMApp submitApp(int masterMemory, String name, String user, public RMApp submitApp(int masterMemory, String name, String user,
Map<ApplicationAccessType, String> acls, boolean unmanaged, String queue, Map<ApplicationAccessType, String> acls, boolean unmanaged, String queue,
int maxAppAttempts, Credentials ts, String appType, int maxAppAttempts, Credentials ts, String appType,
boolean waitForAccepted, boolean keepContainers) boolean waitForAccepted, boolean keepContainers) throws Exception {
throws Exception { return submitApp(masterMemory, name, user, acls, unmanaged, queue,
ApplicationClientProtocol client = getClientRMService(); maxAppAttempts, ts, appType, waitForAccepted, keepContainers,
GetNewApplicationResponse resp = client.getNewApplication(Records false, null);
.newRecord(GetNewApplicationRequest.class)); }
ApplicationId appId = resp.getApplicationId();
public RMApp submitApp(int masterMemory, String name, String user,
Map<ApplicationAccessType, String> acls, boolean unmanaged, String queue,
int maxAppAttempts, Credentials ts, String appType,
boolean waitForAccepted, boolean keepContainers, boolean isAppIdProvided,
ApplicationId applicationId) throws Exception {
ApplicationId appId = isAppIdProvided ? applicationId : null;
ApplicationClientProtocol client = getClientRMService();
if (! isAppIdProvided) {
GetNewApplicationResponse resp = client.getNewApplication(Records
.newRecord(GetNewApplicationRequest.class));
appId = resp.getApplicationId();
}
SubmitApplicationRequest req = Records SubmitApplicationRequest req = Records
.newRecord(SubmitApplicationRequest.class); .newRecord(SubmitApplicationRequest.class);
ApplicationSubmissionContext sub = Records ApplicationSubmissionContext sub = Records
@ -502,4 +516,12 @@ public static MockAM launchAndRegisterAM(RMApp app, MockRM rm, MockNM nm)
return am; return am;
} }
public ApplicationReport getApplicationReport(ApplicationId appId)
throws YarnException, IOException {
ApplicationClientProtocol client = getClientRMService();
GetApplicationReportResponse response =
client.getApplicationReport(GetApplicationReportRequest
.newInstance(appId));
return response.getApplicationReport();
}
} }

View File

@ -0,0 +1,193 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ha.ClientBaseWithFixes;
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.conf.HAUtil;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
public class RMHATestBase extends ClientBaseWithFixes{
private static final int ZK_TIMEOUT_MS = 5000;
private static StateChangeRequestInfo requestInfo =
new StateChangeRequestInfo(
HAServiceProtocol.RequestSource.REQUEST_BY_USER);
protected Configuration configuration = new YarnConfiguration();
static MockRM rm1 = null;
static MockRM rm2 = null;
Configuration confForRM1;
Configuration confForRM2;
@Before
public void setup() throws Exception {
configuration.setBoolean(YarnConfiguration.RM_HA_ENABLED, true);
configuration.set(YarnConfiguration.RM_HA_IDS, "rm1,rm2");
configuration.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
configuration.set(YarnConfiguration.RM_STORE,
ZKRMStateStore.class.getName());
configuration.set(YarnConfiguration.RM_ZK_ADDRESS, hostPort);
configuration.setInt(YarnConfiguration.RM_ZK_TIMEOUT_MS, ZK_TIMEOUT_MS);
configuration.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false);
configuration.set(YarnConfiguration.RM_CLUSTER_ID, "test-yarn-cluster");
int base = 100;
for (String confKey : YarnConfiguration
.getServiceAddressConfKeys(configuration)) {
configuration.set(HAUtil.addSuffix(confKey, "rm1"), "0.0.0.0:"
+ (base + 20));
configuration.set(HAUtil.addSuffix(confKey, "rm2"), "0.0.0.0:"
+ (base + 40));
base = base * 2;
}
confForRM1 = new Configuration(configuration);
confForRM1.set(YarnConfiguration.RM_HA_ID, "rm1");
confForRM2 = new Configuration(configuration);
confForRM2.set(YarnConfiguration.RM_HA_ID, "rm2");
}
@After
public void teardown() {
if (rm1 != null) {
rm1.stop();
}
if (rm2 != null) {
rm2.stop();
}
}
protected MockAM launchAM(RMApp app, MockRM rm, MockNM nm)
throws Exception {
RMAppAttempt attempt = app.getCurrentAppAttempt();
nm.nodeHeartbeat(true);
MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId());
am.registerAppAttempt();
rm.waitForState(app.getApplicationId(), RMAppState.RUNNING);
rm.waitForState(app.getCurrentAppAttempt().getAppAttemptId(),
RMAppAttemptState.RUNNING);
return am;
}
protected void startRMs() throws IOException {
rm1 = new MockRM(confForRM1);
rm2 = new MockRM(confForRM2);
startRMs(rm1, confForRM1, rm2, confForRM2);
}
protected void startRMsWithCustomizedRMAppManager() throws IOException {
final Configuration conf1 = new Configuration(confForRM1);
rm1 = new MockRM(conf1) {
@Override
protected RMAppManager createRMAppManager() {
return new MyRMAppManager(this.rmContext, this.scheduler,
this.masterService, this.applicationACLsManager, conf1);
}
};
rm2 = new MockRM(confForRM2);
startRMs(rm1, conf1, rm2, confForRM2);
}
private static class MyRMAppManager extends RMAppManager {
private Configuration conf;
private RMContext rmContext;
public MyRMAppManager(RMContext context, YarnScheduler scheduler,
ApplicationMasterService masterService,
ApplicationACLsManager applicationACLsManager, Configuration conf) {
super(context, scheduler, masterService, applicationACLsManager, conf);
this.conf = conf;
this.rmContext = context;
}
@Override
protected void submitApplication(
ApplicationSubmissionContext submissionContext, long submitTime,
String user, boolean isRecovered, RMState state) throws YarnException {
//Do nothing, just add the application to RMContext
RMAppImpl application =
new RMAppImpl(submissionContext.getApplicationId(), this.rmContext,
this.conf, submissionContext.getApplicationName(), user,
submissionContext.getQueue(), submissionContext,
this.rmContext.getScheduler(),
this.rmContext.getApplicationMasterService(),
submitTime, submissionContext.getApplicationType(),
submissionContext.getApplicationTags());
this.rmContext.getRMApps().put(submissionContext.getApplicationId(),
application);
//Do not send RMAppEventType.START event
//so the state of Application will not reach to NEW_SAVING state.
}
}
protected boolean isFinalState(RMAppState state) {
return state.equals(RMAppState.FINISHING)
|| state.equals(RMAppState.FINISHED) || state.equals(RMAppState.FAILED)
|| state.equals(RMAppState.KILLED);
}
protected void explicitFailover() throws IOException {
rm1.adminService.transitionToStandby(requestInfo);
rm2.adminService.transitionToActive(requestInfo);
Assert.assertTrue(rm1.getRMContext().getHAServiceState()
== HAServiceState.STANDBY);
Assert.assertTrue(rm2.getRMContext().getHAServiceState()
== HAServiceState.ACTIVE);
}
protected void startRMs(MockRM rm1, Configuration confForRM1, MockRM rm2,
Configuration confForRM2) throws IOException {
rm1.init(confForRM1);
rm1.start();
Assert.assertTrue(rm1.getRMContext().getHAServiceState()
== HAServiceState.STANDBY);
rm2.init(confForRM2);
rm2.start();
Assert.assertTrue(rm2.getRMContext().getHAServiceState()
== HAServiceState.STANDBY);
rm1.adminService.transitionToActive(requestInfo);
Assert.assertTrue(rm1.getRMContext().getHAServiceState()
== HAServiceState.ACTIVE);
}
}

View File

@ -24,86 +24,29 @@
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ha.ClientBaseWithFixes;
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest; import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse; import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.conf.HAUtil;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException; import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.security.QueueACLsManager; import org.apache.hadoop.yarn.server.resourcemanager.security.QueueACLsManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager; import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager;
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before;
import org.junit.Test; import org.junit.Test;
public class TestKillApplicationWithRMHA extends ClientBaseWithFixes{ public class TestKillApplicationWithRMHA extends RMHATestBase{
public static final Log LOG = LogFactory public static final Log LOG = LogFactory
.getLog(TestKillApplicationWithRMHA.class); .getLog(TestKillApplicationWithRMHA.class);
private static final int ZK_TIMEOUT_MS = 5000;
private static StateChangeRequestInfo requestInfo =
new StateChangeRequestInfo(
HAServiceProtocol.RequestSource.REQUEST_BY_USER);
private Configuration configuration = new YarnConfiguration();
static MockRM rm1 = null;
static MockRM rm2 = null;
Configuration confForRM1;
Configuration confForRM2;
@Before
public void setup() throws Exception {
configuration.setBoolean(YarnConfiguration.RM_HA_ENABLED, true);
configuration.set(YarnConfiguration.RM_HA_IDS, "rm1,rm2");
configuration.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
configuration.set(YarnConfiguration.RM_STORE,
ZKRMStateStore.class.getName());
configuration.set(YarnConfiguration.RM_ZK_ADDRESS, hostPort);
configuration.setInt(YarnConfiguration.RM_ZK_TIMEOUT_MS, ZK_TIMEOUT_MS);
configuration.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false);
configuration.set(YarnConfiguration.RM_CLUSTER_ID, "test-yarn-cluster");
int base = 100;
for (String confKey : YarnConfiguration
.getServiceAddressConfKeys(configuration)) {
configuration.set(HAUtil.addSuffix(confKey, "rm1"), "0.0.0.0:"
+ (base + 20));
configuration.set(HAUtil.addSuffix(confKey, "rm2"), "0.0.0.0:"
+ (base + 40));
base = base * 2;
}
confForRM1 = new Configuration(configuration);
confForRM1.set(YarnConfiguration.RM_HA_ID, "rm1");
confForRM2 = new Configuration(configuration);
confForRM2.set(YarnConfiguration.RM_HA_ID, "rm2");
}
@After
public void teardown() {
if (rm1 != null) {
rm1.stop();
}
if (rm2 != null) {
rm2.stop();
}
}
@Test (timeout = 20000) @Test (timeout = 20000)
public void testKillAppWhenFailoverHappensAtNewState() public void testKillAppWhenFailoverHappensAtNewState()
@ -221,18 +164,6 @@ public void testKillAppWhenFailOverHappensDuringApplicationKill()
} }
private MockAM launchAM(RMApp app, MockRM rm, MockNM nm)
throws Exception {
RMAppAttempt attempt = app.getCurrentAppAttempt();
nm.nodeHeartbeat(true);
MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId());
am.registerAppAttempt();
rm.waitForState(app.getApplicationId(), RMAppState.RUNNING);
rm.waitForState(app.getCurrentAppAttempt().getAppAttemptId(),
RMAppAttemptState.RUNNING);
return am;
}
private void failOverAndKillApp(ApplicationId appId, private void failOverAndKillApp(ApplicationId appId,
ApplicationAttemptId appAttemptId, RMAppState initialRMAppState, ApplicationAttemptId appAttemptId, RMAppState initialRMAppState,
RMAppAttemptState initialRMAppAttemptState, RMAppAttemptState initialRMAppAttemptState,
@ -256,29 +187,6 @@ private void failOverAndKillApp(ApplicationId appId,
killApplication(rm2, appId, null, initialRMAppState); killApplication(rm2, appId, null, initialRMAppState);
} }
private void startRMs() throws IOException {
rm1 = new MockRM(confForRM1);
rm2 = new MockRM(confForRM2);
startRMs(rm1, confForRM1, rm2, confForRM2);
}
private void startRMsWithCustomizedRMAppManager() throws IOException {
final Configuration conf1 = new Configuration(confForRM1);
rm1 = new MockRM(conf1) {
@Override
protected RMAppManager createRMAppManager() {
return new MyRMAppManager(this.rmContext, this.scheduler,
this.masterService, this.applicationACLsManager, conf1);
}
};
rm2 = new MockRM(confForRM2);
startRMs(rm1, conf1, rm2, confForRM2);
}
private void startRMsWithCustomizedClientRMService() throws IOException { private void startRMsWithCustomizedClientRMService() throws IOException {
final Configuration conf1 = new Configuration(confForRM1); final Configuration conf1 = new Configuration(confForRM1);
@ -296,39 +204,6 @@ protected ClientRMService createClientRMService() {
startRMs(rm1, conf1, rm2, confForRM2); startRMs(rm1, conf1, rm2, confForRM2);
} }
private static class MyRMAppManager extends RMAppManager {
private Configuration conf;
private RMContext rmContext;
public MyRMAppManager(RMContext context, YarnScheduler scheduler,
ApplicationMasterService masterService,
ApplicationACLsManager applicationACLsManager, Configuration conf) {
super(context, scheduler, masterService, applicationACLsManager, conf);
this.conf = conf;
this.rmContext = context;
}
@Override
protected void submitApplication(
ApplicationSubmissionContext submissionContext, long submitTime,
String user, boolean isRecovered, RMState state) throws YarnException {
//Do nothing, just add the application to RMContext
RMAppImpl application =
new RMAppImpl(submissionContext.getApplicationId(), this.rmContext,
this.conf, submissionContext.getApplicationName(), user,
submissionContext.getQueue(), submissionContext,
this.rmContext.getScheduler(),
this.rmContext.getApplicationMasterService(),
submitTime, submissionContext.getApplicationType(),
submissionContext.getApplicationTags());
this.rmContext.getRMApps().put(submissionContext.getApplicationId(),
application);
//Do not send RMAppEventType.START event
//so the state of Application will not reach to NEW_SAVING state.
}
}
private static class MyClientRMService extends ClientRMService { private static class MyClientRMService extends ClientRMService {
private RMContext rmContext; private RMContext rmContext;
@ -366,21 +241,6 @@ public KillApplicationResponse forceKillApplication(
} }
} }
private boolean isFinalState(RMAppState state) {
return state.equals(RMAppState.FINISHING)
|| state.equals(RMAppState.FINISHED) || state.equals(RMAppState.FAILED)
|| state.equals(RMAppState.KILLED);
}
private void explicitFailover() throws IOException {
rm1.adminService.transitionToStandby(requestInfo);
rm2.adminService.transitionToActive(requestInfo);
Assert.assertTrue(rm1.getRMContext().getHAServiceState()
== HAServiceState.STANDBY);
Assert.assertTrue(rm2.getRMContext().getHAServiceState()
== HAServiceState.ACTIVE);
}
private void killApplication(MockRM rm, ApplicationId appId, private void killApplication(MockRM rm, ApplicationId appId,
ApplicationAttemptId appAttemptId, RMAppState rmAppState) ApplicationAttemptId appAttemptId, RMAppState rmAppState)
throws Exception { throws Exception {
@ -396,21 +256,4 @@ private void killApplication(MockRM rm, ApplicationId appId,
// no new attempt is created. // no new attempt is created.
Assert.assertEquals(1, loadedApp0.getAppAttempts().size()); Assert.assertEquals(1, loadedApp0.getAppAttempts().size());
} }
private void startRMs(MockRM rm1, Configuration confForRM1, MockRM rm2,
Configuration confForRM2) throws IOException {
rm1.init(confForRM1);
rm1.start();
Assert.assertTrue(rm1.getRMContext().getHAServiceState()
== HAServiceState.STANDBY);
rm2.init(confForRM2);
rm2.start();
Assert.assertTrue(rm2.getRMContext().getHAServiceState()
== HAServiceState.STANDBY);
rm1.adminService.transitionToActive(requestInfo);
Assert.assertTrue(rm1.getRMContext().getHAServiceState()
== HAServiceState.ACTIVE);
}
} }

View File

@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager;
import junit.framework.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.junit.Test;
public class TestSubmitApplicationWithRMHA extends RMHATestBase{
public static final Log LOG = LogFactory
.getLog(TestSubmitApplicationWithRMHA.class);
@Test
public void
testHandleRMHABeforeSubmitApplicationCallWithSavedApplicationState()
throws Exception {
// start two RMs, and transit rm1 to active, rm2 to standby
startRMs();
// get a new applicationId from rm1
ApplicationId appId = rm1.getNewAppId().getApplicationId();
// Do the failover
explicitFailover();
// submit the application with previous assigned applicationId
// to current active rm: rm2
RMApp app1 =
rm2.submitApp(200, "", UserGroupInformation
.getCurrentUser().getShortUserName(), null, false, null,
configuration.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null,
false, false, true, appId);
// verify application submission
verifySubmitApp(rm2, app1, appId);
}
private void verifySubmitApp(MockRM rm, RMApp app,
ApplicationId expectedAppId) throws Exception {
int maxWaittingTimes = 20;
int count = 0;
while (true) {
YarnApplicationState state =
rm.getApplicationReport(app.getApplicationId())
.getYarnApplicationState();
if (!state.equals(YarnApplicationState.NEW) &&
!state.equals(YarnApplicationState.NEW_SAVING)) {
break;
}
if (count > maxWaittingTimes) {
break;
}
Thread.sleep(200);
count++;
}
// Verify submittion is successful
Assert.assertFalse(rm.getApplicationReport(app.getApplicationId())
.getYarnApplicationState() == YarnApplicationState.NEW);
Assert.assertFalse(rm.getApplicationReport(app.getApplicationId())
.getYarnApplicationState() == YarnApplicationState.NEW_SAVING);
Assert.assertEquals(expectedAppId, app.getApplicationId());
}
}