From d3c49e76624b7e42a1321c649a1d7bb9906b3073 Mon Sep 17 00:00:00 2001 From: Robert Kanter Date: Fri, 25 Sep 2015 14:59:49 -0700 Subject: [PATCH] MAPREDUCE-6480. archive-logs tool may miss applications (rkanter) --- hadoop-mapreduce-project/CHANGES.txt | 2 + .../dev-support/findbugs-exclude.xml | 32 +++ hadoop-tools/hadoop-archive-logs/pom.xml | 18 ++ .../hadoop/tools/HadoopArchiveLogs.java | 243 ++++++++++++++---- .../hadoop/tools/TestHadoopArchiveLogs.java | 231 ++++++++++------- 5 files changed, 392 insertions(+), 134 deletions(-) create mode 100644 hadoop-tools/hadoop-archive-logs/dev-support/findbugs-exclude.xml diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 4bcd5d89a5..b7e901609d 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -580,6 +580,8 @@ Release 2.8.0 - UNRELEASED MAPREDUCE-6484. Yarn Client uses local address instead of RM address as token renewer in a secure cluster when RM HA is enabled. (Zhihai Xu) + MAPREDUCE-6480. archive-logs tool may miss applications (rkanter) + Release 2.7.2 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-tools/hadoop-archive-logs/dev-support/findbugs-exclude.xml b/hadoop-tools/hadoop-archive-logs/dev-support/findbugs-exclude.xml new file mode 100644 index 0000000000..7f2064ea24 --- /dev/null +++ b/hadoop-tools/hadoop-archive-logs/dev-support/findbugs-exclude.xml @@ -0,0 +1,32 @@ + + + + + + + + + + + + + + + diff --git a/hadoop-tools/hadoop-archive-logs/pom.xml b/hadoop-tools/hadoop-archive-logs/pom.xml index 2a480a8b64..7e7da77efa 100644 --- a/hadoop-tools/hadoop-archive-logs/pom.xml +++ b/hadoop-tools/hadoop-archive-logs/pom.xml @@ -118,6 +118,12 @@ test + + org.mockito + mockito-all + test + + org.apache.hadoop hadoop-common @@ -166,6 +172,18 @@ + + org.codehaus.mojo + findbugs-maven-plugin + + true + true + + ${basedir}/dev-support/findbugs-exclude.xml + + Max + + diff --git a/hadoop-tools/hadoop-archive-logs/src/main/java/org/apache/hadoop/tools/HadoopArchiveLogs.java b/hadoop-tools/hadoop-archive-logs/src/main/java/org/apache/hadoop/tools/HadoopArchiveLogs.java index 4778dcbdce..0879d41f7f 100644 --- a/hadoop-tools/hadoop-archive-logs/src/main/java/org/apache/hadoop/tools/HadoopArchiveLogs.java +++ b/hadoop-tools/hadoop-archive-logs/src/main/java/org/apache/hadoop/tools/HadoopArchiveLogs.java @@ -26,12 +26,14 @@ import org.apache.commons.cli.Option; import org.apache.commons.cli.Options; import org.apache.commons.cli.ParseException; +import org.apache.commons.io.output.FileWriterWithEncoding; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.mapred.JobConf; @@ -43,13 +45,15 @@ import org.apache.hadoop.yarn.applications.distributedshell.Client; import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils; +import org.apache.hadoop.yarn.util.ConverterUtils; import java.io.File; -import java.io.FileWriter; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.Comparator; import java.util.HashSet; @@ -71,6 +75,7 @@ public class HadoopArchiveLogs implements Tool { private static final String MIN_NUM_LOG_FILES_OPTION = "minNumberLogFiles"; private static final String MAX_TOTAL_LOGS_SIZE_OPTION = "maxTotalLogsSize"; private static final String MEMORY_OPTION = "memory"; + private static final String VERBOSE_OPTION = "verbose"; private static final int DEFAULT_MAX_ELIGIBLE = -1; private static final int DEFAULT_MIN_NUM_LOG_FILES = 20; @@ -85,9 +90,10 @@ public class HadoopArchiveLogs implements Tool { long maxTotalLogsSize = DEFAULT_MAX_TOTAL_LOGS_SIZE * 1024L * 1024L; @VisibleForTesting long memory = DEFAULT_MEMORY; + private boolean verbose = false; @VisibleForTesting - Set eligibleApplications; + Set eligibleApplications; private JobConf conf; @@ -122,17 +128,20 @@ public static void main(String[] args) { public int run(String[] args) throws Exception { handleOpts(args); - findAggregatedApps(); - FileSystem fs = null; Path remoteRootLogDir = new Path(conf.get( YarnConfiguration.NM_REMOTE_APP_LOG_DIR, YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR)); String suffix = LogAggregationUtils.getRemoteNodeLogDirSuffix(conf); Path workingDir = new Path(remoteRootLogDir, "archive-logs-work"); + if (verbose) { + LOG.info("Remote Log Dir Root: " + remoteRootLogDir); + LOG.info("Log Suffix: " + suffix); + LOG.info("Working Dir: " + workingDir); + } try { fs = FileSystem.get(conf); - checkFiles(fs, remoteRootLogDir, suffix); + checkFilesAndSeedApps(fs, remoteRootLogDir, suffix); // Prepare working directory if (fs.exists(workingDir)) { @@ -147,6 +156,8 @@ public int run(String[] args) throws Exception { } } + filterAppsByAggregatedStatus(); + checkMaxEligible(); if (eligibleApplications.isEmpty()) { @@ -156,8 +167,8 @@ public int run(String[] args) throws Exception { StringBuilder sb = new StringBuilder("Will process the following applications:"); - for (ApplicationReport report : eligibleApplications) { - sb.append("\n\t").append(report.getApplicationId()); + for (AppInfo app : eligibleApplications) { + sb.append("\n\t").append(app.getAppId()); } LOG.info(sb.toString()); @@ -189,11 +200,14 @@ private void handleOpts(String[] args) throws ParseException { "The amount of memory (in megabytes) for each container (default: " + DEFAULT_MEMORY + ")"); memoryOpt.setArgName("megabytes"); + Option verboseOpt = new Option(VERBOSE_OPTION, false, + "Print more details."); opts.addOption(helpOpt); opts.addOption(maxEligibleOpt); opts.addOption(minNumLogFilesOpt); opts.addOption(maxTotalLogsSizeOpt); opts.addOption(memoryOpt); + opts.addOption(verboseOpt); try { CommandLineParser parser = new GnuParser(); @@ -225,6 +239,9 @@ private void handleOpts(String[] args) throws ParseException { if (commandLine.hasOption(MEMORY_OPTION)) { memory = Long.parseLong(commandLine.getOptionValue(MEMORY_OPTION)); } + if (commandLine.hasOption(VERBOSE_OPTION)) { + verbose = true; + } } catch (ParseException pe) { HelpFormatter formatter = new HelpFormatter(); formatter.printHelp("yarn archive-logs", opts); @@ -233,17 +250,39 @@ private void handleOpts(String[] args) throws ParseException { } @VisibleForTesting - void findAggregatedApps() throws IOException, YarnException { + void filterAppsByAggregatedStatus() throws IOException, YarnException { YarnClient client = YarnClient.createYarnClient(); try { client.init(getConf()); client.start(); - List reports = client.getApplications(); - for (ApplicationReport report : reports) { - LogAggregationStatus aggStatus = report.getLogAggregationStatus(); - if (aggStatus.equals(LogAggregationStatus.SUCCEEDED) || - aggStatus.equals(LogAggregationStatus.FAILED)) { - eligibleApplications.add(report); + for (Iterator it = eligibleApplications.iterator(); + it.hasNext();) { + AppInfo app = it.next(); + try { + ApplicationReport report = client.getApplicationReport( + ConverterUtils.toApplicationId(app.getAppId())); + LogAggregationStatus aggStatus = report.getLogAggregationStatus(); + if (aggStatus.equals(LogAggregationStatus.RUNNING) || + aggStatus.equals(LogAggregationStatus.RUNNING_WITH_FAILURE) || + aggStatus.equals(LogAggregationStatus.NOT_START) || + aggStatus.equals(LogAggregationStatus.DISABLED) || + aggStatus.equals(LogAggregationStatus.FAILED)) { + if (verbose) { + LOG.info("Skipping " + app.getAppId() + + " due to aggregation status being " + aggStatus); + } + it.remove(); + } else { + if (verbose) { + LOG.info(app.getAppId() + " has aggregation status " + aggStatus); + } + app.setFinishTime(report.getFinishTime()); + } + } catch (ApplicationNotFoundException e) { + // Assume the aggregation has finished + if (verbose) { + LOG.info(app.getAppId() + " not in the ResourceManager"); + } } } } finally { @@ -254,33 +293,71 @@ void findAggregatedApps() throws IOException, YarnException { } @VisibleForTesting - void checkFiles(FileSystem fs, Path remoteRootLogDir, String suffix) { - for (Iterator reportIt = eligibleApplications.iterator(); - reportIt.hasNext(); ) { - ApplicationReport report = reportIt.next(); - long totalFileSize = 0L; + void checkFilesAndSeedApps(FileSystem fs, Path remoteRootLogDir, + String suffix) throws IOException { + for (RemoteIterator userIt = + fs.listStatusIterator(remoteRootLogDir); userIt.hasNext();) { + Path userLogPath = userIt.next().getPath(); try { - FileStatus[] files = fs.listStatus( - LogAggregationUtils.getRemoteAppLogDir(remoteRootLogDir, - report.getApplicationId(), report.getUser(), suffix)); - if (files.length < minNumLogFiles) { - reportIt.remove(); - } else { - for (FileStatus file : files) { - if (file.getPath().getName().equals(report.getApplicationId() - + ".har")) { - reportIt.remove(); - break; + for (RemoteIterator appIt = + fs.listStatusIterator(new Path(userLogPath, suffix)); + appIt.hasNext();) { + Path appLogPath = appIt.next().getPath(); + try { + FileStatus[] files = fs.listStatus(appLogPath); + if (files.length >= minNumLogFiles) { + boolean eligible = true; + long totalFileSize = 0L; + for (FileStatus file : files) { + if (file.getPath().getName().equals(appLogPath.getName() + + ".har")) { + eligible = false; + if (verbose) { + LOG.info("Skipping " + appLogPath.getName() + + " due to existing .har file"); + } + break; + } + totalFileSize += file.getLen(); + if (totalFileSize > maxTotalLogsSize) { + eligible = false; + if (verbose) { + LOG.info("Skipping " + appLogPath.getName() + " due to " + + "total file size being too large (" + totalFileSize + + " > " + maxTotalLogsSize + ")"); + } + break; + } + } + if (eligible) { + if (verbose) { + LOG.info("Adding " + appLogPath.getName() + " for user " + + userLogPath.getName()); + } + eligibleApplications.add( + new AppInfo(appLogPath.getName(), userLogPath.getName())); + } + } else { + if (verbose) { + LOG.info("Skipping " + appLogPath.getName() + " due to not " + + "having enough log files (" + files.length + " < " + + minNumLogFiles + ")"); + } + } + } catch (IOException ioe) { + // Ignore any apps we can't read + if (verbose) { + LOG.info("Skipping logs under " + appLogPath + " due to " + + ioe.getMessage()); } - totalFileSize += file.getLen(); - } - if (totalFileSize > maxTotalLogsSize) { - reportIt.remove(); } } } catch (IOException ioe) { - // If the user doesn't have permission or it doesn't exist, then skip it - reportIt.remove(); + // Ignore any apps we can't read + if (verbose) { + LOG.info("Skipping all logs under " + userLogPath + " due to " + + ioe.getMessage()); + } } } } @@ -289,15 +366,26 @@ void checkFiles(FileSystem fs, Path remoteRootLogDir, String suffix) { void checkMaxEligible() { // If we have too many eligible apps, remove the newest ones first if (maxEligible > 0 && eligibleApplications.size() > maxEligible) { - List sortedApplications = - new ArrayList(eligibleApplications); - Collections.sort(sortedApplications, new Comparator() { + if (verbose) { + LOG.info("Too many applications (" + eligibleApplications.size() + + " > " + maxEligible + ")"); + } + List sortedApplications = + new ArrayList(eligibleApplications); + Collections.sort(sortedApplications, new Comparator() { @Override - public int compare(ApplicationReport o1, ApplicationReport o2) { - return Long.compare(o1.getFinishTime(), o2.getFinishTime()); + public int compare(AppInfo o1, AppInfo o2) { + int lCompare = Long.compare(o1.getFinishTime(), o2.getFinishTime()); + if (lCompare == 0) { + return o1.getAppId().compareTo(o2.getAppId()); + } + return lCompare; } }); for (int i = maxEligible; i < sortedApplications.size(); i++) { + if (verbose) { + LOG.info("Removing " + sortedApplications.get(i)); + } eligibleApplications.remove(sortedApplications.get(i)); } } @@ -325,24 +413,26 @@ public int compare(ApplicationReport o1, ApplicationReport o2) { @VisibleForTesting void generateScript(File localScript, Path workingDir, Path remoteRootLogDir, String suffix) throws IOException { - LOG.info("Generating script at: " + localScript.getAbsolutePath()); + if (verbose) { + LOG.info("Generating script at: " + localScript.getAbsolutePath()); + } String halrJarPath = HadoopArchiveLogsRunner.class.getProtectionDomain() .getCodeSource().getLocation().getPath(); String harJarPath = HadoopArchives.class.getProtectionDomain() .getCodeSource().getLocation().getPath(); String classpath = halrJarPath + File.pathSeparator + harJarPath; - FileWriter fw = null; + FileWriterWithEncoding fw = null; try { - fw = new FileWriter(localScript); + fw = new FileWriterWithEncoding(localScript, "UTF-8"); fw.write("#!/bin/bash\nset -e\nset -x\n"); int containerCount = 1; - for (ApplicationReport report : eligibleApplications) { + for (AppInfo app : eligibleApplications) { fw.write("if [ \"$YARN_SHELL_ID\" == \""); fw.write(Integer.toString(containerCount)); fw.write("\" ]; then\n\tappId=\""); - fw.write(report.getApplicationId().toString()); + fw.write(app.getAppId()); fw.write("\"\n\tuser=\""); - fw.write(report.getUser()); + fw.write(app.getUser()); fw.write("\"\nel"); containerCount++; } @@ -382,6 +472,10 @@ private boolean runDistributedShell(File localScript) throws Exception { "--shell_script", localScript.getAbsolutePath() }; + if (verbose) { + LOG.info("Running Distributed Shell with arguments: " + + Arrays.toString(dsArgs)); + } final Client dsClient = new Client(new Configuration(conf)); dsClient.init(dsArgs); return dsClient.run(); @@ -400,4 +494,59 @@ public void setConf(Configuration conf) { public Configuration getConf() { return this.conf; } + + @VisibleForTesting + static class AppInfo { + private String appId; + private String user; + private long finishTime; + + AppInfo(String appId, String user) { + this.appId = appId; + this.user = user; + this.finishTime = 0L; + } + + public String getAppId() { + return appId; + } + + public String getUser() { + return user; + } + + public long getFinishTime() { + return finishTime; + } + + public void setFinishTime(long finishTime) { + this.finishTime = finishTime; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + AppInfo appInfo = (AppInfo) o; + + if (appId != null + ? !appId.equals(appInfo.appId) : appInfo.appId != null) { + return false; + } + return !(user != null + ? !user.equals(appInfo.user) : appInfo.user != null); + } + + @Override + public int hashCode() { + int result = appId != null ? appId.hashCode() : 0; + result = 31 * result + (user != null ? user.hashCode() : 0); + return result; + } + } } diff --git a/hadoop-tools/hadoop-archive-logs/src/test/java/org/apache/hadoop/tools/TestHadoopArchiveLogs.java b/hadoop-tools/hadoop-archive-logs/src/test/java/org/apache/hadoop/tools/TestHadoopArchiveLogs.java index c8ff201946..7423f7926b 100644 --- a/hadoop-tools/hadoop-archive-logs/src/test/java/org/apache/hadoop/tools/TestHadoopArchiveLogs.java +++ b/hadoop-tools/hadoop-archive-logs/src/test/java/org/apache/hadoop/tools/TestHadoopArchiveLogs.java @@ -23,15 +23,12 @@ import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; -import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.LogAggregationStatus; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; -import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.MiniYARNCluster; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; @@ -47,6 +44,7 @@ public class TestHadoopArchiveLogs { private static final long CLUSTER_TIMESTAMP = System.currentTimeMillis(); + private static final String USER = System.getProperty("user.name"); private static final int FILE_SIZE_INCREMENT = 4096; private static final byte[] DUMMY_DATA = new byte[FILE_SIZE_INCREMENT]; static { @@ -54,96 +52,117 @@ public class TestHadoopArchiveLogs { } @Test(timeout = 10000) - public void testCheckFiles() throws Exception { + public void testCheckFilesAndSeedApps() throws Exception { Configuration conf = new Configuration(); HadoopArchiveLogs hal = new HadoopArchiveLogs(conf); FileSystem fs = FileSystem.getLocal(conf); Path rootLogDir = new Path("target", "logs"); String suffix = "logs"; - Path logDir = new Path(rootLogDir, - new Path(System.getProperty("user.name"), suffix)); + Path logDir = new Path(rootLogDir, new Path(USER, suffix)); fs.mkdirs(logDir); - Assert.assertEquals(0, hal.eligibleApplications.size()); - ApplicationReport app1 = createAppReport(1); // no files found - ApplicationReport app2 = createAppReport(2); // too few files - Path app2Path = new Path(logDir, app2.getApplicationId().toString()); + // no files found + ApplicationId appId1 = ApplicationId.newInstance(CLUSTER_TIMESTAMP, 1); + Path app1Path = new Path(logDir, appId1.toString()); + fs.mkdirs(app1Path); + // too few files + ApplicationId appId2 = ApplicationId.newInstance(CLUSTER_TIMESTAMP, 2); + Path app2Path = new Path(logDir, appId2.toString()); fs.mkdirs(app2Path); createFile(fs, new Path(app2Path, "file1"), 1); hal.minNumLogFiles = 2; - ApplicationReport app3 = createAppReport(3); // too large - Path app3Path = new Path(logDir, app3.getApplicationId().toString()); + // too large + ApplicationId appId3 = ApplicationId.newInstance(CLUSTER_TIMESTAMP, 3); + Path app3Path = new Path(logDir, appId3.toString()); fs.mkdirs(app3Path); createFile(fs, new Path(app3Path, "file1"), 2); createFile(fs, new Path(app3Path, "file2"), 5); hal.maxTotalLogsSize = FILE_SIZE_INCREMENT * 6; - ApplicationReport app4 = createAppReport(4); // has har already - Path app4Path = new Path(logDir, app4.getApplicationId().toString()); + // has har already + ApplicationId appId4 = ApplicationId.newInstance(CLUSTER_TIMESTAMP, 4); + Path app4Path = new Path(logDir, appId4.toString()); fs.mkdirs(app4Path); - createFile(fs, new Path(app4Path, app4.getApplicationId() + ".har"), 1); - ApplicationReport app5 = createAppReport(5); // just right - Path app5Path = new Path(logDir, app5.getApplicationId().toString()); + createFile(fs, new Path(app4Path, appId4 + ".har"), 1); + // just right + ApplicationId appId5 = ApplicationId.newInstance(CLUSTER_TIMESTAMP, 5); + Path app5Path = new Path(logDir, appId5.toString()); fs.mkdirs(app5Path); createFile(fs, new Path(app5Path, "file1"), 2); createFile(fs, new Path(app5Path, "file2"), 3); - hal.eligibleApplications.add(app1); - hal.eligibleApplications.add(app2); - hal.eligibleApplications.add(app3); - hal.eligibleApplications.add(app4); - hal.eligibleApplications.add(app5); - hal.checkFiles(fs, rootLogDir, suffix); + Assert.assertEquals(0, hal.eligibleApplications.size()); + hal.checkFilesAndSeedApps(fs, rootLogDir, suffix); Assert.assertEquals(1, hal.eligibleApplications.size()); - Assert.assertEquals(app5, hal.eligibleApplications.iterator().next()); + Assert.assertEquals(appId5.toString(), + hal.eligibleApplications.iterator().next().getAppId()); } @Test(timeout = 10000) public void testCheckMaxEligible() throws Exception { Configuration conf = new Configuration(); - HadoopArchiveLogs hal = new HadoopArchiveLogs(conf); - ApplicationReport app1 = createAppReport(1); + HadoopArchiveLogs.AppInfo app1 = new HadoopArchiveLogs.AppInfo( + ApplicationId.newInstance(CLUSTER_TIMESTAMP, 1).toString(), USER); app1.setFinishTime(CLUSTER_TIMESTAMP - 5); - ApplicationReport app2 = createAppReport(2); + HadoopArchiveLogs.AppInfo app2 = new HadoopArchiveLogs.AppInfo( + ApplicationId.newInstance(CLUSTER_TIMESTAMP, 2).toString(), USER); app2.setFinishTime(CLUSTER_TIMESTAMP - 10); - ApplicationReport app3 = createAppReport(3); - app3.setFinishTime(CLUSTER_TIMESTAMP + 5); - ApplicationReport app4 = createAppReport(4); - app4.setFinishTime(CLUSTER_TIMESTAMP + 10); - ApplicationReport app5 = createAppReport(5); - app5.setFinishTime(CLUSTER_TIMESTAMP); + HadoopArchiveLogs.AppInfo app3 = new HadoopArchiveLogs.AppInfo( + ApplicationId.newInstance(CLUSTER_TIMESTAMP, 3).toString(), USER); + // app3 has no finish time set + HadoopArchiveLogs.AppInfo app4 = new HadoopArchiveLogs.AppInfo( + ApplicationId.newInstance(CLUSTER_TIMESTAMP, 4).toString(), USER); + app4.setFinishTime(CLUSTER_TIMESTAMP + 5); + HadoopArchiveLogs.AppInfo app5 = new HadoopArchiveLogs.AppInfo( + ApplicationId.newInstance(CLUSTER_TIMESTAMP, 5).toString(), USER); + app5.setFinishTime(CLUSTER_TIMESTAMP + 10); + HadoopArchiveLogs.AppInfo app6 = new HadoopArchiveLogs.AppInfo( + ApplicationId.newInstance(CLUSTER_TIMESTAMP, 6).toString(), USER); + // app6 has no finish time set + HadoopArchiveLogs.AppInfo app7 = new HadoopArchiveLogs.AppInfo( + ApplicationId.newInstance(CLUSTER_TIMESTAMP, 7).toString(), USER); + app7.setFinishTime(CLUSTER_TIMESTAMP); + HadoopArchiveLogs hal = new HadoopArchiveLogs(conf); Assert.assertEquals(0, hal.eligibleApplications.size()); hal.eligibleApplications.add(app1); hal.eligibleApplications.add(app2); hal.eligibleApplications.add(app3); hal.eligibleApplications.add(app4); hal.eligibleApplications.add(app5); + hal.eligibleApplications.add(app6); + hal.eligibleApplications.add(app7); + Assert.assertEquals(7, hal.eligibleApplications.size()); hal.maxEligible = -1; hal.checkMaxEligible(); + Assert.assertEquals(7, hal.eligibleApplications.size()); + hal.maxEligible = 6; + hal.checkMaxEligible(); + Assert.assertEquals(6, hal.eligibleApplications.size()); + Assert.assertFalse(hal.eligibleApplications.contains(app5)); + hal.maxEligible = 5; + hal.checkMaxEligible(); Assert.assertEquals(5, hal.eligibleApplications.size()); - + Assert.assertFalse(hal.eligibleApplications.contains(app4)); hal.maxEligible = 4; hal.checkMaxEligible(); Assert.assertEquals(4, hal.eligibleApplications.size()); - Assert.assertFalse(hal.eligibleApplications.contains(app4)); - + Assert.assertFalse(hal.eligibleApplications.contains(app7)); hal.maxEligible = 3; hal.checkMaxEligible(); Assert.assertEquals(3, hal.eligibleApplications.size()); - Assert.assertFalse(hal.eligibleApplications.contains(app3)); - + Assert.assertFalse(hal.eligibleApplications.contains(app1)); hal.maxEligible = 2; hal.checkMaxEligible(); Assert.assertEquals(2, hal.eligibleApplications.size()); - Assert.assertFalse(hal.eligibleApplications.contains(app5)); - + Assert.assertFalse(hal.eligibleApplications.contains(app2)); hal.maxEligible = 1; hal.checkMaxEligible(); Assert.assertEquals(1, hal.eligibleApplications.size()); - Assert.assertFalse(hal.eligibleApplications.contains(app1)); + Assert.assertFalse(hal.eligibleApplications.contains(app6)); + Assert.assertTrue(hal.eligibleApplications.contains(app3)); } @Test(timeout = 10000) - public void testFindAggregatedApps() throws Exception { + public void testFilterAppsByAggregatedStatus() throws Exception { MiniYARNCluster yarnCluster = null; try { Configuration conf = new Configuration(); @@ -156,32 +175,66 @@ public void testFindAggregatedApps() throws Exception { conf = yarnCluster.getConfig(); RMContext rmContext = yarnCluster.getResourceManager().getRMContext(); - RMAppImpl app1 = (RMAppImpl)createRMApp(1, conf, rmContext, + RMAppImpl appImpl1 = (RMAppImpl)createRMApp(1, conf, rmContext, LogAggregationStatus.DISABLED); - RMAppImpl app2 = (RMAppImpl)createRMApp(2, conf, rmContext, + RMAppImpl appImpl2 = (RMAppImpl)createRMApp(2, conf, rmContext, LogAggregationStatus.FAILED); - RMAppImpl app3 = (RMAppImpl)createRMApp(3, conf, rmContext, + RMAppImpl appImpl3 = (RMAppImpl)createRMApp(3, conf, rmContext, LogAggregationStatus.NOT_START); - RMAppImpl app4 = (RMAppImpl)createRMApp(4, conf, rmContext, + RMAppImpl appImpl4 = (RMAppImpl)createRMApp(4, conf, rmContext, LogAggregationStatus.SUCCEEDED); - RMAppImpl app5 = (RMAppImpl)createRMApp(5, conf, rmContext, + RMAppImpl appImpl5 = (RMAppImpl)createRMApp(5, conf, rmContext, LogAggregationStatus.RUNNING); - RMAppImpl app6 = (RMAppImpl)createRMApp(6, conf, rmContext, + RMAppImpl appImpl6 = (RMAppImpl)createRMApp(6, conf, rmContext, LogAggregationStatus.RUNNING_WITH_FAILURE); - RMAppImpl app7 = (RMAppImpl)createRMApp(7, conf, rmContext, + RMAppImpl appImpl7 = (RMAppImpl)createRMApp(7, conf, rmContext, LogAggregationStatus.TIME_OUT); - rmContext.getRMApps().put(app1.getApplicationId(), app1); - rmContext.getRMApps().put(app2.getApplicationId(), app2); - rmContext.getRMApps().put(app3.getApplicationId(), app3); - rmContext.getRMApps().put(app4.getApplicationId(), app4); - rmContext.getRMApps().put(app5.getApplicationId(), app5); - rmContext.getRMApps().put(app6.getApplicationId(), app6); - rmContext.getRMApps().put(app7.getApplicationId(), app7); + RMAppImpl appImpl8 = (RMAppImpl)createRMApp(8, conf, rmContext, + LogAggregationStatus.SUCCEEDED); + rmContext.getRMApps().put(appImpl1.getApplicationId(), appImpl1); + rmContext.getRMApps().put(appImpl2.getApplicationId(), appImpl2); + rmContext.getRMApps().put(appImpl3.getApplicationId(), appImpl3); + rmContext.getRMApps().put(appImpl4.getApplicationId(), appImpl4); + rmContext.getRMApps().put(appImpl5.getApplicationId(), appImpl5); + rmContext.getRMApps().put(appImpl6.getApplicationId(), appImpl6); + rmContext.getRMApps().put(appImpl7.getApplicationId(), appImpl7); + // appImpl8 is not in the RM HadoopArchiveLogs hal = new HadoopArchiveLogs(conf); Assert.assertEquals(0, hal.eligibleApplications.size()); - hal.findAggregatedApps(); - Assert.assertEquals(2, hal.eligibleApplications.size()); + hal.eligibleApplications.add( + new HadoopArchiveLogs.AppInfo(appImpl1.getApplicationId().toString(), + USER)); + hal.eligibleApplications.add( + new HadoopArchiveLogs.AppInfo(appImpl2.getApplicationId().toString(), + USER)); + hal.eligibleApplications.add( + new HadoopArchiveLogs.AppInfo(appImpl3.getApplicationId().toString(), + USER)); + HadoopArchiveLogs.AppInfo app4 = + new HadoopArchiveLogs.AppInfo(appImpl4.getApplicationId().toString(), + USER); + hal.eligibleApplications.add(app4); + hal.eligibleApplications.add( + new HadoopArchiveLogs.AppInfo(appImpl5.getApplicationId().toString(), + USER)); + hal.eligibleApplications.add( + new HadoopArchiveLogs.AppInfo(appImpl6.getApplicationId().toString(), + USER)); + HadoopArchiveLogs.AppInfo app7 = + new HadoopArchiveLogs.AppInfo(appImpl7.getApplicationId().toString(), + USER); + hal.eligibleApplications.add(app7); + HadoopArchiveLogs.AppInfo app8 = + new HadoopArchiveLogs.AppInfo(appImpl8.getApplicationId().toString(), + USER); + hal.eligibleApplications.add(app8); + Assert.assertEquals(8, hal.eligibleApplications.size()); + hal.filterAppsByAggregatedStatus(); + Assert.assertEquals(3, hal.eligibleApplications.size()); + Assert.assertTrue(hal.eligibleApplications.contains(app4)); + Assert.assertTrue(hal.eligibleApplications.contains(app7)); + Assert.assertTrue(hal.eligibleApplications.contains(app8)); } finally { if (yarnCluster != null) { yarnCluster.stop(); @@ -193,10 +246,12 @@ public void testFindAggregatedApps() throws Exception { public void testGenerateScript() throws Exception { Configuration conf = new Configuration(); HadoopArchiveLogs hal = new HadoopArchiveLogs(conf); - ApplicationReport app1 = createAppReport(1); - ApplicationReport app2 = createAppReport(2); - hal.eligibleApplications.add(app1); - hal.eligibleApplications.add(app2); + ApplicationId app1 = ApplicationId.newInstance(CLUSTER_TIMESTAMP, 1); + ApplicationId app2 = ApplicationId.newInstance(CLUSTER_TIMESTAMP, 2); + hal.eligibleApplications.add(new HadoopArchiveLogs.AppInfo(app1.toString(), + USER)); + hal.eligibleApplications.add(new HadoopArchiveLogs.AppInfo(app2.toString(), + USER)); File localScript = new File("target", "script.sh"); Path workingDir = new Path("/tmp", "working"); @@ -213,22 +268,16 @@ public void testGenerateScript() throws Exception { Assert.assertEquals("set -e", lines[1]); Assert.assertEquals("set -x", lines[2]); Assert.assertEquals("if [ \"$YARN_SHELL_ID\" == \"1\" ]; then", lines[3]); - if (lines[4].contains(app1.getApplicationId().toString())) { - Assert.assertEquals("\tappId=\"" + app1.getApplicationId().toString() - + "\"", lines[4]); - Assert.assertEquals("\tappId=\"" + app2.getApplicationId().toString() - + "\"", lines[7]); + if (lines[4].contains(app1.toString())) { + Assert.assertEquals("\tappId=\"" + app1.toString() + "\"", lines[4]); + Assert.assertEquals("\tappId=\"" + app2.toString() + "\"", lines[7]); } else { - Assert.assertEquals("\tappId=\"" + app2.getApplicationId().toString() - + "\"", lines[4]); - Assert.assertEquals("\tappId=\"" + app1.getApplicationId().toString() - + "\"", lines[7]); + Assert.assertEquals("\tappId=\"" + app2.toString() + "\"", lines[4]); + Assert.assertEquals("\tappId=\"" + app1.toString() + "\"", lines[7]); } - Assert.assertEquals("\tuser=\"" + System.getProperty("user.name") + "\"", - lines[5]); + Assert.assertEquals("\tuser=\"" + USER + "\"", lines[5]); Assert.assertEquals("elif [ \"$YARN_SHELL_ID\" == \"2\" ]; then", lines[6]); - Assert.assertEquals("\tuser=\"" + System.getProperty("user.name") + "\"", - lines[8]); + Assert.assertEquals("\tuser=\"" + USER + "\"", lines[8]); Assert.assertEquals("else", lines[9]); Assert.assertEquals("\techo \"Unknown Mapping!\"", lines[10]); Assert.assertEquals("\texit 1", lines[11]); @@ -241,15 +290,23 @@ public void testGenerateScript() throws Exception { remoteRootLogDir.toString() + " -suffix " + suffix, lines[15]); } - private static ApplicationReport createAppReport(int id) { - ApplicationId appId = ApplicationId.newInstance(CLUSTER_TIMESTAMP, id); - return ApplicationReport.newInstance( - appId, - ApplicationAttemptId.newInstance(appId, 1), - System.getProperty("user.name"), - null, null, null, 0, null, YarnApplicationState.FINISHED, null, - null, 0L, 0L, FinalApplicationStatus.SUCCEEDED, null, null, 100f, - null, null); + /** + * If this test failes, then a new Log Aggregation Status was added. Make + * sure that {@link HadoopArchiveLogs#filterAppsByAggregatedStatus()} and this test + * are updated as well, if necessary. + * @throws Exception + */ + @Test(timeout = 5000) + public void testStatuses() throws Exception { + LogAggregationStatus[] statuses = new LogAggregationStatus[7]; + statuses[0] = LogAggregationStatus.DISABLED; + statuses[1] = LogAggregationStatus.NOT_START; + statuses[2] = LogAggregationStatus.RUNNING; + statuses[3] = LogAggregationStatus.RUNNING_WITH_FAILURE; + statuses[4] = LogAggregationStatus.SUCCEEDED; + statuses[5] = LogAggregationStatus.FAILED; + statuses[6] = LogAggregationStatus.TIME_OUT; + Assert.assertArrayEquals(statuses, LogAggregationStatus.values()); } private static void createFile(FileSystem fs, Path p, long sizeMultiple) @@ -265,6 +322,7 @@ private static void createFile(FileSystem fs, Path p, long sizeMultiple) out.close(); } } + Assert.assertTrue(fs.exists(p)); } private static RMApp createRMApp(int id, Configuration conf, RMContext rmContext, @@ -272,11 +330,10 @@ private static RMApp createRMApp(int id, Configuration conf, RMContext rmContext ApplicationId appId = ApplicationId.newInstance(CLUSTER_TIMESTAMP, id); ApplicationSubmissionContext submissionContext = ApplicationSubmissionContext.newInstance(appId, "test", "default", - Priority.newInstance(0), null, false, true, + Priority.newInstance(0), null, true, true, 2, Resource.newInstance(10, 2), "test"); return new RMAppImpl(appId, rmContext, conf, "test", - System.getProperty("user.name"), "default", submissionContext, - rmContext.getScheduler(), + USER, "default", submissionContext, rmContext.getScheduler(), rmContext.getApplicationMasterService(), System.currentTimeMillis(), "test", null, null) {