diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/AHSWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/AHSWebServices.java index d9918d3f36..5e77718102 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/AHSWebServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/AHSWebServices.java @@ -32,25 +32,18 @@ import javax.ws.rs.core.Context; import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; -import javax.ws.rs.core.Response.ResponseBuilder; -import javax.ws.rs.core.Response.Status; import com.google.common.annotations.VisibleForTesting; -import com.sun.jersey.api.client.ClientHandlerException; -import com.sun.jersey.api.client.UniformInterfaceException; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.http.JettyUtils; import org.apache.hadoop.util.StringUtils; -import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.api.ApplicationBaseProtocol; import org.apache.hadoop.yarn.api.records.timeline.TimelineAbout; -import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerFactory; -import org.apache.hadoop.yarn.server.webapp.LogWebServiceUtils; +import org.apache.hadoop.yarn.server.webapp.LogServlet; import org.apache.hadoop.yarn.server.webapp.WebServices; import org.apache.hadoop.yarn.server.webapp.YarnWebServiceParams; import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptInfo; @@ -61,33 +54,20 @@ import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo; import org.apache.hadoop.yarn.util.timeline.TimelineUtils; import org.apache.hadoop.yarn.webapp.BadRequestException; -import org.apache.hadoop.yarn.webapp.NotFoundException; -import com.google.common.base.Joiner; import com.google.inject.Inject; import com.google.inject.Singleton; -import org.codehaus.jettison.json.JSONException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; @Singleton @Path("/ws/v1/applicationhistory") public class AHSWebServices extends WebServices { - private static final Logger LOG = LoggerFactory - .getLogger(AHSWebServices.class); - private static final String NM_DOWNLOAD_URI_STR = - "/ws/v1/node/containers"; - private static final Joiner JOINER = Joiner.on(""); - private static final Joiner DOT_JOINER = Joiner.on(". "); - private final Configuration conf; - private final LogAggregationFileControllerFactory factory; + private LogServlet logServlet; @Inject public AHSWebServices(ApplicationBaseProtocol appBaseProt, Configuration conf) { super(appBaseProt); - this.conf = conf; - this.factory = new LogAggregationFileControllerFactory(conf); + this.logServlet = new LogServlet(conf, this); } @GET @@ -244,87 +224,9 @@ public Response getContainerLogsInfo( @QueryParam(YarnWebServiceParams.NM_ID) String nmId, @QueryParam(YarnWebServiceParams.REDIRECTED_FROM_NODE) @DefaultValue("false") boolean redirected_from_node) { - ContainerId containerId = null; initForReadableEndpoints(res); - try { - containerId = ContainerId.fromString(containerIdStr); - } catch (IllegalArgumentException e) { - throw new BadRequestException("invalid container id, " + containerIdStr); - } - - ApplicationId appId = containerId.getApplicationAttemptId() - .getApplicationId(); - AppInfo appInfo; - try { - appInfo = super.getApp(req, res, appId.toString()); - } catch (Exception ex) { - // directly find logs from HDFS. - return LogWebServiceUtils - .getContainerLogMeta(factory, appId, null, null, containerIdStr, - false); - } - // if the application finishes, directly find logs - // from HDFS. - if (LogWebServiceUtils.isFinishedState(appInfo.getAppState())) { - return LogWebServiceUtils - .getContainerLogMeta(factory, appId, null, null, containerIdStr, - false); - } - if (LogWebServiceUtils.isRunningState(appInfo.getAppState())) { - String appOwner = appInfo.getUser(); - String nodeHttpAddress = null; - if (nmId != null && !nmId.isEmpty()) { - try { - nodeHttpAddress = getNMWebAddressFromRM(conf, nmId); - } catch (Exception ex) { - LOG.debug("{}", ex); - } - } - if (nodeHttpAddress == null || nodeHttpAddress.isEmpty()) { - ContainerInfo containerInfo; - try { - containerInfo = super.getContainer( - req, res, appId.toString(), - containerId.getApplicationAttemptId().toString(), - containerId.toString()); - } catch (Exception ex) { - // return log meta for the aggregated logs if exists. - // It will also return empty log meta for the local logs. - return LogWebServiceUtils - .getContainerLogMeta(factory, appId, appOwner, null, - containerIdStr, true); - } - nodeHttpAddress = containerInfo.getNodeHttpAddress(); - // make sure nodeHttpAddress is not null and not empty. Otherwise, - // we would only get log meta for aggregated logs instead of - // re-directing the request - if (nodeHttpAddress == null || nodeHttpAddress.isEmpty() - || redirected_from_node) { - // return log meta for the aggregated logs if exists. - // It will also return empty log meta for the local logs. - // If this is the redirect request from NM, we should not - // re-direct the request back. Simply output the aggregated log meta. - return LogWebServiceUtils - .getContainerLogMeta(factory, appId, appOwner, null, - containerIdStr, true); - } - } - String uri = "/" + containerId.toString() + "/logs"; - String resURI = JOINER.join( - LogWebServiceUtils.getAbsoluteNMWebAddress(conf, nodeHttpAddress), - NM_DOWNLOAD_URI_STR, uri); - String query = req.getQueryString(); - if (query != null && !query.isEmpty()) { - resURI += "?" + query; - } - ResponseBuilder response = Response.status( - HttpServletResponse.SC_TEMPORARY_REDIRECT); - response.header("Location", resURI); - return response.build(); - } else { - throw new NotFoundException( - "The application is not at Running or Finished State."); - } + return logServlet.getContainerLogsInfo(req, containerIdStr, nmId, + redirected_from_node, null); } /** @@ -385,93 +287,19 @@ public Response getLogs(@Context HttpServletRequest req, @QueryParam(YarnWebServiceParams.REDIRECTED_FROM_NODE) @DefaultValue("false") boolean redirected_from_node) { initForReadableEndpoints(res); - ContainerId containerId; - try { - containerId = ContainerId.fromString(containerIdStr); - } catch (IllegalArgumentException ex) { - return LogWebServiceUtils.createBadResponse(Status.NOT_FOUND, - "Invalid ContainerId: " + containerIdStr); - } - - final long length = LogWebServiceUtils.parseLongParam(size); - - ApplicationId appId = containerId.getApplicationAttemptId() - .getApplicationId(); - AppInfo appInfo; - try { - appInfo = super.getApp(req, res, appId.toString()); - } catch (Exception ex) { - // directly find logs from HDFS. - return LogWebServiceUtils - .sendStreamOutputResponse(factory, appId, null, null, containerIdStr, - filename, format, length, false); - } - String appOwner = appInfo.getUser(); - if (LogWebServiceUtils.isFinishedState(appInfo.getAppState())) { - // directly find logs from HDFS. - return LogWebServiceUtils - .sendStreamOutputResponse(factory, appId, appOwner, null, - containerIdStr, filename, format, length, false); - } - - if (LogWebServiceUtils.isRunningState(appInfo.getAppState())) { - String nodeHttpAddress = null; - if (nmId != null && !nmId.isEmpty()) { - try { - nodeHttpAddress = getNMWebAddressFromRM(conf, nmId); - } catch (Exception ex) { - LOG.debug("{}", ex); - } - } - if (nodeHttpAddress == null || nodeHttpAddress.isEmpty()) { - ContainerInfo containerInfo; - try { - containerInfo = super.getContainer( - req, res, appId.toString(), - containerId.getApplicationAttemptId().toString(), - containerId.toString()); - } catch (Exception ex) { - // output the aggregated logs - return LogWebServiceUtils - .sendStreamOutputResponse(factory, appId, appOwner, null, - containerIdStr, filename, format, length, true); - } - nodeHttpAddress = containerInfo.getNodeHttpAddress(); - // make sure nodeHttpAddress is not null and not empty. Otherwise, - // we would only get aggregated logs instead of re-directing the - // request. - // If this is the redirect request from NM, we should not re-direct the - // request back. Simply output the aggregated logs. - if (nodeHttpAddress == null || nodeHttpAddress.isEmpty() - || redirected_from_node) { - // output the aggregated logs - return LogWebServiceUtils - .sendStreamOutputResponse(factory, appId, appOwner, null, - containerIdStr, filename, format, length, true); - } - } - String uri = "/" + containerId.toString() + "/logs/" + filename; - String resURI = JOINER.join( - LogWebServiceUtils.getAbsoluteNMWebAddress(conf, nodeHttpAddress), - NM_DOWNLOAD_URI_STR, uri); - String query = req.getQueryString(); - if (query != null && !query.isEmpty()) { - resURI += "?" + query; - } - ResponseBuilder response = Response.status( - HttpServletResponse.SC_TEMPORARY_REDIRECT); - response.header("Location", resURI); - return response.build(); - } else { - return LogWebServiceUtils.createBadResponse(Status.NOT_FOUND, - "The application is not at Running or Finished State."); - } + return logServlet.getLogFile(req, containerIdStr, filename, format, size, + nmId, redirected_from_node, null); } - @VisibleForTesting @InterfaceAudience.Private - public String getNMWebAddressFromRM(Configuration configuration, - String nodeId) - throws ClientHandlerException, UniformInterfaceException, JSONException { - return LogWebServiceUtils.getNMWebAddressFromRM(configuration, nodeId); + @VisibleForTesting + @InterfaceAudience.Private + LogServlet getLogServlet() { + return this.logServlet; + } + + @VisibleForTesting + @InterfaceAudience.Private + void setLogServlet(LogServlet logServlet) { + this.logServlet = logServlet; } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TestAHSWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TestAHSWebServices.java index 610f7e562c..16ce7e2999 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TestAHSWebServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TestAHSWebServices.java @@ -24,6 +24,9 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.fail; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.spy; import java.net.HttpURLConnection; import java.net.URI; @@ -64,6 +67,7 @@ import org.apache.hadoop.yarn.server.timeline.TimelineDataManager; import org.apache.hadoop.yarn.server.timeline.TimelineStore; import org.apache.hadoop.yarn.server.timeline.security.TimelineACLsManager; +import org.apache.hadoop.yarn.server.webapp.LogServlet; import org.apache.hadoop.yarn.server.webapp.LogWebServiceUtils; import org.apache.hadoop.yarn.server.webapp.YarnWebServiceParams; import org.apache.hadoop.yarn.server.webapp.dao.ContainerLogsInfo; @@ -88,7 +92,6 @@ import com.google.inject.Guice; import com.google.inject.Singleton; import com.google.inject.servlet.ServletModule; -import com.sun.jersey.api.client.ClientHandlerException; import com.sun.jersey.api.client.ClientResponse; import com.sun.jersey.api.client.ClientResponse.Status; import com.sun.jersey.api.client.GenericType; @@ -137,17 +140,13 @@ protected void serviceStart() throws Exception { }; historyClientService.init(conf); historyClientService.start(); - ahsWebservice = new AHSWebServices(historyClientService, conf) { - @Override - public String getNMWebAddressFromRM(Configuration configuration, - String nodeId) throws ClientHandlerException, - UniformInterfaceException, JSONException { - if (nodeId.equals(NM_ID)) { - return NM_WEBADDRESS; - } - return null; - } - }; + + ahsWebservice = new AHSWebServices(historyClientService, conf); + LogServlet logServlet = spy(ahsWebservice.getLogServlet()); + doReturn(null).when(logServlet).getNMWebAddressFromRM(any()); + doReturn(NM_WEBADDRESS).when(logServlet).getNMWebAddressFromRM(NM_ID); + ahsWebservice.setLogServlet(logServlet); + fs = FileSystem.get(conf); GuiceServletConfig.setInjector( Guice.createInjector(new WebServletModule())); @@ -171,7 +170,7 @@ private static class WebServletModule extends ServletModule { @Override protected void configureServlets() { bind(JAXBContextResolver.class); - bind(AHSWebServices.class).toInstance(ahsWebservice);; + bind(AHSWebServices.class).toInstance(ahsWebservice); bind(GenericExceptionHandler.class); bind(ApplicationBaseProtocol.class).toInstance(historyClientService); serve("/*").with(GuiceContainer.class); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/AppInfoProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/AppInfoProvider.java new file mode 100644 index 0000000000..945135ce7e --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/AppInfoProvider.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *
+ * http://www.apache.org/licenses/LICENSE-2.0 + *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.webapp;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+import javax.servlet.http.HttpServletRequest;
+
+/**
+ * Classes implementing this interface are used in the {@link LogServlet}
+ * for providing various application related information.
+ */
+@InterfaceAudience.LimitedPrivate({"YARN"})
+public interface AppInfoProvider {
+
+ /**
+ * Returns the node HTTP address.
+ *
+ * @param req {@link HttpServletRequest} associated with the request
+ * @param appId the id of the application
+ * @param appAttemptId the id of the application attempt
+ * @param containerId the container id
+ * @param clusterId the id of the cluster
+ * @return the node HTTP address
+ */
+ String getNodeHttpAddress(HttpServletRequest req,
+ String appId, String appAttemptId, String containerId, String clusterId);
+
+ /**
+ * Returns {@link BasicAppInfo} object that wraps the collected information
+ * about the application.
+ *
+ * @param req {@link HttpServletRequest} associated with the request
+ * @param appId the id of the application
+ * @param clusterId the id of the cluster
+ * @return {@link BasicAppInfo} object
+ */
+ BasicAppInfo getApp(HttpServletRequest req, String appId, String clusterId);
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/BasicAppInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/BasicAppInfo.java
new file mode 100644
index 0000000000..0181fd182f
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/BasicAppInfo.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.webapp;
+
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.server.webapp.dao.AppInfo;
+
+/**
+ * Utility class that wraps application information
+ * required by the {@link LogServlet} class.
+ */
+class BasicAppInfo {
+ private final YarnApplicationState appState;
+ private final String user;
+
+ BasicAppInfo(YarnApplicationState appState, String user) {
+ this.appState = appState;
+ this.user = user;
+ }
+
+ static BasicAppInfo fromAppInfo(AppInfo report) {
+ return new BasicAppInfo(report.getAppState(), report.getUser());
+ }
+
+ YarnApplicationState getAppState() {
+ return this.appState;
+ }
+
+ String getUser() {
+ return this.user;
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/LogServlet.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/LogServlet.java
new file mode 100644
index 0000000000..1bddf6dfda
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/LogServlet.java
@@ -0,0 +1,260 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.webapp;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Joiner;
+import com.sun.jersey.api.client.ClientHandlerException;
+import com.sun.jersey.api.client.UniformInterfaceException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerFactory;
+import org.apache.hadoop.yarn.webapp.BadRequestException;
+import org.apache.hadoop.yarn.webapp.NotFoundException;
+import org.codehaus.jettison.json.JSONException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.Response.Status;
+
+/**
+ * Extracts aggregated logs and related information.
+ * Used by various WebServices (AHS, ATS).
+ */
+public class LogServlet extends Configured {
+ private static final Logger LOG = LoggerFactory
+ .getLogger(LogServlet.class);
+
+ private static final Joiner JOINER = Joiner.on("");
+ private static final String NM_DOWNLOAD_URI_STR = "/ws/v1/node/containers";
+
+ private final LogAggregationFileControllerFactory factory;
+ private final AppInfoProvider appInfoProvider;
+
+ public LogServlet(Configuration conf, AppInfoProvider appInfoProvider) {
+ super(conf);
+ this.factory = new LogAggregationFileControllerFactory(conf);
+ this.appInfoProvider = appInfoProvider;
+ }
+
+ @VisibleForTesting
+ public String getNMWebAddressFromRM(String nodeId)
+ throws ClientHandlerException, UniformInterfaceException, JSONException {
+ return LogWebServiceUtils.getNMWebAddressFromRM(getConf(), nodeId);
+ }
+
+ /**
+ * Returns information about the logs for a specific container.
+ *
+ * @param req the {@link HttpServletRequest}
+ * @param containerIdStr container id
+ * @param nmId NodeManager id
+ * @param redirectedFromNode whether the request was redirected
+ * @param clusterId the id of the cluster
+ * @return {@link Response} object containing information about the logs
+ */
+ public Response getContainerLogsInfo(HttpServletRequest req,
+ String containerIdStr, String nmId, boolean redirectedFromNode,
+ String clusterId) {
+ ContainerId containerId = null;
+ try {
+ containerId = ContainerId.fromString(containerIdStr);
+ } catch (IllegalArgumentException e) {
+ throw new BadRequestException("invalid container id, " + containerIdStr);
+ }
+
+ ApplicationId appId = containerId.getApplicationAttemptId()
+ .getApplicationId();
+ BasicAppInfo appInfo;
+ try {
+ appInfo = appInfoProvider.getApp(req, appId.toString(), clusterId);
+ } catch (Exception ex) {
+ // directly find logs from HDFS.
+ return LogWebServiceUtils
+ .getContainerLogMeta(factory, appId, null, null, containerIdStr,
+ false);
+ }
+ // if the application finishes, directly find logs
+ // from HDFS.
+ if (LogWebServiceUtils.isFinishedState(appInfo.getAppState())) {
+ return LogWebServiceUtils
+ .getContainerLogMeta(factory, appId, null, null, containerIdStr,
+ false);
+ }
+ if (LogWebServiceUtils.isRunningState(appInfo.getAppState())) {
+ String appOwner = appInfo.getUser();
+ String nodeHttpAddress = null;
+ if (nmId != null && !nmId.isEmpty()) {
+ try {
+ nodeHttpAddress = getNMWebAddressFromRM(nmId);
+ } catch (Exception ex) {
+ LOG.info("Exception during getting NM web address.", ex);
+ }
+ }
+ if (nodeHttpAddress == null || nodeHttpAddress.isEmpty()) {
+ try {
+ nodeHttpAddress = appInfoProvider.getNodeHttpAddress(
+ req, appId.toString(),
+ containerId.getApplicationAttemptId().toString(),
+ containerId.toString(), clusterId);
+ } catch (Exception ex) {
+ // return log meta for the aggregated logs if exists.
+ // It will also return empty log meta for the local logs.
+ return LogWebServiceUtils
+ .getContainerLogMeta(factory, appId, appOwner, null,
+ containerIdStr, true);
+ }
+ // make sure nodeHttpAddress is not null and not empty. Otherwise,
+ // we would only get log meta for aggregated logs instead of
+ // re-directing the request
+ if (nodeHttpAddress == null || nodeHttpAddress.isEmpty()
+ || redirectedFromNode) {
+ // return log meta for the aggregated logs if exists.
+ // It will also return empty log meta for the local logs.
+ // If this is the redirect request from NM, we should not
+ // re-direct the request back. Simply output the aggregated log meta.
+ return LogWebServiceUtils
+ .getContainerLogMeta(factory, appId, appOwner, null,
+ containerIdStr, true);
+ }
+ }
+ String uri = "/" + containerId.toString() + "/logs";
+ String resURI = JOINER.join(
+ LogWebServiceUtils.getAbsoluteNMWebAddress(getConf(),
+ nodeHttpAddress),
+ NM_DOWNLOAD_URI_STR, uri);
+ String query = req.getQueryString();
+ if (query != null && !query.isEmpty()) {
+ resURI += "?" + query;
+ }
+ Response.ResponseBuilder response = Response.status(
+ HttpServletResponse.SC_TEMPORARY_REDIRECT);
+ response.header("Location", resURI);
+ return response.build();
+ } else {
+ throw new NotFoundException(
+ "The application is not at Running or Finished State.");
+ }
+ }
+
+
+ /**
+ * Returns an aggregated log file belonging to a container.
+ *
+ * @param req the {@link HttpServletRequest}
+ * @param containerIdStr container id
+ * @param filename the name of the file
+ * @param format the format of the response
+ * @param size the size of bytes of the log file that should be returned
+ * @param nmId NodeManager id
+ * @param redirectedFromNode whether the request was redirected
+ * @param clusterId the id of the cluster
+ * @return {@link Response} object containing information about the logs
+ */
+ public Response getLogFile(HttpServletRequest req, String containerIdStr,
+ String filename, String format, String size, String nmId,
+ boolean redirectedFromNode, String clusterId) {
+ ContainerId containerId;
+ try {
+ containerId = ContainerId.fromString(containerIdStr);
+ } catch (IllegalArgumentException ex) {
+ return LogWebServiceUtils.createBadResponse(Status.NOT_FOUND,
+ "Invalid ContainerId: " + containerIdStr);
+ }
+
+ final long length = LogWebServiceUtils.parseLongParam(size);
+
+ ApplicationId appId = containerId.getApplicationAttemptId()
+ .getApplicationId();
+ BasicAppInfo appInfo;
+ try {
+ appInfo = appInfoProvider.getApp(req, appId.toString(), clusterId);
+ } catch (Exception ex) {
+ // directly find logs from HDFS.
+ return LogWebServiceUtils
+ .sendStreamOutputResponse(factory, appId, null, null, containerIdStr,
+ filename, format, length, false);
+ }
+ String appOwner = appInfo.getUser();
+ if (LogWebServiceUtils.isFinishedState(appInfo.getAppState())) {
+ // directly find logs from HDFS.
+ return LogWebServiceUtils
+ .sendStreamOutputResponse(factory, appId, appOwner, null,
+ containerIdStr, filename, format, length, false);
+ }
+
+ if (LogWebServiceUtils.isRunningState(appInfo.getAppState())) {
+ String nodeHttpAddress = null;
+ if (nmId != null && !nmId.isEmpty()) {
+ try {
+ nodeHttpAddress = getNMWebAddressFromRM(nmId);
+ } catch (Exception ex) {
+ LOG.debug("Exception happened during obtaining NM web address " +
+ "from RM.", ex);
+ }
+ }
+ if (nodeHttpAddress == null || nodeHttpAddress.isEmpty()) {
+ try {
+ nodeHttpAddress = appInfoProvider.getNodeHttpAddress(
+ req, appId.toString(),
+ containerId.getApplicationAttemptId().toString(),
+ containerId.toString(), clusterId);
+ } catch (Exception ex) {
+ // output the aggregated logs
+ return LogWebServiceUtils
+ .sendStreamOutputResponse(factory, appId, appOwner, null,
+ containerIdStr, filename, format, length, true);
+ }
+ // make sure nodeHttpAddress is not null and not empty. Otherwise,
+ // we would only get aggregated logs instead of re-directing the
+ // request.
+ // If this is the redirect request from NM, we should not re-direct the
+ // request back. Simply output the aggregated logs.
+ if (nodeHttpAddress == null || nodeHttpAddress.isEmpty()
+ || redirectedFromNode) {
+ // output the aggregated logs
+ return LogWebServiceUtils
+ .sendStreamOutputResponse(factory, appId, appOwner, null,
+ containerIdStr, filename, format, length, true);
+ }
+ }
+ String uri = "/" + containerId.toString() + "/logs/" + filename;
+ String resURI = JOINER.join(
+ LogWebServiceUtils.getAbsoluteNMWebAddress(getConf(),
+ nodeHttpAddress),
+ NM_DOWNLOAD_URI_STR, uri);
+ String query = req.getQueryString();
+ if (query != null && !query.isEmpty()) {
+ resURI += "?" + query;
+ }
+ Response.ResponseBuilder response = Response.status(
+ HttpServletResponse.SC_TEMPORARY_REDIRECT);
+ response.header("Location", resURI);
+ return response.build();
+ } else {
+ return LogWebServiceUtils.createBadResponse(Status.NOT_FOUND,
+ "The application is not at Running or Finished State.");
+ }
+ }
+}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/LogWebService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/LogWebService.java
index b51375128d..1ad6b61e9b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/LogWebService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/LogWebService.java
@@ -35,8 +35,6 @@
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authentication.client.AuthenticatedURL;
import org.apache.hadoop.security.authentication.client.AuthenticationException;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
@@ -44,8 +42,6 @@
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerFactory;
import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants;
-import org.apache.hadoop.yarn.webapp.BadRequestException;
-import org.apache.hadoop.yarn.webapp.NotFoundException;
import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider;
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
import org.slf4j.Logger;
@@ -71,7 +67,9 @@
/**
* Support only ATSv2 client only.
*/
-@Singleton @Path("/ws/v2/applicationlog") public class LogWebService {
+@Singleton
+@Path("/ws/v2/applicationlog")
+public class LogWebService implements AppInfoProvider {
private static final Logger LOG =
LoggerFactory.getLogger(LogWebService.class);
private static final String RESOURCE_URI_STR_V2 = "/ws/v2/timeline/";
@@ -81,6 +79,8 @@
private static LogAggregationFileControllerFactory factory;
private static String base;
private static String defaultClusterid;
+
+ private final LogServlet logServlet;
private volatile Client webTimelineClient;
static {
@@ -99,6 +99,10 @@ private static void init() {
+ " for URI: " + base);
}
+ public LogWebService() {
+ this.logServlet = new LogServlet(yarnConf, this);
+ }
+
private Client createTimelineWebClient() {
ClientConfig cfg = new DefaultClientConfig();
cfg.getClasses().add(YarnJacksonJaxbJsonProvider.class);
@@ -136,7 +140,8 @@ private void initForReadableEndpoints(HttpServletResponse response) {
* @param redirectedFromNode Whether this is a redirected request from NM
* @return The log file's name and current file size
*/
- @GET @Path("/containers/{containerid}/logs")
+ @GET
+ @Path("/containers/{containerid}/logs")
@Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
public Response getContainerLogsInfo(@Context HttpServletRequest req,
@Context HttpServletResponse res,
@@ -145,91 +150,14 @@ public Response getContainerLogsInfo(@Context HttpServletRequest req,
@QueryParam(YarnWebServiceParams.REDIRECTED_FROM_NODE)
@DefaultValue("false") boolean redirectedFromNode,
@QueryParam(YarnWebServiceParams.CLUSTER_ID) String clusterId) {
- ContainerId containerId = null;
initForReadableEndpoints(res);
- try {
- containerId = ContainerId.fromString(containerIdStr);
- } catch (IllegalArgumentException e) {
- throw new BadRequestException("invalid container id, " + containerIdStr);
- }
-
- ApplicationId appId =
- containerId.getApplicationAttemptId().getApplicationId();
- AppInfo appInfo;
- try {
- appInfo = getApp(req, appId.toString(), clusterId);
- } catch (Exception ex) {
- // directly find logs from HDFS.
- return LogWebServiceUtils
- .getContainerLogMeta(factory, appId, null, null, containerIdStr,
- false);
- }
- // if the application finishes, directly find logs
- // from HDFS.
- if (LogWebServiceUtils.isFinishedState(appInfo.getAppState())) {
- return LogWebServiceUtils
- .getContainerLogMeta(factory, appId, null, null, containerIdStr,
- false);
- }
- if (LogWebServiceUtils.isRunningState(appInfo.getAppState())) {
- String appOwner = appInfo.getUser();
- String nodeHttpAddress = null;
- if (nmId != null && !nmId.isEmpty()) {
- try {
- nodeHttpAddress =
- LogWebServiceUtils.getNMWebAddressFromRM(yarnConf, nmId);
- } catch (Exception ex) {
- LOG.debug("{}", ex);
- }
- }
- if (nodeHttpAddress == null || nodeHttpAddress.isEmpty()) {
- ContainerInfo containerInfo;
- try {
- containerInfo =
- getContainer(req, appId.toString(), containerId.toString(),
- clusterId);
- } catch (Exception ex) {
- // return log meta for the aggregated logs if exists.
- // It will also return empty log meta for the local logs.
- return LogWebServiceUtils
- .getContainerLogMeta(factory, appId, appOwner, null,
- containerIdStr, true);
- }
- nodeHttpAddress = containerInfo.getNodeHttpAddress();
- // make sure nodeHttpAddress is not null and not empty. Otherwise,
- // we would only get log meta for aggregated logs instead of
- // re-directing the request
- if (nodeHttpAddress == null || nodeHttpAddress.isEmpty()
- || redirectedFromNode) {
- // return log meta for the aggregated logs if exists.
- // It will also return empty log meta for the local logs.
- // If this is the redirect request from NM, we should not
- // re-direct the request back. Simply output the aggregated log meta.
- return LogWebServiceUtils
- .getContainerLogMeta(factory, appId, appOwner, null,
- containerIdStr, true);
- }
- }
- String uri = "/" + containerId.toString() + "/logs";
- String resURI = JOINER.join(
- LogWebServiceUtils.getAbsoluteNMWebAddress(yarnConf, nodeHttpAddress),
- NM_DOWNLOAD_URI_STR, uri);
- String query = req.getQueryString();
- if (query != null && !query.isEmpty()) {
- resURI += "?" + query;
- }
- Response.ResponseBuilder response =
- Response.status(HttpServletResponse.SC_TEMPORARY_REDIRECT);
- response.header("Location", resURI);
- return response.build();
- } else {
- throw new NotFoundException(
- "The application is not at Running or Finished State.");
- }
+ return logServlet.getContainerLogsInfo(req, containerIdStr, nmId,
+ redirectedFromNode, clusterId);
}
- protected ContainerInfo getContainer(HttpServletRequest req, String appId,
- String containerId, String clusterId) {
+ @Override
+ public String getNodeHttpAddress(HttpServletRequest req, String appId,
+ String appAttemptId, String containerId, String clusterId) {
UserGroupInformation callerUGI = LogWebServiceUtils.getUser(req);
String cId = clusterId != null ? clusterId : defaultClusterid;
MultivaluedMap