MAPREDUCE-6415. Create a tool to combine aggregated logs into HAR files. (Robert Kanter via kasha)

This commit is contained in:
Karthik Kambatla 2015-09-09 17:41:27 -07:00
parent 4014ce5990
commit 119cc75e7e
12 changed files with 2527 additions and 0 deletions

1308
MAPREDUCE-6415.003.patch Normal file

File diff suppressed because it is too large Load Diff

View File

@ -51,6 +51,13 @@
<include>*-sources.jar</include>
</includes>
</fileSet>
<fileSet>
<directory>../hadoop-archive-logs/target</directory>
<outputDirectory>/share/hadoop/${hadoop.component}/sources</outputDirectory>
<includes>
<include>*-sources.jar</include>
</includes>
</fileSet>
<fileSet>
<directory>../hadoop-datajoin/target</directory>
<outputDirectory>/share/hadoop/${hadoop.component}/sources</outputDirectory>

View File

@ -290,6 +290,9 @@ Release 2.8.0 - UNRELEASED
MAPREDUCE-6304. Specifying node labels when submitting MR jobs.
(Naganarasimha G R via wangda)
MAPREDUCE-6415. Create a tool to combine aggregated logs into HAR files.
(Robert Kanter via kasha)
IMPROVEMENTS
MAPREDUCE-6291. Correct mapred queue usage command.

View File

@ -20,6 +20,7 @@ MYNAME="${BASH_SOURCE-$0}"
function hadoop_usage
{
hadoop_add_subcommand "archive" "create a hadoop archive"
hadoop_add_subcommand "archive-logs" "combine aggregated logs into hadoop archives"
hadoop_add_subcommand "classpath" "prints the class path needed for running mapreduce subcommands"
hadoop_add_subcommand "distcp" "copy file or directories recursively"
hadoop_add_subcommand "historyserver" "run job history servers as a standalone daemon"
@ -72,6 +73,13 @@ case ${COMMAND} in
hadoop_debug "Appending HADOOP_CLIENT_OPTS onto HADOOP_OPTS"
HADOOP_OPTS="${HADOOP_OPTS} ${HADOOP_CLIENT_OPTS}"
;;
archive-logs)
CLASS=org.apache.hadoop.tools.HadoopArchiveLogs
hadoop_debug "Injecting TOOL_PATH into CLASSPATH"
hadoop_add_classpath "${TOOL_PATH}"
hadoop_debug "Appending HADOOP_CLIENT_OPTS onto HADOOP_OPTS"
HADOOP_OPTS="${HADOOP_OPTS} ${HADOOP_CLIENT_OPTS}"
;;
classpath)
hadoop_do_classpath_subcommand CLASS "$@"
;;

View File

@ -322,6 +322,11 @@
<artifactId>hadoop-archives</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-archive-logs</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-distcp</artifactId>

View File

@ -0,0 +1,171 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. See accompanying LICENSE file.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-project</artifactId>
<version>3.0.0-SNAPSHOT</version>
<relativePath>../../hadoop-project</relativePath>
</parent>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-archive-logs</artifactId>
<version>3.0.0-SNAPSHOT</version>
<description>Apache Hadoop Archive Logs</description>
<name>Apache Hadoop Archive Logs</name>
<packaging>jar</packaging>
<properties>
<hadoop.log.dir>${project.build.directory}/log</hadoop.log.dir>
</properties>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-applications-distributedshell</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<scope>test</scope>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-server-tests</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-archives</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-common</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-api</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>commons-cli</groupId>
<artifactId>commons-cli</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-client</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-server-resourcemanager</artifactId>
<scope>provided</scope>
</dependency>
<!-- 'mvn dependency:analyze' fails to detect use of this dependency -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<scope>test</scope>
</dependency>
<!-- 'mvn dependency:analyze' fails to detect use of this dependency -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<scope>test</scope>
<type>test-jar</type>
</dependency>
<!-- 'mvn dependency:analyze' fails to detect use of this dependency -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-jobclient</artifactId>
<scope>test</scope>
<type>test-jar</type>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-antrun-plugin</artifactId>
<executions>
<execution>
<id>create-log-dir</id>
<phase>process-test-resources</phase>
<goals>
<goal>run</goal>
</goals>
<configuration>
<target>
<delete dir="${test.build.data}"/>
<mkdir dir="${test.build.data}"/>
<mkdir dir="${hadoop.log.dir}"/>
</target>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<configuration>
<archive>
<manifest>
<mainClass>org.apache.hadoop.tools.HadoopArchiveLogs</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
</plugins>
</build>
</project>

View File

@ -0,0 +1,403 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.tools;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.LogAggregationStatus;
import org.apache.hadoop.yarn.applications.distributedshell.ApplicationMaster;
import org.apache.hadoop.yarn.applications.distributedshell.Client;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
/**
* This tool moves Aggregated Log files into HAR archives using the
* {@link HadoopArchives} tool and the Distributed Shell via the
* {@link HadoopArchiveLogsRunner}.
*/
public class HadoopArchiveLogs implements Tool {
private static final Log LOG = LogFactory.getLog(HadoopArchiveLogs.class);
private static final String HELP_OPTION = "help";
private static final String MAX_ELIGIBLE_APPS_OPTION = "maxEligibleApps";
private static final String MIN_NUM_LOG_FILES_OPTION = "minNumberLogFiles";
private static final String MAX_TOTAL_LOGS_SIZE_OPTION = "maxTotalLogsSize";
private static final String MEMORY_OPTION = "memory";
private static final int DEFAULT_MAX_ELIGIBLE = -1;
private static final int DEFAULT_MIN_NUM_LOG_FILES = 20;
private static final long DEFAULT_MAX_TOTAL_LOGS_SIZE = 1024L;
private static final long DEFAULT_MEMORY = 1024L;
@VisibleForTesting
int maxEligible = DEFAULT_MAX_ELIGIBLE;
@VisibleForTesting
int minNumLogFiles = DEFAULT_MIN_NUM_LOG_FILES;
@VisibleForTesting
long maxTotalLogsSize = DEFAULT_MAX_TOTAL_LOGS_SIZE * 1024L * 1024L;
@VisibleForTesting
long memory = DEFAULT_MEMORY;
@VisibleForTesting
Set<ApplicationReport> eligibleApplications;
private JobConf conf;
public HadoopArchiveLogs(Configuration conf) {
setConf(conf);
eligibleApplications = new HashSet<>();
}
public static void main(String[] args) {
JobConf job = new JobConf(HadoopArchiveLogs.class);
HadoopArchiveLogs hal = new HadoopArchiveLogs(job);
int ret = 0;
try{
ret = ToolRunner.run(hal, args);
} catch(Exception e) {
LOG.debug("Exception", e);
System.err.println(e.getClass().getSimpleName());
final String s = e.getLocalizedMessage();
if (s != null) {
System.err.println(s);
} else {
e.printStackTrace(System.err);
}
System.exit(1);
}
System.exit(ret);
}
@Override
public int run(String[] args) throws Exception {
handleOpts(args);
findAggregatedApps();
FileSystem fs = null;
Path remoteRootLogDir = new Path(conf.get(
YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR));
String suffix = LogAggregationUtils.getRemoteNodeLogDirSuffix(conf);
Path workingDir = new Path(remoteRootLogDir, "archive-logs-work");
try {
fs = FileSystem.get(conf);
checkFiles(fs, remoteRootLogDir, suffix);
// Prepare working directory
if (fs.exists(workingDir)) {
fs.delete(workingDir, true);
}
fs.mkdirs(workingDir);
fs.setPermission(workingDir,
new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE));
} finally {
if (fs != null) {
fs.close();
}
}
checkMaxEligible();
if (eligibleApplications.isEmpty()) {
LOG.info("No eligible applications to process");
System.exit(0);
}
StringBuilder sb =
new StringBuilder("Will process the following applications:");
for (ApplicationReport report : eligibleApplications) {
sb.append("\n\t").append(report.getApplicationId());
}
LOG.info(sb.toString());
File localScript = File.createTempFile("hadoop-archive-logs-", ".sh");
generateScript(localScript, workingDir, remoteRootLogDir, suffix);
if (runDistributedShell(localScript)) {
return 0;
}
return -1;
}
private void handleOpts(String[] args) throws ParseException {
Options opts = new Options();
Option helpOpt = new Option(HELP_OPTION, false, "Prints this message");
Option maxEligibleOpt = new Option(MAX_ELIGIBLE_APPS_OPTION, true,
"The maximum number of eligible apps to process (default: "
+ DEFAULT_MAX_ELIGIBLE + " (all))");
maxEligibleOpt.setArgName("n");
Option minNumLogFilesOpt = new Option(MIN_NUM_LOG_FILES_OPTION, true,
"The minimum number of log files required to be eligible (default: "
+ DEFAULT_MIN_NUM_LOG_FILES + ")");
minNumLogFilesOpt.setArgName("n");
Option maxTotalLogsSizeOpt = new Option(MAX_TOTAL_LOGS_SIZE_OPTION, true,
"The maximum total logs size (in megabytes) required to be eligible" +
" (default: " + DEFAULT_MAX_TOTAL_LOGS_SIZE + ")");
maxTotalLogsSizeOpt.setArgName("megabytes");
Option memoryOpt = new Option(MEMORY_OPTION, true,
"The amount of memory (in megabytes) for each container (default: "
+ DEFAULT_MEMORY + ")");
memoryOpt.setArgName("megabytes");
opts.addOption(helpOpt);
opts.addOption(maxEligibleOpt);
opts.addOption(minNumLogFilesOpt);
opts.addOption(maxTotalLogsSizeOpt);
opts.addOption(memoryOpt);
try {
CommandLineParser parser = new GnuParser();
CommandLine commandLine = parser.parse(opts, args);
if (commandLine.hasOption(HELP_OPTION)) {
HelpFormatter formatter = new HelpFormatter();
formatter.printHelp("yarn archive-logs", opts);
System.exit(0);
}
if (commandLine.hasOption(MAX_ELIGIBLE_APPS_OPTION)) {
maxEligible = Integer.parseInt(
commandLine.getOptionValue(MAX_ELIGIBLE_APPS_OPTION));
if (maxEligible == 0) {
LOG.info("Setting " + MAX_ELIGIBLE_APPS_OPTION + " to 0 accomplishes "
+ "nothing. Please either set it to a negative value "
+ "(default, all) or a more reasonable value.");
System.exit(0);
}
}
if (commandLine.hasOption(MIN_NUM_LOG_FILES_OPTION)) {
minNumLogFiles = Integer.parseInt(
commandLine.getOptionValue(MIN_NUM_LOG_FILES_OPTION));
}
if (commandLine.hasOption(MAX_TOTAL_LOGS_SIZE_OPTION)) {
maxTotalLogsSize = Long.parseLong(
commandLine.getOptionValue(MAX_TOTAL_LOGS_SIZE_OPTION));
maxTotalLogsSize *= 1024L * 1024L;
}
if (commandLine.hasOption(MEMORY_OPTION)) {
memory = Long.parseLong(commandLine.getOptionValue(MEMORY_OPTION));
}
} catch (ParseException pe) {
HelpFormatter formatter = new HelpFormatter();
formatter.printHelp("yarn archive-logs", opts);
throw pe;
}
}
@VisibleForTesting
void findAggregatedApps() throws IOException, YarnException {
YarnClient client = YarnClient.createYarnClient();
try {
client.init(getConf());
client.start();
List<ApplicationReport> reports = client.getApplications();
for (ApplicationReport report : reports) {
LogAggregationStatus aggStatus = report.getLogAggregationStatus();
if (aggStatus.equals(LogAggregationStatus.SUCCEEDED) ||
aggStatus.equals(LogAggregationStatus.FAILED)) {
eligibleApplications.add(report);
}
}
} finally {
if (client != null) {
client.stop();
}
}
}
@VisibleForTesting
void checkFiles(FileSystem fs, Path remoteRootLogDir, String suffix) {
for (Iterator<ApplicationReport> reportIt = eligibleApplications.iterator();
reportIt.hasNext(); ) {
ApplicationReport report = reportIt.next();
long totalFileSize = 0L;
try {
FileStatus[] files = fs.listStatus(
LogAggregationUtils.getRemoteAppLogDir(remoteRootLogDir,
report.getApplicationId(), report.getUser(), suffix));
if (files.length < minNumLogFiles) {
reportIt.remove();
} else {
for (FileStatus file : files) {
if (file.getPath().getName().equals(report.getApplicationId()
+ ".har")) {
reportIt.remove();
break;
}
totalFileSize += file.getLen();
}
if (totalFileSize > maxTotalLogsSize) {
reportIt.remove();
}
}
} catch (IOException ioe) {
// If the user doesn't have permission or it doesn't exist, then skip it
reportIt.remove();
}
}
}
@VisibleForTesting
void checkMaxEligible() {
// If we have too many eligible apps, remove the newest ones first
if (maxEligible > 0 && eligibleApplications.size() > maxEligible) {
List<ApplicationReport> sortedApplications =
new ArrayList<ApplicationReport>(eligibleApplications);
Collections.sort(sortedApplications, new Comparator<ApplicationReport>() {
@Override
public int compare(ApplicationReport o1, ApplicationReport o2) {
return Long.compare(o1.getFinishTime(), o2.getFinishTime());
}
});
for (int i = maxEligible; i < sortedApplications.size(); i++) {
eligibleApplications.remove(sortedApplications.get(i));
}
}
}
/*
The generated script looks like this:
#!/bin/bash
set -e
set -x
if [ "$YARN_SHELL_ID" == "1" ]; then
appId="application_1440448768987_0001"
user="rkanter"
elif [ "$YARN_SHELL_ID" == "2" ]; then
appId="application_1440448768987_0002"
user="rkanter"
else
echo "Unknown Mapping!"
exit 1
fi
export HADOOP_CLIENT_OPTS="-Xmx1024m"
export HADOOP_CLASSPATH=/dist/share/hadoop/tools/lib/hadoop-archive-logs-2.8.0-SNAPSHOT.jar:/dist/share/hadoop/tools/lib/hadoop-archives-2.8.0-SNAPSHOT.jar
"$HADOOP_HOME"/bin/hadoop org.apache.hadoop.tools.HadoopArchiveLogsRunner -appId "$appId" -user "$user" -workingDir /tmp/logs/archive-logs-work -remoteRootLogDir /tmp/logs -suffix logs
*/
@VisibleForTesting
void generateScript(File localScript, Path workingDir,
Path remoteRootLogDir, String suffix) throws IOException {
LOG.info("Generating script at: " + localScript.getAbsolutePath());
String halrJarPath = HadoopArchiveLogsRunner.class.getProtectionDomain()
.getCodeSource().getLocation().getPath();
String harJarPath = HadoopArchives.class.getProtectionDomain()
.getCodeSource().getLocation().getPath();
String classpath = halrJarPath + File.pathSeparator + harJarPath;
FileWriter fw = null;
try {
fw = new FileWriter(localScript);
fw.write("#!/bin/bash\nset -e\nset -x\n");
int containerCount = 1;
for (ApplicationReport report : eligibleApplications) {
fw.write("if [ \"$YARN_SHELL_ID\" == \"");
fw.write(Integer.toString(containerCount));
fw.write("\" ]; then\n\tappId=\"");
fw.write(report.getApplicationId().toString());
fw.write("\"\n\tuser=\"");
fw.write(report.getUser());
fw.write("\"\nel");
containerCount++;
}
fw.write("se\n\techo \"Unknown Mapping!\"\n\texit 1\nfi\n");
fw.write("export HADOOP_CLIENT_OPTS=\"-Xmx");
fw.write(Long.toString(memory));
fw.write("m\"\n");
fw.write("export HADOOP_CLASSPATH=");
fw.write(classpath);
fw.write("\n\"$HADOOP_HOME\"/bin/hadoop ");
fw.write(HadoopArchiveLogsRunner.class.getName());
fw.write(" -appId \"$appId\" -user \"$user\" -workingDir ");
fw.write(workingDir.toString());
fw.write(" -remoteRootLogDir ");
fw.write(remoteRootLogDir.toString());
fw.write(" -suffix ");
fw.write(suffix);
fw.write("\n");
} finally {
if (fw != null) {
fw.close();
}
}
}
private boolean runDistributedShell(File localScript) throws Exception {
String[] dsArgs = {
"--appname",
"ArchiveLogs",
"--jar",
ApplicationMaster.class.getProtectionDomain().getCodeSource()
.getLocation().getPath(),
"--num_containers",
Integer.toString(eligibleApplications.size()),
"--container_memory",
Long.toString(memory),
"--shell_script",
localScript.getAbsolutePath()
};
final Client dsClient = new Client(new Configuration(conf));
dsClient.init(dsArgs);
return dsClient.run();
}
@Override
public void setConf(Configuration conf) {
if (conf instanceof JobConf) {
this.conf = (JobConf) conf;
} else {
this.conf = new JobConf(conf, HadoopArchiveLogs.class);
}
}
@Override
public Configuration getConf() {
return this.conf;
}
}

View File

@ -0,0 +1,180 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.tools;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.io.File;
/**
* This is a child program designed to be used by the {@link HadoopArchiveLogs}
* tool via the Distributed Shell. It's not meant to be run directly.
*/
public class HadoopArchiveLogsRunner implements Tool {
private static final Log LOG = LogFactory.getLog(HadoopArchiveLogsRunner.class);
private static final String APP_ID_OPTION = "appId";
private static final String USER_OPTION = "user";
private static final String WORKING_DIR_OPTION = "workingDir";
private static final String REMOTE_ROOT_LOG_DIR = "remoteRootLogDir";
private static final String SUFFIX_OPTION = "suffix";
private String appId;
private String user;
private String workingDir;
private String remoteLogDir;
private String suffix;
private JobConf conf;
public HadoopArchiveLogsRunner(Configuration conf) {
setConf(conf);
}
public static void main(String[] args) {
JobConf job = new JobConf(HadoopArchiveLogsRunner.class);
HadoopArchiveLogsRunner halr = new HadoopArchiveLogsRunner(job);
int ret = 0;
try{
ret = ToolRunner.run(halr, args);
} catch(Exception e) {
LOG.debug("Exception", e);
System.err.println(e.getClass().getSimpleName());
final String s = e.getLocalizedMessage();
if (s != null) {
System.err.println(s);
} else {
e.printStackTrace(System.err);
}
System.exit(1);
}
System.exit(ret);
}
@Override
public int run(String[] args) throws Exception {
handleOpts(args);
String remoteAppLogDir = remoteLogDir + File.separator + user
+ File.separator + suffix + File.separator + appId;
// Run 'hadoop archives' command in local mode
Configuration haConf = new Configuration(getConf());
haConf.set("mapreduce.framework.name", "local");
HadoopArchives ha = new HadoopArchives(haConf);
String[] haArgs = {
"-archiveName",
appId + ".har",
"-p",
remoteAppLogDir,
"*",
workingDir
};
StringBuilder sb = new StringBuilder("Executing 'hadoop archives'");
for (String haArg : haArgs) {
sb.append("\n\t").append(haArg);
}
LOG.info(sb.toString());
ha.run(haArgs);
FileSystem fs = null;
// Move har file to correct location and delete original logs
try {
fs = FileSystem.get(conf);
LOG.info("Moving har to original location");
fs.rename(new Path(workingDir, appId + ".har"),
new Path(remoteAppLogDir, appId + ".har"));
LOG.info("Deleting original logs");
for (FileStatus original : fs.listStatus(new Path(remoteAppLogDir),
new PathFilter() {
@Override
public boolean accept(Path path) {
return !path.getName().endsWith(".har");
}
})) {
fs.delete(original.getPath(), false);
}
} finally {
if (fs != null) {
fs.close();
}
}
return 0;
}
private void handleOpts(String[] args) throws ParseException {
Options opts = new Options();
Option appIdOpt = new Option(APP_ID_OPTION, true, "Application ID");
appIdOpt.setRequired(true);
Option userOpt = new Option(USER_OPTION, true, "User");
userOpt.setRequired(true);
Option workingDirOpt = new Option(WORKING_DIR_OPTION, true,
"Working Directory");
workingDirOpt.setRequired(true);
Option remoteLogDirOpt = new Option(REMOTE_ROOT_LOG_DIR, true,
"Remote Root Log Directory");
remoteLogDirOpt.setRequired(true);
Option suffixOpt = new Option(SUFFIX_OPTION, true, "Suffix");
suffixOpt.setRequired(true);
opts.addOption(appIdOpt);
opts.addOption(userOpt);
opts.addOption(workingDirOpt);
opts.addOption(remoteLogDirOpt);
opts.addOption(suffixOpt);
CommandLineParser parser = new GnuParser();
CommandLine commandLine = parser.parse(opts, args);
appId = commandLine.getOptionValue(APP_ID_OPTION);
user = commandLine.getOptionValue(USER_OPTION);
workingDir = commandLine.getOptionValue(WORKING_DIR_OPTION);
remoteLogDir = commandLine.getOptionValue(REMOTE_ROOT_LOG_DIR);
suffix = commandLine.getOptionValue(SUFFIX_OPTION);
}
@Override
public void setConf(Configuration conf) {
if (conf instanceof JobConf) {
this.conf = (JobConf) conf;
} else {
this.conf = new JobConf(conf, HadoopArchiveLogsRunner.class);
}
}
@Override
public Configuration getConf() {
return this.conf;
}
}

View File

@ -0,0 +1,293 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.tools;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.LogAggregationStatus;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.MiniYARNCluster;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
import org.junit.Assert;
import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.util.Random;
public class TestHadoopArchiveLogs {
private static final long CLUSTER_TIMESTAMP = System.currentTimeMillis();
private static final int FILE_SIZE_INCREMENT = 4096;
private static final byte[] DUMMY_DATA = new byte[FILE_SIZE_INCREMENT];
static {
new Random().nextBytes(DUMMY_DATA);
}
@Test(timeout = 10000)
public void testCheckFiles() throws Exception {
Configuration conf = new Configuration();
HadoopArchiveLogs hal = new HadoopArchiveLogs(conf);
FileSystem fs = FileSystem.getLocal(conf);
Path rootLogDir = new Path("target", "logs");
String suffix = "logs";
Path logDir = new Path(rootLogDir,
new Path(System.getProperty("user.name"), suffix));
fs.mkdirs(logDir);
Assert.assertEquals(0, hal.eligibleApplications.size());
ApplicationReport app1 = createAppReport(1); // no files found
ApplicationReport app2 = createAppReport(2); // too few files
Path app2Path = new Path(logDir, app2.getApplicationId().toString());
fs.mkdirs(app2Path);
createFile(fs, new Path(app2Path, "file1"), 1);
hal.minNumLogFiles = 2;
ApplicationReport app3 = createAppReport(3); // too large
Path app3Path = new Path(logDir, app3.getApplicationId().toString());
fs.mkdirs(app3Path);
createFile(fs, new Path(app3Path, "file1"), 2);
createFile(fs, new Path(app3Path, "file2"), 5);
hal.maxTotalLogsSize = FILE_SIZE_INCREMENT * 6;
ApplicationReport app4 = createAppReport(4); // has har already
Path app4Path = new Path(logDir, app4.getApplicationId().toString());
fs.mkdirs(app4Path);
createFile(fs, new Path(app4Path, app4.getApplicationId() + ".har"), 1);
ApplicationReport app5 = createAppReport(5); // just right
Path app5Path = new Path(logDir, app5.getApplicationId().toString());
fs.mkdirs(app5Path);
createFile(fs, new Path(app5Path, "file1"), 2);
createFile(fs, new Path(app5Path, "file2"), 3);
hal.eligibleApplications.add(app1);
hal.eligibleApplications.add(app2);
hal.eligibleApplications.add(app3);
hal.eligibleApplications.add(app4);
hal.eligibleApplications.add(app5);
hal.checkFiles(fs, rootLogDir, suffix);
Assert.assertEquals(1, hal.eligibleApplications.size());
Assert.assertEquals(app5, hal.eligibleApplications.iterator().next());
}
@Test(timeout = 10000)
public void testCheckMaxEligible() throws Exception {
Configuration conf = new Configuration();
HadoopArchiveLogs hal = new HadoopArchiveLogs(conf);
ApplicationReport app1 = createAppReport(1);
app1.setFinishTime(CLUSTER_TIMESTAMP - 5);
ApplicationReport app2 = createAppReport(2);
app2.setFinishTime(CLUSTER_TIMESTAMP - 10);
ApplicationReport app3 = createAppReport(3);
app3.setFinishTime(CLUSTER_TIMESTAMP + 5);
ApplicationReport app4 = createAppReport(4);
app4.setFinishTime(CLUSTER_TIMESTAMP + 10);
ApplicationReport app5 = createAppReport(5);
app5.setFinishTime(CLUSTER_TIMESTAMP);
Assert.assertEquals(0, hal.eligibleApplications.size());
hal.eligibleApplications.add(app1);
hal.eligibleApplications.add(app2);
hal.eligibleApplications.add(app3);
hal.eligibleApplications.add(app4);
hal.eligibleApplications.add(app5);
hal.maxEligible = -1;
hal.checkMaxEligible();
Assert.assertEquals(5, hal.eligibleApplications.size());
hal.maxEligible = 4;
hal.checkMaxEligible();
Assert.assertEquals(4, hal.eligibleApplications.size());
Assert.assertFalse(hal.eligibleApplications.contains(app4));
hal.maxEligible = 3;
hal.checkMaxEligible();
Assert.assertEquals(3, hal.eligibleApplications.size());
Assert.assertFalse(hal.eligibleApplications.contains(app3));
hal.maxEligible = 2;
hal.checkMaxEligible();
Assert.assertEquals(2, hal.eligibleApplications.size());
Assert.assertFalse(hal.eligibleApplications.contains(app5));
hal.maxEligible = 1;
hal.checkMaxEligible();
Assert.assertEquals(1, hal.eligibleApplications.size());
Assert.assertFalse(hal.eligibleApplications.contains(app1));
}
@Test(timeout = 10000)
public void testFindAggregatedApps() throws Exception {
MiniYARNCluster yarnCluster = null;
try {
Configuration conf = new Configuration();
conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true);
yarnCluster =
new MiniYARNCluster(TestHadoopArchiveLogs.class.getSimpleName(), 1,
1, 1, 1);
yarnCluster.init(conf);
yarnCluster.start();
conf = yarnCluster.getConfig();
RMContext rmContext = yarnCluster.getResourceManager().getRMContext();
RMAppImpl app1 = (RMAppImpl)createRMApp(1, conf, rmContext,
LogAggregationStatus.DISABLED);
RMAppImpl app2 = (RMAppImpl)createRMApp(2, conf, rmContext,
LogAggregationStatus.FAILED);
RMAppImpl app3 = (RMAppImpl)createRMApp(3, conf, rmContext,
LogAggregationStatus.NOT_START);
RMAppImpl app4 = (RMAppImpl)createRMApp(4, conf, rmContext,
LogAggregationStatus.SUCCEEDED);
RMAppImpl app5 = (RMAppImpl)createRMApp(5, conf, rmContext,
LogAggregationStatus.RUNNING);
RMAppImpl app6 = (RMAppImpl)createRMApp(6, conf, rmContext,
LogAggregationStatus.RUNNING_WITH_FAILURE);
RMAppImpl app7 = (RMAppImpl)createRMApp(7, conf, rmContext,
LogAggregationStatus.TIME_OUT);
rmContext.getRMApps().put(app1.getApplicationId(), app1);
rmContext.getRMApps().put(app2.getApplicationId(), app2);
rmContext.getRMApps().put(app3.getApplicationId(), app3);
rmContext.getRMApps().put(app4.getApplicationId(), app4);
rmContext.getRMApps().put(app5.getApplicationId(), app5);
rmContext.getRMApps().put(app6.getApplicationId(), app6);
rmContext.getRMApps().put(app7.getApplicationId(), app7);
HadoopArchiveLogs hal = new HadoopArchiveLogs(conf);
Assert.assertEquals(0, hal.eligibleApplications.size());
hal.findAggregatedApps();
Assert.assertEquals(2, hal.eligibleApplications.size());
} finally {
if (yarnCluster != null) {
yarnCluster.stop();
}
}
}
@Test(timeout = 10000)
public void testGenerateScript() throws Exception {
Configuration conf = new Configuration();
HadoopArchiveLogs hal = new HadoopArchiveLogs(conf);
ApplicationReport app1 = createAppReport(1);
ApplicationReport app2 = createAppReport(2);
hal.eligibleApplications.add(app1);
hal.eligibleApplications.add(app2);
File localScript = new File("target", "script.sh");
Path workingDir = new Path("/tmp", "working");
Path remoteRootLogDir = new Path("/tmp", "logs");
String suffix = "logs";
localScript.delete();
Assert.assertFalse(localScript.exists());
hal.generateScript(localScript, workingDir, remoteRootLogDir, suffix);
Assert.assertTrue(localScript.exists());
String script = IOUtils.toString(localScript.toURI());
String[] lines = script.split(System.lineSeparator());
Assert.assertEquals(16, lines.length);
Assert.assertEquals("#!/bin/bash", lines[0]);
Assert.assertEquals("set -e", lines[1]);
Assert.assertEquals("set -x", lines[2]);
Assert.assertEquals("if [ \"$YARN_SHELL_ID\" == \"1\" ]; then", lines[3]);
if (lines[4].contains(app1.getApplicationId().toString())) {
Assert.assertEquals("\tappId=\"" + app1.getApplicationId().toString()
+ "\"", lines[4]);
Assert.assertEquals("\tappId=\"" + app2.getApplicationId().toString()
+ "\"", lines[7]);
} else {
Assert.assertEquals("\tappId=\"" + app2.getApplicationId().toString()
+ "\"", lines[4]);
Assert.assertEquals("\tappId=\"" + app1.getApplicationId().toString()
+ "\"", lines[7]);
}
Assert.assertEquals("\tuser=\"" + System.getProperty("user.name") + "\"",
lines[5]);
Assert.assertEquals("elif [ \"$YARN_SHELL_ID\" == \"2\" ]; then", lines[6]);
Assert.assertEquals("\tuser=\"" + System.getProperty("user.name") + "\"",
lines[8]);
Assert.assertEquals("else", lines[9]);
Assert.assertEquals("\techo \"Unknown Mapping!\"", lines[10]);
Assert.assertEquals("\texit 1", lines[11]);
Assert.assertEquals("fi", lines[12]);
Assert.assertEquals("export HADOOP_CLIENT_OPTS=\"-Xmx1024m\"", lines[13]);
Assert.assertTrue(lines[14].startsWith("export HADOOP_CLASSPATH="));
Assert.assertEquals("\"$HADOOP_HOME\"/bin/hadoop org.apache.hadoop.tools." +
"HadoopArchiveLogsRunner -appId \"$appId\" -user \"$user\" -workingDir "
+ workingDir.toString() + " -remoteRootLogDir " +
remoteRootLogDir.toString() + " -suffix " + suffix, lines[15]);
}
private static ApplicationReport createAppReport(int id) {
ApplicationId appId = ApplicationId.newInstance(CLUSTER_TIMESTAMP, id);
return ApplicationReport.newInstance(
appId,
ApplicationAttemptId.newInstance(appId, 1),
System.getProperty("user.name"),
null, null, null, 0, null, YarnApplicationState.FINISHED, null,
null, 0L, 0L, FinalApplicationStatus.SUCCEEDED, null, null, 100f,
null, null);
}
private static void createFile(FileSystem fs, Path p, long sizeMultiple)
throws IOException {
FSDataOutputStream out = null;
try {
out = fs.create(p);
for (int i = 0 ; i < sizeMultiple; i++) {
out.write(DUMMY_DATA);
}
} finally {
if (out != null) {
out.close();
}
}
}
private static RMApp createRMApp(int id, Configuration conf, RMContext rmContext,
final LogAggregationStatus aggStatus) {
ApplicationId appId = ApplicationId.newInstance(CLUSTER_TIMESTAMP, id);
ApplicationSubmissionContext submissionContext =
ApplicationSubmissionContext.newInstance(appId, "test", "default",
Priority.newInstance(0), null, false, true,
2, Resource.newInstance(10, 2), "test");
return new RMAppImpl(appId, rmContext, conf, "test",
System.getProperty("user.name"), "default", submissionContext,
rmContext.getScheduler(),
rmContext.getApplicationMasterService(),
System.currentTimeMillis(), "test",
null, null) {
@Override
public ApplicationReport createAndGetApplicationReport(
String clientUserName, boolean allowAccess) {
ApplicationReport report =
super.createAndGetApplicationReport(clientUserName, allowAccess);
report.setLogAggregationStatus(aggStatus);
return report;
}
};
}
}

View File

@ -0,0 +1,143 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.tools;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.HarFs;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.MiniYARNCluster;
import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
import java.util.Arrays;
import java.util.Comparator;
import java.util.Random;
import static org.junit.Assert.assertEquals;
public class TestHadoopArchiveLogsRunner {
private static final int FILE_SIZE_INCREMENT = 4096;
private static final byte[] DUMMY_DATA = new byte[FILE_SIZE_INCREMENT];
static {
new Random().nextBytes(DUMMY_DATA);
}
@Test(timeout = 30000)
public void testHadoopArchiveLogs() throws Exception {
MiniYARNCluster yarnCluster = null;
MiniDFSCluster dfsCluster = null;
FileSystem fs = null;
try {
Configuration conf = new YarnConfiguration();
conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true);
conf.setBoolean(YarnConfiguration.YARN_MINICLUSTER_FIXED_PORTS, true);
yarnCluster =
new MiniYARNCluster(TestHadoopArchiveLogsRunner.class.getSimpleName(),
1, 2, 1, 1);
yarnCluster.init(conf);
yarnCluster.start();
conf = yarnCluster.getConfig();
dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
ApplicationId app1 =
ApplicationId.newInstance(System.currentTimeMillis(), 1);
fs = FileSystem.get(conf);
Path remoteRootLogDir = new Path(conf.get(
YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR));
Path workingDir = new Path(remoteRootLogDir, "archive-logs-work");
String suffix = "logs";
Path logDir = new Path(remoteRootLogDir,
new Path(System.getProperty("user.name"), suffix));
fs.mkdirs(logDir);
Path app1Path = new Path(logDir, app1.toString());
fs.mkdirs(app1Path);
createFile(fs, new Path(app1Path, "log1"), 3);
createFile(fs, new Path(app1Path, "log2"), 4);
createFile(fs, new Path(app1Path, "log3"), 2);
FileStatus[] app1Files = fs.listStatus(app1Path);
Assert.assertEquals(3, app1Files.length);
String[] args = new String[]{
"-appId", app1.toString(),
"-user", System.getProperty("user.name"),
"-workingDir", workingDir.toString(),
"-remoteRootLogDir", remoteRootLogDir.toString(),
"-suffix", suffix};
final HadoopArchiveLogsRunner halr = new HadoopArchiveLogsRunner(conf);
assertEquals(0, ToolRunner.run(halr, args));
fs = FileSystem.get(conf);
app1Files = fs.listStatus(app1Path);
Assert.assertEquals(1, app1Files.length);
FileStatus harFile = app1Files[0];
Assert.assertEquals(app1.toString() + ".har", harFile.getPath().getName());
Path harPath = new Path("har:///" + harFile.getPath().toUri().getRawPath());
FileStatus[] harLogs = HarFs.get(harPath.toUri(), conf).listStatus(harPath);
Assert.assertEquals(3, harLogs.length);
Arrays.sort(harLogs, new Comparator<FileStatus>() {
@Override
public int compare(FileStatus o1, FileStatus o2) {
return o1.getPath().getName().compareTo(o2.getPath().getName());
}
});
Assert.assertEquals("log1", harLogs[0].getPath().getName());
Assert.assertEquals(3 * FILE_SIZE_INCREMENT, harLogs[0].getLen());
Assert.assertEquals("log2", harLogs[1].getPath().getName());
Assert.assertEquals(4 * FILE_SIZE_INCREMENT, harLogs[1].getLen());
Assert.assertEquals("log3", harLogs[2].getPath().getName());
Assert.assertEquals(2 * FILE_SIZE_INCREMENT, harLogs[2].getLen());
Assert.assertEquals(0, fs.listStatus(workingDir).length);
} finally {
if (yarnCluster != null) {
yarnCluster.stop();
}
if (fs != null) {
fs.close();
}
if (dfsCluster != null) {
dfsCluster.shutdown();
}
}
}
private static void createFile(FileSystem fs, Path p, long sizeMultiple)
throws IOException {
FSDataOutputStream out = null;
try {
out = fs.create(p);
for (int i = 0 ; i < sizeMultiple; i++) {
out.write(DUMMY_DATA);
}
} finally {
if (out != null) {
out.close();
}
}
}
}

View File

@ -50,6 +50,11 @@
<artifactId>hadoop-archives</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-archive-logs</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-rumen</artifactId>

View File

@ -34,6 +34,7 @@
<module>hadoop-streaming</module>
<module>hadoop-distcp</module>
<module>hadoop-archives</module>
<module>hadoop-archive-logs</module>
<module>hadoop-rumen</module>
<module>hadoop-gridmix</module>
<module>hadoop-datajoin</module>