MAPREDUCE-3348. Fixed a bug in MR client to redirect to JobHistoryServer correctly when RM forgets the app. Contributed by Devaraj K.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1298978 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
3c373405e0
commit
ad3d3f54d5
@ -137,6 +137,9 @@ Release 0.23.3 - UNRELEASED
|
|||||||
MAPREDUCE-3578. Starting nodemanager as root gives "Unknown -jvm option"
|
MAPREDUCE-3578. Starting nodemanager as root gives "Unknown -jvm option"
|
||||||
(tomwhite)
|
(tomwhite)
|
||||||
|
|
||||||
|
MAPREDUCE-3348. Fixed a bug in MR client to redirect to JobHistoryServer
|
||||||
|
correctly when RM forgets the app. (Devaraj K via vinodkv)
|
||||||
|
|
||||||
Release 0.23.2 - UNRELEASED
|
Release 0.23.2 - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
@ -149,7 +149,7 @@ private MRClientProtocol getProxy() throws YarnRemoteException {
|
|||||||
|| YarnApplicationState.RUNNING == application
|
|| YarnApplicationState.RUNNING == application
|
||||||
.getYarnApplicationState()) {
|
.getYarnApplicationState()) {
|
||||||
if (application == null) {
|
if (application == null) {
|
||||||
LOG.debug("Could not get Job info from RM for job " + jobId
|
LOG.info("Could not get Job info from RM for job " + jobId
|
||||||
+ ". Redirecting to job history server.");
|
+ ". Redirecting to job history server.");
|
||||||
return checkAndGetHSProxy(null, JobState.NEW);
|
return checkAndGetHSProxy(null, JobState.NEW);
|
||||||
}
|
}
|
||||||
@ -216,7 +216,7 @@ public MRClientProtocol run() throws IOException {
|
|||||||
}
|
}
|
||||||
application = rm.getApplicationReport(appId);
|
application = rm.getApplicationReport(appId);
|
||||||
if (application == null) {
|
if (application == null) {
|
||||||
LOG.debug("Could not get Job info from RM for job " + jobId
|
LOG.info("Could not get Job info from RM for job " + jobId
|
||||||
+ ". Redirecting to job history server.");
|
+ ". Redirecting to job history server.");
|
||||||
return checkAndGetHSProxy(null, JobState.RUNNING);
|
return checkAndGetHSProxy(null, JobState.RUNNING);
|
||||||
}
|
}
|
||||||
|
@ -199,6 +199,10 @@ public GetNewApplicationResponse getNewApplication(
|
|||||||
return response;
|
return response;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* It gives response which includes application report if the application
|
||||||
|
* present otherwise gives response with application report as null.
|
||||||
|
*/
|
||||||
@Override
|
@Override
|
||||||
public GetApplicationReportResponse getApplicationReport(
|
public GetApplicationReportResponse getApplicationReport(
|
||||||
GetApplicationReportRequest request) throws YarnRemoteException {
|
GetApplicationReportRequest request) throws YarnRemoteException {
|
||||||
@ -214,8 +218,10 @@ public GetApplicationReportResponse getApplicationReport(
|
|||||||
|
|
||||||
RMApp application = this.rmContext.getRMApps().get(applicationId);
|
RMApp application = this.rmContext.getRMApps().get(applicationId);
|
||||||
if (application == null) {
|
if (application == null) {
|
||||||
throw RPCUtil.getRemoteException("Trying to get information for an "
|
// If the RM doesn't have the application, provide the response with
|
||||||
+ "absent application " + applicationId);
|
// application report as null and let the clients to handle.
|
||||||
|
return recordFactory
|
||||||
|
.newRecordInstance(GetApplicationReportResponse.class);
|
||||||
}
|
}
|
||||||
|
|
||||||
boolean allowAccess = checkAccess(callerUGI, application.getUser(),
|
boolean allowAccess = checkAccess(callerUGI, application.getUser(),
|
||||||
|
@ -18,8 +18,12 @@
|
|||||||
|
|
||||||
package org.apache.hadoop.yarn.server.resourcemanager;
|
package org.apache.hadoop.yarn.server.resourcemanager;
|
||||||
|
|
||||||
|
import static org.mockito.Mockito.mock;
|
||||||
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
|
||||||
import junit.framework.Assert;
|
import junit.framework.Assert;
|
||||||
|
|
||||||
@ -27,12 +31,20 @@
|
|||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.yarn.api.ClientRMProtocol;
|
import org.apache.hadoop.yarn.api.ClientRMProtocol;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.api.records.NodeReport;
|
import org.apache.hadoop.yarn.api.records.NodeReport;
|
||||||
|
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
|
||||||
|
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||||
|
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||||
import org.apache.hadoop.yarn.ipc.YarnRPC;
|
import org.apache.hadoop.yarn.ipc.YarnRPC;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||||
import org.apache.hadoop.yarn.util.Records;
|
import org.apache.hadoop.yarn.util.Records;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
|
||||||
public class TestClientRMService {
|
public class TestClientRMService {
|
||||||
|
|
||||||
private static final Log LOG = LogFactory.getLog(TestClientRMService.class);
|
private static final Log LOG = LogFactory.getLog(TestClientRMService.class);
|
||||||
@ -79,4 +91,22 @@ protected ClientRMService createClientRMService() {
|
|||||||
Assert.assertFalse("Node is expected to be unhealthy!", nodeReports.get(0)
|
Assert.assertFalse("Node is expected to be unhealthy!", nodeReports.get(0)
|
||||||
.getNodeHealthStatus().getIsNodeHealthy());
|
.getNodeHealthStatus().getIsNodeHealthy());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGetApplicationReport() throws YarnRemoteException {
|
||||||
|
RMContext rmContext = mock(RMContext.class);
|
||||||
|
when(rmContext.getRMApps()).thenReturn(
|
||||||
|
new ConcurrentHashMap<ApplicationId, RMApp>());
|
||||||
|
ClientRMService rmService = new ClientRMService(rmContext, null, null,
|
||||||
|
null, null);
|
||||||
|
RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
|
||||||
|
GetApplicationReportRequest request = recordFactory
|
||||||
|
.newRecordInstance(GetApplicationReportRequest.class);
|
||||||
|
request.setApplicationId(recordFactory
|
||||||
|
.newRecordInstance(ApplicationId.class));
|
||||||
|
GetApplicationReportResponse applicationReport = rmService
|
||||||
|
.getApplicationReport(request);
|
||||||
|
Assert.assertNull("It should return null as application report for absent application.",
|
||||||
|
applicationReport.getApplicationReport());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user