YARN-4428. Redirect RM page to AHS page when AHS turned on and RM page is not available. Contributed by Chang Li

This commit is contained in:
Jason Lowe 2016-01-29 21:48:54 +00:00
parent f4a57d4a53
commit 772ea7b41b
5 changed files with 197 additions and 2 deletions

View File

@ -1441,6 +1441,9 @@ Release 2.7.3 - UNRELEASED
YARN-4598. Invalid event: RESOURCE_FAILED at
CONTAINER_CLEANEDUP_AFTER_KILL (tangshangwen via jlowe)
YARN-4428. Redirect RM page to AHS page when AHS turned on and RM page is
not available (Chang Li via jlowe)
Release 2.7.2 - 2016-01-25
INCOMPATIBLE CHANGES

View File

@ -58,6 +58,7 @@
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factories.RecordFactory;
@ -628,6 +629,21 @@ private void setTrackingUrlToRMAppPage(RMAppAttemptState stateToBeStored) {
}
}
private void setTrackingUrlToAHSPage(RMAppAttemptState stateToBeStored) {
originalTrackingUrl = pjoin(
WebAppUtils.getHttpSchemePrefix(conf) +
WebAppUtils.getAHSWebAppURLWithoutScheme(conf),
"applicationhistory", "app", getAppAttemptId().getApplicationId());
switch (stateToBeStored) {
case KILLED:
case FAILED:
proxiedTrackingUrl = originalTrackingUrl;
break;
default:
break;
}
}
private void invalidateAMHostAndPort() {
this.host = "N/A";
this.rpcPort = -1;
@ -1211,7 +1227,12 @@ private void rememberTargetTransitionsAndStoreState(RMAppAttemptEvent event,
String diags = null;
// don't leave the tracking URL pointing to a non-existent AM
setTrackingUrlToRMAppPage(stateToBeStored);
if (conf.getBoolean(YarnConfiguration.APPLICATION_HISTORY_ENABLED,
YarnConfiguration.DEFAULT_APPLICATION_HISTORY_ENABLED)) {
setTrackingUrlToAHSPage(stateToBeStored);
} else {
setTrackingUrlToRMAppPage(stateToBeStored);
}
String finalTrackingUrl = getOriginalTrackingUrl();
FinalApplicationStatus finalStatus = null;
int exitStatus = ContainerExitStatus.INVALID;

View File

@ -26,6 +26,7 @@
import org.apache.hadoop.yarn.api.ApplicationBaseProtocol;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.util.RMHAUtils;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.webapp.GenericExceptionHandler;
import org.apache.hadoop.yarn.webapp.WebApp;
@ -122,4 +123,8 @@ public String getHAZookeeperConnectionState() {
return rm.getRMContext().getRMAdminService()
.getHAZookeeperConnectionState();
}
public RMContext getRMContext() {
return rm.getRMContext();
}
}

View File

@ -18,6 +18,8 @@
package org.apache.hadoop.yarn.server.resourcemanager.webapp;
import static org.apache.hadoop.yarn.util.StringHelper.pjoin;
import java.io.IOException;
import java.io.PrintWriter;
import java.net.InetSocketAddress;
@ -35,9 +37,19 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.http.HtmlQuoting;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.webproxy.ProxyUriUtils;
import org.apache.hadoop.yarn.util.Apps;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.webapp.YarnWebParams;
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.Sets;
import com.google.inject.Injector;
@ -45,6 +57,8 @@
@Singleton
public class RMWebAppFilter extends GuiceContainer {
private static final Logger LOG =
LoggerFactory.getLogger(RMWebAppFilter.class);
private Injector injector;
/**
@ -56,6 +70,8 @@ public class RMWebAppFilter extends GuiceContainer {
private static final Set<String> NON_REDIRECTED_URIS = Sets.newHashSet(
"/conf", "/stacks", "/logLevel", "/logs");
private String path;
private boolean ahsEnabled;
private String ahsPageURLPrefix;
private static final int BASIC_SLEEP_TIME = 5;
private static final int MAX_SLEEP_TIME = 5 * 60;
private static final Random randnum = new Random();
@ -76,6 +92,13 @@ public RMWebAppFilter(Injector injector, Configuration conf) {
path = YarnConfiguration.useHttps(conf)
? "https://" + path
: "http://" + path;
ahsEnabled = conf.getBoolean(
YarnConfiguration.APPLICATION_HISTORY_ENABLED,
YarnConfiguration.DEFAULT_APPLICATION_HISTORY_ENABLED);
ahsPageURLPrefix = pjoin(
WebAppUtils.getHttpSchemePrefix(conf) +
WebAppUtils.getAHSWebAppURLWithoutScheme(
conf), "applicationhistory");
}
@Override
@ -137,11 +160,78 @@ && shouldRedirect(rmWebApp, uri)) {
}
}
return;
} else if (ahsEnabled) {
String ahsRedirectUrl = ahsRedirectPath(uri, rmWebApp);
if(ahsRedirectUrl != null) {
response.setHeader("Location", ahsRedirectUrl);
response.setStatus(HttpServletResponse.SC_TEMPORARY_REDIRECT);
return;
}
}
super.doFilter(request, response, chain);
}
private String ahsRedirectPath(String uri, RMWebApp rmWebApp) {
// TODO: Commonize URL parsing code. Will be done in YARN-4642.
String redirectPath = null;
if(uri.contains("/cluster/")) {
String[] parts = uri.split("/");
if(parts.length > 3) {
RMContext context = rmWebApp.getRMContext();
String type = parts[2];
ApplicationId appId = null;
ApplicationAttemptId appAttemptId = null;
ContainerId containerId = null;
switch(type){
case "app":
try {
appId = Apps.toAppID(parts[3]);
} catch (YarnRuntimeException | NumberFormatException e) {
LOG.debug("Error parsing {} as an ApplicationId",
parts[3], e);
return redirectPath;
}
if(!context.getRMApps().containsKey(appId)) {
redirectPath = pjoin(ahsPageURLPrefix, "app", appId);
}
break;
case "appattempt":
try{
appAttemptId = ConverterUtils.toApplicationAttemptId(parts[3]);
} catch (IllegalArgumentException e) {
LOG.debug("Error parsing {} as an ApplicationAttemptId",
parts[3], e);
return redirectPath;
}
if(!context.getRMApps().containsKey(
appAttemptId.getApplicationId())) {
redirectPath = pjoin(ahsPageURLPrefix,
"appattempt", appAttemptId);
}
break;
case "container":
try {
containerId = ContainerId.fromString(parts[3]);
} catch (IllegalArgumentException e) {
LOG.debug("Error parsing {} as an ContainerId",
parts[3], e);
return redirectPath;
}
if(!context.getRMApps().containsKey(
containerId.getApplicationAttemptId().getApplicationId())) {
redirectPath = pjoin(ahsPageURLPrefix,
"container", containerId);
}
break;
default:
break;
}
}
}
return redirectPath;
}
private boolean shouldRedirect(RMWebApp rmWebApp, String uri) {
return !uri.equals("/" + rmWebApp.wsName() + "/v1/cluster/info")
&& !uri.equals("/" + rmWebApp.name() + "/cluster")

View File

@ -68,6 +68,7 @@
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.event.InlineDispatcher;
@ -137,6 +138,9 @@ public class TestRMAppAttemptTransitions {
private static final String FAILED_DIAGNOSTICS = "Attempt failed by user.";
private static final String RM_WEBAPP_ADDR =
WebAppUtils.getResolvedRMWebAppURLWithScheme(new Configuration());
private static final String AHS_WEBAPP_ADDR =
WebAppUtils.getHttpSchemePrefix(new Configuration()) +
WebAppUtils.getAHSWebAppURLWithoutScheme(new Configuration());
private boolean isSecurityEnabled;
private RMContext rmContext;
@ -1090,6 +1094,78 @@ public void testLaunchedExpire() {
verifyApplicationAttemptFinished(RMAppAttemptState.FAILED);
}
@SuppressWarnings("unchecked")
@Test(timeout=10000)
public void testLaunchedFailWhileAHSEnabled() {
Configuration myConf = new Configuration(conf);
myConf.setBoolean(YarnConfiguration.APPLICATION_HISTORY_ENABLED, true);
ApplicationId applicationId = MockApps.newAppID(appId);
ApplicationAttemptId applicationAttemptId =
ApplicationAttemptId.newInstance(applicationId, 2);
RMAppAttempt myApplicationAttempt =
new RMAppAttemptImpl(applicationAttempt.getAppAttemptId(),
spyRMContext, scheduler,masterService,
submissionContext, myConf, false,
BuilderUtils.newResourceRequest(
RMAppAttemptImpl.AM_CONTAINER_PRIORITY, ResourceRequest.ANY,
submissionContext.getResource(), 1));
//submit, schedule and allocate app attempt
myApplicationAttempt.handle(
new RMAppAttemptEvent(myApplicationAttempt.getAppAttemptId(),
RMAppAttemptEventType.START));
myApplicationAttempt.handle(
new RMAppAttemptEvent(
myApplicationAttempt.getAppAttemptId(),
RMAppAttemptEventType.ATTEMPT_ADDED));
Container amContainer = mock(Container.class);
Resource resource = BuilderUtils.newResource(2048, 1);
when(amContainer.getId()).thenReturn(
BuilderUtils.newContainerId(myApplicationAttempt.getAppAttemptId(), 1));
when(amContainer.getResource()).thenReturn(resource);
Allocation allocation = mock(Allocation.class);
when(allocation.getContainers()).
thenReturn(Collections.singletonList(amContainer));
when(scheduler.allocate(any(ApplicationAttemptId.class), any(List.class),
any(List.class), any(List.class), any(List.class), any(List.class),
any(List.class))).thenReturn(allocation);
RMContainer rmContainer = mock(RMContainerImpl.class);
when(scheduler.getRMContainer(amContainer.getId())).thenReturn(rmContainer);
myApplicationAttempt.handle(
new RMAppAttemptEvent(myApplicationAttempt.getAppAttemptId(),
RMAppAttemptEventType.CONTAINER_ALLOCATED));
assertEquals(RMAppAttemptState.ALLOCATED_SAVING,
myApplicationAttempt.getAppAttemptState());
myApplicationAttempt.handle(
new RMAppAttemptEvent(myApplicationAttempt.getAppAttemptId(),
RMAppAttemptEventType.ATTEMPT_NEW_SAVED));
// launch app attempt
myApplicationAttempt.handle(
new RMAppAttemptEvent(myApplicationAttempt.getAppAttemptId(),
RMAppAttemptEventType.LAUNCHED));
assertEquals(YarnApplicationAttemptState.LAUNCHED,
myApplicationAttempt.createApplicationAttemptState());
//fail container right after launched
NodeId anyNodeId = NodeId.newInstance("host", 1234);
myApplicationAttempt.handle(
new RMAppAttemptContainerFinishedEvent(
myApplicationAttempt.getAppAttemptId(),
BuilderUtils.newContainerStatus(amContainer.getId(),
ContainerState.COMPLETE, "", 0,
amContainer.getResource()), anyNodeId));
sendAttemptUpdateSavedEvent(myApplicationAttempt);
assertEquals(RMAppAttemptState.FAILED,
myApplicationAttempt.getAppAttemptState());
String rmAppPageUrl = pjoin(AHS_WEBAPP_ADDR, "applicationhistory", "app",
myApplicationAttempt.getAppAttemptId().getApplicationId());
assertEquals(rmAppPageUrl, myApplicationAttempt.getOriginalTrackingUrl());
assertEquals(rmAppPageUrl, myApplicationAttempt.getTrackingUrl());
}
@Test(timeout=20000)
public void testRunningExpire() {
Container amContainer = allocateApplicationAttempt();