diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestLogsCLI.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestLogsCLI.java index 801cf40d93..7a229dc992 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestLogsCLI.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/cli/TestLogsCLI.java @@ -18,6 +18,10 @@ package org.apache.hadoop.yarn.client.cli; +import static org.apache.hadoop.yarn.conf.YarnConfiguration.LOG_AGGREGATION_FILE_CONTROLLER_FMT; +import static org.apache.hadoop.yarn.conf.YarnConfiguration.LOG_AGGREGATION_REMOTE_APP_LOG_DIR_FMT; +import static org.apache.hadoop.yarn.conf.YarnConfiguration.LOG_AGGREGATION_REMOTE_APP_LOG_DIR_SUFFIX_FMT; +import static org.assertj.core.api.Assertions.assertThat; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; @@ -82,6 +86,7 @@ import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileController; import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerContext; import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerFactory; +import org.apache.hadoop.yarn.logaggregation.filecontroller.ifile.LogAggregationIndexedFileController; import org.codehaus.jettison.json.JSONArray; import org.codehaus.jettison.json.JSONObject; import org.junit.Assert; @@ -407,7 +412,7 @@ public void testFetchFinishedApplictionLogs() throws Exception { Path path = new Path(remoteLogRootDir + ugi.getShortUserName() - + "/bucket_logs/0001/application_0_0001"); + + "/bucket-logs-tfile/0001/application_0_0001"); if (fs.exists(path)) { fs.delete(path, true); } @@ -966,8 +971,8 @@ public void testFetchApplictionLogsAsAnotherUser() throws Exception { createContainerLogInLocalDir(appLogsDir, containerId, fs, logTypes); // create the remote app dir for app but for a different user testUser - Path path = new Path(remoteLogRootDir + testUser + "/bucket_logs/0001/" - + appId); + Path path = new Path(remoteLogRootDir + testUser + + "/bucket-logs-tfile/0001/" + appId); if (fs.exists(path)) { fs.delete(path, true); } @@ -1049,7 +1054,7 @@ public void testFetchApplictionLogsAsAnotherUser() throws Exception { System.currentTimeMillis(), 1000); String priorityUser = "priority"; Path pathWithoutPerm = new Path(remoteLogRootDir + priorityUser - + "/bucket_logs/1000/" + appTest); + + "/bucket-logs-tfile/1000/" + appTest); if (fs.exists(pathWithoutPerm)) { fs.delete(pathWithoutPerm, true); } @@ -1139,6 +1144,84 @@ public void testLogsCLIWithInvalidArgs() throws Exception { } } + @Test (timeout = 5000) + public void testGuessAppOwnerWithCustomSuffix() throws Exception { + String remoteLogRootDir = "target/logs/"; + String jobUser = "user1"; + String loggedUser = "user2"; + Configuration conf = new YarnConfiguration(); + conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true); + conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, remoteLogRootDir); + conf.setBoolean(YarnConfiguration.YARN_ACL_ENABLE, true); + conf.set(YarnConfiguration.YARN_ADMIN_ACL, "admin"); + String controllerName = "indexed"; + conf.set(YarnConfiguration.LOG_AGGREGATION_FILE_FORMATS, controllerName); + conf.set(String.format(LOG_AGGREGATION_FILE_CONTROLLER_FMT, + controllerName), LogAggregationIndexedFileController.class.getName()); + conf.set(String.format(LOG_AGGREGATION_REMOTE_APP_LOG_DIR_SUFFIX_FMT, + controllerName), controllerName); + + FileSystem fs = FileSystem.get(conf); + try { + // Test New App Log Dir Struture (after YARN-6929) with Custom Suffix + ApplicationId appId1 = ApplicationId.newInstance(0, 1); + Path path = new Path(remoteLogRootDir + jobUser + "/bucket-indexed/0001/" + + appId1); + if (fs.exists(path)) { + fs.delete(path, true); + } + assertTrue(fs.mkdirs(path)); + String appOwner = LogCLIHelpers.getOwnerForAppIdOrNull(appId1, + loggedUser, conf); + assertThat(appOwner).isEqualTo(jobUser); + + // Test Old App Log Dir Struture (before YARN-6929) with Custom Suffix + ApplicationId appId2 = ApplicationId.newInstance(0, 2); + path = new Path(remoteLogRootDir + jobUser + "/indexed/" + appId2); + if (fs.exists(path)) { + fs.delete(path, true); + } + assertTrue(fs.mkdirs(path)); + appOwner = LogCLIHelpers.getOwnerForAppIdOrNull(appId2, loggedUser, conf); + assertThat(appOwner).isEqualTo(jobUser); + } finally { + fs.delete(new Path(remoteLogRootDir), true); + } + } + + @Test (timeout = 5000) + public void testGuessAppOwnerWithCustomAppLogDir() throws Exception { + String remoteLogRootDir = "target/logs/"; + String remoteLogRootDir1 = "target/logs1/"; + String jobUser = "user1"; + String loggedUser = "user2"; + Configuration conf = new YarnConfiguration(); + conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true); + conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, remoteLogRootDir); + String controllerName = "indexed"; + conf.set(YarnConfiguration.LOG_AGGREGATION_FILE_FORMATS, controllerName); + conf.set(String.format(LOG_AGGREGATION_FILE_CONTROLLER_FMT, + controllerName), LogAggregationIndexedFileController.class.getName()); + conf.set(String.format(LOG_AGGREGATION_REMOTE_APP_LOG_DIR_FMT, + controllerName), remoteLogRootDir1); + + FileSystem fs = FileSystem.get(conf); + try { + // Test Custom App Log Dir Structure + ApplicationId appId1 = ApplicationId.newInstance(0, 3); + Path path = new Path(remoteLogRootDir1 + jobUser + + "/bucket-logs-indexed/0003/" + appId1); + if (fs.exists(path)) { + fs.delete(path, true); + } + assertTrue(fs.mkdirs(path)); + String appOwner = LogCLIHelpers.getOwnerForAppIdOrNull(appId1, + loggedUser, conf); + assertThat(appOwner).isEqualTo(jobUser); + } finally { + fs.delete(new Path(remoteLogRootDir1), true); + } + } @Test (timeout = 15000) public void testSaveContainerLogsLocally() throws Exception { @@ -1407,7 +1490,7 @@ public void testFetchApplictionLogsHar() throws Exception { assertNotNull(harUrl); Path path = new Path(remoteLogRootDir + ugi.getShortUserName() - + "/logs/application_1440536969523_0001"); + + "/bucket-logs-tfile/0001/application_1440536969523_0001"); if (fs.exists(path)) { fs.delete(path, true); } @@ -1468,7 +1551,7 @@ private void createContainerLogs(Configuration configuration, } Path path = new Path(remoteLogRootDir + ugi.getShortUserName() - + "/logs/application_0_0001"); + + "/bucket-logs-tfile/0001/application_0_0001"); if (fs.exists(path)) { fs.delete(path, true); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogDeletionService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogDeletionService.java index 033339acd6..e0233b3164 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogDeletionService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogDeletionService.java @@ -22,6 +22,8 @@ import java.util.Timer; import java.util.TimerTask; +import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerFactory; +import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileController; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.InterfaceAudience; @@ -68,9 +70,11 @@ public LogDeletionTask(Configuration conf, long retentionSecs, ApplicationClient this.conf = conf; this.retentionMillis = retentionSecs * 1000; this.suffix = LogAggregationUtils.getBucketSuffix(); - this.remoteRootLogDir = - new Path(conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, - YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR)); + LogAggregationFileControllerFactory factory = + new LogAggregationFileControllerFactory(conf); + LogAggregationFileController fileController = + factory.getFileControllerForWrite(); + this.remoteRootLogDir = fileController.getRemoteRootLogDir(); this.rmClient = rmClient; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogAggregationUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogAggregationUtils.java index deff2aaa75..b51be9af14 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogAggregationUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogAggregationUtils.java @@ -38,7 +38,7 @@ public class LogAggregationUtils { public static final String TMP_FILE_SUFFIX = ".tmp"; - private static final String BUCKET_SUFFIX = "bucket_"; + private static final String BUCKET_SUFFIX = "bucket-"; /** * Constructs the full filename for an application's log file per node. @@ -173,16 +173,6 @@ public static boolean isOlderPathEnabled(Configuration conf) { YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR_INCLUDE_OLDER); } - /** - * Returns the suffix component of the log dir. - * @param conf the configuration - * @return the suffix which will be appended to the user log dir. - */ - public static String getRemoteNodeLogDirSuffix(Configuration conf) { - return conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX, - YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR_SUFFIX); - } - /** * Returns the bucket suffix component of the log dir. * @return the bucket suffix which appended to user log dir @@ -207,25 +197,6 @@ public static String getNodeString(String nodeId) { return nodeId.toString().replace(":", "_"); } - /** - * Return the remote application log directory. - * @param conf the configuration - * @param appId the application - * @param appOwner the application owner - * @return the remote application log directory path - * @throws IOException if we can not find remote application log directory - */ - public static org.apache.hadoop.fs.Path getRemoteAppLogDir( - Configuration conf, ApplicationId appId, String appOwner) - throws IOException { - String suffix = LogAggregationUtils.getRemoteNodeLogDirSuffix(conf); - org.apache.hadoop.fs.Path remoteRootLogDir = - new org.apache.hadoop.fs.Path(conf.get( - YarnConfiguration.NM_REMOTE_APP_LOG_DIR, - YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR)); - return getRemoteAppLogDir(conf, appId, appOwner, remoteRootLogDir, suffix); - } - /** * Return the remote application log directory. * @param conf the configuration @@ -397,22 +368,4 @@ public static List getRemoteNodeFileList( return nodeFiles; } - /** - * Get all available log files under remote app log directory. - * @param conf the configuration - * @param appId the applicationId - * @param appOwner the application owner - * @return the iterator of available log files - * @throws IOException if there is no log file available - */ - public static RemoteIterator getRemoteNodeFileDir( - Configuration conf, ApplicationId appId, String appOwner) - throws IOException { - String suffix = LogAggregationUtils.getRemoteNodeLogDirSuffix(conf); - Path remoteRootLogDir = new Path(conf.get( - YarnConfiguration.NM_REMOTE_APP_LOG_DIR, - YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR)); - return getRemoteNodeFileDir(conf, appId, appOwner, - remoteRootLogDir, suffix); - } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogCLIHelpers.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogCLIHelpers.java index 8a72d80722..385ad094aa 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogCLIHelpers.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/LogCLIHelpers.java @@ -68,6 +68,75 @@ public int dumpAContainersLogs(String appId, String containerId, return dumpAContainerLogsForLogType(options, false); } + public static String guessOwnerWithFileFormat( + LogAggregationFileController fileFormat, ApplicationId appId, + String bestGuess, Configuration conf) throws IOException { + + boolean scanOldPath = LogAggregationUtils.isOlderPathEnabled(conf); + Path remoteRootLogDir = fileFormat.getRemoteRootLogDir(); + String suffix = fileFormat.getRemoteRootLogDirSuffix(); + Path fullPath = fileFormat.getRemoteAppLogDir(appId, bestGuess); + FileContext fc = + FileContext.getFileContext(remoteRootLogDir.toUri(), conf); + String pathAccess = fullPath.toString(); + + try { + if (fc.util().exists(fullPath)) { + return bestGuess; + } + + if (scanOldPath) { + Path olderAppPath = fileFormat.getOlderRemoteAppLogDir(appId, + bestGuess); + pathAccess = olderAppPath.toString(); + if (fc.util().exists(olderAppPath)) { + return bestGuess; + } + } + } catch (AccessControlException | AccessDeniedException ex) { + logDirNoAccessPermission(pathAccess, bestGuess, ex.getMessage()); + throw ex; + } + + try { + Path toMatch = fileFormat.getRemoteAppLogDir(appId, null); + FileStatus[] matching = fc.util().globStatus(toMatch); + if (matching != null && matching.length == 1) { + //fetch user from new path /app-logs/user[/suffix]/bucket/app_id + Path parent = matching[0].getPath().getParent(); + //skip the suffix too + if (suffix != null && !StringUtils.isEmpty(suffix)) { + parent = parent.getParent(); + } + //skip the bucket + parent = parent.getParent(); + return parent.getName(); + } + } catch (IOException e) { + // Ignore IOException thrown from wrong file format + } + + if (scanOldPath) { + try { + Path toMatch = fileFormat.getOlderRemoteAppLogDir(appId, null); + FileStatus[] matching = fc.util().globStatus(toMatch); + if (matching != null && matching.length == 1) { + //fetch user from old path /app-logs/user[/suffix]/app_id + Path parent = matching[0].getPath().getParent(); + //skip the suffix too + if (suffix != null && !StringUtils.isEmpty(suffix)) { + parent = parent.getParent(); + } + return parent.getName(); + } + } catch (IOException e) { + // Ignore IOException thrown from wrong file format + } + } + + return null; + } + @Private @VisibleForTesting /** @@ -81,70 +150,31 @@ public int dumpAContainersLogs(String appId, String containerId, */ public static String getOwnerForAppIdOrNull( ApplicationId appId, String bestGuess, - Configuration conf) throws IOException { - Path remoteRootLogDir = new Path(conf.get( - YarnConfiguration.NM_REMOTE_APP_LOG_DIR, - YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR)); - String suffix = LogAggregationUtils.getRemoteNodeLogDirSuffix(conf); - Path fullPath = LogAggregationUtils.getRemoteAppLogDir(remoteRootLogDir, - appId, bestGuess, suffix); + Configuration conf) { + LogAggregationFileControllerFactory factory = + new LogAggregationFileControllerFactory(conf); + List fileControllers = factory + .getConfiguredLogAggregationFileControllerList(); - FileContext fc = - FileContext.getFileContext(remoteRootLogDir.toUri(), conf); - String pathAccess = fullPath.toString(); - try { - if (fc.util().exists(fullPath)) { - return bestGuess; - } - - boolean scanOldPath = LogAggregationUtils.isOlderPathEnabled(conf); - if (scanOldPath) { - Path olderAppPath = LogAggregationUtils.getOlderRemoteAppLogDir(appId, - bestGuess, remoteRootLogDir, suffix); - if (fc.util().exists(olderAppPath)) { - return bestGuess; - } - } - - Path toMatch = LogAggregationUtils. - getRemoteAppLogDir(remoteRootLogDir, appId, "*", suffix); - - pathAccess = toMatch.toString(); - FileStatus[] matching = fc.util().globStatus(toMatch); - if (matching == null || matching.length != 1) { - if (scanOldPath) { - toMatch = LogAggregationUtils.getOlderRemoteAppLogDir(appId, "*", - remoteRootLogDir, suffix); - try { - matching = fc.util().globStatus(toMatch); - if (matching != null && matching.length == 1) { - //fetch the user from the old path /app-logs/user[/suffix]/app_id - Path parent = matching[0].getPath().getParent(); - //skip the suffix too - if (suffix != null && !StringUtils.isEmpty(suffix)) { - parent = parent.getParent(); - } - return parent.getName(); - } - } catch (IOException e) { - // Ignore IOException from accessing older app log dir + if (fileControllers != null && !fileControllers.isEmpty()) { + String owner = null; + for (LogAggregationFileController fileFormat : fileControllers) { + try { + owner = guessOwnerWithFileFormat(fileFormat, appId, bestGuess, conf); + if (owner != null) { + return owner; } + } catch (IOException e) { + return null; } - return null; } - //fetch the user from the full path /app-logs/user[/suffix]/bucket/app_id - Path parent = matching[0].getPath().getParent(); - //skip the suffix too - if (suffix != null && !StringUtils.isEmpty(suffix)) { - parent = parent.getParent(); - } - //skip the bucket - parent = parent.getParent(); - return parent.getName(); - } catch (AccessControlException | AccessDeniedException ex) { - logDirNoAccessPermission(pathAccess, bestGuess, ex.getMessage()); - return null; + } else { + System.err.println("Can not find any valid fileControllers. " + + " The configurated fileControllers: " + + YarnConfiguration.LOG_AGGREGATION_FILE_FORMATS); } + + return null; } @Private @@ -215,7 +245,8 @@ public int dumpAllContainersLogs(ContainerLogsRequest options) } if (!foundAnyLogs) { emptyLogDir(LogAggregationUtils.getRemoteAppLogDir( - conf, options.getAppId(), options.getAppOwner()) + conf, options.getAppId(), options.getAppOwner(), + fc.getRemoteRootLogDir(), fc.getRemoteRootLogDirSuffix()) .toString()); return -1; } @@ -286,12 +317,11 @@ public void printNodesList(ContainerLogsRequest options, appOwner, fileFormat.getRemoteRootLogDir(), fileFormat.getRemoteRootLogDirSuffix()); } catch (FileNotFoundException fnf) { - logDirNotExist(LogAggregationUtils.getRemoteAppLogDir( - conf, appId, appOwner).toString()); + logDirNotExist(fileFormat.getRemoteAppLogDir(appId, + appOwner).toString()); } catch (AccessControlException | AccessDeniedException ace) { - logDirNoAccessPermission(LogAggregationUtils.getRemoteAppLogDir( - conf, appId, appOwner).toString(), appOwner, - ace.getMessage()); + logDirNoAccessPermission(fileFormat.getRemoteAppLogDir(appId, + appOwner).toString(), appOwner, ace.getMessage()); } if (nodeFiles == null) { return; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/tfile/LogAggregationTFileController.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/tfile/LogAggregationTFileController.java index ea539e2dc6..3fb432d773 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/tfile/LogAggregationTFileController.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/tfile/LogAggregationTFileController.java @@ -173,7 +173,8 @@ public boolean readAggregatedLogs(ContainerLogsRequest logRequest, || containerIdStr.isEmpty()); long size = logRequest.getBytes(); RemoteIterator nodeFiles = LogAggregationUtils - .getRemoteNodeFileDir(conf, appId, logRequest.getAppOwner()); + .getRemoteNodeFileDir(conf, appId, logRequest.getAppOwner(), + remoteRootLogDir, remoteRootLogDirSuffix); byte[] buf = new byte[65535]; while (nodeFiles != null && nodeFiles.hasNext()) { final FileStatus thisNodeFile = nodeFiles.next(); @@ -267,7 +268,8 @@ public List readAggregatedLogsMeta( String nodeIdStr = (nodeId == null) ? null : LogAggregationUtils.getNodeString(nodeId); RemoteIterator nodeFiles = LogAggregationUtils - .getRemoteNodeFileDir(conf, appId, appOwner); + .getRemoteNodeFileDir(conf, appId, appOwner, + remoteRootLogDir, remoteRootLogDirSuffix); if (nodeFiles == null) { throw new IOException("There is no available log file for " + "application:" + appId); @@ -331,7 +333,7 @@ public List readAggregatedLogsMeta( @Override public void renderAggregatedLogsBlock(Block html, ViewContext context) { TFileAggregatedLogsBlock block = new TFileAggregatedLogsBlock( - context, conf); + context, conf, remoteRootLogDir, remoteRootLogDirSuffix); block.render(html); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/tfile/TFileAggregatedLogsBlock.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/tfile/TFileAggregatedLogsBlock.java index 6fb5b90bd8..2c2604b139 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/tfile/TFileAggregatedLogsBlock.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/tfile/TFileAggregatedLogsBlock.java @@ -48,11 +48,16 @@ public class TFileAggregatedLogsBlock extends LogAggregationHtmlBlock { private final Configuration conf; + private Path remoteRootLogDir; + private String remoteRootLogDirSuffix; @Inject - public TFileAggregatedLogsBlock(ViewContext ctx, Configuration conf) { + public TFileAggregatedLogsBlock(ViewContext ctx, Configuration conf, + Path remoteRootLogDir, String remoteRootLogDirSuffix) { super(ctx); this.conf = conf; + this.remoteRootLogDir = remoteRootLogDir; + this.remoteRootLogDirSuffix = remoteRootLogDirSuffix; } @Override @@ -67,7 +72,7 @@ protected void render(Block html) { try { nodeFiles = LogAggregationUtils .getRemoteNodeFileDir(conf, params.getAppId(), - params.getAppOwner()); + params.getAppOwner(), remoteRootLogDir, remoteRootLogDirSuffix); } catch (RuntimeException e) { throw e; } catch (Exception ex) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogDeletionService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogDeletionService.java index f6855d6977..daa2fc6b01 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogDeletionService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogDeletionService.java @@ -18,6 +18,8 @@ package org.apache.hadoop.yarn.logaggregation; +import static org.apache.hadoop.yarn.conf.YarnConfiguration.LOG_AGGREGATION_FILE_CONTROLLER_FMT; + import java.io.IOException; import java.net.URI; import java.util.Arrays; @@ -37,6 +39,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.logaggregation.filecontroller.tfile.LogAggregationTFileController; import org.junit.Before; import org.junit.Test; import org.junit.Assert; @@ -56,7 +59,6 @@ public void testDeletion() throws Exception { long now = System.currentTimeMillis(); long toDeleteTime = now - (2000*1000); long toKeepTime = now - (1500*1000); - String root = "mockfs://foo/"; String remoteRootLogDir = root+"tmp/logs"; String suffix = "logs"; @@ -67,6 +69,10 @@ public void testDeletion() throws Exception { conf.set(YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS, "1800"); conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, remoteRootLogDir); conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX, suffix); + conf.set(YarnConfiguration.LOG_AGGREGATION_FILE_FORMATS, "TFile"); + conf.set(String.format(LOG_AGGREGATION_FILE_CONTROLLER_FMT, "TFile"), + LogAggregationTFileController.class.getName()); + Path rootPath = new Path(root); FileSystem rootFs = rootPath.getFileSystem(conf); @@ -211,6 +217,10 @@ public void testRefreshLogRetentionSettings() throws Exception { "1"); conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, remoteRootLogDir); conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX, suffix); + conf.set(YarnConfiguration.LOG_AGGREGATION_FILE_FORMATS, "TFile"); + conf.set(String.format(LOG_AGGREGATION_FILE_CONTROLLER_FMT, "TFile"), + LogAggregationTFileController.class.getName()); + Path rootPath = new Path(root); FileSystem rootFs = rootPath.getFileSystem(conf); @@ -335,6 +345,10 @@ public void testCheckInterval() throws Exception { conf.set(YarnConfiguration.LOG_AGGREGATION_RETAIN_CHECK_INTERVAL_SECONDS, "1"); conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, remoteRootLogDir); conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX, suffix); + conf.set(YarnConfiguration.LOG_AGGREGATION_FILE_FORMATS, "TFile"); + conf.set(String.format(LOG_AGGREGATION_FILE_CONTROLLER_FMT, "TFile"), + LogAggregationTFileController.class.getName()); + // prevent us from picking up the same mockfs instance from another test FileSystem.closeAll(); @@ -437,6 +451,9 @@ public void testRobustLogDeletion() throws Exception { "1"); conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, remoteRootLogDir); conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX, suffix); + conf.set(YarnConfiguration.LOG_AGGREGATION_FILE_FORMATS, "TFile"); + conf.set(String.format(LOG_AGGREGATION_FILE_CONTROLLER_FMT, "TFile"), + LogAggregationTFileController.class.getName()); // prevent us from picking up the same mockfs instance from another test FileSystem.closeAll(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogsBlock.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogsBlock.java index bee34e0735..1135f9e7e8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogsBlock.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogsBlock.java @@ -378,7 +378,7 @@ private static class TFileAggregatedLogsBlockForTest @Inject TFileAggregatedLogsBlockForTest(ViewContext ctx, Configuration conf) { - super(ctx, conf); + super(ctx, conf, new Path("target/logs"), "logs"); } public void render(Block html) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestContainerLogsUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestContainerLogsUtils.java index 231e0e21ab..e1f1914d3a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestContainerLogsUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestContainerLogsUtils.java @@ -35,7 +35,6 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeId; -import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileController; import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerContext; import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerFactory; @@ -87,9 +86,14 @@ public static void createContainerLogFileInRemoteFS(Configuration conf, createContainerLogInLocalDir(appLogsDir, containerId, fs, fileName, content); // upload container logs to remote log dir - Path path = LogAggregationUtils.getRemoteAppLogDir( - new Path(conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR)), - appId, user, "logs"); + + LogAggregationFileControllerFactory factory = + new LogAggregationFileControllerFactory(conf); + LogAggregationFileController fileController = + factory.getFileControllerForWrite(); + + Path path = fileController.getRemoteAppLogDir(appId, user); + if (fs.exists(path) && deleteRemoteLogDir) { fs.delete(path, true); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java index adcec8db36..484cad13f8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java @@ -695,13 +695,12 @@ public void testRemoteRootLogDirIsCreatedWithCorrectGroupOwner() @Test public void testAppLogDirCreation() throws Exception { - final String logSuffix = "bucket_logs"; - final String inputSuffix = "logs"; + final String inputSuffix = "logs-tfile"; this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath()); this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, this.remoteRootLogDir.getAbsolutePath()); - this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX, inputSuffix); + this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX, "logs"); InlineDispatcher dispatcher = new InlineDispatcher(); dispatcher.init(this.conf); @@ -734,10 +733,10 @@ public LogAggregationFileController getLogAggregationFileController( ApplicationId appId = BuilderUtils.newApplicationId(1, 1); Path userDir = fs.makeQualified(new Path( remoteRootLogDir.getAbsolutePath(), this.user)); - Path suffixDir = new Path(userDir, logSuffix); Path bucketDir = fs.makeQualified(LogAggregationUtils.getRemoteBucketDir( new Path(remoteRootLogDir.getAbsolutePath()), this.user, inputSuffix, appId)); + Path suffixDir = bucketDir.getParent(); Path appDir = fs.makeQualified(LogAggregationUtils.getRemoteAppLogDir( new Path(remoteRootLogDir.getAbsolutePath()), appId, this.user, inputSuffix)); @@ -775,13 +774,12 @@ public LogAggregationFileController getLogAggregationFileController( // Verify we do not create bucket dir again ApplicationId appId4 = BuilderUtils.newApplicationId(2, 10003); - Path bucketDir4 = fs.makeQualified(LogAggregationUtils.getRemoteBucketDir( - new Path(remoteRootLogDir.getAbsolutePath()), - this.user, logSuffix, appId4)); - new File(bucketDir4.toUri().getPath()).mkdir(); Path appDir4 = fs.makeQualified(LogAggregationUtils.getRemoteAppLogDir( new Path(remoteRootLogDir.getAbsolutePath()), appId4, this.user, inputSuffix)); + Path bucketDir4 = appDir4.getParent(); + new File(bucketDir4.toUri().getPath()).mkdir(); + aggSvc.handle(new LogHandlerAppStartedEvent(appId4, this.user, null, this.acls, contextWithAllContainers)); verify(spyFs, never()).mkdirs(eq(bucketDir4), isA(FsPermission.class));