YARN-1144. Unmanaged AMs registering a tracking URI should not be proxy-fied. (tucu)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1521039 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
1cd7b067f7
commit
f2e0a125f4
@ -165,6 +165,9 @@ Release 2.1.1-beta - UNRELEASED
|
||||
YARN-1049. ContainerExistStatus should define a status for preempted
|
||||
containers. (tucu)
|
||||
|
||||
YARN-1144. Unmanaged AMs registering a tracking URI should not be
|
||||
proxy-fied. (tucu)
|
||||
|
||||
Release 2.1.0-beta - 2013-08-22
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
@ -440,7 +440,8 @@ public int getRpcPort() {
|
||||
public String getTrackingUrl() {
|
||||
this.readLock.lock();
|
||||
try {
|
||||
return this.proxiedTrackingUrl;
|
||||
return (getSubmissionContext().getUnmanagedAM()) ?
|
||||
this.origTrackingUrl : this.proxiedTrackingUrl;
|
||||
} finally {
|
||||
this.readLock.unlock();
|
||||
}
|
||||
@ -961,7 +962,7 @@ public void transition(RMAppAttemptImpl appAttempt,
|
||||
}
|
||||
}
|
||||
|
||||
private static final class AMRegisteredTransition extends BaseTransition {
|
||||
static final class AMRegisteredTransition extends BaseTransition {
|
||||
@Override
|
||||
public void transition(RMAppAttemptImpl appAttempt,
|
||||
RMAppAttemptEvent event) {
|
||||
|
@ -0,0 +1,77 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt;
|
||||
|
||||
import junit.framework.Assert;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.event.Dispatcher;
|
||||
import org.apache.hadoop.yarn.event.EventHandler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRegistrationEvent;
|
||||
|
||||
import org.apache.hadoop.yarn.server.webproxy.ProxyUriUtils;
|
||||
import org.junit.Test;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
public class TestRMAppAttemptImpl {
|
||||
|
||||
private void testTrackingUrl(String url, boolean unmanaged) {
|
||||
ApplicationAttemptId attemptId = ApplicationAttemptId.newInstance
|
||||
(ApplicationId.newInstance(1, 2), 1);
|
||||
EventHandler handler = Mockito.mock(EventHandler.class);
|
||||
Dispatcher dispatcher = Mockito.mock(Dispatcher.class);
|
||||
Mockito.when(dispatcher.getEventHandler()).thenReturn(handler);
|
||||
RMContext rmContext = Mockito.mock(RMContext.class);
|
||||
Mockito.when(rmContext.getDispatcher()).thenReturn(dispatcher);
|
||||
|
||||
ApplicationSubmissionContext appContext =
|
||||
Mockito.mock(ApplicationSubmissionContext.class);
|
||||
Mockito.when(appContext.getUnmanagedAM()).thenReturn(unmanaged);
|
||||
|
||||
RMAppAttemptImpl attempt = new RMAppAttemptImpl(attemptId, rmContext, null,
|
||||
null, appContext, new YarnConfiguration(), null);
|
||||
RMAppAttemptRegistrationEvent event =
|
||||
Mockito.mock(RMAppAttemptRegistrationEvent.class);
|
||||
Mockito.when(event.getHost()).thenReturn("h");
|
||||
Mockito.when(event.getRpcport()).thenReturn(0);
|
||||
Mockito.when(event.getTrackingurl()).thenReturn(url);
|
||||
new RMAppAttemptImpl.AMRegisteredTransition().transition(attempt, event);
|
||||
if (unmanaged) {
|
||||
Assert.assertEquals(url, attempt.getTrackingUrl());
|
||||
} else {
|
||||
Assert.assertNotSame(url, attempt.getTrackingUrl());
|
||||
Assert.assertTrue(attempt.getTrackingUrl().contains(
|
||||
ProxyUriUtils.PROXY_SERVLET_NAME));
|
||||
Assert.assertTrue(attempt.getTrackingUrl().contains(
|
||||
attemptId.getApplicationId().toString()));
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTrackingUrlUnmanagedAM() {
|
||||
testTrackingUrl("http://foo:8000/x", true);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTrackingUrlManagedAM() {
|
||||
testTrackingUrl("bar:8000/x", false);
|
||||
}
|
||||
}
|
@ -408,16 +408,19 @@ private void testAppAttemptLaunchedState(Container container) {
|
||||
* {@link RMAppAttemptState#RUNNING}
|
||||
*/
|
||||
private void testAppAttemptRunningState(Container container,
|
||||
String host, int rpcPort, String trackingUrl) {
|
||||
String host, int rpcPort, String trackingUrl, boolean unmanagedAM) {
|
||||
assertEquals(RMAppAttemptState.RUNNING,
|
||||
applicationAttempt.getAppAttemptState());
|
||||
assertEquals(container, applicationAttempt.getMasterContainer());
|
||||
assertEquals(host, applicationAttempt.getHost());
|
||||
assertEquals(rpcPort, applicationAttempt.getRpcPort());
|
||||
assertEquals(trackingUrl, applicationAttempt.getOriginalTrackingUrl());
|
||||
if (unmanagedAM) {
|
||||
assertEquals("oldtrackingurl", applicationAttempt.getTrackingUrl());
|
||||
} else {
|
||||
assertEquals(getProxyUrl(applicationAttempt),
|
||||
applicationAttempt.getTrackingUrl());
|
||||
|
||||
}
|
||||
// TODO - need to add more checks relevant to this state
|
||||
}
|
||||
|
||||
@ -446,13 +449,18 @@ private void testAppAttemptFinishedState(Container container,
|
||||
FinalApplicationStatus finalStatus,
|
||||
String trackingUrl,
|
||||
String diagnostics,
|
||||
int finishedContainerCount) {
|
||||
int finishedContainerCount, boolean unmanagedAM) {
|
||||
assertEquals(RMAppAttemptState.FINISHED,
|
||||
applicationAttempt.getAppAttemptState());
|
||||
assertEquals(diagnostics, applicationAttempt.getDiagnostics());
|
||||
assertEquals(trackingUrl, applicationAttempt.getOriginalTrackingUrl());
|
||||
if (unmanagedAM) {
|
||||
assertEquals("mytrackingurl", applicationAttempt.getTrackingUrl());
|
||||
|
||||
} else {
|
||||
assertEquals(getProxyUrl(applicationAttempt),
|
||||
applicationAttempt.getTrackingUrl());
|
||||
}
|
||||
assertEquals(finishedContainerCount, applicationAttempt
|
||||
.getJustFinishedContainers().size());
|
||||
assertEquals(container, applicationAttempt.getMasterContainer());
|
||||
@ -535,13 +543,14 @@ private void launchApplicationAttempt(Container container) {
|
||||
private void runApplicationAttempt(Container container,
|
||||
String host,
|
||||
int rpcPort,
|
||||
String trackingUrl) {
|
||||
String trackingUrl, boolean unmanagedAM) {
|
||||
applicationAttempt.handle(
|
||||
new RMAppAttemptRegistrationEvent(
|
||||
applicationAttempt.getAppAttemptId(),
|
||||
host, rpcPort, trackingUrl));
|
||||
|
||||
testAppAttemptRunningState(container, host, rpcPort, trackingUrl);
|
||||
testAppAttemptRunningState(container, host, rpcPort, trackingUrl,
|
||||
unmanagedAM);
|
||||
}
|
||||
|
||||
private void unregisterApplicationAttempt(Container container,
|
||||
@ -567,7 +576,7 @@ public void testUnmanagedAMSuccess() {
|
||||
applicationAttempt.getAppAttemptId());
|
||||
|
||||
// launch AM
|
||||
runApplicationAttempt(null, "host", 8042, "oldtrackingurl");
|
||||
runApplicationAttempt(null, "host", 8042, "oldtrackingurl", true);
|
||||
|
||||
// complete a container
|
||||
applicationAttempt.handle(new RMAppAttemptContainerAcquiredEvent(
|
||||
@ -581,7 +590,8 @@ public void testUnmanagedAMSuccess() {
|
||||
applicationAttempt.handle(new RMAppAttemptUnregistrationEvent(
|
||||
applicationAttempt.getAppAttemptId(), trackingUrl, finalStatus,
|
||||
diagnostics));
|
||||
testAppAttemptFinishedState(null, finalStatus, trackingUrl, diagnostics, 1);
|
||||
testAppAttemptFinishedState(null, finalStatus, trackingUrl, diagnostics, 1,
|
||||
true);
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -690,7 +700,7 @@ public void testAMCrashAtAllocated() {
|
||||
public void testRunningToFailed() {
|
||||
Container amContainer = allocateApplicationAttempt();
|
||||
launchApplicationAttempt(amContainer);
|
||||
runApplicationAttempt(amContainer, "host", 8042, "oldtrackingurl");
|
||||
runApplicationAttempt(amContainer, "host", 8042, "oldtrackingurl", false);
|
||||
String containerDiagMsg = "some error";
|
||||
int exitCode = 123;
|
||||
ContainerStatus cs = BuilderUtils.newContainerStatus(amContainer.getId(),
|
||||
@ -713,7 +723,7 @@ public void testRunningToFailed() {
|
||||
public void testRunningToKilled() {
|
||||
Container amContainer = allocateApplicationAttempt();
|
||||
launchApplicationAttempt(amContainer);
|
||||
runApplicationAttempt(amContainer, "host", 8042, "oldtrackingurl");
|
||||
runApplicationAttempt(amContainer, "host", 8042, "oldtrackingurl", false);
|
||||
applicationAttempt.handle(
|
||||
new RMAppAttemptEvent(
|
||||
applicationAttempt.getAppAttemptId(),
|
||||
@ -751,7 +761,7 @@ public void testLaunchedExpire() {
|
||||
public void testRunningExpire() {
|
||||
Container amContainer = allocateApplicationAttempt();
|
||||
launchApplicationAttempt(amContainer);
|
||||
runApplicationAttempt(amContainer, "host", 8042, "oldtrackingurl");
|
||||
runApplicationAttempt(amContainer, "host", 8042, "oldtrackingurl", false);
|
||||
applicationAttempt.handle(new RMAppAttemptEvent(
|
||||
applicationAttempt.getAppAttemptId(), RMAppAttemptEventType.EXPIRE));
|
||||
assertEquals(RMAppAttemptState.FAILED,
|
||||
@ -769,7 +779,7 @@ public void testRunningExpire() {
|
||||
public void testUnregisterToKilledFinishing() {
|
||||
Container amContainer = allocateApplicationAttempt();
|
||||
launchApplicationAttempt(amContainer);
|
||||
runApplicationAttempt(amContainer, "host", 8042, "oldtrackingurl");
|
||||
runApplicationAttempt(amContainer, "host", 8042, "oldtrackingurl", false);
|
||||
unregisterApplicationAttempt(amContainer,
|
||||
FinalApplicationStatus.KILLED, "newtrackingurl",
|
||||
"Killed by user");
|
||||
@ -780,14 +790,14 @@ public void testUnregisterToKilledFinishing() {
|
||||
public void testNoTrackingUrl() {
|
||||
Container amContainer = allocateApplicationAttempt();
|
||||
launchApplicationAttempt(amContainer);
|
||||
runApplicationAttempt(amContainer, "host", 8042, "");
|
||||
runApplicationAttempt(amContainer, "host", 8042, "", false);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUnregisterToSuccessfulFinishing() {
|
||||
Container amContainer = allocateApplicationAttempt();
|
||||
launchApplicationAttempt(amContainer);
|
||||
runApplicationAttempt(amContainer, "host", 8042, "oldtrackingurl");
|
||||
runApplicationAttempt(amContainer, "host", 8042, "oldtrackingurl", false);
|
||||
unregisterApplicationAttempt(amContainer,
|
||||
FinalApplicationStatus.SUCCEEDED, "mytrackingurl", "Successful");
|
||||
}
|
||||
@ -796,7 +806,7 @@ public void testUnregisterToSuccessfulFinishing() {
|
||||
public void testFinishingKill() {
|
||||
Container amContainer = allocateApplicationAttempt();
|
||||
launchApplicationAttempt(amContainer);
|
||||
runApplicationAttempt(amContainer, "host", 8042, "oldtrackingurl");
|
||||
runApplicationAttempt(amContainer, "host", 8042, "oldtrackingurl", false);
|
||||
FinalApplicationStatus finalStatus = FinalApplicationStatus.FAILED;
|
||||
String trackingUrl = "newtrackingurl";
|
||||
String diagnostics = "Job failed";
|
||||
@ -814,7 +824,7 @@ public void testFinishingKill() {
|
||||
public void testFinishingExpire() {
|
||||
Container amContainer = allocateApplicationAttempt();
|
||||
launchApplicationAttempt(amContainer);
|
||||
runApplicationAttempt(amContainer, "host", 8042, "oldtrackingurl");
|
||||
runApplicationAttempt(amContainer, "host", 8042, "oldtrackingurl", false);
|
||||
FinalApplicationStatus finalStatus = FinalApplicationStatus.SUCCEEDED;
|
||||
String trackingUrl = "mytrackingurl";
|
||||
String diagnostics = "Successful";
|
||||
@ -825,14 +835,14 @@ public void testFinishingExpire() {
|
||||
applicationAttempt.getAppAttemptId(),
|
||||
RMAppAttemptEventType.EXPIRE));
|
||||
testAppAttemptFinishedState(amContainer, finalStatus, trackingUrl,
|
||||
diagnostics, 0);
|
||||
diagnostics, 0, false);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFinishingToFinishing() {
|
||||
Container amContainer = allocateApplicationAttempt();
|
||||
launchApplicationAttempt(amContainer);
|
||||
runApplicationAttempt(amContainer, "host", 8042, "oldtrackingurl");
|
||||
runApplicationAttempt(amContainer, "host", 8042, "oldtrackingurl", false);
|
||||
FinalApplicationStatus finalStatus = FinalApplicationStatus.SUCCEEDED;
|
||||
String trackingUrl = "mytrackingurl";
|
||||
String diagnostics = "Successful";
|
||||
@ -854,7 +864,7 @@ public void testFinishingToFinishing() {
|
||||
public void testSuccessfulFinishingToFinished() {
|
||||
Container amContainer = allocateApplicationAttempt();
|
||||
launchApplicationAttempt(amContainer);
|
||||
runApplicationAttempt(amContainer, "host", 8042, "oldtrackingurl");
|
||||
runApplicationAttempt(amContainer, "host", 8042, "oldtrackingurl", false);
|
||||
FinalApplicationStatus finalStatus = FinalApplicationStatus.SUCCEEDED;
|
||||
String trackingUrl = "mytrackingurl";
|
||||
String diagnostics = "Successful";
|
||||
@ -866,7 +876,7 @@ public void testSuccessfulFinishingToFinished() {
|
||||
BuilderUtils.newContainerStatus(amContainer.getId(),
|
||||
ContainerState.COMPLETE, "", 0)));
|
||||
testAppAttemptFinishedState(amContainer, finalStatus, trackingUrl,
|
||||
diagnostics, 0);
|
||||
diagnostics, 0, false);
|
||||
}
|
||||
|
||||
private void verifyTokenCount(ApplicationAttemptId appAttemptId, int count) {
|
||||
|
Loading…
Reference in New Issue
Block a user