From fbd6063269221ec25834684477f434e19f0b66af Mon Sep 17 00:00:00 2001 From: Xuan Date: Fri, 24 Jul 2015 10:15:54 -0700 Subject: [PATCH] YARN-3967. Fetch the application report from the AHS if the RM does not know about it. Contributed by Mit Desai --- hadoop-yarn-project/CHANGES.txt | 3 + .../server/webproxy/AppReportFetcher.java | 79 ++++++++++-- .../server/webproxy/TestAppReportFetcher.java | 117 ++++++++++++++++++ 3 files changed, 187 insertions(+), 12 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/test/java/org/apache/hadoop/yarn/server/webproxy/TestAppReportFetcher.java diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 8bc9e4cbdc..a25387d6a5 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -683,6 +683,9 @@ Release 2.7.2 - UNRELEASED YARN-3170. YARN architecture document needs updating. (Brahma Reddy Battula via ozawa) + YARN-3967. Fetch the application report from the AHS if the RM does not know about it. + (Mit Desai via xgong) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/main/java/org/apache/hadoop/yarn/server/webproxy/AppReportFetcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/main/java/org/apache/hadoop/yarn/server/webproxy/AppReportFetcher.java index 5c93413dc0..6aa43eb65f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/main/java/org/apache/hadoop/yarn/server/webproxy/AppReportFetcher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/main/java/org/apache/hadoop/yarn/server/webproxy/AppReportFetcher.java @@ -24,11 +24,15 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.yarn.api.ApplicationClientProtocol; +import org.apache.hadoop.yarn.api.ApplicationHistoryProtocol; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.client.AHSProxy; import org.apache.hadoop.yarn.client.ClientRMProxy; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.factories.RecordFactory; @@ -41,38 +45,73 @@ public class AppReportFetcher { private static final Log LOG = LogFactory.getLog(AppReportFetcher.class); private final Configuration conf; private final ApplicationClientProtocol applicationsManager; + private final ApplicationHistoryProtocol historyManager; private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); - + private boolean isAHSEnabled; + /** - * Create a new Connection to the RM to fetch Application reports. + * Create a new Connection to the RM/Application History Server + * to fetch Application reports. * @param conf the conf to use to know where the RM is. */ public AppReportFetcher(Configuration conf) { + if (conf.getBoolean(YarnConfiguration.APPLICATION_HISTORY_ENABLED, + YarnConfiguration.DEFAULT_APPLICATION_HISTORY_ENABLED)) { + isAHSEnabled = true; + } this.conf = conf; try { applicationsManager = ClientRMProxy.createRMProxy(conf, ApplicationClientProtocol.class); + if (isAHSEnabled) { + historyManager = getAHSProxy(conf); + } else { + this.historyManager = null; + } } catch (IOException e) { throw new YarnRuntimeException(e); } } /** - * Just call directly into the applicationsManager given instead of creating - * a remote connection to it. This is mostly for when the Proxy is running - * as part of the RM already. + * Create a direct connection to RM instead of a remote connection when + * the proxy is running as part of the RM. Also create a remote connection to + * Application History Server if it is enabled. * @param conf the configuration to use * @param applicationsManager what to use to get the RM reports. */ public AppReportFetcher(Configuration conf, ApplicationClientProtocol applicationsManager) { + if (conf.getBoolean(YarnConfiguration.APPLICATION_HISTORY_ENABLED, + YarnConfiguration.DEFAULT_APPLICATION_HISTORY_ENABLED)) { + isAHSEnabled = true; + } this.conf = conf; this.applicationsManager = applicationsManager; + if (isAHSEnabled) { + try { + historyManager = getAHSProxy(conf); + } catch (IOException e) { + throw new YarnRuntimeException(e); + } + } else { + this.historyManager = null; + } } - + + protected ApplicationHistoryProtocol getAHSProxy(Configuration configuration) + throws IOException { + return AHSProxy.createAHSProxy(configuration, + ApplicationHistoryProtocol.class, + configuration.getSocketAddr(YarnConfiguration.TIMELINE_SERVICE_ADDRESS, + YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ADDRESS, + YarnConfiguration.DEFAULT_TIMELINE_SERVICE_PORT)); + } + /** - * Get a report for the specified app. - * @param appId the id of the application to get. - * @return the ApplicationReport for that app. + * Get an application report for the specified application id from the RM and + * fall back to the Application History Server if not found in RM. + * @param appId id of the application to get. + * @return the ApplicationReport for the appId. * @throws YarnException on any error. * @throws IOException */ @@ -81,9 +120,22 @@ public ApplicationReport getApplicationReport(ApplicationId appId) GetApplicationReportRequest request = recordFactory .newRecordInstance(GetApplicationReportRequest.class); request.setApplicationId(appId); - - GetApplicationReportResponse response = applicationsManager - .getApplicationReport(request); + + GetApplicationReportResponse response; + try { + response = applicationsManager.getApplicationReport(request); + } catch (YarnException e) { + if (!isAHSEnabled) { + // Just throw it as usual if historyService is not enabled. + throw e; + } + // Even if history-service is enabled, treat all exceptions still the same + // except the following + if (!(e.getClass() == ApplicationNotFoundException.class)) { + throw e; + } + response = historyManager.getApplicationReport(request); + } return response.getApplicationReport(); } @@ -91,5 +143,8 @@ public void stop() { if (this.applicationsManager != null) { RPC.stopProxy(this.applicationsManager); } + if (this.historyManager != null) { + RPC.stopProxy(this.historyManager); + } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/test/java/org/apache/hadoop/yarn/server/webproxy/TestAppReportFetcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/test/java/org/apache/hadoop/yarn/server/webproxy/TestAppReportFetcher.java new file mode 100644 index 0000000000..bcab33fd66 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-web-proxy/src/test/java/org/apache/hadoop/yarn/server/webproxy/TestAppReportFetcher.java @@ -0,0 +1,117 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.server.webproxy; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.ApplicationClientProtocol; +import org.apache.hadoop.yarn.api.ApplicationHistoryProtocol; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.junit.After; +import org.junit.Assert; +import org.junit.Test; +import org.mockito.Mockito; + +public class TestAppReportFetcher { + + static ApplicationHistoryProtocol historyManager; + static Configuration conf = new Configuration(); + private static ApplicationClientProtocol appManager; + private static AppReportFetcher fetcher; + private final String appNotFoundExceptionMsg = "APP NOT FOUND"; + + @After + public void cleanUp() { + historyManager = null; + appManager = null; + fetcher = null; + } + + public void testHelper(boolean isAHSEnabled) + throws YarnException, IOException { + conf.setBoolean(YarnConfiguration.APPLICATION_HISTORY_ENABLED, + isAHSEnabled); + appManager = Mockito.mock(ApplicationClientProtocol.class); + Mockito.when(appManager + .getApplicationReport(Mockito.any(GetApplicationReportRequest.class))) + .thenThrow(new ApplicationNotFoundException(appNotFoundExceptionMsg)); + fetcher = new AppReportFetcherForTest(conf, appManager); + ApplicationId appId = ApplicationId.newInstance(0,0); + fetcher.getApplicationReport(appId); + } + + @Test + public void testFetchReportAHSEnabled() throws YarnException, IOException { + testHelper(true); + Mockito.verify(historyManager, Mockito.times(1)) + .getApplicationReport(Mockito.any(GetApplicationReportRequest.class)); + Mockito.verify(appManager, Mockito.times(1)) + .getApplicationReport(Mockito.any(GetApplicationReportRequest.class)); + } + + @Test + public void testFetchReportAHSDisabled() throws YarnException, IOException { + try { + testHelper(false); + } catch (ApplicationNotFoundException e) { + Assert.assertTrue(e.getMessage() == appNotFoundExceptionMsg); + /* RM will not know of the app and Application History Service is disabled + * So we will not try to get the report from AHS and RM will throw + * ApplicationNotFoundException + */ + } + Mockito.verify(appManager, Mockito.times(1)) + .getApplicationReport(Mockito.any(GetApplicationReportRequest.class)); + if (historyManager != null) { + Assert.fail("HistoryManager should be null as AHS is disabled"); + } + } + + static class AppReportFetcherForTest extends AppReportFetcher { + + public AppReportFetcherForTest(Configuration conf, + ApplicationClientProtocol acp) { + super(conf, acp); + } + + @Override + protected ApplicationHistoryProtocol getAHSProxy(Configuration conf) + throws IOException + { + GetApplicationReportResponse resp = Mockito. + mock(GetApplicationReportResponse.class); + historyManager = Mockito.mock(ApplicationHistoryProtocol.class); + try { + Mockito.when(historyManager.getApplicationReport(Mockito + .any(GetApplicationReportRequest.class))).thenReturn(resp); + } catch (YarnException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + return historyManager; + } + } +}