From 3234e5eaf36aae1839b2ab5cc43517afe7087a45 Mon Sep 17 00:00:00 2001 From: adamantal Date: Sat, 12 Dec 2020 09:42:22 +0100 Subject: [PATCH] YARN-10031. Create a general purpose log request with additional query parameters. Contributed by Andras Gyori --- .../mapreduce/v2/hs/webapp/HsWebServices.java | 27 ++ .../ExtendedLogMetaRequest.java | 291 +++++++++++++ .../LogAggregationMetaCollector.java | 143 +++++++ .../logaggregation/LogAggregationUtils.java | 105 ++++- .../LogAggregationFileController.java | 46 +++ .../LogAggregationIndexedFileController.java | 76 ++++ .../tfile/LogAggregationTFileController.java | 55 +++ .../TestLogAggregationMetaCollector.java | 391 ++++++++++++++++++ .../FakeLogAggregationFileController.java | 96 +++++ ...stLogAggregationIndexedFileController.java | 106 +++++ .../hadoop/yarn/server/webapp/LogServlet.java | 34 +- .../server/webapp/YarnWebServiceParams.java | 2 + 12 files changed, 1359 insertions(+), 13 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/ExtendedLogMetaRequest.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogAggregationMetaCollector.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestLogAggregationMetaCollector.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/filecontroller/FakeLogAggregationFileController.java diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsWebServices.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsWebServices.java index 6e175001a3..4ee76369b9 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsWebServices.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsWebServices.java @@ -19,6 +19,7 @@ package org.apache.hadoop.mapreduce.v2.hs.webapp; import java.io.IOException; +import java.util.Set; import javax.annotation.Nullable; import javax.servlet.http.HttpServletRequest; @@ -69,6 +70,7 @@ import org.apache.hadoop.mapreduce.v2.util.MRApps; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.ApplicationClientProtocol; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.logaggregation.ExtendedLogMetaRequest; import org.apache.hadoop.yarn.server.webapp.WrappedLogMetaRequest; import org.apache.hadoop.yarn.server.webapp.YarnWebServiceParams; import org.apache.hadoop.yarn.server.webapp.LogServlet; @@ -441,6 +443,31 @@ public class HsWebServices extends WebServices { return logServlet.getRemoteLogDirPath(user, appIdStr); } + @GET + @Path("/extended-log-query") + @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML }) + @InterfaceAudience.Public + @InterfaceStability.Unstable + public Response getAggregatedLogsMeta(@Context HttpServletRequest hsr, + @QueryParam(YarnWebServiceParams.CONTAINER_LOG_FILE_NAME) String fileName, + @QueryParam(YarnWebServiceParams.FILESIZE) Set fileSize, + @QueryParam(YarnWebServiceParams.MODIFICATION_TIME) Set + modificationTime, + @QueryParam(YarnWebServiceParams.APP_ID) String appIdStr, + @QueryParam(YarnWebServiceParams.CONTAINER_ID) String containerIdStr, + @QueryParam(YarnWebServiceParams.NM_ID) String nmId) throws IOException { + init(); + ExtendedLogMetaRequest.ExtendedLogMetaRequestBuilder logsRequest = + new ExtendedLogMetaRequest.ExtendedLogMetaRequestBuilder(); + logsRequest.setAppId(appIdStr); + logsRequest.setFileName(fileName); + logsRequest.setContainerId(containerIdStr); + logsRequest.setFileSize(fileSize); + logsRequest.setModificationTime(modificationTime); + logsRequest.setNodeId(nmId); + return logServlet.getContainerLogsInfo(hsr, logsRequest); + } + @GET @Path("/aggregatedlogs") @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML }) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/ExtendedLogMetaRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/ExtendedLogMetaRequest.java new file mode 100644 index 0000000000..0815e03e32 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/ExtendedLogMetaRequest.java @@ -0,0 +1,291 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.logaggregation; + +import java.util.Collections; +import java.util.List; +import java.util.Set; +import java.util.function.Predicate; +import java.util.regex.Pattern; +import java.util.regex.PatternSyntaxException; +import java.util.stream.Collectors; + +/** + * Represents a query of log metadata with extended filtering capabilities. + */ +public class ExtendedLogMetaRequest { + private final String user; + private final String appId; + private final String containerId; + private final MatchExpression nodeId; + private final MatchExpression fileName; + private final ComparisonCollection fileSize; + private final ComparisonCollection modificationTime; + + public ExtendedLogMetaRequest( + String user, String appId, String containerId, MatchExpression nodeId, + MatchExpression fileName, ComparisonCollection fileSize, + ComparisonCollection modificationTime) { + this.user = user; + this.appId = appId; + this.containerId = containerId; + this.nodeId = nodeId; + this.fileName = fileName; + this.fileSize = fileSize; + this.modificationTime = modificationTime; + } + + public String getUser() { + return user; + } + + public String getAppId() { + return appId; + } + + public String getContainerId() { + return containerId; + } + + public MatchExpression getNodeId() { + return nodeId; + } + + public MatchExpression getFileName() { + return fileName; + } + + public ComparisonCollection getFileSize() { + return fileSize; + } + + public ComparisonCollection getModificationTime() { + return modificationTime; + } + + public static class ExtendedLogMetaRequestBuilder { + private String user; + private String appId; + private String containerId; + private MatchExpression nodeId = new MatchExpression(null); + private MatchExpression fileName = new MatchExpression(null); + private ComparisonCollection fileSize = new ComparisonCollection(null); + private ComparisonCollection modificationTime = + new ComparisonCollection(null); + + public ExtendedLogMetaRequestBuilder setUser(String userName) { + this.user = userName; + return this; + } + + public ExtendedLogMetaRequestBuilder setAppId(String applicationId) { + this.appId = applicationId; + return this; + } + + public ExtendedLogMetaRequestBuilder setContainerId(String container) { + this.containerId = container; + return this; + } + + public ExtendedLogMetaRequestBuilder setNodeId(String node) { + try { + this.nodeId = new MatchExpression(node); + } catch (PatternSyntaxException e) { + throw new IllegalArgumentException("Node Id expression is invalid", e); + } + return this; + } + + public ExtendedLogMetaRequestBuilder setFileName(String file) { + try { + this.fileName = new MatchExpression(file); + } catch (PatternSyntaxException e) { + throw new IllegalArgumentException("Filename expression is invalid", e); + } + return this; + } + + public ExtendedLogMetaRequestBuilder setFileSize(Set fileSizes) { + this.fileSize = new ComparisonCollection(fileSizes); + return this; + } + + public ExtendedLogMetaRequestBuilder setModificationTime( + Set modificationTimes) { + this.modificationTime = new ComparisonCollection(modificationTimes); + return this; + } + + public boolean isUserSet() { + return user != null; + } + + public ExtendedLogMetaRequest build() { + return new ExtendedLogMetaRequest(user, appId, containerId, nodeId, + fileName, fileSize, modificationTime); + } + } + + /** + * A collection of {@code ComparisonExpression}. + */ + public static class ComparisonCollection { + private List comparisonExpressions; + + public ComparisonCollection(Set expressions) { + if (expressions == null) { + this.comparisonExpressions = Collections.emptyList(); + } else { + List equalExpressions = expressions.stream().filter( + e -> !e.startsWith(ComparisonExpression.GREATER_OPERATOR) && + !e.startsWith(ComparisonExpression.LESSER_OPERATOR)) + .collect(Collectors.toList()); + if (equalExpressions.size() > 1) { + throw new IllegalArgumentException( + "Can not process more, than one exact match. Matches: " + + String.join(" ", equalExpressions)); + } + + this.comparisonExpressions = expressions.stream() + .map(ComparisonExpression::new).collect(Collectors.toList()); + + } + + } + + public boolean match(Long value) { + return match(value, true); + } + + public boolean match(String value) { + if (value == null) { + return true; + } + + return match(Long.valueOf(value), true); + } + + /** + * Checks, if the given value matches all the {@code ComparisonExpression}. + * This implies an AND logic between the expressions. + * @param value given value to match against + * @param defaultValue default value to return when no expression is defined + * @return whether all expressions were matched + */ + public boolean match(Long value, boolean defaultValue) { + if (comparisonExpressions.isEmpty()) { + return defaultValue; + } + + return comparisonExpressions.stream() + .allMatch(expr -> expr.match(value)); + } + + } + + /** + * Wraps a comparison logic based on a stringified expression. + * The format of the expression is: + * >value = is greater than value + * <value = is lower than value + * value = is equal to value + */ + public static class ComparisonExpression { + public static final String GREATER_OPERATOR = ">"; + public static final String LESSER_OPERATOR = "<"; + + private String expression; + private Predicate comparisonFn; + private Long convertedValue; + + public ComparisonExpression(String expression) { + if (expression == null) { + return; + } + + if (expression.startsWith(GREATER_OPERATOR)) { + convertedValue = Long.parseLong(expression.substring(1)); + comparisonFn = a -> a > convertedValue; + } else if (expression.startsWith(LESSER_OPERATOR)) { + convertedValue = Long.parseLong(expression.substring(1)); + comparisonFn = a -> a < convertedValue; + } else { + convertedValue = Long.parseLong(expression); + comparisonFn = a -> a.equals(convertedValue); + } + + this.expression = expression; + } + + public boolean match(String value) { + return match(Long.valueOf(value), true); + } + + public boolean match(Long value) { + return match(value, true); + } + + /** + * Test the given value with the defined comparison functions based on + * stringified expression. + * @param value value to test with + * @param defaultValue value to return when no expression was defined + * @return comparison test result or the given default value + */ + public boolean match(Long value, boolean defaultValue) { + if (expression == null) { + return defaultValue; + } else { + return comparisonFn.test(value); + } + } + + @Override + public String toString() { + return convertedValue != null ? String.valueOf(convertedValue) : ""; + } + } + + /** + * Wraps a regex matcher. + */ + public static class MatchExpression { + private Pattern expression; + + public MatchExpression(String expression) { + this.expression = expression != null ? Pattern.compile(expression) : null; + } + + /** + * Matches the value on the expression. + * @param value value to be matched against + * @return result of the match or true, if no expression was defined + */ + public boolean match(String value) { + return expression == null || expression.matcher(value).matches(); + } + + @Override + public String toString() { + return expression != null ? expression.pattern() : ""; + } + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogAggregationMetaCollector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogAggregationMetaCollector.java new file mode 100644 index 0000000000..9c6e5b34d7 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogAggregationMetaCollector.java @@ -0,0 +1,143 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.logaggregation; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.HarFs; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileController; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +/** + * Facilitates an extended query of aggregated log file metadata with + * the help of file controllers. + */ +public class LogAggregationMetaCollector { + + private static final Logger LOG = LoggerFactory.getLogger( + LogAggregationMetaCollector.class); + + private final ExtendedLogMetaRequest logsRequest; + private final Configuration conf; + + public LogAggregationMetaCollector( + ExtendedLogMetaRequest logsRequest, Configuration conf) { + this.logsRequest = logsRequest; + this.conf = conf; + } + + /** + * Collects all log file metadata based on the complex query defined in + * {@code UserLogsRequest}. + * @param fileController log aggregation file format controller + * @return collection of log file metadata grouped by containers + * @throws IOException if node file is not reachable + */ + public List collect( + LogAggregationFileController fileController) throws IOException { + List containersLogMeta = new ArrayList<>(); + RemoteIterator appDirs = fileController. + getApplicationDirectoriesOfUser(logsRequest.getUser()); + + while (appDirs.hasNext()) { + FileStatus currentAppDir = appDirs.next(); + if (logsRequest.getAppId() == null || + logsRequest.getAppId().equals(currentAppDir.getPath().getName())) { + ApplicationId appId = ApplicationId.fromString( + currentAppDir.getPath().getName()); + RemoteIterator nodeFiles = fileController + .getNodeFilesOfApplicationDirectory(currentAppDir); + + while (nodeFiles.hasNext()) { + FileStatus currentNodeFile = nodeFiles.next(); + if (!logsRequest.getNodeId().match(currentNodeFile.getPath() + .getName())) { + continue; + } + + if (currentNodeFile.getPath().getName().equals( + logsRequest.getAppId() + ".har")) { + Path p = new Path("har:///" + + currentNodeFile.getPath().toUri().getRawPath()); + nodeFiles = HarFs.get(p.toUri(), conf).listStatusIterator(p); + continue; + } + + try { + Map> metaFiles = fileController + .getLogMetaFilesOfNode(logsRequest, currentNodeFile, appId); + if (metaFiles == null) { + continue; + } + + metaFiles.entrySet().removeIf(entry -> + !(logsRequest.getContainerId() == null || + logsRequest.getContainerId().equals(entry.getKey()))); + + containersLogMeta.addAll(createContainerLogMetas( + currentNodeFile.getPath().getName(), metaFiles)); + } catch (IOException ioe) { + LOG.warn("Can not get log meta from the log file:" + + currentNodeFile.getPath() + "\n" + ioe.getMessage()); + } + + } + } + + } + return containersLogMeta; + } + + private List createContainerLogMetas( + String nodeId, Map> metaFiles) { + List containerLogMetas = new ArrayList<>(); + for (Map.Entry> containerLogs + : metaFiles.entrySet()) { + ContainerLogMeta containerLogMeta = new ContainerLogMeta( + containerLogs.getKey(), nodeId); + for (ContainerLogFileInfo file : containerLogs.getValue()) { + boolean isFileNameMatches = logsRequest.getFileName() + .match(file.getFileName()); + boolean fileSizeComparison = logsRequest.getFileSize() + .match(file.getFileSize()); + boolean modificationTimeComparison = logsRequest.getModificationTime() + .match(file.getLastModifiedTime()); + + if (!isFileNameMatches || !fileSizeComparison || + !modificationTimeComparison) { + continue; + } + containerLogMeta.getContainerLogMeta().add(file); + } + if (!containerLogMeta.getContainerLogMeta().isEmpty()) { + containerLogMetas.add(containerLogMeta); + } + } + return containerLogMetas; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogAggregationUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogAggregationUtils.java index 5f9466f386..ec3d3f8509 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogAggregationUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogAggregationUtils.java @@ -29,10 +29,13 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting; + +import java.io.FileNotFoundException; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.NoSuchElementException; @Private public class LogAggregationUtils { @@ -295,19 +298,8 @@ public class LogAggregationUtils { // Return both new and old node files combined RemoteIterator curDir = nodeFilesCur; RemoteIterator prevDir = nodeFilesPrev; - RemoteIterator nodeFilesCombined = new - RemoteIterator() { - @Override - public boolean hasNext() throws IOException { - return prevDir.hasNext() || curDir.hasNext(); - } - @Override - public FileStatus next() throws IOException { - return prevDir.hasNext() ? prevDir.next() : curDir.next(); - } - }; - return nodeFilesCombined; + return combineIterators(prevDir, curDir); } } @@ -368,4 +360,93 @@ public class LogAggregationUtils { return nodeFiles; } + public static RemoteIterator getRemoteFiles( + Configuration conf, Path appPath) throws IOException { + + Path qualifiedLogDir = + FileContext.getFileContext(conf).makeQualified(appPath); + return FileContext.getFileContext( + qualifiedLogDir.toUri(), conf).listStatus(appPath); + } + + public static RemoteIterator getUserRemoteLogDir( + Configuration conf, String user, Path remoteRootLogDir, + String remoteRootLogDirSuffix) throws IOException { + Path userPath = LogAggregationUtils.getRemoteLogSuffixedDir( + remoteRootLogDir, user, remoteRootLogDirSuffix); + final RemoteIterator userRootDirFiles = + getRemoteFiles(conf, userPath); + + RemoteIterator newDirs = new RemoteIterator() { + private RemoteIterator currentBucketDir = + LogAggregationUtils.getSubDir(conf, userRootDirFiles); + @Override + public boolean hasNext() throws IOException { + return currentBucketDir != null && currentBucketDir.hasNext() || + userRootDirFiles.hasNext(); + } + + @Override + public FileStatus next() throws IOException { + FileStatus next = null; + while (next == null) { + if (currentBucketDir != null && currentBucketDir.hasNext()) { + next = currentBucketDir.next(); + } else if (userRootDirFiles.hasNext()) { + currentBucketDir = LogAggregationUtils.getSubDir( + conf, userRootDirFiles); + } else { + throw new NoSuchElementException(); + } + } + return next; + } + }; + + RemoteIterator allDir = newDirs; + if (LogAggregationUtils.isOlderPathEnabled(conf)) { + try { + Path oldPath = LogAggregationUtils.getOlderRemoteLogSuffixedDir( + remoteRootLogDir, user, remoteRootLogDirSuffix); + final RemoteIterator oldUserRootDirFiles = + getRemoteFiles(conf, oldPath); + allDir = combineIterators(oldUserRootDirFiles, newDirs); + } catch (FileNotFoundException e) { + return newDirs; + } + } + + return allDir; + } + + private static RemoteIterator getSubDir( + Configuration conf, RemoteIterator rootDir) + throws IOException { + if (rootDir.hasNext()) { + Path userPath = rootDir.next().getPath(); + Path qualifiedLogDir = + FileContext.getFileContext(conf).makeQualified(userPath); + return FileContext.getFileContext( + qualifiedLogDir.toUri(), conf).listStatus(userPath); + } else { + return null; + } + } + + private static RemoteIterator combineIterators( + RemoteIterator first, RemoteIterator second) { + return new RemoteIterator() { + @Override + public boolean hasNext() throws IOException { + return first.hasNext() || second.hasNext(); + } + + @Override + public FileStatus next() throws IOException { + return first.hasNext() ? first.next() : second.next(); + } + }; + + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java index 40ba555c31..c6e34ef6a9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/LogAggregationFileController.java @@ -41,6 +41,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.ipc.RemoteException; @@ -53,7 +54,9 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.logaggregation.ContainerLogFileInfo; import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils; +import org.apache.hadoop.yarn.logaggregation.ExtendedLogMetaRequest; import org.apache.hadoop.yarn.webapp.View.ViewContext; import org.apache.hadoop.yarn.webapp.view.HtmlBlock.Block; import org.slf4j.Logger; @@ -224,6 +227,49 @@ public abstract class LogAggregationFileController { public abstract List readAggregatedLogsMeta( ContainerLogsRequest logRequest) throws IOException; + /** + * Returns log file metadata for a node grouped by containers. + * + * @param logRequest extended query information holder + * @param currentNodeFile file status of a node in an application directory + * @param appId id of the application, which is the same as in node path + * @return log file metadata + * @throws IOException if there is no node file + */ + public Map> getLogMetaFilesOfNode( + ExtendedLogMetaRequest logRequest, FileStatus currentNodeFile, + ApplicationId appId) throws IOException { + LOG.info("User aggregated complex log queries " + + "are not implemented for this file controller"); + return Collections.emptyMap(); + } + + /** + * Gets all application directories of a user. + * + * @param user name of the user + * @return a lazy iterator of directories + * @throws IOException if user directory does not exist + */ + public RemoteIterator getApplicationDirectoriesOfUser( + String user) throws IOException { + return LogAggregationUtils.getUserRemoteLogDir( + conf, user, getRemoteRootLogDir(), getRemoteRootLogDirSuffix()); + } + + /** + * Gets all node files in an application directory. + * + * @param appDir application directory + * @return a lazy iterator of files + * @throws IOException if file context is not reachable + */ + public RemoteIterator getNodeFilesOfApplicationDirectory( + FileStatus appDir) throws IOException { + return LogAggregationUtils + .getRemoteFiles(conf, appDir.getPath()); + } + /** * Render Aggregated Logs block. * @param html the html diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/LogAggregationIndexedFileController.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/LogAggregationIndexedFileController.java index 8047f4a519..b02466b9ab 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/LogAggregationIndexedFileController.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/LogAggregationIndexedFileController.java @@ -27,6 +27,7 @@ import java.io.InputStream; import java.io.OutputStream; import java.io.Serializable; import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; import java.security.PrivilegedExceptionAction; @@ -74,12 +75,14 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.logaggregation.ContainerLogAggregationType; +import org.apache.hadoop.yarn.logaggregation.ContainerLogFileInfo; import org.apache.hadoop.yarn.logaggregation.ContainerLogMeta; import org.apache.hadoop.yarn.logaggregation.ContainerLogsRequest; import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils; import org.apache.hadoop.yarn.logaggregation.LogToolUtils; import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey; import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue; +import org.apache.hadoop.yarn.logaggregation.ExtendedLogMetaRequest; import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileController; import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerContext; import org.apache.hadoop.yarn.util.Clock; @@ -610,6 +613,45 @@ public class LogAggregationIndexedFileController return findLogs; } + @Override + public Map> getLogMetaFilesOfNode( + ExtendedLogMetaRequest logRequest, FileStatus currentNodeFile, + ApplicationId appId) throws IOException { + Map> logMetaFiles = new HashMap<>(); + + Long checkSumIndex = parseChecksum(currentNodeFile); + long endIndex = -1; + if (checkSumIndex != null) { + endIndex = checkSumIndex; + } + IndexedLogsMeta current = loadIndexedLogsMeta( + currentNodeFile.getPath(), endIndex, appId); + if (current != null) { + for (IndexedPerAggregationLogMeta logMeta : + current.getLogMetas()) { + for (Entry> log : logMeta + .getLogMetas().entrySet()) { + String currentContainerId = log.getKey(); + if (!(logRequest.getContainerId() == null || + logRequest.getContainerId().equals(currentContainerId))) { + continue; + } + logMetaFiles.put(currentContainerId, new ArrayList<>()); + for (IndexedFileLogMeta aMeta : log.getValue()) { + ContainerLogFileInfo file = new ContainerLogFileInfo(); + file.setFileName(aMeta.getFileName()); + file.setFileSize(Long.toString(aMeta.getFileSize())); + file.setLastModifiedTime( + Long.toString(aMeta.getLastModifiedTime())); + logMetaFiles.get(currentContainerId).add(file); + } + } + } + } + + return logMetaFiles; + } + @Override public List readAggregatedLogsMeta( ContainerLogsRequest logRequest) throws IOException { @@ -743,6 +785,40 @@ public class LogAggregationIndexedFileController return checkSumFiles; } + private Long parseChecksum(FileStatus file) { + if (!file.getPath().getName().endsWith(CHECK_SUM_FILE_SUFFIX)) { + return null; + } + + FSDataInputStream checksumFileInputStream = null; + try { + FileContext fileContext = FileContext + .getFileContext(file.getPath().toUri(), conf); + String nodeName = null; + long index = 0L; + checksumFileInputStream = fileContext.open(file.getPath()); + int nameLength = checksumFileInputStream.readInt(); + byte[] b = new byte[nameLength]; + int actualLength = checksumFileInputStream.read(b); + if (actualLength == nameLength) { + nodeName = new String(b, StandardCharsets.UTF_8); + index = checksumFileInputStream.readLong(); + } else { + return null; + } + if (!nodeName.isEmpty()) { + return index; + } + } catch (IOException ex) { + LOG.warn(ex.getMessage()); + return null; + } finally { + IOUtils.cleanupWithLogger(LOG, checksumFileInputStream); + } + + return null; + } + @Private public List getNodeLogFileToRead( List nodeFiles, String nodeId, ApplicationId appId) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/tfile/LogAggregationTFileController.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/tfile/LogAggregationTFileController.java index 2355d30640..b365424de8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/tfile/LogAggregationTFileController.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/tfile/LogAggregationTFileController.java @@ -25,10 +25,13 @@ import java.io.OutputStream; import java.nio.charset.Charset; import java.security.PrivilegedExceptionAction; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.logaggregation.ContainerLogFileInfo; +import org.apache.hadoop.yarn.logaggregation.ExtendedLogMetaRequest; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.commons.math3.util.Pair; @@ -258,6 +261,58 @@ public class LogAggregationTFileController return findLogs; } + @Override + public Map> getLogMetaFilesOfNode( + ExtendedLogMetaRequest logRequest, FileStatus currentNodeFile, + ApplicationId appId) throws IOException { + Map> logMetaFiles = new HashMap<>(); + Path nodePath = currentNodeFile.getPath(); + + LogReader reader = + new LogReader(conf, + nodePath); + try { + DataInputStream valueStream; + LogKey key = new LogKey(); + valueStream = reader.next(key); + while (valueStream != null) { + if (logRequest.getContainerId() == null || + logRequest.getContainerId().equals(key.toString())) { + logMetaFiles.put(key.toString(), new ArrayList<>()); + fillMetaFiles(currentNodeFile, valueStream, + logMetaFiles.get(key.toString())); + } + // Next container + key = new LogKey(); + valueStream = reader.next(key); + } + } finally { + reader.close(); + } + return logMetaFiles; + } + + private void fillMetaFiles( + FileStatus currentNodeFile, DataInputStream valueStream, + List logMetaFiles) + throws IOException { + while (true) { + try { + Pair logMeta = + LogReader.readContainerMetaDataAndSkipData( + valueStream); + ContainerLogFileInfo logMetaFile = new ContainerLogFileInfo(); + logMetaFile.setLastModifiedTime( + Long.toString(currentNodeFile.getModificationTime())); + logMetaFile.setFileName(logMeta.getFirst()); + logMetaFile.setFileSize(logMeta.getSecond()); + logMetaFiles.add(logMetaFile); + } catch (EOFException eof) { + break; + } + } + } + @Override public List readAggregatedLogsMeta( ContainerLogsRequest logRequest) throws IOException { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestLogAggregationMetaCollector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestLogAggregationMetaCollector.java new file mode 100644 index 0000000000..c60635b0e2 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestLogAggregationMetaCollector.java @@ -0,0 +1,391 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.logaggregation; + +import org.apache.commons.lang3.tuple.ImmutablePair; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.logaggregation.filecontroller.FakeLogAggregationFileController; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.time.Clock; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.junit.Assert.*; + +public class TestLogAggregationMetaCollector { + private static final String TEST_NODE = "TEST_NODE_1"; + private static final String TEST_NODE_2 = "TEST_NODE_2"; + private static final String BIG_FILE_NAME = "TEST_BIG"; + private static final String SMALL_FILE_NAME = "TEST_SMALL"; + + private static ApplicationId app = ApplicationId.newInstance( + Clock.systemDefaultZone().millis(), 1); + private static ApplicationId app2 = ApplicationId.newInstance( + Clock.systemDefaultZone().millis(), 2); + + private static ApplicationAttemptId appAttempt = + ApplicationAttemptId.newInstance(app, 1); + private static ApplicationAttemptId app2Attempt = + ApplicationAttemptId.newInstance(app2, 1); + + private static ContainerId attemptContainer = + ContainerId.newContainerId(appAttempt, 1); + private static ContainerId attemptContainer2 = + ContainerId.newContainerId(appAttempt, 2); + + private static ContainerId attempt2Container = + ContainerId.newContainerId(app2Attempt, 1); + private static ContainerId attempt2Container2 = + ContainerId.newContainerId(app2Attempt, 2); + + private FakeNodeFileController fileController; + + private static class FakeNodeFileController + extends FakeLogAggregationFileController { + private Map, + Map>> logFiles; + private List appDirs; + private List nodeFiles; + + FakeNodeFileController( + Map, Map>> logFiles, List appDirs, + List nodeFiles) { + this.logFiles = logFiles; + this.appDirs = appDirs; + this.nodeFiles = nodeFiles; + } + + @Override + public RemoteIterator getApplicationDirectoriesOfUser( + String user) throws IOException { + return new RemoteIterator() { + private Iterator iter = appDirs.iterator(); + + @Override + public boolean hasNext() throws IOException { + return iter.hasNext(); + } + + @Override + public FileStatus next() throws IOException { + return iter.next(); + } + }; + } + + @Override + public RemoteIterator getNodeFilesOfApplicationDirectory( + FileStatus appDir) throws IOException { + return new RemoteIterator() { + private Iterator iter = nodeFiles.iterator(); + + @Override + public boolean hasNext() throws IOException { + return iter.hasNext(); + } + + @Override + public FileStatus next() throws IOException { + return iter.next(); + } + }; + } + + @Override + public Map> getLogMetaFilesOfNode( + ExtendedLogMetaRequest logRequest, FileStatus currentNodeFile, + ApplicationId appId) throws IOException { + return logFiles.get(new ImmutablePair<>(appId.toString(), + currentNodeFile.getPath().getName())); + } + } + + @Before + public void setUp() throws Exception { + fileController = createFileController(); + } + + @After + public void tearDown() throws Exception { + } + + @Test + public void testAllNull() throws IOException { + ExtendedLogMetaRequest.ExtendedLogMetaRequestBuilder request = + new ExtendedLogMetaRequest.ExtendedLogMetaRequestBuilder(); + request.setAppId(null); + request.setContainerId(null); + request.setFileName(null); + request.setFileSize(null); + request.setModificationTime(null); + request.setNodeId(null); + request.setUser(null); + + LogAggregationMetaCollector collector = new LogAggregationMetaCollector( + request.build(), new YarnConfiguration()); + List res = collector.collect(fileController); + + List allFile = res.stream() + .flatMap(m -> m.getContainerLogMeta().stream()) + .collect(Collectors.toList()); + assertEquals(8, allFile.size()); + } + + @Test + public void testAllSet() throws IOException { + ExtendedLogMetaRequest.ExtendedLogMetaRequestBuilder request = + new ExtendedLogMetaRequest.ExtendedLogMetaRequestBuilder(); + Set fileSizeExpressions = new HashSet<>(); + fileSizeExpressions.add("<51"); + Set modificationTimeExpressions = new HashSet<>(); + modificationTimeExpressions.add("<1000"); + request.setAppId(app.toString()); + request.setContainerId(attemptContainer.toString()); + request.setFileName(String.format("%s.*", SMALL_FILE_NAME)); + request.setFileSize(fileSizeExpressions); + request.setModificationTime(modificationTimeExpressions); + request.setNodeId(TEST_NODE); + request.setUser("TEST"); + + LogAggregationMetaCollector collector = new LogAggregationMetaCollector( + request.build(), new YarnConfiguration()); + List res = collector.collect(fileController); + + List allFile = res.stream() + .flatMap(m -> m.getContainerLogMeta().stream()) + .collect(Collectors.toList()); + assertEquals(1, allFile.size()); + } + + @Test + public void testSingleNodeRequest() throws IOException { + ExtendedLogMetaRequest.ExtendedLogMetaRequestBuilder request = + new ExtendedLogMetaRequest.ExtendedLogMetaRequestBuilder(); + request.setAppId(null); + request.setContainerId(null); + request.setFileName(null); + request.setFileSize(null); + request.setModificationTime(null); + request.setNodeId(TEST_NODE); + request.setUser(null); + + LogAggregationMetaCollector collector = new LogAggregationMetaCollector( + request.build(), new YarnConfiguration()); + List res = collector.collect(fileController); + + List allFile = res.stream() + .flatMap(m -> m.getContainerLogMeta().stream()) + .collect(Collectors.toList()); + assertEquals(4, allFile.stream(). + filter(f -> f.getFileName().contains(TEST_NODE)).count()); + } + + @Test + public void testMultipleNodeRegexRequest() throws IOException { + ExtendedLogMetaRequest.ExtendedLogMetaRequestBuilder request = + new ExtendedLogMetaRequest.ExtendedLogMetaRequestBuilder(); + request.setAppId(null); + request.setContainerId(null); + request.setFileName(null); + request.setFileSize(null); + request.setModificationTime(null); + request.setNodeId("TEST_NODE_.*"); + request.setUser(null); + + LogAggregationMetaCollector collector = new LogAggregationMetaCollector( + request.build(), new YarnConfiguration()); + List res = collector.collect(fileController); + + List allFile = res.stream() + .flatMap(m -> m.getContainerLogMeta().stream()) + .collect(Collectors.toList()); + assertEquals(8, allFile.size()); + } + + @Test + public void testMultipleFileRegex() throws IOException { + ExtendedLogMetaRequest.ExtendedLogMetaRequestBuilder request = + new ExtendedLogMetaRequest.ExtendedLogMetaRequestBuilder(); + request.setAppId(null); + request.setContainerId(null); + request.setFileName(String.format("%s.*", BIG_FILE_NAME)); + request.setFileSize(null); + request.setModificationTime(null); + request.setNodeId(null); + request.setUser(null); + + LogAggregationMetaCollector collector = new LogAggregationMetaCollector( + request.build(), new YarnConfiguration()); + List res = collector.collect(fileController); + + List allFile = res.stream() + .flatMap(m -> m.getContainerLogMeta().stream()) + .collect(Collectors.toList()); + assertEquals(4, allFile.size()); + assertTrue(allFile.stream().allMatch( + f -> f.getFileName().contains(BIG_FILE_NAME))); + } + + @Test + public void testContainerIdExactMatch() throws IOException { + ExtendedLogMetaRequest.ExtendedLogMetaRequestBuilder request = + new ExtendedLogMetaRequest.ExtendedLogMetaRequestBuilder(); + request.setAppId(null); + request.setContainerId(attemptContainer.toString()); + request.setFileName(null); + request.setFileSize(null); + request.setModificationTime(null); + request.setNodeId(null); + request.setUser(null); + + LogAggregationMetaCollector collector = new LogAggregationMetaCollector( + request.build(), new YarnConfiguration()); + List res = collector.collect(fileController); + + List allFile = res.stream() + .flatMap(m -> m.getContainerLogMeta().stream()) + .collect(Collectors.toList()); + assertEquals(2, allFile.size()); + assertTrue(allFile.stream().allMatch( + f -> f.getFileName().contains(attemptContainer.toString()))); + } + + @Test + public void testMultipleFileBetweenSize() throws IOException { + ExtendedLogMetaRequest.ExtendedLogMetaRequestBuilder request = + new ExtendedLogMetaRequest.ExtendedLogMetaRequestBuilder(); + Set fileSizeExpressions = new HashSet<>(); + fileSizeExpressions.add(">50"); + fileSizeExpressions.add("<101"); + request.setAppId(null); + request.setContainerId(null); + request.setFileName(null); + request.setFileSize(fileSizeExpressions); + request.setModificationTime(null); + request.setNodeId(null); + request.setUser(null); + + LogAggregationMetaCollector collector = new LogAggregationMetaCollector( + request.build(), new YarnConfiguration()); + List res = collector.collect(fileController); + + List allFile = res.stream() + .flatMap(m -> m.getContainerLogMeta().stream()) + .collect(Collectors.toList()); + assertEquals(4, allFile.size()); + assertTrue(allFile.stream().allMatch( + f -> f.getFileSize().equals("100"))); + } + + @Test + public void testInvalidQueryStrings() throws IOException { + ExtendedLogMetaRequest.ExtendedLogMetaRequestBuilder request = + new ExtendedLogMetaRequest.ExtendedLogMetaRequestBuilder(); + Set fileSizeExpressions = new HashSet<>(); + fileSizeExpressions.add("50"); + fileSizeExpressions.add("101"); + try { + request.setFileName("*"); + fail("An error should be thrown due to an invalid regex"); + } catch (IllegalArgumentException ignored) { + } + + try { + request.setFileSize(fileSizeExpressions); + fail("An error should be thrown due to multiple exact match expression"); + } catch (IllegalArgumentException ignored) { + } + } + + private FakeNodeFileController createFileController() { + FileStatus appDir = new FileStatus(); + appDir.setPath(new Path(String.format("test/%s", app.toString()))); + FileStatus appDir2 = new FileStatus(); + appDir2.setPath(new Path(String.format("test/%s", app2.toString()))); + List appDirs = new ArrayList<>(); + appDirs.add(appDir); + appDirs.add(appDir2); + + FileStatus nodeFile = new FileStatus(); + nodeFile.setPath(new Path(String.format("test/%s", TEST_NODE))); + FileStatus nodeFile2 = new FileStatus(); + nodeFile2.setPath(new Path(String.format("test/%s", TEST_NODE_2))); + List nodeFiles = new ArrayList<>(); + nodeFiles.add(nodeFile); + nodeFiles.add(nodeFile2); + + Map, Map>> internal = new HashMap<>(); + internal.put(new ImmutablePair<>(app.toString(), TEST_NODE), + createLogFiles(TEST_NODE, attemptContainer)); + internal.put(new ImmutablePair<>(app.toString(), TEST_NODE_2), + createLogFiles(TEST_NODE_2, attemptContainer2)); + internal.put(new ImmutablePair<>(app2.toString(), TEST_NODE), + createLogFiles(TEST_NODE, attempt2Container)); + internal.put(new ImmutablePair<>(app2.toString(), TEST_NODE_2), + createLogFiles(TEST_NODE_2, attempt2Container2)); + return new FakeNodeFileController(internal, appDirs, nodeFiles); + } + + private Map> createLogFiles( + String nodeId, ContainerId... containerId) { + Map> logFiles = new HashMap<>(); + for (ContainerId c : containerId) { + + List files = new ArrayList<>(); + ContainerLogFileInfo bigFile = new ContainerLogFileInfo(); + bigFile.setFileName(generateFileName( + BIG_FILE_NAME, nodeId, c.toString())); + bigFile.setFileSize("100"); + bigFile.setLastModifiedTime("1000"); + ContainerLogFileInfo smallFile = new ContainerLogFileInfo(); + smallFile.setFileName(generateFileName( + SMALL_FILE_NAME, nodeId, c.toString())); + smallFile.setFileSize("50"); + smallFile.setLastModifiedTime("100"); + files.add(bigFile); + files.add(smallFile); + + logFiles.put(c.toString(), files); + } + return logFiles; + } + + private String generateFileName( + String name, String nodeId, String containerId) { + return String.format("%s_%s_%s", name, nodeId, containerId); + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/filecontroller/FakeLogAggregationFileController.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/filecontroller/FakeLogAggregationFileController.java new file mode 100644 index 0000000000..c667d3b4fe --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/filecontroller/FakeLogAggregationFileController.java @@ -0,0 +1,96 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.logaggregation.filecontroller; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.yarn.api.records.ApplicationAccessType; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat; +import org.apache.hadoop.yarn.logaggregation.ContainerLogMeta; +import org.apache.hadoop.yarn.logaggregation.ContainerLogsRequest; +import org.apache.hadoop.yarn.webapp.View; +import org.apache.hadoop.yarn.webapp.view.HtmlBlock; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.List; +import java.util.Map; + +public class FakeLogAggregationFileController + extends LogAggregationFileController { + + @Override + protected void initInternal(Configuration conf) { + + } + + @Override + public void initializeWriter(LogAggregationFileControllerContext context) + throws IOException { + + } + + @Override + public void closeWriter() throws LogAggregationDFSException { + + } + + @Override + public void write(AggregatedLogFormat.LogKey logKey, + AggregatedLogFormat.LogValue logValue) throws IOException { + + } + + @Override + public void postWrite(LogAggregationFileControllerContext record) + throws Exception { + + } + + @Override + public boolean readAggregatedLogs(ContainerLogsRequest logRequest, + OutputStream os) throws IOException { + return false; + } + + @Override + public List readAggregatedLogsMeta( + ContainerLogsRequest logRequest) throws IOException { + return null; + } + + @Override + public void renderAggregatedLogsBlock(HtmlBlock.Block html, + View.ViewContext context) { + + } + + @Override + public String getApplicationOwner(Path aggregatedLogPath, + ApplicationId appId) throws IOException { + return null; + } + + @Override + public Map getApplicationAcls( + Path aggregatedLogPath, ApplicationId appId) throws IOException { + return null; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/TestLogAggregationIndexedFileController.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/TestLogAggregationIndexedFileController.java index 73351813e7..2da413d798 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/TestLogAggregationIndexedFileController.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/TestLogAggregationIndexedFileController.java @@ -50,6 +50,7 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.logaggregation.ContainerLogMeta; import org.apache.hadoop.yarn.logaggregation.ContainerLogsRequest; +import org.apache.hadoop.yarn.logaggregation.ExtendedLogMetaRequest; import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils; import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey; import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue; @@ -497,4 +498,109 @@ public class TestLogAggregationIndexedFileController fileFormat.initialize(conf, fileControllerName); assertThat(fileFormat.getRollOverLogMaxSize(conf)).isZero(); } + + @Test + public void testGetLogMetaFilesOfNode() throws Exception { + if (fs.exists(rootLocalLogDirPath)) { + fs.delete(rootLocalLogDirPath, true); + } + assertTrue(fs.mkdirs(rootLocalLogDirPath)); + + Path appLogsDir = new Path(rootLocalLogDirPath, appId.toString()); + if (fs.exists(appLogsDir)) { + fs.delete(appLogsDir, true); + } + assertTrue(fs.mkdirs(appLogsDir)); + + List logTypes = new ArrayList(); + logTypes.add("syslog"); + logTypes.add("stdout"); + logTypes.add("stderr"); + + Set files = new HashSet<>(); + + LogKey key1 = new LogKey(containerId.toString()); + + for(String logType : logTypes) { + File file = createAndWriteLocalLogFile(containerId, appLogsDir, + logType); + files.add(file); + } + files.add(createZeroLocalLogFile(appLogsDir)); + + LogValue value = mock(LogValue.class); + when(value.getPendingLogFilesToUploadForThisContainer()).thenReturn(files); + + LogAggregationIndexedFileController fileFormat = + new LogAggregationIndexedFileController(); + + fileFormat.initialize(getConf(), "Indexed"); + + Map appAcls = new HashMap<>(); + Path appDir = fileFormat.getRemoteAppLogDir(appId, + USER_UGI.getShortUserName()); + if (fs.exists(appDir)) { + fs.delete(appDir, true); + } + assertTrue(fs.mkdirs(appDir)); + + Path logPath = fileFormat.getRemoteNodeLogFileForApp( + appId, USER_UGI.getShortUserName(), nodeId); + LogAggregationFileControllerContext context = + new LogAggregationFileControllerContext( + logPath, logPath, true, 1000, appId, appAcls, nodeId, USER_UGI); + // initialize the writer + fileFormat.initializeWriter(context); + + fileFormat.write(key1, value); + fileFormat.postWrite(context); + fileFormat.closeWriter(); + + ContainerLogsRequest logRequest = new ContainerLogsRequest(); + logRequest.setAppId(appId); + logRequest.setNodeId(nodeId.toString()); + logRequest.setAppOwner(USER_UGI.getShortUserName()); + logRequest.setContainerId(containerId.toString()); + logRequest.setBytes(Long.MAX_VALUE); + // create a checksum file + final ControlledClock clock = new ControlledClock(); + clock.setTime(System.currentTimeMillis()); + Path checksumFile = new Path(fileFormat.getRemoteAppLogDir( + appId, USER_UGI.getShortUserName()), + LogAggregationUtils.getNodeString(nodeId) + + LogAggregationIndexedFileController.CHECK_SUM_FILE_SUFFIX); + FSDataOutputStream fInput = null; + try { + String nodeName = logPath.getName() + "_" + clock.getTime(); + fInput = FileSystem.create(fs, checksumFile, LOG_FILE_UMASK); + fInput.writeInt(nodeName.length()); + fInput.write(nodeName.getBytes( + Charset.forName("UTF-8"))); + fInput.writeLong(0); + } finally { + IOUtils.closeQuietly(fInput); + } + + Path nodePath = LogAggregationUtils.getRemoteAppLogDir( + fileFormat.getRemoteRootLogDir(), appId, USER_UGI.getShortUserName(), + fileFormat.getRemoteRootLogDirSuffix()); + FileStatus[] nodes = fs.listStatus(nodePath); + ExtendedLogMetaRequest req = + new ExtendedLogMetaRequest.ExtendedLogMetaRequestBuilder().build(); + for (FileStatus node : nodes) { + Map> metas = + fileFormat.getLogMetaFilesOfNode(req, node, appId); + + if (node.getPath().getName().contains( + LogAggregationIndexedFileController.CHECK_SUM_FILE_SUFFIX)) { + assertTrue("Checksum node files should not contain any logs", + metas.isEmpty()); + } else { + assertFalse("Non-checksum node files should contain log files", + metas.isEmpty()); + assertEquals(4, metas.values().stream().findFirst().get().size()); + } + } + + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/LogServlet.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/LogServlet.java index fb8ad60266..c61391b2bc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/LogServlet.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/LogServlet.java @@ -31,8 +31,10 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl; import org.apache.hadoop.yarn.logaggregation.ContainerLogAggregationType; import org.apache.hadoop.yarn.logaggregation.ContainerLogMeta; -import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils; +import org.apache.hadoop.yarn.logaggregation.LogAggregationMetaCollector; +import org.apache.hadoop.yarn.logaggregation.ExtendedLogMetaRequest; import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileController; +import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils; import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerFactory; import org.apache.hadoop.yarn.server.webapp.dao.ContainerLogsInfo; import org.apache.hadoop.yarn.server.webapp.dao.RemoteLogPathEntry; @@ -264,6 +266,36 @@ public class LogServlet extends Configured { redirectedFromNode, null, manualRedirection); } + public Response getContainerLogsInfo( + HttpServletRequest req, + ExtendedLogMetaRequest.ExtendedLogMetaRequestBuilder logsRequest) + throws IOException { + List logs = new ArrayList<>(); + + if (!logsRequest.isUserSet()) { + logsRequest.setUser(UserGroupInformation.getCurrentUser().getUserName()); + } + LogAggregationMetaCollector collector = new LogAggregationMetaCollector( + logsRequest.build(), getConf()); + + for (LogAggregationFileController fc : getOrCreateFactory() + .getConfiguredLogAggregationFileControllerList()) { + logs.addAll(collector.collect(fc)); + } + + List containersLogsInfo = convertToContainerLogsInfo( + logs, false); + GenericEntity> meta = + new GenericEntity>(containersLogsInfo) { + }; + Response.ResponseBuilder response = Response.ok(meta); + // Sending the X-Content-Type-Options response header with the value + // nosniff will prevent Internet Explorer from MIME-sniffing a response + // away from the declared content-type. + response.header("X-Content-Type-Options", "nosniff"); + return response.build(); + } + /** * Returns information about the logs for a specific container. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/YarnWebServiceParams.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/YarnWebServiceParams.java index 3aade3faaf..84697a389b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/YarnWebServiceParams.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/YarnWebServiceParams.java @@ -40,4 +40,6 @@ public interface YarnWebServiceParams { String CLUSTER_ID = "clusterid"; String MANUAL_REDIRECTION = "manual_redirection"; String REMOTE_USER = "user"; + String FILESIZE = "file_size"; + String MODIFICATION_TIME = "modification_time"; }