diff --git a/MAPREDUCE-6415.003.patch b/MAPREDUCE-6415.003.patch new file mode 100644 index 0000000000..7c14341500 --- /dev/null +++ b/MAPREDUCE-6415.003.patch @@ -0,0 +1,1308 @@ +diff --git hadoop-assemblies/src/main/resources/assemblies/hadoop-tools.xml hadoop-assemblies/src/main/resources/assemblies/hadoop-tools.xml +index fa55703..3f646e6 100644 +--- hadoop-assemblies/src/main/resources/assemblies/hadoop-tools.xml ++++ hadoop-assemblies/src/main/resources/assemblies/hadoop-tools.xml +@@ -52,6 +52,13 @@ + + + ++ ../hadoop-archive-logs/target ++ /share/hadoop/${hadoop.component}/sources ++ ++ *-sources.jar ++ ++ ++ + ../hadoop-datajoin/target + /share/hadoop/${hadoop.component}/sources + +diff --git hadoop-mapreduce-project/bin/mapred hadoop-mapreduce-project/bin/mapred +index 426af80..2d56a8d 100755 +--- hadoop-mapreduce-project/bin/mapred ++++ hadoop-mapreduce-project/bin/mapred +@@ -20,6 +20,7 @@ MYNAME="${BASH_SOURCE-$0}" + function hadoop_usage + { + hadoop_add_subcommand "archive" "create a hadoop archive" ++ hadoop_add_subcommand "archive-logs" "combine aggregated logs into hadoop archives" + hadoop_add_subcommand "classpath" "prints the class path needed for running mapreduce subcommands" + hadoop_add_subcommand "distcp" "copy file or directories recursively" + hadoop_add_subcommand "historyserver" "run job history servers as a standalone daemon" +@@ -72,6 +73,13 @@ case ${COMMAND} in + hadoop_debug "Appending HADOOP_CLIENT_OPTS onto HADOOP_OPTS" + HADOOP_OPTS="${HADOOP_OPTS} ${HADOOP_CLIENT_OPTS}" + ;; ++ archive-logs) ++ CLASS=org.apache.hadoop.tools.HadoopArchiveLogs ++ hadoop_debug "Injecting TOOL_PATH into CLASSPATH" ++ hadoop_add_classpath "${TOOL_PATH}" ++ hadoop_debug "Appending HADOOP_CLIENT_OPTS onto HADOOP_OPTS" ++ HADOOP_OPTS="${HADOOP_OPTS} ${HADOOP_CLIENT_OPTS}" ++ ;; + classpath) + hadoop_do_classpath_subcommand CLASS "$@" + ;; +diff --git hadoop-project/pom.xml hadoop-project/pom.xml +index 9863475..636e063 100644 +--- hadoop-project/pom.xml ++++ hadoop-project/pom.xml +@@ -324,6 +324,11 @@ + + + org.apache.hadoop ++ hadoop-archive-logs ++ ${project.version} ++ ++ ++ org.apache.hadoop + hadoop-distcp + ${project.version} + +diff --git hadoop-tools/hadoop-archive-logs/pom.xml hadoop-tools/hadoop-archive-logs/pom.xml +new file mode 100644 +index 0000000..2a480a8 +--- /dev/null ++++ hadoop-tools/hadoop-archive-logs/pom.xml +@@ -0,0 +1,171 @@ ++ ++ ++ ++ 4.0.0 ++ ++ org.apache.hadoop ++ hadoop-project ++ 3.0.0-SNAPSHOT ++ ../../hadoop-project ++ ++ org.apache.hadoop ++ hadoop-archive-logs ++ 3.0.0-SNAPSHOT ++ Apache Hadoop Archive Logs ++ Apache Hadoop Archive Logs ++ jar ++ ++ ++ ${project.build.directory}/log ++ ++ ++ ++ ++ junit ++ junit ++ test ++ ++ ++ org.apache.hadoop ++ hadoop-mapreduce-client-core ++ provided ++ ++ ++ org.apache.hadoop ++ hadoop-yarn-applications-distributedshell ++ provided ++ ++ ++ org.apache.hadoop ++ hadoop-common ++ provided ++ ++ ++ org.apache.hadoop ++ hadoop-hdfs ++ test ++ test-jar ++ ++ ++ org.apache.hadoop ++ hadoop-yarn-server-tests ++ test-jar ++ test ++ ++ ++ org.apache.hadoop ++ hadoop-archives ++ ++ ++ org.apache.hadoop ++ hadoop-yarn-common ++ provided ++ ++ ++ org.apache.hadoop ++ hadoop-yarn-api ++ provided ++ ++ ++ com.google.guava ++ guava ++ provided ++ ++ ++ commons-io ++ commons-io ++ provided ++ ++ ++ commons-logging ++ commons-logging ++ provided ++ ++ ++ commons-cli ++ commons-cli ++ provided ++ ++ ++ org.apache.hadoop ++ hadoop-yarn-client ++ provided ++ ++ ++ org.apache.hadoop ++ hadoop-yarn-server-resourcemanager ++ provided ++ ++ ++ ++ org.apache.hadoop ++ hadoop-hdfs ++ test ++ ++ ++ ++ org.apache.hadoop ++ hadoop-common ++ test ++ test-jar ++ ++ ++ ++ org.apache.hadoop ++ hadoop-mapreduce-client-jobclient ++ test ++ test-jar ++ ++ ++ ++ ++ ++ ++ org.apache.maven.plugins ++ maven-antrun-plugin ++ ++ ++ create-log-dir ++ process-test-resources ++ ++ run ++ ++ ++ ++ ++ ++ ++ ++ ++ ++ ++ ++ ++ org.apache.maven.plugins ++ maven-jar-plugin ++ ++ ++ ++ org.apache.hadoop.tools.HadoopArchiveLogs ++ ++ ++ ++ ++ ++ ++ +diff --git hadoop-tools/hadoop-archive-logs/src/main/java/org/apache/hadoop/tools/HadoopArchiveLogs.java hadoop-tools/hadoop-archive-logs/src/main/java/org/apache/hadoop/tools/HadoopArchiveLogs.java +new file mode 100644 +index 0000000..4778dcb +--- /dev/null ++++ hadoop-tools/hadoop-archive-logs/src/main/java/org/apache/hadoop/tools/HadoopArchiveLogs.java +@@ -0,0 +1,403 @@ ++/** ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++ ++package org.apache.hadoop.tools; ++ ++import com.google.common.annotations.VisibleForTesting; ++import org.apache.commons.cli.CommandLine; ++import org.apache.commons.cli.CommandLineParser; ++import org.apache.commons.cli.GnuParser; ++import org.apache.commons.cli.HelpFormatter; ++import org.apache.commons.cli.Option; ++import org.apache.commons.cli.Options; ++import org.apache.commons.cli.ParseException; ++import org.apache.commons.logging.Log; ++import org.apache.commons.logging.LogFactory; ++import org.apache.hadoop.conf.Configuration; ++import org.apache.hadoop.fs.FileStatus; ++import org.apache.hadoop.fs.FileSystem; ++import org.apache.hadoop.fs.Path; ++import org.apache.hadoop.fs.permission.FsAction; ++import org.apache.hadoop.fs.permission.FsPermission; ++import org.apache.hadoop.mapred.JobConf; ++import org.apache.hadoop.util.Tool; ++import org.apache.hadoop.util.ToolRunner; ++import org.apache.hadoop.yarn.api.records.ApplicationReport; ++import org.apache.hadoop.yarn.api.records.LogAggregationStatus; ++import org.apache.hadoop.yarn.applications.distributedshell.ApplicationMaster; ++import org.apache.hadoop.yarn.applications.distributedshell.Client; ++import org.apache.hadoop.yarn.client.api.YarnClient; ++import org.apache.hadoop.yarn.conf.YarnConfiguration; ++import org.apache.hadoop.yarn.exceptions.YarnException; ++import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils; ++ ++import java.io.File; ++import java.io.FileWriter; ++import java.io.IOException; ++import java.util.ArrayList; ++import java.util.Collections; ++import java.util.Comparator; ++import java.util.HashSet; ++import java.util.Iterator; ++import java.util.List; ++import java.util.Set; ++ ++ ++/** ++ * This tool moves Aggregated Log files into HAR archives using the ++ * {@link HadoopArchives} tool and the Distributed Shell via the ++ * {@link HadoopArchiveLogsRunner}. ++ */ ++public class HadoopArchiveLogs implements Tool { ++ private static final Log LOG = LogFactory.getLog(HadoopArchiveLogs.class); ++ ++ private static final String HELP_OPTION = "help"; ++ private static final String MAX_ELIGIBLE_APPS_OPTION = "maxEligibleApps"; ++ private static final String MIN_NUM_LOG_FILES_OPTION = "minNumberLogFiles"; ++ private static final String MAX_TOTAL_LOGS_SIZE_OPTION = "maxTotalLogsSize"; ++ private static final String MEMORY_OPTION = "memory"; ++ ++ private static final int DEFAULT_MAX_ELIGIBLE = -1; ++ private static final int DEFAULT_MIN_NUM_LOG_FILES = 20; ++ private static final long DEFAULT_MAX_TOTAL_LOGS_SIZE = 1024L; ++ private static final long DEFAULT_MEMORY = 1024L; ++ ++ @VisibleForTesting ++ int maxEligible = DEFAULT_MAX_ELIGIBLE; ++ @VisibleForTesting ++ int minNumLogFiles = DEFAULT_MIN_NUM_LOG_FILES; ++ @VisibleForTesting ++ long maxTotalLogsSize = DEFAULT_MAX_TOTAL_LOGS_SIZE * 1024L * 1024L; ++ @VisibleForTesting ++ long memory = DEFAULT_MEMORY; ++ ++ @VisibleForTesting ++ Set eligibleApplications; ++ ++ private JobConf conf; ++ ++ public HadoopArchiveLogs(Configuration conf) { ++ setConf(conf); ++ eligibleApplications = new HashSet<>(); ++ } ++ ++ public static void main(String[] args) { ++ JobConf job = new JobConf(HadoopArchiveLogs.class); ++ ++ HadoopArchiveLogs hal = new HadoopArchiveLogs(job); ++ int ret = 0; ++ ++ try{ ++ ret = ToolRunner.run(hal, args); ++ } catch(Exception e) { ++ LOG.debug("Exception", e); ++ System.err.println(e.getClass().getSimpleName()); ++ final String s = e.getLocalizedMessage(); ++ if (s != null) { ++ System.err.println(s); ++ } else { ++ e.printStackTrace(System.err); ++ } ++ System.exit(1); ++ } ++ System.exit(ret); ++ } ++ ++ @Override ++ public int run(String[] args) throws Exception { ++ handleOpts(args); ++ ++ findAggregatedApps(); ++ ++ FileSystem fs = null; ++ Path remoteRootLogDir = new Path(conf.get( ++ YarnConfiguration.NM_REMOTE_APP_LOG_DIR, ++ YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR)); ++ String suffix = LogAggregationUtils.getRemoteNodeLogDirSuffix(conf); ++ Path workingDir = new Path(remoteRootLogDir, "archive-logs-work"); ++ try { ++ fs = FileSystem.get(conf); ++ checkFiles(fs, remoteRootLogDir, suffix); ++ ++ // Prepare working directory ++ if (fs.exists(workingDir)) { ++ fs.delete(workingDir, true); ++ } ++ fs.mkdirs(workingDir); ++ fs.setPermission(workingDir, ++ new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE)); ++ } finally { ++ if (fs != null) { ++ fs.close(); ++ } ++ } ++ ++ checkMaxEligible(); ++ ++ if (eligibleApplications.isEmpty()) { ++ LOG.info("No eligible applications to process"); ++ System.exit(0); ++ } ++ ++ StringBuilder sb = ++ new StringBuilder("Will process the following applications:"); ++ for (ApplicationReport report : eligibleApplications) { ++ sb.append("\n\t").append(report.getApplicationId()); ++ } ++ LOG.info(sb.toString()); ++ ++ File localScript = File.createTempFile("hadoop-archive-logs-", ".sh"); ++ generateScript(localScript, workingDir, remoteRootLogDir, suffix); ++ ++ if (runDistributedShell(localScript)) { ++ return 0; ++ } ++ return -1; ++ } ++ ++ private void handleOpts(String[] args) throws ParseException { ++ Options opts = new Options(); ++ Option helpOpt = new Option(HELP_OPTION, false, "Prints this message"); ++ Option maxEligibleOpt = new Option(MAX_ELIGIBLE_APPS_OPTION, true, ++ "The maximum number of eligible apps to process (default: " ++ + DEFAULT_MAX_ELIGIBLE + " (all))"); ++ maxEligibleOpt.setArgName("n"); ++ Option minNumLogFilesOpt = new Option(MIN_NUM_LOG_FILES_OPTION, true, ++ "The minimum number of log files required to be eligible (default: " ++ + DEFAULT_MIN_NUM_LOG_FILES + ")"); ++ minNumLogFilesOpt.setArgName("n"); ++ Option maxTotalLogsSizeOpt = new Option(MAX_TOTAL_LOGS_SIZE_OPTION, true, ++ "The maximum total logs size (in megabytes) required to be eligible" + ++ " (default: " + DEFAULT_MAX_TOTAL_LOGS_SIZE + ")"); ++ maxTotalLogsSizeOpt.setArgName("megabytes"); ++ Option memoryOpt = new Option(MEMORY_OPTION, true, ++ "The amount of memory (in megabytes) for each container (default: " ++ + DEFAULT_MEMORY + ")"); ++ memoryOpt.setArgName("megabytes"); ++ opts.addOption(helpOpt); ++ opts.addOption(maxEligibleOpt); ++ opts.addOption(minNumLogFilesOpt); ++ opts.addOption(maxTotalLogsSizeOpt); ++ opts.addOption(memoryOpt); ++ ++ try { ++ CommandLineParser parser = new GnuParser(); ++ CommandLine commandLine = parser.parse(opts, args); ++ if (commandLine.hasOption(HELP_OPTION)) { ++ HelpFormatter formatter = new HelpFormatter(); ++ formatter.printHelp("yarn archive-logs", opts); ++ System.exit(0); ++ } ++ if (commandLine.hasOption(MAX_ELIGIBLE_APPS_OPTION)) { ++ maxEligible = Integer.parseInt( ++ commandLine.getOptionValue(MAX_ELIGIBLE_APPS_OPTION)); ++ if (maxEligible == 0) { ++ LOG.info("Setting " + MAX_ELIGIBLE_APPS_OPTION + " to 0 accomplishes " ++ + "nothing. Please either set it to a negative value " ++ + "(default, all) or a more reasonable value."); ++ System.exit(0); ++ } ++ } ++ if (commandLine.hasOption(MIN_NUM_LOG_FILES_OPTION)) { ++ minNumLogFiles = Integer.parseInt( ++ commandLine.getOptionValue(MIN_NUM_LOG_FILES_OPTION)); ++ } ++ if (commandLine.hasOption(MAX_TOTAL_LOGS_SIZE_OPTION)) { ++ maxTotalLogsSize = Long.parseLong( ++ commandLine.getOptionValue(MAX_TOTAL_LOGS_SIZE_OPTION)); ++ maxTotalLogsSize *= 1024L * 1024L; ++ } ++ if (commandLine.hasOption(MEMORY_OPTION)) { ++ memory = Long.parseLong(commandLine.getOptionValue(MEMORY_OPTION)); ++ } ++ } catch (ParseException pe) { ++ HelpFormatter formatter = new HelpFormatter(); ++ formatter.printHelp("yarn archive-logs", opts); ++ throw pe; ++ } ++ } ++ ++ @VisibleForTesting ++ void findAggregatedApps() throws IOException, YarnException { ++ YarnClient client = YarnClient.createYarnClient(); ++ try { ++ client.init(getConf()); ++ client.start(); ++ List reports = client.getApplications(); ++ for (ApplicationReport report : reports) { ++ LogAggregationStatus aggStatus = report.getLogAggregationStatus(); ++ if (aggStatus.equals(LogAggregationStatus.SUCCEEDED) || ++ aggStatus.equals(LogAggregationStatus.FAILED)) { ++ eligibleApplications.add(report); ++ } ++ } ++ } finally { ++ if (client != null) { ++ client.stop(); ++ } ++ } ++ } ++ ++ @VisibleForTesting ++ void checkFiles(FileSystem fs, Path remoteRootLogDir, String suffix) { ++ for (Iterator reportIt = eligibleApplications.iterator(); ++ reportIt.hasNext(); ) { ++ ApplicationReport report = reportIt.next(); ++ long totalFileSize = 0L; ++ try { ++ FileStatus[] files = fs.listStatus( ++ LogAggregationUtils.getRemoteAppLogDir(remoteRootLogDir, ++ report.getApplicationId(), report.getUser(), suffix)); ++ if (files.length < minNumLogFiles) { ++ reportIt.remove(); ++ } else { ++ for (FileStatus file : files) { ++ if (file.getPath().getName().equals(report.getApplicationId() ++ + ".har")) { ++ reportIt.remove(); ++ break; ++ } ++ totalFileSize += file.getLen(); ++ } ++ if (totalFileSize > maxTotalLogsSize) { ++ reportIt.remove(); ++ } ++ } ++ } catch (IOException ioe) { ++ // If the user doesn't have permission or it doesn't exist, then skip it ++ reportIt.remove(); ++ } ++ } ++ } ++ ++ @VisibleForTesting ++ void checkMaxEligible() { ++ // If we have too many eligible apps, remove the newest ones first ++ if (maxEligible > 0 && eligibleApplications.size() > maxEligible) { ++ List sortedApplications = ++ new ArrayList(eligibleApplications); ++ Collections.sort(sortedApplications, new Comparator() { ++ @Override ++ public int compare(ApplicationReport o1, ApplicationReport o2) { ++ return Long.compare(o1.getFinishTime(), o2.getFinishTime()); ++ } ++ }); ++ for (int i = maxEligible; i < sortedApplications.size(); i++) { ++ eligibleApplications.remove(sortedApplications.get(i)); ++ } ++ } ++ } ++ ++ /* ++ The generated script looks like this: ++ #!/bin/bash ++ set -e ++ set -x ++ if [ "$YARN_SHELL_ID" == "1" ]; then ++ appId="application_1440448768987_0001" ++ user="rkanter" ++ elif [ "$YARN_SHELL_ID" == "2" ]; then ++ appId="application_1440448768987_0002" ++ user="rkanter" ++ else ++ echo "Unknown Mapping!" ++ exit 1 ++ fi ++ export HADOOP_CLIENT_OPTS="-Xmx1024m" ++ export HADOOP_CLASSPATH=/dist/share/hadoop/tools/lib/hadoop-archive-logs-2.8.0-SNAPSHOT.jar:/dist/share/hadoop/tools/lib/hadoop-archives-2.8.0-SNAPSHOT.jar ++ "$HADOOP_HOME"/bin/hadoop org.apache.hadoop.tools.HadoopArchiveLogsRunner -appId "$appId" -user "$user" -workingDir /tmp/logs/archive-logs-work -remoteRootLogDir /tmp/logs -suffix logs ++ */ ++ @VisibleForTesting ++ void generateScript(File localScript, Path workingDir, ++ Path remoteRootLogDir, String suffix) throws IOException { ++ LOG.info("Generating script at: " + localScript.getAbsolutePath()); ++ String halrJarPath = HadoopArchiveLogsRunner.class.getProtectionDomain() ++ .getCodeSource().getLocation().getPath(); ++ String harJarPath = HadoopArchives.class.getProtectionDomain() ++ .getCodeSource().getLocation().getPath(); ++ String classpath = halrJarPath + File.pathSeparator + harJarPath; ++ FileWriter fw = null; ++ try { ++ fw = new FileWriter(localScript); ++ fw.write("#!/bin/bash\nset -e\nset -x\n"); ++ int containerCount = 1; ++ for (ApplicationReport report : eligibleApplications) { ++ fw.write("if [ \"$YARN_SHELL_ID\" == \""); ++ fw.write(Integer.toString(containerCount)); ++ fw.write("\" ]; then\n\tappId=\""); ++ fw.write(report.getApplicationId().toString()); ++ fw.write("\"\n\tuser=\""); ++ fw.write(report.getUser()); ++ fw.write("\"\nel"); ++ containerCount++; ++ } ++ fw.write("se\n\techo \"Unknown Mapping!\"\n\texit 1\nfi\n"); ++ fw.write("export HADOOP_CLIENT_OPTS=\"-Xmx"); ++ fw.write(Long.toString(memory)); ++ fw.write("m\"\n"); ++ fw.write("export HADOOP_CLASSPATH="); ++ fw.write(classpath); ++ fw.write("\n\"$HADOOP_HOME\"/bin/hadoop "); ++ fw.write(HadoopArchiveLogsRunner.class.getName()); ++ fw.write(" -appId \"$appId\" -user \"$user\" -workingDir "); ++ fw.write(workingDir.toString()); ++ fw.write(" -remoteRootLogDir "); ++ fw.write(remoteRootLogDir.toString()); ++ fw.write(" -suffix "); ++ fw.write(suffix); ++ fw.write("\n"); ++ } finally { ++ if (fw != null) { ++ fw.close(); ++ } ++ } ++ } ++ ++ private boolean runDistributedShell(File localScript) throws Exception { ++ String[] dsArgs = { ++ "--appname", ++ "ArchiveLogs", ++ "--jar", ++ ApplicationMaster.class.getProtectionDomain().getCodeSource() ++ .getLocation().getPath(), ++ "--num_containers", ++ Integer.toString(eligibleApplications.size()), ++ "--container_memory", ++ Long.toString(memory), ++ "--shell_script", ++ localScript.getAbsolutePath() ++ }; ++ final Client dsClient = new Client(new Configuration(conf)); ++ dsClient.init(dsArgs); ++ return dsClient.run(); ++ } ++ ++ @Override ++ public void setConf(Configuration conf) { ++ if (conf instanceof JobConf) { ++ this.conf = (JobConf) conf; ++ } else { ++ this.conf = new JobConf(conf, HadoopArchiveLogs.class); ++ } ++ } ++ ++ @Override ++ public Configuration getConf() { ++ return this.conf; ++ } ++} +diff --git hadoop-tools/hadoop-archive-logs/src/main/java/org/apache/hadoop/tools/HadoopArchiveLogsRunner.java hadoop-tools/hadoop-archive-logs/src/main/java/org/apache/hadoop/tools/HadoopArchiveLogsRunner.java +new file mode 100644 +index 0000000..347e5fb +--- /dev/null ++++ hadoop-tools/hadoop-archive-logs/src/main/java/org/apache/hadoop/tools/HadoopArchiveLogsRunner.java +@@ -0,0 +1,180 @@ ++/** ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++ ++package org.apache.hadoop.tools; ++ ++import org.apache.commons.cli.CommandLine; ++import org.apache.commons.cli.CommandLineParser; ++import org.apache.commons.cli.GnuParser; ++import org.apache.commons.cli.Option; ++import org.apache.commons.cli.Options; ++import org.apache.commons.cli.ParseException; ++import org.apache.commons.logging.Log; ++import org.apache.commons.logging.LogFactory; ++import org.apache.hadoop.conf.Configuration; ++import org.apache.hadoop.fs.FileStatus; ++import org.apache.hadoop.fs.FileSystem; ++import org.apache.hadoop.fs.Path; ++import org.apache.hadoop.fs.PathFilter; ++import org.apache.hadoop.mapred.JobConf; ++import org.apache.hadoop.util.Tool; ++import org.apache.hadoop.util.ToolRunner; ++ ++import java.io.File; ++ ++/** ++ * This is a child program designed to be used by the {@link HadoopArchiveLogs} ++ * tool via the Distributed Shell. It's not meant to be run directly. ++ */ ++public class HadoopArchiveLogsRunner implements Tool { ++ private static final Log LOG = LogFactory.getLog(HadoopArchiveLogsRunner.class); ++ ++ private static final String APP_ID_OPTION = "appId"; ++ private static final String USER_OPTION = "user"; ++ private static final String WORKING_DIR_OPTION = "workingDir"; ++ private static final String REMOTE_ROOT_LOG_DIR = "remoteRootLogDir"; ++ private static final String SUFFIX_OPTION = "suffix"; ++ ++ private String appId; ++ private String user; ++ private String workingDir; ++ private String remoteLogDir; ++ private String suffix; ++ ++ private JobConf conf; ++ ++ public HadoopArchiveLogsRunner(Configuration conf) { ++ setConf(conf); ++ } ++ ++ public static void main(String[] args) { ++ JobConf job = new JobConf(HadoopArchiveLogsRunner.class); ++ ++ HadoopArchiveLogsRunner halr = new HadoopArchiveLogsRunner(job); ++ int ret = 0; ++ ++ try{ ++ ret = ToolRunner.run(halr, args); ++ } catch(Exception e) { ++ LOG.debug("Exception", e); ++ System.err.println(e.getClass().getSimpleName()); ++ final String s = e.getLocalizedMessage(); ++ if (s != null) { ++ System.err.println(s); ++ } else { ++ e.printStackTrace(System.err); ++ } ++ System.exit(1); ++ } ++ System.exit(ret); ++ } ++ ++ @Override ++ public int run(String[] args) throws Exception { ++ handleOpts(args); ++ String remoteAppLogDir = remoteLogDir + File.separator + user ++ + File.separator + suffix + File.separator + appId; ++ ++ // Run 'hadoop archives' command in local mode ++ Configuration haConf = new Configuration(getConf()); ++ haConf.set("mapreduce.framework.name", "local"); ++ HadoopArchives ha = new HadoopArchives(haConf); ++ String[] haArgs = { ++ "-archiveName", ++ appId + ".har", ++ "-p", ++ remoteAppLogDir, ++ "*", ++ workingDir ++ }; ++ StringBuilder sb = new StringBuilder("Executing 'hadoop archives'"); ++ for (String haArg : haArgs) { ++ sb.append("\n\t").append(haArg); ++ } ++ LOG.info(sb.toString()); ++ ha.run(haArgs); ++ ++ FileSystem fs = null; ++ // Move har file to correct location and delete original logs ++ try { ++ fs = FileSystem.get(conf); ++ LOG.info("Moving har to original location"); ++ fs.rename(new Path(workingDir, appId + ".har"), ++ new Path(remoteAppLogDir, appId + ".har")); ++ LOG.info("Deleting original logs"); ++ for (FileStatus original : fs.listStatus(new Path(remoteAppLogDir), ++ new PathFilter() { ++ @Override ++ public boolean accept(Path path) { ++ return !path.getName().endsWith(".har"); ++ } ++ })) { ++ fs.delete(original.getPath(), false); ++ } ++ } finally { ++ if (fs != null) { ++ fs.close(); ++ } ++ } ++ ++ return 0; ++ } ++ ++ private void handleOpts(String[] args) throws ParseException { ++ Options opts = new Options(); ++ Option appIdOpt = new Option(APP_ID_OPTION, true, "Application ID"); ++ appIdOpt.setRequired(true); ++ Option userOpt = new Option(USER_OPTION, true, "User"); ++ userOpt.setRequired(true); ++ Option workingDirOpt = new Option(WORKING_DIR_OPTION, true, ++ "Working Directory"); ++ workingDirOpt.setRequired(true); ++ Option remoteLogDirOpt = new Option(REMOTE_ROOT_LOG_DIR, true, ++ "Remote Root Log Directory"); ++ remoteLogDirOpt.setRequired(true); ++ Option suffixOpt = new Option(SUFFIX_OPTION, true, "Suffix"); ++ suffixOpt.setRequired(true); ++ opts.addOption(appIdOpt); ++ opts.addOption(userOpt); ++ opts.addOption(workingDirOpt); ++ opts.addOption(remoteLogDirOpt); ++ opts.addOption(suffixOpt); ++ ++ CommandLineParser parser = new GnuParser(); ++ CommandLine commandLine = parser.parse(opts, args); ++ appId = commandLine.getOptionValue(APP_ID_OPTION); ++ user = commandLine.getOptionValue(USER_OPTION); ++ workingDir = commandLine.getOptionValue(WORKING_DIR_OPTION); ++ remoteLogDir = commandLine.getOptionValue(REMOTE_ROOT_LOG_DIR); ++ suffix = commandLine.getOptionValue(SUFFIX_OPTION); ++ } ++ ++ @Override ++ public void setConf(Configuration conf) { ++ if (conf instanceof JobConf) { ++ this.conf = (JobConf) conf; ++ } else { ++ this.conf = new JobConf(conf, HadoopArchiveLogsRunner.class); ++ } ++ } ++ ++ @Override ++ public Configuration getConf() { ++ return this.conf; ++ } ++} +diff --git hadoop-tools/hadoop-archive-logs/src/test/java/org/apache/hadoop/tools/TestHadoopArchiveLogs.java hadoop-tools/hadoop-archive-logs/src/test/java/org/apache/hadoop/tools/TestHadoopArchiveLogs.java +new file mode 100644 +index 0000000..c8ff201 +--- /dev/null ++++ hadoop-tools/hadoop-archive-logs/src/test/java/org/apache/hadoop/tools/TestHadoopArchiveLogs.java +@@ -0,0 +1,293 @@ ++/** ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++ ++package org.apache.hadoop.tools; ++ ++import org.apache.commons.io.IOUtils; ++import org.apache.hadoop.conf.Configuration; ++import org.apache.hadoop.fs.FSDataOutputStream; ++import org.apache.hadoop.fs.FileSystem; ++import org.apache.hadoop.fs.Path; ++import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; ++import org.apache.hadoop.yarn.api.records.ApplicationId; ++import org.apache.hadoop.yarn.api.records.ApplicationReport; ++import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; ++import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; ++import org.apache.hadoop.yarn.api.records.LogAggregationStatus; ++import org.apache.hadoop.yarn.api.records.Priority; ++import org.apache.hadoop.yarn.api.records.Resource; ++import org.apache.hadoop.yarn.api.records.YarnApplicationState; ++import org.apache.hadoop.yarn.conf.YarnConfiguration; ++import org.apache.hadoop.yarn.server.MiniYARNCluster; ++import org.apache.hadoop.yarn.server.resourcemanager.RMContext; ++import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; ++import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; ++import org.junit.Assert; ++import org.junit.Test; ++ ++import java.io.File; ++import java.io.IOException; ++import java.util.Random; ++ ++public class TestHadoopArchiveLogs { ++ ++ private static final long CLUSTER_TIMESTAMP = System.currentTimeMillis(); ++ private static final int FILE_SIZE_INCREMENT = 4096; ++ private static final byte[] DUMMY_DATA = new byte[FILE_SIZE_INCREMENT]; ++ static { ++ new Random().nextBytes(DUMMY_DATA); ++ } ++ ++ @Test(timeout = 10000) ++ public void testCheckFiles() throws Exception { ++ Configuration conf = new Configuration(); ++ HadoopArchiveLogs hal = new HadoopArchiveLogs(conf); ++ FileSystem fs = FileSystem.getLocal(conf); ++ Path rootLogDir = new Path("target", "logs"); ++ String suffix = "logs"; ++ Path logDir = new Path(rootLogDir, ++ new Path(System.getProperty("user.name"), suffix)); ++ fs.mkdirs(logDir); ++ ++ Assert.assertEquals(0, hal.eligibleApplications.size()); ++ ApplicationReport app1 = createAppReport(1); // no files found ++ ApplicationReport app2 = createAppReport(2); // too few files ++ Path app2Path = new Path(logDir, app2.getApplicationId().toString()); ++ fs.mkdirs(app2Path); ++ createFile(fs, new Path(app2Path, "file1"), 1); ++ hal.minNumLogFiles = 2; ++ ApplicationReport app3 = createAppReport(3); // too large ++ Path app3Path = new Path(logDir, app3.getApplicationId().toString()); ++ fs.mkdirs(app3Path); ++ createFile(fs, new Path(app3Path, "file1"), 2); ++ createFile(fs, new Path(app3Path, "file2"), 5); ++ hal.maxTotalLogsSize = FILE_SIZE_INCREMENT * 6; ++ ApplicationReport app4 = createAppReport(4); // has har already ++ Path app4Path = new Path(logDir, app4.getApplicationId().toString()); ++ fs.mkdirs(app4Path); ++ createFile(fs, new Path(app4Path, app4.getApplicationId() + ".har"), 1); ++ ApplicationReport app5 = createAppReport(5); // just right ++ Path app5Path = new Path(logDir, app5.getApplicationId().toString()); ++ fs.mkdirs(app5Path); ++ createFile(fs, new Path(app5Path, "file1"), 2); ++ createFile(fs, new Path(app5Path, "file2"), 3); ++ hal.eligibleApplications.add(app1); ++ hal.eligibleApplications.add(app2); ++ hal.eligibleApplications.add(app3); ++ hal.eligibleApplications.add(app4); ++ hal.eligibleApplications.add(app5); ++ ++ hal.checkFiles(fs, rootLogDir, suffix); ++ Assert.assertEquals(1, hal.eligibleApplications.size()); ++ Assert.assertEquals(app5, hal.eligibleApplications.iterator().next()); ++ } ++ ++ @Test(timeout = 10000) ++ public void testCheckMaxEligible() throws Exception { ++ Configuration conf = new Configuration(); ++ HadoopArchiveLogs hal = new HadoopArchiveLogs(conf); ++ ApplicationReport app1 = createAppReport(1); ++ app1.setFinishTime(CLUSTER_TIMESTAMP - 5); ++ ApplicationReport app2 = createAppReport(2); ++ app2.setFinishTime(CLUSTER_TIMESTAMP - 10); ++ ApplicationReport app3 = createAppReport(3); ++ app3.setFinishTime(CLUSTER_TIMESTAMP + 5); ++ ApplicationReport app4 = createAppReport(4); ++ app4.setFinishTime(CLUSTER_TIMESTAMP + 10); ++ ApplicationReport app5 = createAppReport(5); ++ app5.setFinishTime(CLUSTER_TIMESTAMP); ++ Assert.assertEquals(0, hal.eligibleApplications.size()); ++ hal.eligibleApplications.add(app1); ++ hal.eligibleApplications.add(app2); ++ hal.eligibleApplications.add(app3); ++ hal.eligibleApplications.add(app4); ++ hal.eligibleApplications.add(app5); ++ hal.maxEligible = -1; ++ hal.checkMaxEligible(); ++ Assert.assertEquals(5, hal.eligibleApplications.size()); ++ ++ hal.maxEligible = 4; ++ hal.checkMaxEligible(); ++ Assert.assertEquals(4, hal.eligibleApplications.size()); ++ Assert.assertFalse(hal.eligibleApplications.contains(app4)); ++ ++ hal.maxEligible = 3; ++ hal.checkMaxEligible(); ++ Assert.assertEquals(3, hal.eligibleApplications.size()); ++ Assert.assertFalse(hal.eligibleApplications.contains(app3)); ++ ++ hal.maxEligible = 2; ++ hal.checkMaxEligible(); ++ Assert.assertEquals(2, hal.eligibleApplications.size()); ++ Assert.assertFalse(hal.eligibleApplications.contains(app5)); ++ ++ hal.maxEligible = 1; ++ hal.checkMaxEligible(); ++ Assert.assertEquals(1, hal.eligibleApplications.size()); ++ Assert.assertFalse(hal.eligibleApplications.contains(app1)); ++ } ++ ++ @Test(timeout = 10000) ++ public void testFindAggregatedApps() throws Exception { ++ MiniYARNCluster yarnCluster = null; ++ try { ++ Configuration conf = new Configuration(); ++ conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true); ++ yarnCluster = ++ new MiniYARNCluster(TestHadoopArchiveLogs.class.getSimpleName(), 1, ++ 1, 1, 1); ++ yarnCluster.init(conf); ++ yarnCluster.start(); ++ conf = yarnCluster.getConfig(); ++ ++ RMContext rmContext = yarnCluster.getResourceManager().getRMContext(); ++ RMAppImpl app1 = (RMAppImpl)createRMApp(1, conf, rmContext, ++ LogAggregationStatus.DISABLED); ++ RMAppImpl app2 = (RMAppImpl)createRMApp(2, conf, rmContext, ++ LogAggregationStatus.FAILED); ++ RMAppImpl app3 = (RMAppImpl)createRMApp(3, conf, rmContext, ++ LogAggregationStatus.NOT_START); ++ RMAppImpl app4 = (RMAppImpl)createRMApp(4, conf, rmContext, ++ LogAggregationStatus.SUCCEEDED); ++ RMAppImpl app5 = (RMAppImpl)createRMApp(5, conf, rmContext, ++ LogAggregationStatus.RUNNING); ++ RMAppImpl app6 = (RMAppImpl)createRMApp(6, conf, rmContext, ++ LogAggregationStatus.RUNNING_WITH_FAILURE); ++ RMAppImpl app7 = (RMAppImpl)createRMApp(7, conf, rmContext, ++ LogAggregationStatus.TIME_OUT); ++ rmContext.getRMApps().put(app1.getApplicationId(), app1); ++ rmContext.getRMApps().put(app2.getApplicationId(), app2); ++ rmContext.getRMApps().put(app3.getApplicationId(), app3); ++ rmContext.getRMApps().put(app4.getApplicationId(), app4); ++ rmContext.getRMApps().put(app5.getApplicationId(), app5); ++ rmContext.getRMApps().put(app6.getApplicationId(), app6); ++ rmContext.getRMApps().put(app7.getApplicationId(), app7); ++ ++ HadoopArchiveLogs hal = new HadoopArchiveLogs(conf); ++ Assert.assertEquals(0, hal.eligibleApplications.size()); ++ hal.findAggregatedApps(); ++ Assert.assertEquals(2, hal.eligibleApplications.size()); ++ } finally { ++ if (yarnCluster != null) { ++ yarnCluster.stop(); ++ } ++ } ++ } ++ ++ @Test(timeout = 10000) ++ public void testGenerateScript() throws Exception { ++ Configuration conf = new Configuration(); ++ HadoopArchiveLogs hal = new HadoopArchiveLogs(conf); ++ ApplicationReport app1 = createAppReport(1); ++ ApplicationReport app2 = createAppReport(2); ++ hal.eligibleApplications.add(app1); ++ hal.eligibleApplications.add(app2); ++ ++ File localScript = new File("target", "script.sh"); ++ Path workingDir = new Path("/tmp", "working"); ++ Path remoteRootLogDir = new Path("/tmp", "logs"); ++ String suffix = "logs"; ++ localScript.delete(); ++ Assert.assertFalse(localScript.exists()); ++ hal.generateScript(localScript, workingDir, remoteRootLogDir, suffix); ++ Assert.assertTrue(localScript.exists()); ++ String script = IOUtils.toString(localScript.toURI()); ++ String[] lines = script.split(System.lineSeparator()); ++ Assert.assertEquals(16, lines.length); ++ Assert.assertEquals("#!/bin/bash", lines[0]); ++ Assert.assertEquals("set -e", lines[1]); ++ Assert.assertEquals("set -x", lines[2]); ++ Assert.assertEquals("if [ \"$YARN_SHELL_ID\" == \"1\" ]; then", lines[3]); ++ if (lines[4].contains(app1.getApplicationId().toString())) { ++ Assert.assertEquals("\tappId=\"" + app1.getApplicationId().toString() ++ + "\"", lines[4]); ++ Assert.assertEquals("\tappId=\"" + app2.getApplicationId().toString() ++ + "\"", lines[7]); ++ } else { ++ Assert.assertEquals("\tappId=\"" + app2.getApplicationId().toString() ++ + "\"", lines[4]); ++ Assert.assertEquals("\tappId=\"" + app1.getApplicationId().toString() ++ + "\"", lines[7]); ++ } ++ Assert.assertEquals("\tuser=\"" + System.getProperty("user.name") + "\"", ++ lines[5]); ++ Assert.assertEquals("elif [ \"$YARN_SHELL_ID\" == \"2\" ]; then", lines[6]); ++ Assert.assertEquals("\tuser=\"" + System.getProperty("user.name") + "\"", ++ lines[8]); ++ Assert.assertEquals("else", lines[9]); ++ Assert.assertEquals("\techo \"Unknown Mapping!\"", lines[10]); ++ Assert.assertEquals("\texit 1", lines[11]); ++ Assert.assertEquals("fi", lines[12]); ++ Assert.assertEquals("export HADOOP_CLIENT_OPTS=\"-Xmx1024m\"", lines[13]); ++ Assert.assertTrue(lines[14].startsWith("export HADOOP_CLASSPATH=")); ++ Assert.assertEquals("\"$HADOOP_HOME\"/bin/hadoop org.apache.hadoop.tools." + ++ "HadoopArchiveLogsRunner -appId \"$appId\" -user \"$user\" -workingDir " ++ + workingDir.toString() + " -remoteRootLogDir " + ++ remoteRootLogDir.toString() + " -suffix " + suffix, lines[15]); ++ } ++ ++ private static ApplicationReport createAppReport(int id) { ++ ApplicationId appId = ApplicationId.newInstance(CLUSTER_TIMESTAMP, id); ++ return ApplicationReport.newInstance( ++ appId, ++ ApplicationAttemptId.newInstance(appId, 1), ++ System.getProperty("user.name"), ++ null, null, null, 0, null, YarnApplicationState.FINISHED, null, ++ null, 0L, 0L, FinalApplicationStatus.SUCCEEDED, null, null, 100f, ++ null, null); ++ } ++ ++ private static void createFile(FileSystem fs, Path p, long sizeMultiple) ++ throws IOException { ++ FSDataOutputStream out = null; ++ try { ++ out = fs.create(p); ++ for (int i = 0 ; i < sizeMultiple; i++) { ++ out.write(DUMMY_DATA); ++ } ++ } finally { ++ if (out != null) { ++ out.close(); ++ } ++ } ++ } ++ ++ private static RMApp createRMApp(int id, Configuration conf, RMContext rmContext, ++ final LogAggregationStatus aggStatus) { ++ ApplicationId appId = ApplicationId.newInstance(CLUSTER_TIMESTAMP, id); ++ ApplicationSubmissionContext submissionContext = ++ ApplicationSubmissionContext.newInstance(appId, "test", "default", ++ Priority.newInstance(0), null, false, true, ++ 2, Resource.newInstance(10, 2), "test"); ++ return new RMAppImpl(appId, rmContext, conf, "test", ++ System.getProperty("user.name"), "default", submissionContext, ++ rmContext.getScheduler(), ++ rmContext.getApplicationMasterService(), ++ System.currentTimeMillis(), "test", ++ null, null) { ++ @Override ++ public ApplicationReport createAndGetApplicationReport( ++ String clientUserName, boolean allowAccess) { ++ ApplicationReport report = ++ super.createAndGetApplicationReport(clientUserName, allowAccess); ++ report.setLogAggregationStatus(aggStatus); ++ return report; ++ } ++ }; ++ } ++} +diff --git hadoop-tools/hadoop-archive-logs/src/test/java/org/apache/hadoop/tools/TestHadoopArchiveLogsRunner.java hadoop-tools/hadoop-archive-logs/src/test/java/org/apache/hadoop/tools/TestHadoopArchiveLogsRunner.java +new file mode 100644 +index 0000000..af66f14 +--- /dev/null ++++ hadoop-tools/hadoop-archive-logs/src/test/java/org/apache/hadoop/tools/TestHadoopArchiveLogsRunner.java +@@ -0,0 +1,143 @@ ++/** ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++ ++package org.apache.hadoop.tools; ++ ++import org.apache.hadoop.conf.Configuration; ++import org.apache.hadoop.fs.FSDataOutputStream; ++import org.apache.hadoop.fs.FileStatus; ++import org.apache.hadoop.fs.FileSystem; ++import org.apache.hadoop.fs.HarFs; ++import org.apache.hadoop.fs.Path; ++import org.apache.hadoop.hdfs.MiniDFSCluster; ++import org.apache.hadoop.util.ToolRunner; ++import org.apache.hadoop.yarn.api.records.ApplicationId; ++import org.apache.hadoop.yarn.conf.YarnConfiguration; ++import org.apache.hadoop.yarn.server.MiniYARNCluster; ++import org.junit.Assert; ++import org.junit.Test; ++ ++import java.io.IOException; ++import java.util.Arrays; ++import java.util.Comparator; ++import java.util.Random; ++ ++import static org.junit.Assert.assertEquals; ++ ++public class TestHadoopArchiveLogsRunner { ++ ++ private static final int FILE_SIZE_INCREMENT = 4096; ++ private static final byte[] DUMMY_DATA = new byte[FILE_SIZE_INCREMENT]; ++ static { ++ new Random().nextBytes(DUMMY_DATA); ++ } ++ ++ @Test(timeout = 30000) ++ public void testHadoopArchiveLogs() throws Exception { ++ MiniYARNCluster yarnCluster = null; ++ MiniDFSCluster dfsCluster = null; ++ FileSystem fs = null; ++ try { ++ Configuration conf = new YarnConfiguration(); ++ conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true); ++ conf.setBoolean(YarnConfiguration.YARN_MINICLUSTER_FIXED_PORTS, true); ++ yarnCluster = ++ new MiniYARNCluster(TestHadoopArchiveLogsRunner.class.getSimpleName(), ++ 1, 2, 1, 1); ++ yarnCluster.init(conf); ++ yarnCluster.start(); ++ conf = yarnCluster.getConfig(); ++ dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); ++ ++ ApplicationId app1 = ++ ApplicationId.newInstance(System.currentTimeMillis(), 1); ++ fs = FileSystem.get(conf); ++ Path remoteRootLogDir = new Path(conf.get( ++ YarnConfiguration.NM_REMOTE_APP_LOG_DIR, ++ YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR)); ++ Path workingDir = new Path(remoteRootLogDir, "archive-logs-work"); ++ String suffix = "logs"; ++ Path logDir = new Path(remoteRootLogDir, ++ new Path(System.getProperty("user.name"), suffix)); ++ fs.mkdirs(logDir); ++ Path app1Path = new Path(logDir, app1.toString()); ++ fs.mkdirs(app1Path); ++ createFile(fs, new Path(app1Path, "log1"), 3); ++ createFile(fs, new Path(app1Path, "log2"), 4); ++ createFile(fs, new Path(app1Path, "log3"), 2); ++ FileStatus[] app1Files = fs.listStatus(app1Path); ++ Assert.assertEquals(3, app1Files.length); ++ ++ String[] args = new String[]{ ++ "-appId", app1.toString(), ++ "-user", System.getProperty("user.name"), ++ "-workingDir", workingDir.toString(), ++ "-remoteRootLogDir", remoteRootLogDir.toString(), ++ "-suffix", suffix}; ++ final HadoopArchiveLogsRunner halr = new HadoopArchiveLogsRunner(conf); ++ assertEquals(0, ToolRunner.run(halr, args)); ++ ++ fs = FileSystem.get(conf); ++ app1Files = fs.listStatus(app1Path); ++ Assert.assertEquals(1, app1Files.length); ++ FileStatus harFile = app1Files[0]; ++ Assert.assertEquals(app1.toString() + ".har", harFile.getPath().getName()); ++ Path harPath = new Path("har:///" + harFile.getPath().toUri().getRawPath()); ++ FileStatus[] harLogs = HarFs.get(harPath.toUri(), conf).listStatus(harPath); ++ Assert.assertEquals(3, harLogs.length); ++ Arrays.sort(harLogs, new Comparator() { ++ @Override ++ public int compare(FileStatus o1, FileStatus o2) { ++ return o1.getPath().getName().compareTo(o2.getPath().getName()); ++ } ++ }); ++ Assert.assertEquals("log1", harLogs[0].getPath().getName()); ++ Assert.assertEquals(3 * FILE_SIZE_INCREMENT, harLogs[0].getLen()); ++ Assert.assertEquals("log2", harLogs[1].getPath().getName()); ++ Assert.assertEquals(4 * FILE_SIZE_INCREMENT, harLogs[1].getLen()); ++ Assert.assertEquals("log3", harLogs[2].getPath().getName()); ++ Assert.assertEquals(2 * FILE_SIZE_INCREMENT, harLogs[2].getLen()); ++ Assert.assertEquals(0, fs.listStatus(workingDir).length); ++ } finally { ++ if (yarnCluster != null) { ++ yarnCluster.stop(); ++ } ++ if (fs != null) { ++ fs.close(); ++ } ++ if (dfsCluster != null) { ++ dfsCluster.shutdown(); ++ } ++ } ++ } ++ ++ private static void createFile(FileSystem fs, Path p, long sizeMultiple) ++ throws IOException { ++ FSDataOutputStream out = null; ++ try { ++ out = fs.create(p); ++ for (int i = 0 ; i < sizeMultiple; i++) { ++ out.write(DUMMY_DATA); ++ } ++ } finally { ++ if (out != null) { ++ out.close(); ++ } ++ } ++ } ++} +diff --git hadoop-tools/hadoop-tools-dist/pom.xml hadoop-tools/hadoop-tools-dist/pom.xml +index 540401d..e6c458f 100644 +--- hadoop-tools/hadoop-tools-dist/pom.xml ++++ hadoop-tools/hadoop-tools-dist/pom.xml +@@ -52,6 +52,11 @@ + + + org.apache.hadoop ++ hadoop-archive-logs ++ compile ++ ++ ++ org.apache.hadoop + hadoop-rumen + compile + +diff --git hadoop-tools/pom.xml hadoop-tools/pom.xml +index 5b35f46..0061bf0 100644 +--- hadoop-tools/pom.xml ++++ hadoop-tools/pom.xml +@@ -34,6 +34,7 @@ + hadoop-streaming + hadoop-distcp + hadoop-archives ++ hadoop-archive-logs + hadoop-rumen + hadoop-gridmix + hadoop-datajoin diff --git a/hadoop-assemblies/src/main/resources/assemblies/hadoop-tools.xml b/hadoop-assemblies/src/main/resources/assemblies/hadoop-tools.xml index fa557034fb..3f646e6979 100644 --- a/hadoop-assemblies/src/main/resources/assemblies/hadoop-tools.xml +++ b/hadoop-assemblies/src/main/resources/assemblies/hadoop-tools.xml @@ -51,6 +51,13 @@ *-sources.jar + + ../hadoop-archive-logs/target + /share/hadoop/${hadoop.component}/sources + + *-sources.jar + + ../hadoop-datajoin/target /share/hadoop/${hadoop.component}/sources diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 5b5724b577..428d37e909 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -290,6 +290,9 @@ Release 2.8.0 - UNRELEASED MAPREDUCE-6304. Specifying node labels when submitting MR jobs. (Naganarasimha G R via wangda) + MAPREDUCE-6415. Create a tool to combine aggregated logs into HAR files. + (Robert Kanter via kasha) + IMPROVEMENTS MAPREDUCE-6291. Correct mapred queue usage command. diff --git a/hadoop-mapreduce-project/bin/mapred b/hadoop-mapreduce-project/bin/mapred index 426af8005a..2d56a8dc92 100755 --- a/hadoop-mapreduce-project/bin/mapred +++ b/hadoop-mapreduce-project/bin/mapred @@ -20,6 +20,7 @@ MYNAME="${BASH_SOURCE-$0}" function hadoop_usage { hadoop_add_subcommand "archive" "create a hadoop archive" + hadoop_add_subcommand "archive-logs" "combine aggregated logs into hadoop archives" hadoop_add_subcommand "classpath" "prints the class path needed for running mapreduce subcommands" hadoop_add_subcommand "distcp" "copy file or directories recursively" hadoop_add_subcommand "historyserver" "run job history servers as a standalone daemon" @@ -72,6 +73,13 @@ case ${COMMAND} in hadoop_debug "Appending HADOOP_CLIENT_OPTS onto HADOOP_OPTS" HADOOP_OPTS="${HADOOP_OPTS} ${HADOOP_CLIENT_OPTS}" ;; + archive-logs) + CLASS=org.apache.hadoop.tools.HadoopArchiveLogs + hadoop_debug "Injecting TOOL_PATH into CLASSPATH" + hadoop_add_classpath "${TOOL_PATH}" + hadoop_debug "Appending HADOOP_CLIENT_OPTS onto HADOOP_OPTS" + HADOOP_OPTS="${HADOOP_OPTS} ${HADOOP_CLIENT_OPTS}" + ;; classpath) hadoop_do_classpath_subcommand CLASS "$@" ;; diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml index 986347593f..636e063179 100644 --- a/hadoop-project/pom.xml +++ b/hadoop-project/pom.xml @@ -322,6 +322,11 @@ hadoop-archives ${project.version} + + org.apache.hadoop + hadoop-archive-logs + ${project.version} + org.apache.hadoop hadoop-distcp diff --git a/hadoop-tools/hadoop-archive-logs/pom.xml b/hadoop-tools/hadoop-archive-logs/pom.xml new file mode 100644 index 0000000000..2a480a8b64 --- /dev/null +++ b/hadoop-tools/hadoop-archive-logs/pom.xml @@ -0,0 +1,171 @@ + + + + 4.0.0 + + org.apache.hadoop + hadoop-project + 3.0.0-SNAPSHOT + ../../hadoop-project + + org.apache.hadoop + hadoop-archive-logs + 3.0.0-SNAPSHOT + Apache Hadoop Archive Logs + Apache Hadoop Archive Logs + jar + + + ${project.build.directory}/log + + + + + junit + junit + test + + + org.apache.hadoop + hadoop-mapreduce-client-core + provided + + + org.apache.hadoop + hadoop-yarn-applications-distributedshell + provided + + + org.apache.hadoop + hadoop-common + provided + + + org.apache.hadoop + hadoop-hdfs + test + test-jar + + + org.apache.hadoop + hadoop-yarn-server-tests + test-jar + test + + + org.apache.hadoop + hadoop-archives + + + org.apache.hadoop + hadoop-yarn-common + provided + + + org.apache.hadoop + hadoop-yarn-api + provided + + + com.google.guava + guava + provided + + + commons-io + commons-io + provided + + + commons-logging + commons-logging + provided + + + commons-cli + commons-cli + provided + + + org.apache.hadoop + hadoop-yarn-client + provided + + + org.apache.hadoop + hadoop-yarn-server-resourcemanager + provided + + + + org.apache.hadoop + hadoop-hdfs + test + + + + org.apache.hadoop + hadoop-common + test + test-jar + + + + org.apache.hadoop + hadoop-mapreduce-client-jobclient + test + test-jar + + + + + + + org.apache.maven.plugins + maven-antrun-plugin + + + create-log-dir + process-test-resources + + run + + + + + + + + + + + + + org.apache.maven.plugins + maven-jar-plugin + + + + org.apache.hadoop.tools.HadoopArchiveLogs + + + + + + + diff --git a/hadoop-tools/hadoop-archive-logs/src/main/java/org/apache/hadoop/tools/HadoopArchiveLogs.java b/hadoop-tools/hadoop-archive-logs/src/main/java/org/apache/hadoop/tools/HadoopArchiveLogs.java new file mode 100644 index 0000000000..4778dcbdce --- /dev/null +++ b/hadoop-tools/hadoop-archive-logs/src/main/java/org/apache/hadoop/tools/HadoopArchiveLogs.java @@ -0,0 +1,403 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.tools; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.GnuParser; +import org.apache.commons.cli.HelpFormatter; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.ParseException; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsAction; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.LogAggregationStatus; +import org.apache.hadoop.yarn.applications.distributedshell.ApplicationMaster; +import org.apache.hadoop.yarn.applications.distributedshell.Client; +import org.apache.hadoop.yarn.client.api.YarnClient; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils; + +import java.io.File; +import java.io.FileWriter; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Set; + + +/** + * This tool moves Aggregated Log files into HAR archives using the + * {@link HadoopArchives} tool and the Distributed Shell via the + * {@link HadoopArchiveLogsRunner}. + */ +public class HadoopArchiveLogs implements Tool { + private static final Log LOG = LogFactory.getLog(HadoopArchiveLogs.class); + + private static final String HELP_OPTION = "help"; + private static final String MAX_ELIGIBLE_APPS_OPTION = "maxEligibleApps"; + private static final String MIN_NUM_LOG_FILES_OPTION = "minNumberLogFiles"; + private static final String MAX_TOTAL_LOGS_SIZE_OPTION = "maxTotalLogsSize"; + private static final String MEMORY_OPTION = "memory"; + + private static final int DEFAULT_MAX_ELIGIBLE = -1; + private static final int DEFAULT_MIN_NUM_LOG_FILES = 20; + private static final long DEFAULT_MAX_TOTAL_LOGS_SIZE = 1024L; + private static final long DEFAULT_MEMORY = 1024L; + + @VisibleForTesting + int maxEligible = DEFAULT_MAX_ELIGIBLE; + @VisibleForTesting + int minNumLogFiles = DEFAULT_MIN_NUM_LOG_FILES; + @VisibleForTesting + long maxTotalLogsSize = DEFAULT_MAX_TOTAL_LOGS_SIZE * 1024L * 1024L; + @VisibleForTesting + long memory = DEFAULT_MEMORY; + + @VisibleForTesting + Set eligibleApplications; + + private JobConf conf; + + public HadoopArchiveLogs(Configuration conf) { + setConf(conf); + eligibleApplications = new HashSet<>(); + } + + public static void main(String[] args) { + JobConf job = new JobConf(HadoopArchiveLogs.class); + + HadoopArchiveLogs hal = new HadoopArchiveLogs(job); + int ret = 0; + + try{ + ret = ToolRunner.run(hal, args); + } catch(Exception e) { + LOG.debug("Exception", e); + System.err.println(e.getClass().getSimpleName()); + final String s = e.getLocalizedMessage(); + if (s != null) { + System.err.println(s); + } else { + e.printStackTrace(System.err); + } + System.exit(1); + } + System.exit(ret); + } + + @Override + public int run(String[] args) throws Exception { + handleOpts(args); + + findAggregatedApps(); + + FileSystem fs = null; + Path remoteRootLogDir = new Path(conf.get( + YarnConfiguration.NM_REMOTE_APP_LOG_DIR, + YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR)); + String suffix = LogAggregationUtils.getRemoteNodeLogDirSuffix(conf); + Path workingDir = new Path(remoteRootLogDir, "archive-logs-work"); + try { + fs = FileSystem.get(conf); + checkFiles(fs, remoteRootLogDir, suffix); + + // Prepare working directory + if (fs.exists(workingDir)) { + fs.delete(workingDir, true); + } + fs.mkdirs(workingDir); + fs.setPermission(workingDir, + new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE)); + } finally { + if (fs != null) { + fs.close(); + } + } + + checkMaxEligible(); + + if (eligibleApplications.isEmpty()) { + LOG.info("No eligible applications to process"); + System.exit(0); + } + + StringBuilder sb = + new StringBuilder("Will process the following applications:"); + for (ApplicationReport report : eligibleApplications) { + sb.append("\n\t").append(report.getApplicationId()); + } + LOG.info(sb.toString()); + + File localScript = File.createTempFile("hadoop-archive-logs-", ".sh"); + generateScript(localScript, workingDir, remoteRootLogDir, suffix); + + if (runDistributedShell(localScript)) { + return 0; + } + return -1; + } + + private void handleOpts(String[] args) throws ParseException { + Options opts = new Options(); + Option helpOpt = new Option(HELP_OPTION, false, "Prints this message"); + Option maxEligibleOpt = new Option(MAX_ELIGIBLE_APPS_OPTION, true, + "The maximum number of eligible apps to process (default: " + + DEFAULT_MAX_ELIGIBLE + " (all))"); + maxEligibleOpt.setArgName("n"); + Option minNumLogFilesOpt = new Option(MIN_NUM_LOG_FILES_OPTION, true, + "The minimum number of log files required to be eligible (default: " + + DEFAULT_MIN_NUM_LOG_FILES + ")"); + minNumLogFilesOpt.setArgName("n"); + Option maxTotalLogsSizeOpt = new Option(MAX_TOTAL_LOGS_SIZE_OPTION, true, + "The maximum total logs size (in megabytes) required to be eligible" + + " (default: " + DEFAULT_MAX_TOTAL_LOGS_SIZE + ")"); + maxTotalLogsSizeOpt.setArgName("megabytes"); + Option memoryOpt = new Option(MEMORY_OPTION, true, + "The amount of memory (in megabytes) for each container (default: " + + DEFAULT_MEMORY + ")"); + memoryOpt.setArgName("megabytes"); + opts.addOption(helpOpt); + opts.addOption(maxEligibleOpt); + opts.addOption(minNumLogFilesOpt); + opts.addOption(maxTotalLogsSizeOpt); + opts.addOption(memoryOpt); + + try { + CommandLineParser parser = new GnuParser(); + CommandLine commandLine = parser.parse(opts, args); + if (commandLine.hasOption(HELP_OPTION)) { + HelpFormatter formatter = new HelpFormatter(); + formatter.printHelp("yarn archive-logs", opts); + System.exit(0); + } + if (commandLine.hasOption(MAX_ELIGIBLE_APPS_OPTION)) { + maxEligible = Integer.parseInt( + commandLine.getOptionValue(MAX_ELIGIBLE_APPS_OPTION)); + if (maxEligible == 0) { + LOG.info("Setting " + MAX_ELIGIBLE_APPS_OPTION + " to 0 accomplishes " + + "nothing. Please either set it to a negative value " + + "(default, all) or a more reasonable value."); + System.exit(0); + } + } + if (commandLine.hasOption(MIN_NUM_LOG_FILES_OPTION)) { + minNumLogFiles = Integer.parseInt( + commandLine.getOptionValue(MIN_NUM_LOG_FILES_OPTION)); + } + if (commandLine.hasOption(MAX_TOTAL_LOGS_SIZE_OPTION)) { + maxTotalLogsSize = Long.parseLong( + commandLine.getOptionValue(MAX_TOTAL_LOGS_SIZE_OPTION)); + maxTotalLogsSize *= 1024L * 1024L; + } + if (commandLine.hasOption(MEMORY_OPTION)) { + memory = Long.parseLong(commandLine.getOptionValue(MEMORY_OPTION)); + } + } catch (ParseException pe) { + HelpFormatter formatter = new HelpFormatter(); + formatter.printHelp("yarn archive-logs", opts); + throw pe; + } + } + + @VisibleForTesting + void findAggregatedApps() throws IOException, YarnException { + YarnClient client = YarnClient.createYarnClient(); + try { + client.init(getConf()); + client.start(); + List reports = client.getApplications(); + for (ApplicationReport report : reports) { + LogAggregationStatus aggStatus = report.getLogAggregationStatus(); + if (aggStatus.equals(LogAggregationStatus.SUCCEEDED) || + aggStatus.equals(LogAggregationStatus.FAILED)) { + eligibleApplications.add(report); + } + } + } finally { + if (client != null) { + client.stop(); + } + } + } + + @VisibleForTesting + void checkFiles(FileSystem fs, Path remoteRootLogDir, String suffix) { + for (Iterator reportIt = eligibleApplications.iterator(); + reportIt.hasNext(); ) { + ApplicationReport report = reportIt.next(); + long totalFileSize = 0L; + try { + FileStatus[] files = fs.listStatus( + LogAggregationUtils.getRemoteAppLogDir(remoteRootLogDir, + report.getApplicationId(), report.getUser(), suffix)); + if (files.length < minNumLogFiles) { + reportIt.remove(); + } else { + for (FileStatus file : files) { + if (file.getPath().getName().equals(report.getApplicationId() + + ".har")) { + reportIt.remove(); + break; + } + totalFileSize += file.getLen(); + } + if (totalFileSize > maxTotalLogsSize) { + reportIt.remove(); + } + } + } catch (IOException ioe) { + // If the user doesn't have permission or it doesn't exist, then skip it + reportIt.remove(); + } + } + } + + @VisibleForTesting + void checkMaxEligible() { + // If we have too many eligible apps, remove the newest ones first + if (maxEligible > 0 && eligibleApplications.size() > maxEligible) { + List sortedApplications = + new ArrayList(eligibleApplications); + Collections.sort(sortedApplications, new Comparator() { + @Override + public int compare(ApplicationReport o1, ApplicationReport o2) { + return Long.compare(o1.getFinishTime(), o2.getFinishTime()); + } + }); + for (int i = maxEligible; i < sortedApplications.size(); i++) { + eligibleApplications.remove(sortedApplications.get(i)); + } + } + } + + /* + The generated script looks like this: + #!/bin/bash + set -e + set -x + if [ "$YARN_SHELL_ID" == "1" ]; then + appId="application_1440448768987_0001" + user="rkanter" + elif [ "$YARN_SHELL_ID" == "2" ]; then + appId="application_1440448768987_0002" + user="rkanter" + else + echo "Unknown Mapping!" + exit 1 + fi + export HADOOP_CLIENT_OPTS="-Xmx1024m" + export HADOOP_CLASSPATH=/dist/share/hadoop/tools/lib/hadoop-archive-logs-2.8.0-SNAPSHOT.jar:/dist/share/hadoop/tools/lib/hadoop-archives-2.8.0-SNAPSHOT.jar + "$HADOOP_HOME"/bin/hadoop org.apache.hadoop.tools.HadoopArchiveLogsRunner -appId "$appId" -user "$user" -workingDir /tmp/logs/archive-logs-work -remoteRootLogDir /tmp/logs -suffix logs + */ + @VisibleForTesting + void generateScript(File localScript, Path workingDir, + Path remoteRootLogDir, String suffix) throws IOException { + LOG.info("Generating script at: " + localScript.getAbsolutePath()); + String halrJarPath = HadoopArchiveLogsRunner.class.getProtectionDomain() + .getCodeSource().getLocation().getPath(); + String harJarPath = HadoopArchives.class.getProtectionDomain() + .getCodeSource().getLocation().getPath(); + String classpath = halrJarPath + File.pathSeparator + harJarPath; + FileWriter fw = null; + try { + fw = new FileWriter(localScript); + fw.write("#!/bin/bash\nset -e\nset -x\n"); + int containerCount = 1; + for (ApplicationReport report : eligibleApplications) { + fw.write("if [ \"$YARN_SHELL_ID\" == \""); + fw.write(Integer.toString(containerCount)); + fw.write("\" ]; then\n\tappId=\""); + fw.write(report.getApplicationId().toString()); + fw.write("\"\n\tuser=\""); + fw.write(report.getUser()); + fw.write("\"\nel"); + containerCount++; + } + fw.write("se\n\techo \"Unknown Mapping!\"\n\texit 1\nfi\n"); + fw.write("export HADOOP_CLIENT_OPTS=\"-Xmx"); + fw.write(Long.toString(memory)); + fw.write("m\"\n"); + fw.write("export HADOOP_CLASSPATH="); + fw.write(classpath); + fw.write("\n\"$HADOOP_HOME\"/bin/hadoop "); + fw.write(HadoopArchiveLogsRunner.class.getName()); + fw.write(" -appId \"$appId\" -user \"$user\" -workingDir "); + fw.write(workingDir.toString()); + fw.write(" -remoteRootLogDir "); + fw.write(remoteRootLogDir.toString()); + fw.write(" -suffix "); + fw.write(suffix); + fw.write("\n"); + } finally { + if (fw != null) { + fw.close(); + } + } + } + + private boolean runDistributedShell(File localScript) throws Exception { + String[] dsArgs = { + "--appname", + "ArchiveLogs", + "--jar", + ApplicationMaster.class.getProtectionDomain().getCodeSource() + .getLocation().getPath(), + "--num_containers", + Integer.toString(eligibleApplications.size()), + "--container_memory", + Long.toString(memory), + "--shell_script", + localScript.getAbsolutePath() + }; + final Client dsClient = new Client(new Configuration(conf)); + dsClient.init(dsArgs); + return dsClient.run(); + } + + @Override + public void setConf(Configuration conf) { + if (conf instanceof JobConf) { + this.conf = (JobConf) conf; + } else { + this.conf = new JobConf(conf, HadoopArchiveLogs.class); + } + } + + @Override + public Configuration getConf() { + return this.conf; + } +} diff --git a/hadoop-tools/hadoop-archive-logs/src/main/java/org/apache/hadoop/tools/HadoopArchiveLogsRunner.java b/hadoop-tools/hadoop-archive-logs/src/main/java/org/apache/hadoop/tools/HadoopArchiveLogsRunner.java new file mode 100644 index 0000000000..347e5fb48a --- /dev/null +++ b/hadoop-tools/hadoop-archive-logs/src/main/java/org/apache/hadoop/tools/HadoopArchiveLogsRunner.java @@ -0,0 +1,180 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.tools; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.GnuParser; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.ParseException; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; + +import java.io.File; + +/** + * This is a child program designed to be used by the {@link HadoopArchiveLogs} + * tool via the Distributed Shell. It's not meant to be run directly. + */ +public class HadoopArchiveLogsRunner implements Tool { + private static final Log LOG = LogFactory.getLog(HadoopArchiveLogsRunner.class); + + private static final String APP_ID_OPTION = "appId"; + private static final String USER_OPTION = "user"; + private static final String WORKING_DIR_OPTION = "workingDir"; + private static final String REMOTE_ROOT_LOG_DIR = "remoteRootLogDir"; + private static final String SUFFIX_OPTION = "suffix"; + + private String appId; + private String user; + private String workingDir; + private String remoteLogDir; + private String suffix; + + private JobConf conf; + + public HadoopArchiveLogsRunner(Configuration conf) { + setConf(conf); + } + + public static void main(String[] args) { + JobConf job = new JobConf(HadoopArchiveLogsRunner.class); + + HadoopArchiveLogsRunner halr = new HadoopArchiveLogsRunner(job); + int ret = 0; + + try{ + ret = ToolRunner.run(halr, args); + } catch(Exception e) { + LOG.debug("Exception", e); + System.err.println(e.getClass().getSimpleName()); + final String s = e.getLocalizedMessage(); + if (s != null) { + System.err.println(s); + } else { + e.printStackTrace(System.err); + } + System.exit(1); + } + System.exit(ret); + } + + @Override + public int run(String[] args) throws Exception { + handleOpts(args); + String remoteAppLogDir = remoteLogDir + File.separator + user + + File.separator + suffix + File.separator + appId; + + // Run 'hadoop archives' command in local mode + Configuration haConf = new Configuration(getConf()); + haConf.set("mapreduce.framework.name", "local"); + HadoopArchives ha = new HadoopArchives(haConf); + String[] haArgs = { + "-archiveName", + appId + ".har", + "-p", + remoteAppLogDir, + "*", + workingDir + }; + StringBuilder sb = new StringBuilder("Executing 'hadoop archives'"); + for (String haArg : haArgs) { + sb.append("\n\t").append(haArg); + } + LOG.info(sb.toString()); + ha.run(haArgs); + + FileSystem fs = null; + // Move har file to correct location and delete original logs + try { + fs = FileSystem.get(conf); + LOG.info("Moving har to original location"); + fs.rename(new Path(workingDir, appId + ".har"), + new Path(remoteAppLogDir, appId + ".har")); + LOG.info("Deleting original logs"); + for (FileStatus original : fs.listStatus(new Path(remoteAppLogDir), + new PathFilter() { + @Override + public boolean accept(Path path) { + return !path.getName().endsWith(".har"); + } + })) { + fs.delete(original.getPath(), false); + } + } finally { + if (fs != null) { + fs.close(); + } + } + + return 0; + } + + private void handleOpts(String[] args) throws ParseException { + Options opts = new Options(); + Option appIdOpt = new Option(APP_ID_OPTION, true, "Application ID"); + appIdOpt.setRequired(true); + Option userOpt = new Option(USER_OPTION, true, "User"); + userOpt.setRequired(true); + Option workingDirOpt = new Option(WORKING_DIR_OPTION, true, + "Working Directory"); + workingDirOpt.setRequired(true); + Option remoteLogDirOpt = new Option(REMOTE_ROOT_LOG_DIR, true, + "Remote Root Log Directory"); + remoteLogDirOpt.setRequired(true); + Option suffixOpt = new Option(SUFFIX_OPTION, true, "Suffix"); + suffixOpt.setRequired(true); + opts.addOption(appIdOpt); + opts.addOption(userOpt); + opts.addOption(workingDirOpt); + opts.addOption(remoteLogDirOpt); + opts.addOption(suffixOpt); + + CommandLineParser parser = new GnuParser(); + CommandLine commandLine = parser.parse(opts, args); + appId = commandLine.getOptionValue(APP_ID_OPTION); + user = commandLine.getOptionValue(USER_OPTION); + workingDir = commandLine.getOptionValue(WORKING_DIR_OPTION); + remoteLogDir = commandLine.getOptionValue(REMOTE_ROOT_LOG_DIR); + suffix = commandLine.getOptionValue(SUFFIX_OPTION); + } + + @Override + public void setConf(Configuration conf) { + if (conf instanceof JobConf) { + this.conf = (JobConf) conf; + } else { + this.conf = new JobConf(conf, HadoopArchiveLogsRunner.class); + } + } + + @Override + public Configuration getConf() { + return this.conf; + } +} diff --git a/hadoop-tools/hadoop-archive-logs/src/test/java/org/apache/hadoop/tools/TestHadoopArchiveLogs.java b/hadoop-tools/hadoop-archive-logs/src/test/java/org/apache/hadoop/tools/TestHadoopArchiveLogs.java new file mode 100644 index 0000000000..c8ff201946 --- /dev/null +++ b/hadoop-tools/hadoop-archive-logs/src/test/java/org/apache/hadoop/tools/TestHadoopArchiveLogs.java @@ -0,0 +1,293 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.tools; + +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.LogAggregationStatus; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.MiniYARNCluster; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; +import org.junit.Assert; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.util.Random; + +public class TestHadoopArchiveLogs { + + private static final long CLUSTER_TIMESTAMP = System.currentTimeMillis(); + private static final int FILE_SIZE_INCREMENT = 4096; + private static final byte[] DUMMY_DATA = new byte[FILE_SIZE_INCREMENT]; + static { + new Random().nextBytes(DUMMY_DATA); + } + + @Test(timeout = 10000) + public void testCheckFiles() throws Exception { + Configuration conf = new Configuration(); + HadoopArchiveLogs hal = new HadoopArchiveLogs(conf); + FileSystem fs = FileSystem.getLocal(conf); + Path rootLogDir = new Path("target", "logs"); + String suffix = "logs"; + Path logDir = new Path(rootLogDir, + new Path(System.getProperty("user.name"), suffix)); + fs.mkdirs(logDir); + + Assert.assertEquals(0, hal.eligibleApplications.size()); + ApplicationReport app1 = createAppReport(1); // no files found + ApplicationReport app2 = createAppReport(2); // too few files + Path app2Path = new Path(logDir, app2.getApplicationId().toString()); + fs.mkdirs(app2Path); + createFile(fs, new Path(app2Path, "file1"), 1); + hal.minNumLogFiles = 2; + ApplicationReport app3 = createAppReport(3); // too large + Path app3Path = new Path(logDir, app3.getApplicationId().toString()); + fs.mkdirs(app3Path); + createFile(fs, new Path(app3Path, "file1"), 2); + createFile(fs, new Path(app3Path, "file2"), 5); + hal.maxTotalLogsSize = FILE_SIZE_INCREMENT * 6; + ApplicationReport app4 = createAppReport(4); // has har already + Path app4Path = new Path(logDir, app4.getApplicationId().toString()); + fs.mkdirs(app4Path); + createFile(fs, new Path(app4Path, app4.getApplicationId() + ".har"), 1); + ApplicationReport app5 = createAppReport(5); // just right + Path app5Path = new Path(logDir, app5.getApplicationId().toString()); + fs.mkdirs(app5Path); + createFile(fs, new Path(app5Path, "file1"), 2); + createFile(fs, new Path(app5Path, "file2"), 3); + hal.eligibleApplications.add(app1); + hal.eligibleApplications.add(app2); + hal.eligibleApplications.add(app3); + hal.eligibleApplications.add(app4); + hal.eligibleApplications.add(app5); + + hal.checkFiles(fs, rootLogDir, suffix); + Assert.assertEquals(1, hal.eligibleApplications.size()); + Assert.assertEquals(app5, hal.eligibleApplications.iterator().next()); + } + + @Test(timeout = 10000) + public void testCheckMaxEligible() throws Exception { + Configuration conf = new Configuration(); + HadoopArchiveLogs hal = new HadoopArchiveLogs(conf); + ApplicationReport app1 = createAppReport(1); + app1.setFinishTime(CLUSTER_TIMESTAMP - 5); + ApplicationReport app2 = createAppReport(2); + app2.setFinishTime(CLUSTER_TIMESTAMP - 10); + ApplicationReport app3 = createAppReport(3); + app3.setFinishTime(CLUSTER_TIMESTAMP + 5); + ApplicationReport app4 = createAppReport(4); + app4.setFinishTime(CLUSTER_TIMESTAMP + 10); + ApplicationReport app5 = createAppReport(5); + app5.setFinishTime(CLUSTER_TIMESTAMP); + Assert.assertEquals(0, hal.eligibleApplications.size()); + hal.eligibleApplications.add(app1); + hal.eligibleApplications.add(app2); + hal.eligibleApplications.add(app3); + hal.eligibleApplications.add(app4); + hal.eligibleApplications.add(app5); + hal.maxEligible = -1; + hal.checkMaxEligible(); + Assert.assertEquals(5, hal.eligibleApplications.size()); + + hal.maxEligible = 4; + hal.checkMaxEligible(); + Assert.assertEquals(4, hal.eligibleApplications.size()); + Assert.assertFalse(hal.eligibleApplications.contains(app4)); + + hal.maxEligible = 3; + hal.checkMaxEligible(); + Assert.assertEquals(3, hal.eligibleApplications.size()); + Assert.assertFalse(hal.eligibleApplications.contains(app3)); + + hal.maxEligible = 2; + hal.checkMaxEligible(); + Assert.assertEquals(2, hal.eligibleApplications.size()); + Assert.assertFalse(hal.eligibleApplications.contains(app5)); + + hal.maxEligible = 1; + hal.checkMaxEligible(); + Assert.assertEquals(1, hal.eligibleApplications.size()); + Assert.assertFalse(hal.eligibleApplications.contains(app1)); + } + + @Test(timeout = 10000) + public void testFindAggregatedApps() throws Exception { + MiniYARNCluster yarnCluster = null; + try { + Configuration conf = new Configuration(); + conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true); + yarnCluster = + new MiniYARNCluster(TestHadoopArchiveLogs.class.getSimpleName(), 1, + 1, 1, 1); + yarnCluster.init(conf); + yarnCluster.start(); + conf = yarnCluster.getConfig(); + + RMContext rmContext = yarnCluster.getResourceManager().getRMContext(); + RMAppImpl app1 = (RMAppImpl)createRMApp(1, conf, rmContext, + LogAggregationStatus.DISABLED); + RMAppImpl app2 = (RMAppImpl)createRMApp(2, conf, rmContext, + LogAggregationStatus.FAILED); + RMAppImpl app3 = (RMAppImpl)createRMApp(3, conf, rmContext, + LogAggregationStatus.NOT_START); + RMAppImpl app4 = (RMAppImpl)createRMApp(4, conf, rmContext, + LogAggregationStatus.SUCCEEDED); + RMAppImpl app5 = (RMAppImpl)createRMApp(5, conf, rmContext, + LogAggregationStatus.RUNNING); + RMAppImpl app6 = (RMAppImpl)createRMApp(6, conf, rmContext, + LogAggregationStatus.RUNNING_WITH_FAILURE); + RMAppImpl app7 = (RMAppImpl)createRMApp(7, conf, rmContext, + LogAggregationStatus.TIME_OUT); + rmContext.getRMApps().put(app1.getApplicationId(), app1); + rmContext.getRMApps().put(app2.getApplicationId(), app2); + rmContext.getRMApps().put(app3.getApplicationId(), app3); + rmContext.getRMApps().put(app4.getApplicationId(), app4); + rmContext.getRMApps().put(app5.getApplicationId(), app5); + rmContext.getRMApps().put(app6.getApplicationId(), app6); + rmContext.getRMApps().put(app7.getApplicationId(), app7); + + HadoopArchiveLogs hal = new HadoopArchiveLogs(conf); + Assert.assertEquals(0, hal.eligibleApplications.size()); + hal.findAggregatedApps(); + Assert.assertEquals(2, hal.eligibleApplications.size()); + } finally { + if (yarnCluster != null) { + yarnCluster.stop(); + } + } + } + + @Test(timeout = 10000) + public void testGenerateScript() throws Exception { + Configuration conf = new Configuration(); + HadoopArchiveLogs hal = new HadoopArchiveLogs(conf); + ApplicationReport app1 = createAppReport(1); + ApplicationReport app2 = createAppReport(2); + hal.eligibleApplications.add(app1); + hal.eligibleApplications.add(app2); + + File localScript = new File("target", "script.sh"); + Path workingDir = new Path("/tmp", "working"); + Path remoteRootLogDir = new Path("/tmp", "logs"); + String suffix = "logs"; + localScript.delete(); + Assert.assertFalse(localScript.exists()); + hal.generateScript(localScript, workingDir, remoteRootLogDir, suffix); + Assert.assertTrue(localScript.exists()); + String script = IOUtils.toString(localScript.toURI()); + String[] lines = script.split(System.lineSeparator()); + Assert.assertEquals(16, lines.length); + Assert.assertEquals("#!/bin/bash", lines[0]); + Assert.assertEquals("set -e", lines[1]); + Assert.assertEquals("set -x", lines[2]); + Assert.assertEquals("if [ \"$YARN_SHELL_ID\" == \"1\" ]; then", lines[3]); + if (lines[4].contains(app1.getApplicationId().toString())) { + Assert.assertEquals("\tappId=\"" + app1.getApplicationId().toString() + + "\"", lines[4]); + Assert.assertEquals("\tappId=\"" + app2.getApplicationId().toString() + + "\"", lines[7]); + } else { + Assert.assertEquals("\tappId=\"" + app2.getApplicationId().toString() + + "\"", lines[4]); + Assert.assertEquals("\tappId=\"" + app1.getApplicationId().toString() + + "\"", lines[7]); + } + Assert.assertEquals("\tuser=\"" + System.getProperty("user.name") + "\"", + lines[5]); + Assert.assertEquals("elif [ \"$YARN_SHELL_ID\" == \"2\" ]; then", lines[6]); + Assert.assertEquals("\tuser=\"" + System.getProperty("user.name") + "\"", + lines[8]); + Assert.assertEquals("else", lines[9]); + Assert.assertEquals("\techo \"Unknown Mapping!\"", lines[10]); + Assert.assertEquals("\texit 1", lines[11]); + Assert.assertEquals("fi", lines[12]); + Assert.assertEquals("export HADOOP_CLIENT_OPTS=\"-Xmx1024m\"", lines[13]); + Assert.assertTrue(lines[14].startsWith("export HADOOP_CLASSPATH=")); + Assert.assertEquals("\"$HADOOP_HOME\"/bin/hadoop org.apache.hadoop.tools." + + "HadoopArchiveLogsRunner -appId \"$appId\" -user \"$user\" -workingDir " + + workingDir.toString() + " -remoteRootLogDir " + + remoteRootLogDir.toString() + " -suffix " + suffix, lines[15]); + } + + private static ApplicationReport createAppReport(int id) { + ApplicationId appId = ApplicationId.newInstance(CLUSTER_TIMESTAMP, id); + return ApplicationReport.newInstance( + appId, + ApplicationAttemptId.newInstance(appId, 1), + System.getProperty("user.name"), + null, null, null, 0, null, YarnApplicationState.FINISHED, null, + null, 0L, 0L, FinalApplicationStatus.SUCCEEDED, null, null, 100f, + null, null); + } + + private static void createFile(FileSystem fs, Path p, long sizeMultiple) + throws IOException { + FSDataOutputStream out = null; + try { + out = fs.create(p); + for (int i = 0 ; i < sizeMultiple; i++) { + out.write(DUMMY_DATA); + } + } finally { + if (out != null) { + out.close(); + } + } + } + + private static RMApp createRMApp(int id, Configuration conf, RMContext rmContext, + final LogAggregationStatus aggStatus) { + ApplicationId appId = ApplicationId.newInstance(CLUSTER_TIMESTAMP, id); + ApplicationSubmissionContext submissionContext = + ApplicationSubmissionContext.newInstance(appId, "test", "default", + Priority.newInstance(0), null, false, true, + 2, Resource.newInstance(10, 2), "test"); + return new RMAppImpl(appId, rmContext, conf, "test", + System.getProperty("user.name"), "default", submissionContext, + rmContext.getScheduler(), + rmContext.getApplicationMasterService(), + System.currentTimeMillis(), "test", + null, null) { + @Override + public ApplicationReport createAndGetApplicationReport( + String clientUserName, boolean allowAccess) { + ApplicationReport report = + super.createAndGetApplicationReport(clientUserName, allowAccess); + report.setLogAggregationStatus(aggStatus); + return report; + } + }; + } +} diff --git a/hadoop-tools/hadoop-archive-logs/src/test/java/org/apache/hadoop/tools/TestHadoopArchiveLogsRunner.java b/hadoop-tools/hadoop-archive-logs/src/test/java/org/apache/hadoop/tools/TestHadoopArchiveLogsRunner.java new file mode 100644 index 0000000000..af66f14bc8 --- /dev/null +++ b/hadoop-tools/hadoop-archive-logs/src/test/java/org/apache/hadoop/tools/TestHadoopArchiveLogsRunner.java @@ -0,0 +1,143 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.tools; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.HarFs; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.util.ToolRunner; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.MiniYARNCluster; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Comparator; +import java.util.Random; + +import static org.junit.Assert.assertEquals; + +public class TestHadoopArchiveLogsRunner { + + private static final int FILE_SIZE_INCREMENT = 4096; + private static final byte[] DUMMY_DATA = new byte[FILE_SIZE_INCREMENT]; + static { + new Random().nextBytes(DUMMY_DATA); + } + + @Test(timeout = 30000) + public void testHadoopArchiveLogs() throws Exception { + MiniYARNCluster yarnCluster = null; + MiniDFSCluster dfsCluster = null; + FileSystem fs = null; + try { + Configuration conf = new YarnConfiguration(); + conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true); + conf.setBoolean(YarnConfiguration.YARN_MINICLUSTER_FIXED_PORTS, true); + yarnCluster = + new MiniYARNCluster(TestHadoopArchiveLogsRunner.class.getSimpleName(), + 1, 2, 1, 1); + yarnCluster.init(conf); + yarnCluster.start(); + conf = yarnCluster.getConfig(); + dfsCluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); + + ApplicationId app1 = + ApplicationId.newInstance(System.currentTimeMillis(), 1); + fs = FileSystem.get(conf); + Path remoteRootLogDir = new Path(conf.get( + YarnConfiguration.NM_REMOTE_APP_LOG_DIR, + YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR)); + Path workingDir = new Path(remoteRootLogDir, "archive-logs-work"); + String suffix = "logs"; + Path logDir = new Path(remoteRootLogDir, + new Path(System.getProperty("user.name"), suffix)); + fs.mkdirs(logDir); + Path app1Path = new Path(logDir, app1.toString()); + fs.mkdirs(app1Path); + createFile(fs, new Path(app1Path, "log1"), 3); + createFile(fs, new Path(app1Path, "log2"), 4); + createFile(fs, new Path(app1Path, "log3"), 2); + FileStatus[] app1Files = fs.listStatus(app1Path); + Assert.assertEquals(3, app1Files.length); + + String[] args = new String[]{ + "-appId", app1.toString(), + "-user", System.getProperty("user.name"), + "-workingDir", workingDir.toString(), + "-remoteRootLogDir", remoteRootLogDir.toString(), + "-suffix", suffix}; + final HadoopArchiveLogsRunner halr = new HadoopArchiveLogsRunner(conf); + assertEquals(0, ToolRunner.run(halr, args)); + + fs = FileSystem.get(conf); + app1Files = fs.listStatus(app1Path); + Assert.assertEquals(1, app1Files.length); + FileStatus harFile = app1Files[0]; + Assert.assertEquals(app1.toString() + ".har", harFile.getPath().getName()); + Path harPath = new Path("har:///" + harFile.getPath().toUri().getRawPath()); + FileStatus[] harLogs = HarFs.get(harPath.toUri(), conf).listStatus(harPath); + Assert.assertEquals(3, harLogs.length); + Arrays.sort(harLogs, new Comparator() { + @Override + public int compare(FileStatus o1, FileStatus o2) { + return o1.getPath().getName().compareTo(o2.getPath().getName()); + } + }); + Assert.assertEquals("log1", harLogs[0].getPath().getName()); + Assert.assertEquals(3 * FILE_SIZE_INCREMENT, harLogs[0].getLen()); + Assert.assertEquals("log2", harLogs[1].getPath().getName()); + Assert.assertEquals(4 * FILE_SIZE_INCREMENT, harLogs[1].getLen()); + Assert.assertEquals("log3", harLogs[2].getPath().getName()); + Assert.assertEquals(2 * FILE_SIZE_INCREMENT, harLogs[2].getLen()); + Assert.assertEquals(0, fs.listStatus(workingDir).length); + } finally { + if (yarnCluster != null) { + yarnCluster.stop(); + } + if (fs != null) { + fs.close(); + } + if (dfsCluster != null) { + dfsCluster.shutdown(); + } + } + } + + private static void createFile(FileSystem fs, Path p, long sizeMultiple) + throws IOException { + FSDataOutputStream out = null; + try { + out = fs.create(p); + for (int i = 0 ; i < sizeMultiple; i++) { + out.write(DUMMY_DATA); + } + } finally { + if (out != null) { + out.close(); + } + } + } +} diff --git a/hadoop-tools/hadoop-tools-dist/pom.xml b/hadoop-tools/hadoop-tools-dist/pom.xml index 540401ddfd..e6c458fb3c 100644 --- a/hadoop-tools/hadoop-tools-dist/pom.xml +++ b/hadoop-tools/hadoop-tools-dist/pom.xml @@ -50,6 +50,11 @@ hadoop-archives compile + + org.apache.hadoop + hadoop-archive-logs + compile + org.apache.hadoop hadoop-rumen diff --git a/hadoop-tools/pom.xml b/hadoop-tools/pom.xml index 5b35f46efe..0061bf079f 100644 --- a/hadoop-tools/pom.xml +++ b/hadoop-tools/pom.xml @@ -34,6 +34,7 @@ hadoop-streaming hadoop-distcp hadoop-archives + hadoop-archive-logs hadoop-rumen hadoop-gridmix hadoop-datajoin