YARN-9558. Fixed LogAggregation test cases.

Contributed by Prabhu Joseph
This commit is contained in:
Eric Yang 2019-05-23 18:38:47 -04:00
parent 4b099b8b89
commit 460ba7fb14
10 changed files with 237 additions and 141 deletions

View File

@ -18,6 +18,10 @@
package org.apache.hadoop.yarn.client.cli; package org.apache.hadoop.yarn.client.cli;
import static org.apache.hadoop.yarn.conf.YarnConfiguration.LOG_AGGREGATION_FILE_CONTROLLER_FMT;
import static org.apache.hadoop.yarn.conf.YarnConfiguration.LOG_AGGREGATION_REMOTE_APP_LOG_DIR_FMT;
import static org.apache.hadoop.yarn.conf.YarnConfiguration.LOG_AGGREGATION_REMOTE_APP_LOG_DIR_SUFFIX_FMT;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
@ -82,6 +86,7 @@
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileController; import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileController;
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerContext; import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerContext;
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerFactory; import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerFactory;
import org.apache.hadoop.yarn.logaggregation.filecontroller.ifile.LogAggregationIndexedFileController;
import org.codehaus.jettison.json.JSONArray; import org.codehaus.jettison.json.JSONArray;
import org.codehaus.jettison.json.JSONObject; import org.codehaus.jettison.json.JSONObject;
import org.junit.Assert; import org.junit.Assert;
@ -407,7 +412,7 @@ public void testFetchFinishedApplictionLogs() throws Exception {
Path path = Path path =
new Path(remoteLogRootDir + ugi.getShortUserName() new Path(remoteLogRootDir + ugi.getShortUserName()
+ "/bucket_logs/0001/application_0_0001"); + "/bucket-logs-tfile/0001/application_0_0001");
if (fs.exists(path)) { if (fs.exists(path)) {
fs.delete(path, true); fs.delete(path, true);
} }
@ -966,8 +971,8 @@ public void testFetchApplictionLogsAsAnotherUser() throws Exception {
createContainerLogInLocalDir(appLogsDir, containerId, fs, logTypes); createContainerLogInLocalDir(appLogsDir, containerId, fs, logTypes);
// create the remote app dir for app but for a different user testUser // create the remote app dir for app but for a different user testUser
Path path = new Path(remoteLogRootDir + testUser + "/bucket_logs/0001/" Path path = new Path(remoteLogRootDir + testUser +
+ appId); "/bucket-logs-tfile/0001/" + appId);
if (fs.exists(path)) { if (fs.exists(path)) {
fs.delete(path, true); fs.delete(path, true);
} }
@ -1049,7 +1054,7 @@ public void testFetchApplictionLogsAsAnotherUser() throws Exception {
System.currentTimeMillis(), 1000); System.currentTimeMillis(), 1000);
String priorityUser = "priority"; String priorityUser = "priority";
Path pathWithoutPerm = new Path(remoteLogRootDir + priorityUser Path pathWithoutPerm = new Path(remoteLogRootDir + priorityUser
+ "/bucket_logs/1000/" + appTest); + "/bucket-logs-tfile/1000/" + appTest);
if (fs.exists(pathWithoutPerm)) { if (fs.exists(pathWithoutPerm)) {
fs.delete(pathWithoutPerm, true); fs.delete(pathWithoutPerm, true);
} }
@ -1139,6 +1144,84 @@ public void testLogsCLIWithInvalidArgs() throws Exception {
} }
} }
@Test (timeout = 5000)
public void testGuessAppOwnerWithCustomSuffix() throws Exception {
String remoteLogRootDir = "target/logs/";
String jobUser = "user1";
String loggedUser = "user2";
Configuration conf = new YarnConfiguration();
conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true);
conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, remoteLogRootDir);
conf.setBoolean(YarnConfiguration.YARN_ACL_ENABLE, true);
conf.set(YarnConfiguration.YARN_ADMIN_ACL, "admin");
String controllerName = "indexed";
conf.set(YarnConfiguration.LOG_AGGREGATION_FILE_FORMATS, controllerName);
conf.set(String.format(LOG_AGGREGATION_FILE_CONTROLLER_FMT,
controllerName), LogAggregationIndexedFileController.class.getName());
conf.set(String.format(LOG_AGGREGATION_REMOTE_APP_LOG_DIR_SUFFIX_FMT,
controllerName), controllerName);
FileSystem fs = FileSystem.get(conf);
try {
// Test New App Log Dir Struture (after YARN-6929) with Custom Suffix
ApplicationId appId1 = ApplicationId.newInstance(0, 1);
Path path = new Path(remoteLogRootDir + jobUser + "/bucket-indexed/0001/"
+ appId1);
if (fs.exists(path)) {
fs.delete(path, true);
}
assertTrue(fs.mkdirs(path));
String appOwner = LogCLIHelpers.getOwnerForAppIdOrNull(appId1,
loggedUser, conf);
assertThat(appOwner).isEqualTo(jobUser);
// Test Old App Log Dir Struture (before YARN-6929) with Custom Suffix
ApplicationId appId2 = ApplicationId.newInstance(0, 2);
path = new Path(remoteLogRootDir + jobUser + "/indexed/" + appId2);
if (fs.exists(path)) {
fs.delete(path, true);
}
assertTrue(fs.mkdirs(path));
appOwner = LogCLIHelpers.getOwnerForAppIdOrNull(appId2, loggedUser, conf);
assertThat(appOwner).isEqualTo(jobUser);
} finally {
fs.delete(new Path(remoteLogRootDir), true);
}
}
@Test (timeout = 5000)
public void testGuessAppOwnerWithCustomAppLogDir() throws Exception {
String remoteLogRootDir = "target/logs/";
String remoteLogRootDir1 = "target/logs1/";
String jobUser = "user1";
String loggedUser = "user2";
Configuration conf = new YarnConfiguration();
conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true);
conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, remoteLogRootDir);
String controllerName = "indexed";
conf.set(YarnConfiguration.LOG_AGGREGATION_FILE_FORMATS, controllerName);
conf.set(String.format(LOG_AGGREGATION_FILE_CONTROLLER_FMT,
controllerName), LogAggregationIndexedFileController.class.getName());
conf.set(String.format(LOG_AGGREGATION_REMOTE_APP_LOG_DIR_FMT,
controllerName), remoteLogRootDir1);
FileSystem fs = FileSystem.get(conf);
try {
// Test Custom App Log Dir Structure
ApplicationId appId1 = ApplicationId.newInstance(0, 3);
Path path = new Path(remoteLogRootDir1 + jobUser +
"/bucket-logs-indexed/0003/" + appId1);
if (fs.exists(path)) {
fs.delete(path, true);
}
assertTrue(fs.mkdirs(path));
String appOwner = LogCLIHelpers.getOwnerForAppIdOrNull(appId1,
loggedUser, conf);
assertThat(appOwner).isEqualTo(jobUser);
} finally {
fs.delete(new Path(remoteLogRootDir1), true);
}
}
@Test (timeout = 15000) @Test (timeout = 15000)
public void testSaveContainerLogsLocally() throws Exception { public void testSaveContainerLogsLocally() throws Exception {
@ -1407,7 +1490,7 @@ public void testFetchApplictionLogsHar() throws Exception {
assertNotNull(harUrl); assertNotNull(harUrl);
Path path = Path path =
new Path(remoteLogRootDir + ugi.getShortUserName() new Path(remoteLogRootDir + ugi.getShortUserName()
+ "/logs/application_1440536969523_0001"); + "/bucket-logs-tfile/0001/application_1440536969523_0001");
if (fs.exists(path)) { if (fs.exists(path)) {
fs.delete(path, true); fs.delete(path, true);
} }
@ -1468,7 +1551,7 @@ private void createContainerLogs(Configuration configuration,
} }
Path path = Path path =
new Path(remoteLogRootDir + ugi.getShortUserName() new Path(remoteLogRootDir + ugi.getShortUserName()
+ "/logs/application_0_0001"); + "/bucket-logs-tfile/0001/application_0_0001");
if (fs.exists(path)) { if (fs.exists(path)) {
fs.delete(path, true); fs.delete(path, true);

View File

@ -22,6 +22,8 @@
import java.util.Timer; import java.util.Timer;
import java.util.TimerTask; import java.util.TimerTask;
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerFactory;
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileController;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
@ -68,9 +70,11 @@ public LogDeletionTask(Configuration conf, long retentionSecs, ApplicationClient
this.conf = conf; this.conf = conf;
this.retentionMillis = retentionSecs * 1000; this.retentionMillis = retentionSecs * 1000;
this.suffix = LogAggregationUtils.getBucketSuffix(); this.suffix = LogAggregationUtils.getBucketSuffix();
this.remoteRootLogDir = LogAggregationFileControllerFactory factory =
new Path(conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, new LogAggregationFileControllerFactory(conf);
YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR)); LogAggregationFileController fileController =
factory.getFileControllerForWrite();
this.remoteRootLogDir = fileController.getRemoteRootLogDir();
this.rmClient = rmClient; this.rmClient = rmClient;
} }

View File

@ -38,7 +38,7 @@
public class LogAggregationUtils { public class LogAggregationUtils {
public static final String TMP_FILE_SUFFIX = ".tmp"; public static final String TMP_FILE_SUFFIX = ".tmp";
private static final String BUCKET_SUFFIX = "bucket_"; private static final String BUCKET_SUFFIX = "bucket-";
/** /**
* Constructs the full filename for an application's log file per node. * Constructs the full filename for an application's log file per node.
@ -173,16 +173,6 @@ public static boolean isOlderPathEnabled(Configuration conf) {
YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR_INCLUDE_OLDER); YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR_INCLUDE_OLDER);
} }
/**
* Returns the suffix component of the log dir.
* @param conf the configuration
* @return the suffix which will be appended to the user log dir.
*/
public static String getRemoteNodeLogDirSuffix(Configuration conf) {
return conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX,
YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR_SUFFIX);
}
/** /**
* Returns the bucket suffix component of the log dir. * Returns the bucket suffix component of the log dir.
* @return the bucket suffix which appended to user log dir * @return the bucket suffix which appended to user log dir
@ -207,25 +197,6 @@ public static String getNodeString(String nodeId) {
return nodeId.toString().replace(":", "_"); return nodeId.toString().replace(":", "_");
} }
/**
* Return the remote application log directory.
* @param conf the configuration
* @param appId the application
* @param appOwner the application owner
* @return the remote application log directory path
* @throws IOException if we can not find remote application log directory
*/
public static org.apache.hadoop.fs.Path getRemoteAppLogDir(
Configuration conf, ApplicationId appId, String appOwner)
throws IOException {
String suffix = LogAggregationUtils.getRemoteNodeLogDirSuffix(conf);
org.apache.hadoop.fs.Path remoteRootLogDir =
new org.apache.hadoop.fs.Path(conf.get(
YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR));
return getRemoteAppLogDir(conf, appId, appOwner, remoteRootLogDir, suffix);
}
/** /**
* Return the remote application log directory. * Return the remote application log directory.
* @param conf the configuration * @param conf the configuration
@ -397,22 +368,4 @@ public static List<FileStatus> getRemoteNodeFileList(
return nodeFiles; return nodeFiles;
} }
/**
* Get all available log files under remote app log directory.
* @param conf the configuration
* @param appId the applicationId
* @param appOwner the application owner
* @return the iterator of available log files
* @throws IOException if there is no log file available
*/
public static RemoteIterator<FileStatus> getRemoteNodeFileDir(
Configuration conf, ApplicationId appId, String appOwner)
throws IOException {
String suffix = LogAggregationUtils.getRemoteNodeLogDirSuffix(conf);
Path remoteRootLogDir = new Path(conf.get(
YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR));
return getRemoteNodeFileDir(conf, appId, appOwner,
remoteRootLogDir, suffix);
}
} }

View File

@ -68,6 +68,75 @@ public int dumpAContainersLogs(String appId, String containerId,
return dumpAContainerLogsForLogType(options, false); return dumpAContainerLogsForLogType(options, false);
} }
public static String guessOwnerWithFileFormat(
LogAggregationFileController fileFormat, ApplicationId appId,
String bestGuess, Configuration conf) throws IOException {
boolean scanOldPath = LogAggregationUtils.isOlderPathEnabled(conf);
Path remoteRootLogDir = fileFormat.getRemoteRootLogDir();
String suffix = fileFormat.getRemoteRootLogDirSuffix();
Path fullPath = fileFormat.getRemoteAppLogDir(appId, bestGuess);
FileContext fc =
FileContext.getFileContext(remoteRootLogDir.toUri(), conf);
String pathAccess = fullPath.toString();
try {
if (fc.util().exists(fullPath)) {
return bestGuess;
}
if (scanOldPath) {
Path olderAppPath = fileFormat.getOlderRemoteAppLogDir(appId,
bestGuess);
pathAccess = olderAppPath.toString();
if (fc.util().exists(olderAppPath)) {
return bestGuess;
}
}
} catch (AccessControlException | AccessDeniedException ex) {
logDirNoAccessPermission(pathAccess, bestGuess, ex.getMessage());
throw ex;
}
try {
Path toMatch = fileFormat.getRemoteAppLogDir(appId, null);
FileStatus[] matching = fc.util().globStatus(toMatch);
if (matching != null && matching.length == 1) {
//fetch user from new path /app-logs/user[/suffix]/bucket/app_id
Path parent = matching[0].getPath().getParent();
//skip the suffix too
if (suffix != null && !StringUtils.isEmpty(suffix)) {
parent = parent.getParent();
}
//skip the bucket
parent = parent.getParent();
return parent.getName();
}
} catch (IOException e) {
// Ignore IOException thrown from wrong file format
}
if (scanOldPath) {
try {
Path toMatch = fileFormat.getOlderRemoteAppLogDir(appId, null);
FileStatus[] matching = fc.util().globStatus(toMatch);
if (matching != null && matching.length == 1) {
//fetch user from old path /app-logs/user[/suffix]/app_id
Path parent = matching[0].getPath().getParent();
//skip the suffix too
if (suffix != null && !StringUtils.isEmpty(suffix)) {
parent = parent.getParent();
}
return parent.getName();
}
} catch (IOException e) {
// Ignore IOException thrown from wrong file format
}
}
return null;
}
@Private @Private
@VisibleForTesting @VisibleForTesting
/** /**
@ -81,70 +150,31 @@ public int dumpAContainersLogs(String appId, String containerId,
*/ */
public static String getOwnerForAppIdOrNull( public static String getOwnerForAppIdOrNull(
ApplicationId appId, String bestGuess, ApplicationId appId, String bestGuess,
Configuration conf) throws IOException { Configuration conf) {
Path remoteRootLogDir = new Path(conf.get( LogAggregationFileControllerFactory factory =
YarnConfiguration.NM_REMOTE_APP_LOG_DIR, new LogAggregationFileControllerFactory(conf);
YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR)); List<LogAggregationFileController> fileControllers = factory
String suffix = LogAggregationUtils.getRemoteNodeLogDirSuffix(conf); .getConfiguredLogAggregationFileControllerList();
Path fullPath = LogAggregationUtils.getRemoteAppLogDir(remoteRootLogDir,
appId, bestGuess, suffix);
FileContext fc = if (fileControllers != null && !fileControllers.isEmpty()) {
FileContext.getFileContext(remoteRootLogDir.toUri(), conf); String owner = null;
String pathAccess = fullPath.toString(); for (LogAggregationFileController fileFormat : fileControllers) {
try { try {
if (fc.util().exists(fullPath)) { owner = guessOwnerWithFileFormat(fileFormat, appId, bestGuess, conf);
return bestGuess; if (owner != null) {
} return owner;
boolean scanOldPath = LogAggregationUtils.isOlderPathEnabled(conf);
if (scanOldPath) {
Path olderAppPath = LogAggregationUtils.getOlderRemoteAppLogDir(appId,
bestGuess, remoteRootLogDir, suffix);
if (fc.util().exists(olderAppPath)) {
return bestGuess;
}
}
Path toMatch = LogAggregationUtils.
getRemoteAppLogDir(remoteRootLogDir, appId, "*", suffix);
pathAccess = toMatch.toString();
FileStatus[] matching = fc.util().globStatus(toMatch);
if (matching == null || matching.length != 1) {
if (scanOldPath) {
toMatch = LogAggregationUtils.getOlderRemoteAppLogDir(appId, "*",
remoteRootLogDir, suffix);
try {
matching = fc.util().globStatus(toMatch);
if (matching != null && matching.length == 1) {
//fetch the user from the old path /app-logs/user[/suffix]/app_id
Path parent = matching[0].getPath().getParent();
//skip the suffix too
if (suffix != null && !StringUtils.isEmpty(suffix)) {
parent = parent.getParent();
}
return parent.getName();
}
} catch (IOException e) {
// Ignore IOException from accessing older app log dir
} }
} catch (IOException e) {
return null;
} }
return null;
} }
//fetch the user from the full path /app-logs/user[/suffix]/bucket/app_id } else {
Path parent = matching[0].getPath().getParent(); System.err.println("Can not find any valid fileControllers. " +
//skip the suffix too " The configurated fileControllers: " +
if (suffix != null && !StringUtils.isEmpty(suffix)) { YarnConfiguration.LOG_AGGREGATION_FILE_FORMATS);
parent = parent.getParent();
}
//skip the bucket
parent = parent.getParent();
return parent.getName();
} catch (AccessControlException | AccessDeniedException ex) {
logDirNoAccessPermission(pathAccess, bestGuess, ex.getMessage());
return null;
} }
return null;
} }
@Private @Private
@ -215,7 +245,8 @@ public int dumpAllContainersLogs(ContainerLogsRequest options)
} }
if (!foundAnyLogs) { if (!foundAnyLogs) {
emptyLogDir(LogAggregationUtils.getRemoteAppLogDir( emptyLogDir(LogAggregationUtils.getRemoteAppLogDir(
conf, options.getAppId(), options.getAppOwner()) conf, options.getAppId(), options.getAppOwner(),
fc.getRemoteRootLogDir(), fc.getRemoteRootLogDirSuffix())
.toString()); .toString());
return -1; return -1;
} }
@ -286,12 +317,11 @@ public void printNodesList(ContainerLogsRequest options,
appOwner, fileFormat.getRemoteRootLogDir(), appOwner, fileFormat.getRemoteRootLogDir(),
fileFormat.getRemoteRootLogDirSuffix()); fileFormat.getRemoteRootLogDirSuffix());
} catch (FileNotFoundException fnf) { } catch (FileNotFoundException fnf) {
logDirNotExist(LogAggregationUtils.getRemoteAppLogDir( logDirNotExist(fileFormat.getRemoteAppLogDir(appId,
conf, appId, appOwner).toString()); appOwner).toString());
} catch (AccessControlException | AccessDeniedException ace) { } catch (AccessControlException | AccessDeniedException ace) {
logDirNoAccessPermission(LogAggregationUtils.getRemoteAppLogDir( logDirNoAccessPermission(fileFormat.getRemoteAppLogDir(appId,
conf, appId, appOwner).toString(), appOwner, appOwner).toString(), appOwner, ace.getMessage());
ace.getMessage());
} }
if (nodeFiles == null) { if (nodeFiles == null) {
return; return;

View File

@ -173,7 +173,8 @@ public boolean readAggregatedLogs(ContainerLogsRequest logRequest,
|| containerIdStr.isEmpty()); || containerIdStr.isEmpty());
long size = logRequest.getBytes(); long size = logRequest.getBytes();
RemoteIterator<FileStatus> nodeFiles = LogAggregationUtils RemoteIterator<FileStatus> nodeFiles = LogAggregationUtils
.getRemoteNodeFileDir(conf, appId, logRequest.getAppOwner()); .getRemoteNodeFileDir(conf, appId, logRequest.getAppOwner(),
remoteRootLogDir, remoteRootLogDirSuffix);
byte[] buf = new byte[65535]; byte[] buf = new byte[65535];
while (nodeFiles != null && nodeFiles.hasNext()) { while (nodeFiles != null && nodeFiles.hasNext()) {
final FileStatus thisNodeFile = nodeFiles.next(); final FileStatus thisNodeFile = nodeFiles.next();
@ -267,7 +268,8 @@ public List<ContainerLogMeta> readAggregatedLogsMeta(
String nodeIdStr = (nodeId == null) ? null String nodeIdStr = (nodeId == null) ? null
: LogAggregationUtils.getNodeString(nodeId); : LogAggregationUtils.getNodeString(nodeId);
RemoteIterator<FileStatus> nodeFiles = LogAggregationUtils RemoteIterator<FileStatus> nodeFiles = LogAggregationUtils
.getRemoteNodeFileDir(conf, appId, appOwner); .getRemoteNodeFileDir(conf, appId, appOwner,
remoteRootLogDir, remoteRootLogDirSuffix);
if (nodeFiles == null) { if (nodeFiles == null) {
throw new IOException("There is no available log file for " throw new IOException("There is no available log file for "
+ "application:" + appId); + "application:" + appId);
@ -331,7 +333,7 @@ public List<ContainerLogMeta> readAggregatedLogsMeta(
@Override @Override
public void renderAggregatedLogsBlock(Block html, ViewContext context) { public void renderAggregatedLogsBlock(Block html, ViewContext context) {
TFileAggregatedLogsBlock block = new TFileAggregatedLogsBlock( TFileAggregatedLogsBlock block = new TFileAggregatedLogsBlock(
context, conf); context, conf, remoteRootLogDir, remoteRootLogDirSuffix);
block.render(html); block.render(html);
} }

View File

@ -48,11 +48,16 @@
public class TFileAggregatedLogsBlock extends LogAggregationHtmlBlock { public class TFileAggregatedLogsBlock extends LogAggregationHtmlBlock {
private final Configuration conf; private final Configuration conf;
private Path remoteRootLogDir;
private String remoteRootLogDirSuffix;
@Inject @Inject
public TFileAggregatedLogsBlock(ViewContext ctx, Configuration conf) { public TFileAggregatedLogsBlock(ViewContext ctx, Configuration conf,
Path remoteRootLogDir, String remoteRootLogDirSuffix) {
super(ctx); super(ctx);
this.conf = conf; this.conf = conf;
this.remoteRootLogDir = remoteRootLogDir;
this.remoteRootLogDirSuffix = remoteRootLogDirSuffix;
} }
@Override @Override
@ -67,7 +72,7 @@ protected void render(Block html) {
try { try {
nodeFiles = LogAggregationUtils nodeFiles = LogAggregationUtils
.getRemoteNodeFileDir(conf, params.getAppId(), .getRemoteNodeFileDir(conf, params.getAppId(),
params.getAppOwner()); params.getAppOwner(), remoteRootLogDir, remoteRootLogDirSuffix);
} catch (RuntimeException e) { } catch (RuntimeException e) {
throw e; throw e;
} catch (Exception ex) { } catch (Exception ex) {

View File

@ -18,6 +18,8 @@
package org.apache.hadoop.yarn.logaggregation; package org.apache.hadoop.yarn.logaggregation;
import static org.apache.hadoop.yarn.conf.YarnConfiguration.LOG_AGGREGATION_FILE_CONTROLLER_FMT;
import java.io.IOException; import java.io.IOException;
import java.net.URI; import java.net.URI;
import java.util.Arrays; import java.util.Arrays;
@ -37,6 +39,7 @@
import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.YarnApplicationState; import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.logaggregation.filecontroller.tfile.LogAggregationTFileController;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.junit.Assert; import org.junit.Assert;
@ -56,7 +59,6 @@ public void testDeletion() throws Exception {
long now = System.currentTimeMillis(); long now = System.currentTimeMillis();
long toDeleteTime = now - (2000*1000); long toDeleteTime = now - (2000*1000);
long toKeepTime = now - (1500*1000); long toKeepTime = now - (1500*1000);
String root = "mockfs://foo/"; String root = "mockfs://foo/";
String remoteRootLogDir = root+"tmp/logs"; String remoteRootLogDir = root+"tmp/logs";
String suffix = "logs"; String suffix = "logs";
@ -67,6 +69,10 @@ public void testDeletion() throws Exception {
conf.set(YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS, "1800"); conf.set(YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS, "1800");
conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, remoteRootLogDir); conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, remoteRootLogDir);
conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX, suffix); conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX, suffix);
conf.set(YarnConfiguration.LOG_AGGREGATION_FILE_FORMATS, "TFile");
conf.set(String.format(LOG_AGGREGATION_FILE_CONTROLLER_FMT, "TFile"),
LogAggregationTFileController.class.getName());
Path rootPath = new Path(root); Path rootPath = new Path(root);
FileSystem rootFs = rootPath.getFileSystem(conf); FileSystem rootFs = rootPath.getFileSystem(conf);
@ -211,6 +217,10 @@ public void testRefreshLogRetentionSettings() throws Exception {
"1"); "1");
conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, remoteRootLogDir); conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, remoteRootLogDir);
conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX, suffix); conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX, suffix);
conf.set(YarnConfiguration.LOG_AGGREGATION_FILE_FORMATS, "TFile");
conf.set(String.format(LOG_AGGREGATION_FILE_CONTROLLER_FMT, "TFile"),
LogAggregationTFileController.class.getName());
Path rootPath = new Path(root); Path rootPath = new Path(root);
FileSystem rootFs = rootPath.getFileSystem(conf); FileSystem rootFs = rootPath.getFileSystem(conf);
@ -335,6 +345,10 @@ public void testCheckInterval() throws Exception {
conf.set(YarnConfiguration.LOG_AGGREGATION_RETAIN_CHECK_INTERVAL_SECONDS, "1"); conf.set(YarnConfiguration.LOG_AGGREGATION_RETAIN_CHECK_INTERVAL_SECONDS, "1");
conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, remoteRootLogDir); conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, remoteRootLogDir);
conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX, suffix); conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX, suffix);
conf.set(YarnConfiguration.LOG_AGGREGATION_FILE_FORMATS, "TFile");
conf.set(String.format(LOG_AGGREGATION_FILE_CONTROLLER_FMT, "TFile"),
LogAggregationTFileController.class.getName());
// prevent us from picking up the same mockfs instance from another test // prevent us from picking up the same mockfs instance from another test
FileSystem.closeAll(); FileSystem.closeAll();
@ -437,6 +451,9 @@ public void testRobustLogDeletion() throws Exception {
"1"); "1");
conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, remoteRootLogDir); conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, remoteRootLogDir);
conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX, suffix); conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX, suffix);
conf.set(YarnConfiguration.LOG_AGGREGATION_FILE_FORMATS, "TFile");
conf.set(String.format(LOG_AGGREGATION_FILE_CONTROLLER_FMT, "TFile"),
LogAggregationTFileController.class.getName());
// prevent us from picking up the same mockfs instance from another test // prevent us from picking up the same mockfs instance from another test
FileSystem.closeAll(); FileSystem.closeAll();

View File

@ -378,7 +378,7 @@ private static class TFileAggregatedLogsBlockForTest
@Inject @Inject
TFileAggregatedLogsBlockForTest(ViewContext ctx, Configuration conf) { TFileAggregatedLogsBlockForTest(ViewContext ctx, Configuration conf) {
super(ctx, conf); super(ctx, conf, new Path("target/logs"), "logs");
} }
public void render(Block html) { public void render(Block html) {

View File

@ -35,7 +35,6 @@
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileController; import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileController;
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerContext; import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerContext;
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerFactory; import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerFactory;
@ -87,9 +86,14 @@ public static void createContainerLogFileInRemoteFS(Configuration conf,
createContainerLogInLocalDir(appLogsDir, containerId, fs, fileName, createContainerLogInLocalDir(appLogsDir, containerId, fs, fileName,
content); content);
// upload container logs to remote log dir // upload container logs to remote log dir
Path path = LogAggregationUtils.getRemoteAppLogDir(
new Path(conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR)), LogAggregationFileControllerFactory factory =
appId, user, "logs"); new LogAggregationFileControllerFactory(conf);
LogAggregationFileController fileController =
factory.getFileControllerForWrite();
Path path = fileController.getRemoteAppLogDir(appId, user);
if (fs.exists(path) && deleteRemoteLogDir) { if (fs.exists(path) && deleteRemoteLogDir) {
fs.delete(path, true); fs.delete(path, true);
} }

View File

@ -695,13 +695,12 @@ public void testRemoteRootLogDirIsCreatedWithCorrectGroupOwner()
@Test @Test
public void testAppLogDirCreation() throws Exception { public void testAppLogDirCreation() throws Exception {
final String logSuffix = "bucket_logs"; final String inputSuffix = "logs-tfile";
final String inputSuffix = "logs";
this.conf.set(YarnConfiguration.NM_LOG_DIRS, this.conf.set(YarnConfiguration.NM_LOG_DIRS,
localLogDir.getAbsolutePath()); localLogDir.getAbsolutePath());
this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
this.remoteRootLogDir.getAbsolutePath()); this.remoteRootLogDir.getAbsolutePath());
this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX, inputSuffix); this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX, "logs");
InlineDispatcher dispatcher = new InlineDispatcher(); InlineDispatcher dispatcher = new InlineDispatcher();
dispatcher.init(this.conf); dispatcher.init(this.conf);
@ -734,10 +733,10 @@ public LogAggregationFileController getLogAggregationFileController(
ApplicationId appId = BuilderUtils.newApplicationId(1, 1); ApplicationId appId = BuilderUtils.newApplicationId(1, 1);
Path userDir = fs.makeQualified(new Path( Path userDir = fs.makeQualified(new Path(
remoteRootLogDir.getAbsolutePath(), this.user)); remoteRootLogDir.getAbsolutePath(), this.user));
Path suffixDir = new Path(userDir, logSuffix);
Path bucketDir = fs.makeQualified(LogAggregationUtils.getRemoteBucketDir( Path bucketDir = fs.makeQualified(LogAggregationUtils.getRemoteBucketDir(
new Path(remoteRootLogDir.getAbsolutePath()), new Path(remoteRootLogDir.getAbsolutePath()),
this.user, inputSuffix, appId)); this.user, inputSuffix, appId));
Path suffixDir = bucketDir.getParent();
Path appDir = fs.makeQualified(LogAggregationUtils.getRemoteAppLogDir( Path appDir = fs.makeQualified(LogAggregationUtils.getRemoteAppLogDir(
new Path(remoteRootLogDir.getAbsolutePath()), appId, new Path(remoteRootLogDir.getAbsolutePath()), appId,
this.user, inputSuffix)); this.user, inputSuffix));
@ -775,13 +774,12 @@ public LogAggregationFileController getLogAggregationFileController(
// Verify we do not create bucket dir again // Verify we do not create bucket dir again
ApplicationId appId4 = BuilderUtils.newApplicationId(2, 10003); ApplicationId appId4 = BuilderUtils.newApplicationId(2, 10003);
Path bucketDir4 = fs.makeQualified(LogAggregationUtils.getRemoteBucketDir(
new Path(remoteRootLogDir.getAbsolutePath()),
this.user, logSuffix, appId4));
new File(bucketDir4.toUri().getPath()).mkdir();
Path appDir4 = fs.makeQualified(LogAggregationUtils.getRemoteAppLogDir( Path appDir4 = fs.makeQualified(LogAggregationUtils.getRemoteAppLogDir(
new Path(remoteRootLogDir.getAbsolutePath()), new Path(remoteRootLogDir.getAbsolutePath()),
appId4, this.user, inputSuffix)); appId4, this.user, inputSuffix));
Path bucketDir4 = appDir4.getParent();
new File(bucketDir4.toUri().getPath()).mkdir();
aggSvc.handle(new LogHandlerAppStartedEvent(appId4, this.user, null, aggSvc.handle(new LogHandlerAppStartedEvent(appId4, this.user, null,
this.acls, contextWithAllContainers)); this.acls, contextWithAllContainers));
verify(spyFs, never()).mkdirs(eq(bucketDir4), isA(FsPermission.class)); verify(spyFs, never()).mkdirs(eq(bucketDir4), isA(FsPermission.class));