YARN-6011. Add a new web service to list the files on a container in AHSWebService. Contributed by Xuan Gong.

This commit is contained in:
Junping Du 2017-01-16 16:20:24 -08:00
parent d1d0b3e1fd
commit cf695577aa
3 changed files with 415 additions and 0 deletions

View File

@ -24,6 +24,8 @@
import java.io.OutputStream;
import java.nio.charset.Charset;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import javax.servlet.http.HttpServletRequest;
@ -40,6 +42,7 @@
import javax.ws.rs.core.StreamingOutput;
import javax.ws.rs.core.Response.ResponseBuilder;
import javax.ws.rs.core.Response.Status;
import org.apache.commons.math3.util.Pair;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
@ -57,12 +60,14 @@
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat;
import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogReader;
import org.apache.hadoop.yarn.server.webapp.WebServices;
import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptInfo;
import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptsInfo;
import org.apache.hadoop.yarn.server.webapp.dao.AppInfo;
import org.apache.hadoop.yarn.server.webapp.dao.AppsInfo;
import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo;
import org.apache.hadoop.yarn.server.webapp.dao.ContainerLogsInfo;
import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo;
import org.apache.hadoop.yarn.util.Times;
import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
@ -213,6 +218,115 @@ public ContainerInfo getContainer(@Context HttpServletRequest req,
}
}
// TODO: YARN-6080: Create WebServiceUtils to have common functions used in
// RMWebService, NMWebService and AHSWebService.
/**
* Returns log file's name as well as current file size for a container.
*
* @param req
* HttpServletRequest
* @param res
* HttpServletResponse
* @param containerIdStr
* The container ID
* @return
* The log file's name and current file size
*/
@GET
@Path("/containers/{containerid}/logs")
@Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
public Response getContainerLogsInfo(
@Context HttpServletRequest req,
@Context HttpServletResponse res,
@PathParam("containerid") String containerIdStr) {
ContainerId containerId = null;
init(res);
try {
containerId = ContainerId.fromString(containerIdStr);
} catch (Exception e) {
throw new BadRequestException("invalid container id, " + containerIdStr);
}
ApplicationId appId = containerId.getApplicationAttemptId()
.getApplicationId();
AppInfo appInfo;
try {
appInfo = super.getApp(req, res, appId.toString());
} catch (Exception ex) {
// directly find logs from HDFS.
return getContainerLogMeta(appId, null, null, containerIdStr);
}
String appOwner = appInfo.getUser();
ContainerInfo containerInfo;
try {
containerInfo = super.getContainer(
req, res, appId.toString(),
containerId.getApplicationAttemptId().toString(),
containerId.toString());
} catch (Exception ex) {
if (isFinishedState(appInfo.getAppState())) {
// directly find logs from HDFS.
return getContainerLogMeta(appId, appOwner, null, containerIdStr);
}
return createBadResponse(Status.INTERNAL_SERVER_ERROR,
"Can not get ContainerInfo for the container: " + containerId);
}
String nodeId = containerInfo.getNodeId();
if (isRunningState(appInfo.getAppState())) {
String nodeHttpAddress = containerInfo.getNodeHttpAddress();
String uri = "/" + containerId.toString() + "/logs";
String resURI = JOINER.join(nodeHttpAddress, NM_DOWNLOAD_URI_STR, uri);
String query = req.getQueryString();
if (query != null && !query.isEmpty()) {
resURI += "?" + query;
}
ResponseBuilder response = Response.status(
HttpServletResponse.SC_TEMPORARY_REDIRECT);
response.header("Location", resURI);
return response.build();
} else if (isFinishedState(appInfo.getAppState())) {
return getContainerLogMeta(appId, appOwner, nodeId,
containerIdStr);
} else {
return createBadResponse(Status.NOT_FOUND,
"The application is not at Running or Finished State.");
}
}
/**
* Returns the contents of a container's log file in plain text.
*
* @param req
* HttpServletRequest
* @param res
* HttpServletResponse
* @param containerIdStr
* The container ID
* @param filename
* The name of the log file
* @param format
* The content type
* @param size
* the size of the log file
* @return
* The contents of the container's log file
*/
@GET
@Path("/containers/{containerid}/logs/{filename}")
@Produces({ MediaType.TEXT_PLAIN })
@Public
@Unstable
public Response getContainerLogFile(@Context HttpServletRequest req,
@Context HttpServletResponse res,
@PathParam("containerid") String containerIdStr,
@PathParam("filename") String filename,
@QueryParam("format") String format,
@QueryParam("size") String size) {
return getLogs(req, res, containerIdStr, filename, format, size);
}
//TODO: YARN-4993: Refactory ContainersLogsBlock, AggregatedLogsBlock and
// container log webservice introduced in AHS to minimize
// the duplication.
@GET
@Path("/containerlogs/{containerid}/{filename}")
@Produces({ MediaType.TEXT_PLAIN + "; " + JettyUtils.UTF_8 })
@ -486,4 +600,92 @@ private long parseLongParam(String bytes) {
}
return Long.parseLong(bytes);
}
private Response getContainerLogMeta(ApplicationId appId, String appOwner,
final String nodeId, final String containerIdStr) {
Map<String, String> containerLogMeta = new HashMap<>();
try {
String suffix = LogAggregationUtils.getRemoteNodeLogDirSuffix(conf);
org.apache.hadoop.fs.Path remoteRootLogDir =
new org.apache.hadoop.fs.Path(conf.get(
YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR));
org.apache.hadoop.fs.Path qualifiedRemoteRootLogDir =
FileContext.getFileContext(conf).makeQualified(remoteRootLogDir);
FileContext fc = FileContext.getFileContext(
qualifiedRemoteRootLogDir.toUri(), conf);
org.apache.hadoop.fs.Path remoteAppDir = null;
if (appOwner == null) {
org.apache.hadoop.fs.Path toMatch = LogAggregationUtils
.getRemoteAppLogDir(remoteRootLogDir, appId, "*", suffix);
FileStatus[] matching = fc.util().globStatus(toMatch);
if (matching == null || matching.length != 1) {
return createBadResponse(Status.INTERNAL_SERVER_ERROR,
"Can not get log meta for container: " + containerIdStr);
}
remoteAppDir = matching[0].getPath();
} else {
remoteAppDir = LogAggregationUtils.getRemoteAppLogDir(
remoteRootLogDir, appId, appOwner, suffix);
}
final RemoteIterator<FileStatus> nodeFiles;
nodeFiles = fc.listStatus(remoteAppDir);
if (!nodeFiles.hasNext()) {
return createBadResponse(Status.INTERNAL_SERVER_ERROR,
"Can not get log meta for container: " + containerIdStr);
}
String nodeIdStr = (nodeId == null) ? null
: LogAggregationUtils.getNodeString(nodeId);
while (nodeFiles.hasNext()) {
FileStatus thisNodeFile = nodeFiles.next();
if (nodeIdStr != null) {
if (!thisNodeFile.getPath().getName().contains(nodeIdStr)) {
continue;
}
}
if (!thisNodeFile.getPath().getName().endsWith(
LogAggregationUtils.TMP_FILE_SUFFIX)) {
AggregatedLogFormat.LogReader reader =
new AggregatedLogFormat.LogReader(conf,
thisNodeFile.getPath());
try {
DataInputStream valueStream;
LogKey key = new LogKey();
valueStream = reader.next(key);
while (valueStream != null) {
if (key.toString().equals(containerIdStr)) {
while (true) {
try {
Pair<String, String> logMeta =
LogReader.readContainerMetaDataAndSkipData(
valueStream, null);
containerLogMeta.put(logMeta.getFirst(),
logMeta.getSecond());
} catch (EOFException eof) {
break;
}
}
break;
}
// Next container
key = new LogKey();
valueStream = reader.next(key);
}
} finally {
reader.close();
}
}
}
ResponseBuilder response = Response.ok(new ContainerLogsInfo(
containerLogMeta));
// Sending the X-Content-Type-Options response header with the value
// nosniff will prevent Internet Explorer from MIME-sniffing a response
// away from the declared content-type.
response.header("X-Content-Type-Options", "nosniff");
return response.build();
} catch (Exception ex) {
return createBadResponse(Status.INTERNAL_SERVER_ERROR,
ex.getMessage());
}
}
}

View File

@ -66,6 +66,7 @@
import org.apache.hadoop.yarn.server.timeline.TimelineDataManager;
import org.apache.hadoop.yarn.server.timeline.TimelineStore;
import org.apache.hadoop.yarn.server.timeline.security.TimelineACLsManager;
import org.apache.hadoop.yarn.server.webapp.dao.ContainerLogsInfo;
import org.apache.hadoop.yarn.api.records.timeline.TimelineAbout;
import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
import org.apache.hadoop.yarn.webapp.GenericExceptionHandler;
@ -563,6 +564,17 @@ public void testContainerLogsForFinishedApps() throws Exception {
String responseText = response.getEntity(String.class);
assertTrue(responseText.contains("Hello." + containerId1));
// Do the same test with new API
r = resource();
response = r.path("ws").path("v1")
.path("applicationhistory").path("containers")
.path(containerId1.toString()).path("logs").path(fileName)
.queryParam("user.name", user)
.accept(MediaType.TEXT_PLAIN)
.get(ClientResponse.class);
responseText = response.getEntity(String.class);
assertTrue(responseText.contains("Hello." + containerId1));
// test whether we can find container log from remote diretory if
// the containerInfo for this container could not be fetched from AHS.
r = resource();
@ -575,6 +587,17 @@ public void testContainerLogsForFinishedApps() throws Exception {
responseText = response.getEntity(String.class);
assertTrue(responseText.contains("Hello." + containerId100));
// Do the same test with new API
r = resource();
response = r.path("ws").path("v1")
.path("applicationhistory").path("containers")
.path(containerId100.toString()).path("logs").path(fileName)
.queryParam("user.name", user)
.accept(MediaType.TEXT_PLAIN)
.get(ClientResponse.class);
responseText = response.getEntity(String.class);
assertTrue(responseText.contains("Hello." + containerId100));
// create an application which can not be found from AHS
ApplicationId appId100 = ApplicationId.newInstance(0, 100);
appLogsDir = new Path(rootLogDirPath, appId100.toString());
@ -723,6 +746,95 @@ public void testContainerLogsForRunningApps() throws Exception {
assertTrue(redirectURL.contains(containerId1.toString()));
assertTrue(redirectURL.contains("/logs/" + fileName));
assertTrue(redirectURL.contains("user.name=" + user));
// Test with new API
requestURI = r.path("ws").path("v1")
.path("applicationhistory").path("containers")
.path(containerId1.toString()).path("logs").path(fileName)
.queryParam("user.name", user).getURI();
redirectURL = getRedirectURL(requestURI.toString());
assertTrue(redirectURL != null);
assertTrue(redirectURL.contains("test:1234"));
assertTrue(redirectURL.contains("ws/v1/node/containers"));
assertTrue(redirectURL.contains(containerId1.toString()));
assertTrue(redirectURL.contains("/logs/" + fileName));
assertTrue(redirectURL.contains("user.name=" + user));
}
@Test(timeout = 10000)
public void testContainerLogsMetaForRunningApps() throws Exception {
String user = "user1";
ApplicationId appId = ApplicationId.newInstance(
1234, 1);
ApplicationAttemptId appAttemptId =
ApplicationAttemptId.newInstance(appId, 1);
ContainerId containerId1 = ContainerId.newContainerId(appAttemptId, 1);
WebResource r = resource();
URI requestURI = r.path("ws").path("v1")
.path("applicationhistory").path("containers")
.path(containerId1.toString()).path("logs")
.queryParam("user.name", user).getURI();
String redirectURL = getRedirectURL(requestURI.toString());
assertTrue(redirectURL != null);
assertTrue(redirectURL.contains("test:1234"));
assertTrue(redirectURL.contains("ws/v1/node/containers"));
assertTrue(redirectURL.contains(containerId1.toString()));
assertTrue(redirectURL.contains("/logs"));
}
@Test(timeout = 10000)
public void testContainerLogsMetaForFinishedApps() throws Exception {
String fileName = "syslog";
String user = "user1";
UserGroupInformation ugi = UserGroupInformation.createRemoteUser("user1");
NodeId nodeId = NodeId.newInstance("test host", 100);
//prepare the logs for remote directory
ApplicationId appId = ApplicationId.newInstance(0, 1);
// create local logs
List<String> rootLogDirList = new ArrayList<String>();
rootLogDirList.add(rootLogDir);
Path rootLogDirPath = new Path(rootLogDir);
if (fs.exists(rootLogDirPath)) {
fs.delete(rootLogDirPath, true);
}
assertTrue(fs.mkdirs(rootLogDirPath));
Path appLogsDir = new Path(rootLogDirPath, appId.toString());
if (fs.exists(appLogsDir)) {
fs.delete(appLogsDir, true);
}
assertTrue(fs.mkdirs(appLogsDir));
ApplicationAttemptId appAttemptId =
ApplicationAttemptId.newInstance(appId, 1);
ContainerId containerId1 = ContainerId.newContainerId(appAttemptId, 1);
String content = "Hello." + containerId1;
createContainerLogInLocalDir(appLogsDir, containerId1, fs, fileName,
content);
// upload container logs to remote log dir
Path path = new Path(conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR) +
user + "/logs/" + appId.toString());
if (fs.exists(path)) {
fs.delete(path, true);
}
assertTrue(fs.mkdirs(path));
uploadContainerLogIntoRemoteDir(ugi, conf, rootLogDirList, nodeId,
containerId1, path, fs);
WebResource r = resource();
ClientResponse response = r.path("ws").path("v1")
.path("applicationhistory").path("containers")
.path(containerId1.toString()).path("logs")
.queryParam("user.name", user)
.accept(MediaType.APPLICATION_JSON)
.get(ClientResponse.class);
ContainerLogsInfo responseText = response.getEntity(
ContainerLogsInfo.class);
assertEquals(responseText.getContainerLogsInfo().size(), 1);
assertEquals(responseText.getContainerLogsInfo().get(0).getFileName(),
fileName);
assertEquals(responseText.getContainerLogsInfo().get(0).getFileSize(),
String.valueOf(content.length()));
}
private static String getRedirectURL(String url) {

View File

@ -0,0 +1,101 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.webapp.dao;
import com.google.common.annotations.VisibleForTesting;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlElement;
import javax.xml.bind.annotation.XmlRootElement;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.yarn.exceptions.YarnException;
/**
* {@code ContainerLogsInfo} includes the log meta-data of containers.
* <p>
* The container log meta-data includes details such as:
* <ul>
* <li>The filename of the container log.</li>
* <li>The size of the container log.</li>
* </ul>
*/
@XmlRootElement(name = "containerLogsInfo")
@XmlAccessorType(XmlAccessType.FIELD)
public class ContainerLogsInfo {
@XmlElement(name = "containerLogInfo")
protected List<ContainerLogInfo> containerLogsInfo;
//JAXB needs this
public ContainerLogsInfo() {}
public ContainerLogsInfo(Map<String, String> containerLogMeta)
throws YarnException {
this.containerLogsInfo = new ArrayList<ContainerLogInfo>();
for (Entry<String, String> meta : containerLogMeta.entrySet()) {
ContainerLogInfo info = new ContainerLogInfo(meta.getKey(),
meta.getValue());
containerLogsInfo.add(info);
}
}
public List<ContainerLogInfo> getContainerLogsInfo() {
return this.containerLogsInfo;
}
/**
* It includes the log meta-data of a container.
*
*/
@Private
@VisibleForTesting
public static class ContainerLogInfo {
private String fileName;
private String fileSize;
//JAXB needs this
public ContainerLogInfo() {}
public ContainerLogInfo(String fileName, String fileSize) {
this.setFileName(fileName);
this.setFileSize(fileSize);
}
public String getFileName() {
return fileName;
}
public void setFileName(String fileName) {
this.fileName = fileName;
}
public String getFileSize() {
return fileSize;
}
public void setFileSize(String fileSize) {
this.fileSize = fileSize;
}
}
}