YARN-1541. Changed ResourceManager to invalidate ApplicationMaster host/port information once an AM crashes. Contributed by Jian He.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1553772 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
defeef6fe4
commit
8f0bf54d34
@ -191,6 +191,9 @@ Release 2.4.0 - UNRELEASED
|
||||
|
||||
YARN-1523. Use StandbyException instead of RMNotYetReadyException (kasha)
|
||||
|
||||
YARN-1541. Changed ResourceManager to invalidate ApplicationMaster host/port
|
||||
information once an AM crashes. (Jian He via vinodkv)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
BUG FIXES
|
||||
|
@ -139,7 +139,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
|
||||
|
||||
private float progress = 0;
|
||||
private String host = "N/A";
|
||||
private int rpcPort;
|
||||
private int rpcPort = -1;
|
||||
private String originalTrackingUrl = "N/A";
|
||||
private String proxiedTrackingUrl = "N/A";
|
||||
private long startTime = 0;
|
||||
@ -526,6 +526,11 @@ private void setTrackingUrlToRMAppPage() {
|
||||
proxiedTrackingUrl = originalTrackingUrl;
|
||||
}
|
||||
|
||||
private void invalidateAMHostAndPort() {
|
||||
this.host = "N/A";
|
||||
this.rpcPort = -1;
|
||||
}
|
||||
|
||||
// This is only used for RMStateStore. Normal operation must invoke the secret
|
||||
// manager to get the key and not use the local key directly.
|
||||
@Override
|
||||
@ -1033,6 +1038,7 @@ public void transition(RMAppAttemptImpl appAttempt,
|
||||
{
|
||||
// don't leave the tracking URL pointing to a non-existent AM
|
||||
appAttempt.setTrackingUrlToRMAppPage();
|
||||
appAttempt.invalidateAMHostAndPort();
|
||||
appEvent =
|
||||
new RMAppFailedAttemptEvent(applicationId,
|
||||
RMAppEventType.ATTEMPT_KILLED,
|
||||
@ -1043,6 +1049,7 @@ public void transition(RMAppAttemptImpl appAttempt,
|
||||
{
|
||||
// don't leave the tracking URL pointing to a non-existent AM
|
||||
appAttempt.setTrackingUrlToRMAppPage();
|
||||
appAttempt.invalidateAMHostAndPort();
|
||||
appEvent =
|
||||
new RMAppFailedAttemptEvent(applicationId,
|
||||
RMAppEventType.ATTEMPT_FAILED,
|
||||
@ -1059,7 +1066,6 @@ public void transition(RMAppAttemptImpl appAttempt,
|
||||
appAttempt.eventHandler.handle(appEvent);
|
||||
appAttempt.eventHandler.handle(new AppAttemptRemovedSchedulerEvent(
|
||||
appAttemptId, finalAttemptState));
|
||||
|
||||
appAttempt.removeCredentials(appAttempt);
|
||||
}
|
||||
}
|
||||
|
@ -19,26 +19,33 @@
|
||||
package org.apache.hadoop.yarn.server.resourcemanager;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.EnumSet;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
|
||||
import javax.security.auth.login.Configuration;
|
||||
|
||||
import junit.framework.Assert;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
||||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerState;
|
||||
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
||||
import org.apache.hadoop.yarn.api.records.NMToken;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||
import org.apache.hadoop.yarn.api.records.Token;
|
||||
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
|
||||
@ -368,6 +375,111 @@ public void testActivatingApplicationAfterAddingNM() throws Exception {
|
||||
rm1.stop();
|
||||
}
|
||||
|
||||
// This is to test AM Host and rpc port are invalidated after the am attempt
|
||||
// is killed or failed, so that client doesn't get the wrong information.
|
||||
@Test (timeout = 80000)
|
||||
public void testInvalidateAMHostPortWhenAMFailedOrKilled() throws Exception {
|
||||
YarnConfiguration conf = new YarnConfiguration();
|
||||
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1);
|
||||
MockRM rm1 = new MockRM(conf);
|
||||
rm1.start();
|
||||
|
||||
// a succeeded app
|
||||
RMApp app1 = rm1.submitApp(200);
|
||||
MockNM nm1 =
|
||||
new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
|
||||
nm1.registerNode();
|
||||
MockAM am1 = launchAM(app1, rm1, nm1);
|
||||
finishApplicationMaster(app1, rm1, nm1, am1);
|
||||
|
||||
// a failed app
|
||||
RMApp app2 = rm1.submitApp(200);
|
||||
MockAM am2 = launchAM(app2, rm1, nm1);
|
||||
nm1.nodeHeartbeat(am2.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
|
||||
am2.waitForState(RMAppAttemptState.FAILED);
|
||||
rm1.waitForState(app2.getApplicationId(), RMAppState.FAILED);
|
||||
|
||||
// a killed app
|
||||
RMApp app3 = rm1.submitApp(200);
|
||||
MockAM am3 = launchAM(app3, rm1, nm1);
|
||||
rm1.killApp(app3.getApplicationId());
|
||||
rm1.waitForState(app3.getApplicationId(), RMAppState.KILLED);
|
||||
rm1.waitForState(am3.getApplicationAttemptId(), RMAppAttemptState.KILLED);
|
||||
|
||||
GetApplicationsRequest request1 =
|
||||
GetApplicationsRequest.newInstance(EnumSet.of(
|
||||
YarnApplicationState.FINISHED, YarnApplicationState.KILLED,
|
||||
YarnApplicationState.FAILED));
|
||||
GetApplicationsResponse response1 =
|
||||
rm1.getClientRMService().getApplications(request1);
|
||||
List<ApplicationReport> appList1 = response1.getApplicationList();
|
||||
|
||||
Assert.assertEquals(3, appList1.size());
|
||||
for (ApplicationReport report : appList1) {
|
||||
// killed/failed apps host and rpc port are invalidated.
|
||||
if (report.getApplicationId().equals(app2.getApplicationId())
|
||||
|| report.getApplicationId().equals(app3.getApplicationId())) {
|
||||
Assert.assertEquals("N/A", report.getHost());
|
||||
Assert.assertEquals(-1, report.getRpcPort());
|
||||
}
|
||||
// succeeded app's host and rpc port is not invalidated
|
||||
if (report.getApplicationId().equals(app1.getApplicationId())) {
|
||||
Assert.assertFalse(report.getHost().equals("N/A"));
|
||||
Assert.assertTrue(report.getRpcPort() != -1);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Test (timeout = 60000)
|
||||
public void testInvalidatedAMHostPortOnAMRestart() throws Exception {
|
||||
YarnConfiguration conf = new YarnConfiguration();
|
||||
MockRM rm1 = new MockRM(conf);
|
||||
rm1.start();
|
||||
MockNM nm1 =
|
||||
new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
|
||||
nm1.registerNode();
|
||||
|
||||
// a failed app
|
||||
RMApp app2 = rm1.submitApp(200);
|
||||
MockAM am2 = launchAM(app2, rm1, nm1);
|
||||
nm1
|
||||
.nodeHeartbeat(am2.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
|
||||
am2.waitForState(RMAppAttemptState.FAILED);
|
||||
rm1.waitForState(app2.getApplicationId(), RMAppState.ACCEPTED);
|
||||
|
||||
// before new attempt is launched, the app report returns the invalid AM
|
||||
// host and port.
|
||||
GetApplicationReportRequest request1 =
|
||||
GetApplicationReportRequest.newInstance(app2.getApplicationId());
|
||||
ApplicationReport report1 =
|
||||
rm1.getClientRMService().getApplicationReport(request1)
|
||||
.getApplicationReport();
|
||||
Assert.assertEquals("N/A", report1.getHost());
|
||||
Assert.assertEquals(-1, report1.getRpcPort());
|
||||
}
|
||||
|
||||
private MockAM launchAM(RMApp app, MockRM rm, MockNM nm)
|
||||
throws Exception {
|
||||
RMAppAttempt attempt = app.getCurrentAppAttempt();
|
||||
nm.nodeHeartbeat(true);
|
||||
MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId());
|
||||
am.registerAppAttempt();
|
||||
rm.waitForState(app.getApplicationId(), RMAppState.RUNNING);
|
||||
return am;
|
||||
}
|
||||
|
||||
private void finishApplicationMaster(RMApp rmApp, MockRM rm, MockNM nm,
|
||||
MockAM am) throws Exception {
|
||||
FinishApplicationMasterRequest req =
|
||||
FinishApplicationMasterRequest.newInstance(
|
||||
FinalApplicationStatus.SUCCEEDED, "", "");
|
||||
am.unregisterAppAttempt(req);
|
||||
am.waitForState(RMAppAttemptState.FINISHING);
|
||||
nm.nodeHeartbeat(am.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
|
||||
am.waitForState(RMAppAttemptState.FINISHED);
|
||||
rm.waitForState(rmApp.getApplicationId(), RMAppState.FINISHED);
|
||||
}
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
TestRM t = new TestRM();
|
||||
t.testGetNewAppId();
|
||||
|
@ -806,6 +806,7 @@ public void testRunningToFailed() {
|
||||
applicationAttempt.getAppAttemptId().getApplicationId());
|
||||
assertEquals(rmAppPageUrl, applicationAttempt.getOriginalTrackingUrl());
|
||||
assertEquals(rmAppPageUrl, applicationAttempt.getTrackingUrl());
|
||||
verifyAMHostAndPortInvalidated();
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -841,6 +842,7 @@ public void testRunningToKilled() {
|
||||
assertEquals(rmAppPageUrl, applicationAttempt.getOriginalTrackingUrl());
|
||||
assertEquals(rmAppPageUrl, applicationAttempt.getTrackingUrl());
|
||||
verifyTokenCount(applicationAttempt.getAppAttemptId(), 1);
|
||||
verifyAMHostAndPortInvalidated();
|
||||
}
|
||||
|
||||
@Test(timeout=10000)
|
||||
@ -878,6 +880,7 @@ public void testRunningExpire() {
|
||||
assertEquals(rmAppPageUrl, applicationAttempt.getOriginalTrackingUrl());
|
||||
assertEquals(rmAppPageUrl, applicationAttempt.getTrackingUrl());
|
||||
verifyTokenCount(applicationAttempt.getAppAttemptId(), 1);
|
||||
verifyAMHostAndPortInvalidated();
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -1125,4 +1128,9 @@ private void verifyAttemptFinalStateSaved() {
|
||||
verify(store, times(1)).updateApplicationAttemptState(
|
||||
any(ApplicationAttemptState.class));
|
||||
}
|
||||
|
||||
private void verifyAMHostAndPortInvalidated() {
|
||||
assertEquals("N/A", applicationAttempt.getHost());
|
||||
assertEquals(-1, applicationAttempt.getRpcPort());
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user