diff --git a/hadoop-tools/hadoop-archive-logs/src/main/java/org/apache/hadoop/tools/HadoopArchiveLogs.java b/hadoop-tools/hadoop-archive-logs/src/main/java/org/apache/hadoop/tools/HadoopArchiveLogs.java index 4a7aad99f8..83d0d18794 100644 --- a/hadoop-tools/hadoop-archive-logs/src/main/java/org/apache/hadoop/tools/HadoopArchiveLogs.java +++ b/hadoop-tools/hadoop-archive-logs/src/main/java/org/apache/hadoop/tools/HadoopArchiveLogs.java @@ -48,7 +48,8 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException; import org.apache.hadoop.yarn.exceptions.YarnException; -import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils; +import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileController; +import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerFactory; import org.apache.hadoop.yarn.util.ConverterUtils; import java.io.File; @@ -102,11 +103,14 @@ public class HadoopArchiveLogs implements Tool { @VisibleForTesting Set eligibleApplications; + private Set workingDirs; + private JobConf conf; public HadoopArchiveLogs(Configuration conf) { setConf(conf); eligibleApplications = new HashSet<>(); + workingDirs = new HashSet<>(); } public static void main(String[] args) { @@ -138,47 +142,71 @@ public int run(String[] args) throws Exception { handleOpts(args); FileSystem fs = null; - Path remoteRootLogDir = new Path(conf.get( - YarnConfiguration.NM_REMOTE_APP_LOG_DIR, - YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR)); - String suffix = LogAggregationUtils.getRemoteNodeLogDirSuffix(conf); - Path workingDir = new Path(remoteRootLogDir, "archive-logs-work"); - if (verbose) { - LOG.info("Remote Log Dir Root: " + remoteRootLogDir); - LOG.info("Log Suffix: " + suffix); - LOG.info("Working Dir: " + workingDir); + + LogAggregationFileControllerFactory factory = + new LogAggregationFileControllerFactory(conf); + List fileControllers = factory + .getConfiguredLogAggregationFileControllerList(); + if (fileControllers == null || fileControllers.isEmpty()) { + LOG.info("Can not find any valid fileControllers."); + if (verbose) { + LOG.info("The configurated fileControllers:" + + YarnConfiguration.LOG_AGGREGATION_FILE_FORMATS); + } + return 0; } try { fs = FileSystem.get(conf); - if (prepareWorkingDir(fs, workingDir)) { - - checkFilesAndSeedApps(fs, remoteRootLogDir, suffix); + // find eligibleApplications for all the fileControllers + int previousTotal = 0; + for (LogAggregationFileController fileController : fileControllers) { + Path remoteRootLogDir = fileController.getRemoteRootLogDir(); + String suffix = fileController.getRemoteRootLogDirSuffix(); + Path workingDir = new Path(remoteRootLogDir, "archive-logs-work"); + if (verbose) { + LOG.info("LogAggregationFileController:" + fileController + .getClass().getName()); + LOG.info("Remote Log Dir Root: " + remoteRootLogDir); + LOG.info("Log Suffix: " + suffix); + LOG.info("Working Dir: " + workingDir); + } + checkFilesAndSeedApps(fs, remoteRootLogDir, suffix, workingDir); filterAppsByAggregatedStatus(); - checkMaxEligible(); - - if (eligibleApplications.isEmpty()) { - LOG.info("No eligible applications to process"); - exitCode = 0; - } else { - StringBuilder sb = - new StringBuilder("Will process the following applications:"); - for (AppInfo app : eligibleApplications) { - sb.append("\n\t").append(app.getAppId()); - } - LOG.info(sb.toString()); - - File localScript = File.createTempFile("hadoop-archive-logs-", ".sh"); - generateScript(localScript, workingDir, remoteRootLogDir, suffix); - - exitCode = runDistributedShell(localScript) ? 0 : 1; + if (eligibleApplications.size() > previousTotal) { + workingDirs.add(workingDir); + previousTotal = eligibleApplications.size(); } } + checkMaxEligible(); + if (workingDirs.isEmpty() || eligibleApplications.isEmpty()) { + LOG.info("No eligible applications to process"); + return 0; + } + for (Path workingDir : workingDirs) { + if (!prepareWorkingDir(fs, workingDir)) { + LOG.error("Failed to create the workingDir:" + + workingDir.toString()); + return 1; + } + } + StringBuilder sb = + new StringBuilder("Will process the following applications:"); + for (AppInfo app : eligibleApplications) { + sb.append("\n\t").append(app.getAppId()); + } + LOG.info(sb.toString()); + File localScript = File.createTempFile("hadoop-archive-logs-", ".sh"); + generateScript(localScript); + + exitCode = runDistributedShell(localScript) ? 0 : 1; } finally { if (fs != null) { // Cleanup working directory - fs.delete(workingDir, true); + for (Path workingDir : workingDirs) { + fs.delete(workingDir, true); + } fs.close(); } } @@ -296,8 +324,8 @@ void filterAppsByAggregatedStatus() throws IOException, YarnException { try { client.init(getConf()); client.start(); - for (Iterator it = eligibleApplications.iterator(); - it.hasNext();) { + for (Iterator it = eligibleApplications + .iterator(); it.hasNext();) { AppInfo app = it.next(); try { ApplicationReport report = client.getApplicationReport( @@ -335,7 +363,7 @@ void filterAppsByAggregatedStatus() throws IOException, YarnException { @VisibleForTesting void checkFilesAndSeedApps(FileSystem fs, Path remoteRootLogDir, - String suffix) throws IOException { + String suffix, Path workingDir) throws IOException { for (RemoteIterator userIt = fs.listStatusIterator(remoteRootLogDir); userIt.hasNext();) { Path userLogPath = userIt.next().getPath(); @@ -375,8 +403,13 @@ void checkFilesAndSeedApps(FileSystem fs, Path remoteRootLogDir, LOG.info("Adding " + appLogPath.getName() + " for user " + userLogPath.getName()); } - eligibleApplications.add( - new AppInfo(appLogPath.getName(), userLogPath.getName())); + AppInfo context = new AppInfo(); + context.setAppId(appLogPath.getName()); + context.setUser(userLogPath.getName()); + context.setSuffix(suffix); + context.setRemoteRootLogDir(remoteRootLogDir); + context.setWorkingDir(workingDir); + eligibleApplications.add(context); } } else { if (verbose) { @@ -406,14 +439,17 @@ void checkFilesAndSeedApps(FileSystem fs, Path remoteRootLogDir, @VisibleForTesting void checkMaxEligible() { // If we have too many eligible apps, remove the newest ones first - if (maxEligible > 0 && eligibleApplications.size() > maxEligible) { + if (maxEligible > 0 && eligibleApplications.size() + > maxEligible) { if (verbose) { - LOG.info("Too many applications (" + eligibleApplications.size() + + LOG.info("Too many applications (" + eligibleApplications + .size() + " > " + maxEligible + ")"); } List sortedApplications = new ArrayList(eligibleApplications); - Collections.sort(sortedApplications, new Comparator() { + Collections.sort(sortedApplications, new Comparator< + AppInfo>() { @Override public int compare(AppInfo o1, AppInfo o2) { int lCompare = Long.compare(o1.getFinishTime(), o2.getFinishTime()); @@ -440,20 +476,25 @@ public int compare(AppInfo o1, AppInfo o2) { if [ "$YARN_SHELL_ID" == "1" ]; then appId="application_1440448768987_0001" user="rkanter" + workingDir="/tmp/logs/archive-logs-work" + remoteRootLogDir="/tmp/logs" + suffix="logs" elif [ "$YARN_SHELL_ID" == "2" ]; then appId="application_1440448768987_0002" user="rkanter" + workingDir="/tmp/logs/archive-logs-work" + remoteRootLogDir="/tmp/logs" + suffix="logs" else echo "Unknown Mapping!" exit 1 fi export HADOOP_CLIENT_OPTS="-Xmx1024m" export HADOOP_CLASSPATH=/dist/share/hadoop/tools/lib/hadoop-archive-logs-2.8.0-SNAPSHOT.jar:/dist/share/hadoop/tools/lib/hadoop-archives-2.8.0-SNAPSHOT.jar - "$HADOOP_HOME"/bin/hadoop org.apache.hadoop.tools.HadoopArchiveLogsRunner -appId "$appId" -user "$user" -workingDir /tmp/logs/archive-logs-work -remoteRootLogDir /tmp/logs -suffix logs + "$HADOOP_HOME"/bin/hadoop org.apache.hadoop.tools.HadoopArchiveLogsRunner -appId "$appId" -user "$user" -workingDir "$workingDir" -remoteRootLogDir "$remoteRootLogDir" -suffix "$suffix" */ @VisibleForTesting - void generateScript(File localScript, Path workingDir, - Path remoteRootLogDir, String suffix) throws IOException { + void generateScript(File localScript) throws IOException { if (verbose) { LOG.info("Generating script at: " + localScript.getAbsolutePath()); } @@ -467,13 +508,19 @@ void generateScript(File localScript, Path workingDir, fw = new FileWriterWithEncoding(localScript, "UTF-8"); fw.write("#!/bin/bash\nset -e\nset -x\n"); int containerCount = 1; - for (AppInfo app : eligibleApplications) { + for (AppInfo context : eligibleApplications) { fw.write("if [ \"$YARN_SHELL_ID\" == \""); fw.write(Integer.toString(containerCount)); fw.write("\" ]; then\n\tappId=\""); - fw.write(app.getAppId()); + fw.write(context.getAppId()); fw.write("\"\n\tuser=\""); - fw.write(app.getUser()); + fw.write(context.getUser()); + fw.write("\"\n\tworkingDir=\""); + fw.write(context.getWorkingDir().toString()); + fw.write("\"\n\tremoteRootLogDir=\""); + fw.write(context.getRemoteRootLogDir().toString()); + fw.write("\"\n\tsuffix=\""); + fw.write(context.getSuffix()); fw.write("\"\nel"); containerCount++; } @@ -486,11 +533,11 @@ void generateScript(File localScript, Path workingDir, fw.write("\n\"$HADOOP_HOME\"/bin/hadoop "); fw.write(HadoopArchiveLogsRunner.class.getName()); fw.write(" -appId \"$appId\" -user \"$user\" -workingDir "); - fw.write(workingDir.toString()); + fw.write("\"$workingDir\""); fw.write(" -remoteRootLogDir "); - fw.write(remoteRootLogDir.toString()); + fw.write("\"$remoteRootLogDir\""); fw.write(" -suffix "); - fw.write(suffix); + fw.write("\"$suffix\""); if (!proxy) { fw.write(" -noProxy\n"); } @@ -542,23 +589,59 @@ public Configuration getConf() { @VisibleForTesting static class AppInfo { private String appId; + private Path remoteRootLogDir; + private String suffix; + private Path workingDir; private String user; private long finishTime; + AppInfo() {} + AppInfo(String appId, String user) { - this.appId = appId; - this.user = user; - this.finishTime = 0L; + this.setAppId(appId); + this.setUser(user); } public String getAppId() { return appId; } + public void setAppId(String appId) { + this.appId = appId; + } + + public Path getRemoteRootLogDir() { + return remoteRootLogDir; + } + + public void setRemoteRootLogDir(Path remoteRootLogDir) { + this.remoteRootLogDir = remoteRootLogDir; + } + + public String getSuffix() { + return suffix; + } + + public void setSuffix(String suffix) { + this.suffix = suffix; + } + + public Path getWorkingDir() { + return workingDir; + } + + public void setWorkingDir(Path workingDir) { + this.workingDir = workingDir; + } + public String getUser() { return user; } + public void setUser(String user) { + this.user = user; + } + public long getFinishTime() { return finishTime; } @@ -582,14 +665,39 @@ public boolean equals(Object o) { ? !appId.equals(appInfo.appId) : appInfo.appId != null) { return false; } - return !(user != null - ? !user.equals(appInfo.user) : appInfo.user != null); + + if (user != null + ? !user.equals(appInfo.user) : appInfo.user != null) { + return false; + } + + if (suffix != null + ? !suffix.equals(appInfo.suffix) : appInfo.suffix != null) { + return false; + } + + if (workingDir != null ? !workingDir.equals( + appInfo.workingDir) : appInfo.workingDir != null) { + return false; + } + + if (remoteRootLogDir != null ? !remoteRootLogDir.equals( + appInfo.remoteRootLogDir) : appInfo.remoteRootLogDir != null) { + return false; + } + + return Long.compare(finishTime, appInfo.finishTime) == 0; } @Override public int hashCode() { int result = appId != null ? appId.hashCode() : 0; result = 31 * result + (user != null ? user.hashCode() : 0); + result = 31 * result + (suffix != null ? suffix.hashCode() : 0); + result = 31 * result + (workingDir != null ? workingDir.hashCode() : 0); + result = 31 * result + (remoteRootLogDir != null ? + remoteRootLogDir.hashCode() : 0); + result = 31 * result + Long.valueOf(finishTime).hashCode(); return result; } } diff --git a/hadoop-tools/hadoop-archive-logs/src/test/java/org/apache/hadoop/tools/TestHadoopArchiveLogs.java b/hadoop-tools/hadoop-archive-logs/src/test/java/org/apache/hadoop/tools/TestHadoopArchiveLogs.java index 423f3f1d5b..2ddd4c5dae 100644 --- a/hadoop-tools/hadoop-archive-logs/src/test/java/org/apache/hadoop/tools/TestHadoopArchiveLogs.java +++ b/hadoop-tools/hadoop-archive-logs/src/test/java/org/apache/hadoop/tools/TestHadoopArchiveLogs.java @@ -95,7 +95,8 @@ public void testCheckFilesAndSeedApps() throws Exception { createFile(fs, new Path(app5Path, "file2"), 3); Assert.assertEquals(0, hal.eligibleApplications.size()); - hal.checkFilesAndSeedApps(fs, rootLogDir, suffix); + hal.checkFilesAndSeedApps(fs, rootLogDir, suffix, new Path(rootLogDir, + "archive-logs-work")); Assert.assertEquals(1, hal.eligibleApplications.size()); Assert.assertEquals(appId5.toString(), hal.eligibleApplications.iterator().next().getAppId()); @@ -249,59 +250,88 @@ public void testGenerateScript() throws Exception { private void _testGenerateScript(boolean proxy) throws Exception { Configuration conf = new Configuration(); HadoopArchiveLogs hal = new HadoopArchiveLogs(conf); - ApplicationId app1 = ApplicationId.newInstance(CLUSTER_TIMESTAMP, 1); - ApplicationId app2 = ApplicationId.newInstance(CLUSTER_TIMESTAMP, 2); - hal.eligibleApplications.add(new HadoopArchiveLogs.AppInfo(app1.toString(), - USER)); - hal.eligibleApplications.add(new HadoopArchiveLogs.AppInfo(app2.toString(), - USER)); - hal.proxy = proxy; - - File localScript = new File("target", "script.sh"); Path workingDir = new Path("/tmp", "working"); Path remoteRootLogDir = new Path("/tmp", "logs"); String suffix = "logs"; + ApplicationId app1 = ApplicationId.newInstance(CLUSTER_TIMESTAMP, 1); + HadoopArchiveLogs.AppInfo appInfo1 = new HadoopArchiveLogs.AppInfo( + app1.toString(), USER); + appInfo1.setSuffix(suffix); + appInfo1.setRemoteRootLogDir(remoteRootLogDir); + appInfo1.setWorkingDir(workingDir); + hal.eligibleApplications.add(appInfo1); + ApplicationId app2 = ApplicationId.newInstance(CLUSTER_TIMESTAMP, 2); + Path workingDir2 = new Path("/tmp", "working2"); + Path remoteRootLogDir2 = new Path("/tmp", "logs2"); + String suffix2 = "logs2"; + HadoopArchiveLogs.AppInfo appInfo2 = new HadoopArchiveLogs.AppInfo( + app2.toString(), USER); + appInfo2.setSuffix(suffix2); + appInfo2.setRemoteRootLogDir(remoteRootLogDir2); + appInfo2.setWorkingDir(workingDir2); + hal.eligibleApplications.add(appInfo2); + hal.proxy = proxy; + + File localScript = new File("target", "script.sh"); localScript.delete(); Assert.assertFalse(localScript.exists()); - hal.generateScript(localScript, workingDir, remoteRootLogDir, suffix); + hal.generateScript(localScript); Assert.assertTrue(localScript.exists()); String script = IOUtils.toString(localScript.toURI()); String[] lines = script.split(System.lineSeparator()); - Assert.assertEquals(16, lines.length); + Assert.assertEquals(22, lines.length); Assert.assertEquals("#!/bin/bash", lines[0]); Assert.assertEquals("set -e", lines[1]); Assert.assertEquals("set -x", lines[2]); Assert.assertEquals("if [ \"$YARN_SHELL_ID\" == \"1\" ]; then", lines[3]); + boolean oneBefore = true; if (lines[4].contains(app1.toString())) { Assert.assertEquals("\tappId=\"" + app1.toString() + "\"", lines[4]); - Assert.assertEquals("\tappId=\"" + app2.toString() + "\"", lines[7]); + Assert.assertEquals("\tappId=\"" + app2.toString() + "\"", lines[10]); } else { + oneBefore = false; Assert.assertEquals("\tappId=\"" + app2.toString() + "\"", lines[4]); - Assert.assertEquals("\tappId=\"" + app1.toString() + "\"", lines[7]); + Assert.assertEquals("\tappId=\"" + app1.toString() + "\"", lines[10]); } Assert.assertEquals("\tuser=\"" + USER + "\"", lines[5]); - Assert.assertEquals("elif [ \"$YARN_SHELL_ID\" == \"2\" ]; then", lines[6]); - Assert.assertEquals("\tuser=\"" + USER + "\"", lines[8]); - Assert.assertEquals("else", lines[9]); - Assert.assertEquals("\techo \"Unknown Mapping!\"", lines[10]); - Assert.assertEquals("\texit 1", lines[11]); - Assert.assertEquals("fi", lines[12]); - Assert.assertEquals("export HADOOP_CLIENT_OPTS=\"-Xmx1024m\"", lines[13]); - Assert.assertTrue(lines[14].startsWith("export HADOOP_CLASSPATH=")); + Assert.assertEquals("\tworkingDir=\"" + (oneBefore ? workingDir.toString() + : workingDir2.toString()) + "\"", lines[6]); + Assert.assertEquals("\tremoteRootLogDir=\"" + (oneBefore + ? remoteRootLogDir.toString() : remoteRootLogDir2.toString()) + + "\"", lines[7]); + Assert.assertEquals("\tsuffix=\"" + (oneBefore ? suffix : suffix2) + + "\"", lines[8]); + Assert.assertEquals("elif [ \"$YARN_SHELL_ID\" == \"2\" ]; then", + lines[9]); + Assert.assertEquals("\tuser=\"" + USER + "\"", lines[11]); + Assert.assertEquals("\tworkingDir=\"" + (oneBefore + ? workingDir2.toString() : workingDir.toString()) + "\"", + lines[12]); + Assert.assertEquals("\tremoteRootLogDir=\"" + (oneBefore + ? remoteRootLogDir2.toString() : remoteRootLogDir.toString()) + + "\"", lines[13]); + Assert.assertEquals("\tsuffix=\"" + (oneBefore ? suffix2 : suffix) + + "\"", lines[14]); + Assert.assertEquals("else", lines[15]); + Assert.assertEquals("\techo \"Unknown Mapping!\"", lines[16]); + Assert.assertEquals("\texit 1", lines[17]); + Assert.assertEquals("fi", lines[18]); + Assert.assertEquals("export HADOOP_CLIENT_OPTS=\"-Xmx1024m\"", lines[19]); + Assert.assertTrue(lines[20].startsWith("export HADOOP_CLASSPATH=")); if (proxy) { Assert.assertEquals( "\"$HADOOP_HOME\"/bin/hadoop org.apache.hadoop.tools." + "HadoopArchiveLogsRunner -appId \"$appId\" -user \"$user\" " + - "-workingDir " + workingDir.toString() + " -remoteRootLogDir " + - remoteRootLogDir.toString() + " -suffix " + suffix, - lines[15]); + "-workingDir \"$workingDir\" -remoteRootLogDir " + + "\"$remoteRootLogDir\" -suffix \"$suffix\"", + lines[21]); } else { Assert.assertEquals( "\"$HADOOP_HOME\"/bin/hadoop org.apache.hadoop.tools." + "HadoopArchiveLogsRunner -appId \"$appId\" -user \"$user\" " + - "-workingDir " + workingDir.toString() + " -remoteRootLogDir " + - remoteRootLogDir.toString() + " -suffix " + suffix + " -noProxy", - lines[15]); + "-workingDir \"$workingDir\" -remoteRootLogDir " + + "\"$remoteRootLogDir\" -suffix \"$suffix\" -noProxy", + lines[21]); } }