YARN-3967. Fetch the application report from the AHS if the RM does not

know about it. Contributed by Mit Desai
This commit is contained in:
Xuan 2015-07-24 10:15:54 -07:00
parent ee233ec95c
commit fbd6063269
3 changed files with 187 additions and 12 deletions

View File

@ -683,6 +683,9 @@ Release 2.7.2 - UNRELEASED
YARN-3170. YARN architecture document needs updating. (Brahma Reddy Battula YARN-3170. YARN architecture document needs updating. (Brahma Reddy Battula
via ozawa) via ozawa)
YARN-3967. Fetch the application report from the AHS if the RM does not know about it.
(Mit Desai via xgong)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -24,11 +24,15 @@
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol; import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.api.ApplicationHistoryProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.client.AHSProxy;
import org.apache.hadoop.yarn.client.ClientRMProxy; import org.apache.hadoop.yarn.client.ClientRMProxy;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factories.RecordFactory;
@ -41,38 +45,73 @@ public class AppReportFetcher {
private static final Log LOG = LogFactory.getLog(AppReportFetcher.class); private static final Log LOG = LogFactory.getLog(AppReportFetcher.class);
private final Configuration conf; private final Configuration conf;
private final ApplicationClientProtocol applicationsManager; private final ApplicationClientProtocol applicationsManager;
private final ApplicationHistoryProtocol historyManager;
private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
private boolean isAHSEnabled;
/** /**
* Create a new Connection to the RM to fetch Application reports. * Create a new Connection to the RM/Application History Server
* to fetch Application reports.
* @param conf the conf to use to know where the RM is. * @param conf the conf to use to know where the RM is.
*/ */
public AppReportFetcher(Configuration conf) { public AppReportFetcher(Configuration conf) {
if (conf.getBoolean(YarnConfiguration.APPLICATION_HISTORY_ENABLED,
YarnConfiguration.DEFAULT_APPLICATION_HISTORY_ENABLED)) {
isAHSEnabled = true;
}
this.conf = conf; this.conf = conf;
try { try {
applicationsManager = ClientRMProxy.createRMProxy(conf, applicationsManager = ClientRMProxy.createRMProxy(conf,
ApplicationClientProtocol.class); ApplicationClientProtocol.class);
if (isAHSEnabled) {
historyManager = getAHSProxy(conf);
} else {
this.historyManager = null;
}
} catch (IOException e) { } catch (IOException e) {
throw new YarnRuntimeException(e); throw new YarnRuntimeException(e);
} }
} }
/** /**
* Just call directly into the applicationsManager given instead of creating * Create a direct connection to RM instead of a remote connection when
* a remote connection to it. This is mostly for when the Proxy is running * the proxy is running as part of the RM. Also create a remote connection to
* as part of the RM already. * Application History Server if it is enabled.
* @param conf the configuration to use * @param conf the configuration to use
* @param applicationsManager what to use to get the RM reports. * @param applicationsManager what to use to get the RM reports.
*/ */
public AppReportFetcher(Configuration conf, ApplicationClientProtocol applicationsManager) { public AppReportFetcher(Configuration conf, ApplicationClientProtocol applicationsManager) {
if (conf.getBoolean(YarnConfiguration.APPLICATION_HISTORY_ENABLED,
YarnConfiguration.DEFAULT_APPLICATION_HISTORY_ENABLED)) {
isAHSEnabled = true;
}
this.conf = conf; this.conf = conf;
this.applicationsManager = applicationsManager; this.applicationsManager = applicationsManager;
if (isAHSEnabled) {
try {
historyManager = getAHSProxy(conf);
} catch (IOException e) {
throw new YarnRuntimeException(e);
}
} else {
this.historyManager = null;
}
}
protected ApplicationHistoryProtocol getAHSProxy(Configuration configuration)
throws IOException {
return AHSProxy.createAHSProxy(configuration,
ApplicationHistoryProtocol.class,
configuration.getSocketAddr(YarnConfiguration.TIMELINE_SERVICE_ADDRESS,
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ADDRESS,
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_PORT));
} }
/** /**
* Get a report for the specified app. * Get an application report for the specified application id from the RM and
* @param appId the id of the application to get. * fall back to the Application History Server if not found in RM.
* @return the ApplicationReport for that app. * @param appId id of the application to get.
* @return the ApplicationReport for the appId.
* @throws YarnException on any error. * @throws YarnException on any error.
* @throws IOException * @throws IOException
*/ */
@ -82,8 +121,21 @@ public ApplicationReport getApplicationReport(ApplicationId appId)
.newRecordInstance(GetApplicationReportRequest.class); .newRecordInstance(GetApplicationReportRequest.class);
request.setApplicationId(appId); request.setApplicationId(appId);
GetApplicationReportResponse response = applicationsManager GetApplicationReportResponse response;
.getApplicationReport(request); try {
response = applicationsManager.getApplicationReport(request);
} catch (YarnException e) {
if (!isAHSEnabled) {
// Just throw it as usual if historyService is not enabled.
throw e;
}
// Even if history-service is enabled, treat all exceptions still the same
// except the following
if (!(e.getClass() == ApplicationNotFoundException.class)) {
throw e;
}
response = historyManager.getApplicationReport(request);
}
return response.getApplicationReport(); return response.getApplicationReport();
} }
@ -91,5 +143,8 @@ public void stop() {
if (this.applicationsManager != null) { if (this.applicationsManager != null) {
RPC.stopProxy(this.applicationsManager); RPC.stopProxy(this.applicationsManager);
} }
if (this.historyManager != null) {
RPC.stopProxy(this.historyManager);
}
} }
} }

View File

@ -0,0 +1,117 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.webproxy;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
import org.apache.hadoop.yarn.api.ApplicationHistoryProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
public class TestAppReportFetcher {
static ApplicationHistoryProtocol historyManager;
static Configuration conf = new Configuration();
private static ApplicationClientProtocol appManager;
private static AppReportFetcher fetcher;
private final String appNotFoundExceptionMsg = "APP NOT FOUND";
@After
public void cleanUp() {
historyManager = null;
appManager = null;
fetcher = null;
}
public void testHelper(boolean isAHSEnabled)
throws YarnException, IOException {
conf.setBoolean(YarnConfiguration.APPLICATION_HISTORY_ENABLED,
isAHSEnabled);
appManager = Mockito.mock(ApplicationClientProtocol.class);
Mockito.when(appManager
.getApplicationReport(Mockito.any(GetApplicationReportRequest.class)))
.thenThrow(new ApplicationNotFoundException(appNotFoundExceptionMsg));
fetcher = new AppReportFetcherForTest(conf, appManager);
ApplicationId appId = ApplicationId.newInstance(0,0);
fetcher.getApplicationReport(appId);
}
@Test
public void testFetchReportAHSEnabled() throws YarnException, IOException {
testHelper(true);
Mockito.verify(historyManager, Mockito.times(1))
.getApplicationReport(Mockito.any(GetApplicationReportRequest.class));
Mockito.verify(appManager, Mockito.times(1))
.getApplicationReport(Mockito.any(GetApplicationReportRequest.class));
}
@Test
public void testFetchReportAHSDisabled() throws YarnException, IOException {
try {
testHelper(false);
} catch (ApplicationNotFoundException e) {
Assert.assertTrue(e.getMessage() == appNotFoundExceptionMsg);
/* RM will not know of the app and Application History Service is disabled
* So we will not try to get the report from AHS and RM will throw
* ApplicationNotFoundException
*/
}
Mockito.verify(appManager, Mockito.times(1))
.getApplicationReport(Mockito.any(GetApplicationReportRequest.class));
if (historyManager != null) {
Assert.fail("HistoryManager should be null as AHS is disabled");
}
}
static class AppReportFetcherForTest extends AppReportFetcher {
public AppReportFetcherForTest(Configuration conf,
ApplicationClientProtocol acp) {
super(conf, acp);
}
@Override
protected ApplicationHistoryProtocol getAHSProxy(Configuration conf)
throws IOException
{
GetApplicationReportResponse resp = Mockito.
mock(GetApplicationReportResponse.class);
historyManager = Mockito.mock(ApplicationHistoryProtocol.class);
try {
Mockito.when(historyManager.getApplicationReport(Mockito
.any(GetApplicationReportRequest.class))).thenReturn(resp);
} catch (YarnException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return historyManager;
}
}
}