YARN-6929. Improved partition algorithm for yarn remote-app-log-dir.

Contributed by Prabhu Joseph
This commit is contained in:
Eric Yang 2019-04-30 17:04:59 -04:00
parent dead9b4049
commit accb811e57
10 changed files with 397 additions and 121 deletions

View File

@ -1434,13 +1434,20 @@ public static boolean isAclEnabled(Configuration conf) {
public static final String DEFAULT_NM_REMOTE_APP_LOG_DIR = "/tmp/logs";
/**
* The remote log dir will be created at
* NM_REMOTE_APP_LOG_DIR/${user}/NM_REMOTE_APP_LOG_DIR_SUFFIX/${appId}
* The remote log dir will be created at below location.
* NM_REMOTE_APP_LOG_DIR/${user}/bucket_{NM_REMOTE_APP_LOG_DIR_SUFFIX}
* /${bucketDir}/${appId}
*/
public static final String NM_REMOTE_APP_LOG_DIR_SUFFIX =
NM_PREFIX + "remote-app-log-dir-suffix";
public static final String DEFAULT_NM_REMOTE_APP_LOG_DIR_SUFFIX="logs";
/** Specifies whether Older Application Log Directory is included. */
public static final String NM_REMOTE_APP_LOG_DIR_INCLUDE_OLDER =
NM_PREFIX + "remote-app-log-dir-include-older";
public static final boolean DEFAULT_NM_REMOTE_APP_LOG_DIR_INCLUDE_OLDER =
true;
public static final String YARN_LOG_SERVER_URL =
YARN_PREFIX + "log.server.url";

View File

@ -67,7 +67,7 @@ static class LogDeletionTask extends TimerTask {
public LogDeletionTask(Configuration conf, long retentionSecs, ApplicationClientProtocol rmClient) {
this.conf = conf;
this.retentionMillis = retentionSecs * 1000;
this.suffix = LogAggregationUtils.getRemoteNodeLogDirSuffix(conf);
this.suffix = LogAggregationUtils.getBucketSuffix();
this.remoteRootLogDir =
new Path(conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR));
@ -82,8 +82,18 @@ public void run() {
FileSystem fs = remoteRootLogDir.getFileSystem(conf);
for(FileStatus userDir : fs.listStatus(remoteRootLogDir)) {
if(userDir.isDirectory()) {
Path userDirPath = new Path(userDir.getPath(), suffix);
deleteOldLogDirsFrom(userDirPath, cutoffMillis, fs, rmClient);
for (FileStatus suffixDir : fs.listStatus(userDir.getPath())) {
Path suffixDirPath = suffixDir.getPath();
if (suffixDir.isDirectory() && suffixDirPath.getName().
startsWith(suffix)) {
for (FileStatus bucketDir : fs.listStatus(suffixDirPath)) {
if (bucketDir.isDirectory()) {
deleteOldLogDirsFrom(bucketDir.getPath(), cutoffMillis,
fs, rmClient);
}
}
}
}
}
}
} catch (Throwable t) {

View File

@ -38,6 +38,7 @@
public class LogAggregationUtils {
public static final String TMP_FILE_SUFFIX = ".tmp";
private static final String BUCKET_SUFFIX = "bucket_";
/**
* Constructs the full filename for an application's log file per node.
@ -64,8 +65,22 @@ public static Path getRemoteNodeLogFileForApp(Path remoteRootLogDir,
*/
public static Path getRemoteAppLogDir(Path remoteRootLogDir,
ApplicationId appId, String user, String suffix) {
return new Path(getRemoteLogSuffixedDir(remoteRootLogDir, user, suffix),
appId.toString());
return new Path(getRemoteBucketDir(remoteRootLogDir, user, suffix,
appId), appId.toString());
}
/**
* Gets the older remote app log dir.
* @param appId the application id
* @param user the application owner
* @param remoteRootLogDir the aggregated log remote root log dir
* @param suffix the log directory suffix
* @return the remote application specific log dir.
*/
public static Path getOlderRemoteAppLogDir(ApplicationId appId,
String user, Path remoteRootLogDir, String suffix) {
return new Path(getOlderRemoteLogSuffixedDir(remoteRootLogDir, user,
suffix), appId.toString());
}
/**
@ -77,6 +92,19 @@ public static Path getRemoteAppLogDir(Path remoteRootLogDir,
*/
public static Path getRemoteLogSuffixedDir(Path remoteRootLogDir,
String user, String suffix) {
suffix = getBucketSuffix() + suffix;
return new Path(getRemoteLogUserDir(remoteRootLogDir, user), suffix);
}
/**
* Gets the older remote suffixed log dir for the user.
* @param remoteRootLogDir the aggregated log remote root log dir
* @param user the application owner
* @param suffix the log dir suffix
* @return the older remote suffixed log dir.
*/
public static Path getOlderRemoteLogSuffixedDir(Path remoteRootLogDir,
String user, String suffix) {
if (suffix == null || suffix.isEmpty()) {
return getRemoteLogUserDir(remoteRootLogDir, user);
}
@ -94,6 +122,33 @@ public static Path getRemoteLogUserDir(Path remoteRootLogDir, String user) {
return new Path(remoteRootLogDir, user);
}
/**
* Gets the remote log user's bucket dir.
* @param remoteRootLogDir the aggregated log remote root log dir
* @param user the application owner
* @param suffix the log dir suffix
* @param appId the application id
* @return the remote log per user per cluster timestamp per bucket dir.
*/
public static Path getRemoteBucketDir(Path remoteRootLogDir, String user,
String suffix, ApplicationId appId) {
int bucket = appId.getId() % 10000;
String bucketDir = String.format("%04d", bucket);
return new Path(getRemoteLogSuffixedDir(remoteRootLogDir,
user, suffix), bucketDir);
}
/**
* Check if older Application Log Directory has to be included.
* @param conf the configuration
* @return Is Older App Log Dir enabled?
*/
public static boolean isOlderPathEnabled(Configuration conf) {
return conf.getBoolean(YarnConfiguration.
NM_REMOTE_APP_LOG_DIR_INCLUDE_OLDER,
YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR_INCLUDE_OLDER);
}
/**
* Returns the suffix component of the log dir.
* @param conf the configuration
@ -104,6 +159,14 @@ public static String getRemoteNodeLogDirSuffix(Configuration conf) {
YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR_SUFFIX);
}
/**
* Returns the bucket suffix component of the log dir.
* @return the bucket suffix which appended to user log dir
*/
public static String getBucketSuffix() {
return BUCKET_SUFFIX;
}
/**
* Converts a nodeId to a form used in the app log file name.
@ -174,6 +237,24 @@ public static org.apache.hadoop.fs.Path getRemoteAppLogDir(
return remoteAppDir;
}
/**
* Get all available log files under remote app log directory.
* @param conf the configuration
* @param remoteAppLogDir the application log directory
* @param appId the applicationId
* @param appOwner the application owner
* @return the iterator of available log files
* @throws IOException if there is no log file directory
*/
public static RemoteIterator<FileStatus> getNodeFiles(Configuration conf,
Path remoteAppLogDir, ApplicationId appId, String appOwner)
throws IOException {
Path qualifiedLogDir =
FileContext.getFileContext(conf).makeQualified(remoteAppLogDir);
return FileContext.getFileContext(
qualifiedLogDir.toUri(), conf).listStatus(remoteAppLogDir);
}
/**
* Get all available log files under remote app log directory.
* @param conf the configuration
@ -188,14 +269,58 @@ public static RemoteIterator<FileStatus> getRemoteNodeFileDir(
Configuration conf, ApplicationId appId, String appOwner,
org.apache.hadoop.fs.Path remoteRootLogDir, String suffix)
throws IOException {
RemoteIterator<FileStatus> nodeFilesCur= null;
RemoteIterator<FileStatus> nodeFilesPrev = null;
StringBuilder diagnosticsMsg = new StringBuilder();
// Get Node Files from new app log dir
Path remoteAppLogDir = getRemoteAppLogDir(conf, appId, appOwner,
remoteRootLogDir, suffix);
RemoteIterator<FileStatus> nodeFiles = null;
Path qualifiedLogDir =
FileContext.getFileContext(conf).makeQualified(remoteAppLogDir);
nodeFiles = FileContext.getFileContext(qualifiedLogDir.toUri(),
conf).listStatus(remoteAppLogDir);
return nodeFiles;
try {
nodeFilesCur = getNodeFiles(conf, remoteAppLogDir, appId, appOwner);
} catch (IOException ex) {
diagnosticsMsg.append(ex.getMessage() + "\n");
}
// Get Node Files from old app log dir
if (isOlderPathEnabled(conf)) {
remoteAppLogDir = getOlderRemoteAppLogDir(appId, appOwner,
remoteRootLogDir, suffix);
try {
nodeFilesPrev = getNodeFiles(conf,
remoteAppLogDir, appId, appOwner);
} catch (IOException ex) {
diagnosticsMsg.append(ex.getMessage() + "\n");
}
// Return older files if new app log dir does not exist
if (nodeFilesCur == null) {
return nodeFilesPrev;
} else if (nodeFilesPrev != null) {
// Return both new and old node files combined
RemoteIterator<FileStatus> curDir = nodeFilesCur;
RemoteIterator<FileStatus> prevDir = nodeFilesPrev;
RemoteIterator<FileStatus> nodeFilesCombined = new
RemoteIterator<FileStatus>() {
@Override
public boolean hasNext() throws IOException {
return prevDir.hasNext() || curDir.hasNext();
}
@Override
public FileStatus next() throws IOException {
return prevDir.hasNext() ? prevDir.next() : curDir.next();
}
};
return nodeFilesCombined;
}
}
// Error reading from or new app log dir does not exist
if (nodeFilesCur == null) {
throw new IOException(diagnosticsMsg.toString());
}
return nodeFilesCur;
}
/**
@ -212,13 +337,39 @@ public static List<FileStatus> getRemoteNodeFileList(
Configuration conf, ApplicationId appId, String appOwner,
org.apache.hadoop.fs.Path remoteRootLogDir, String suffix)
throws IOException {
StringBuilder diagnosticsMsg = new StringBuilder();
Path remoteAppLogDir = getRemoteAppLogDir(conf, appId, appOwner,
remoteRootLogDir, suffix);
List<FileStatus> nodeFiles = new ArrayList<>();
Path qualifiedLogDir =
FileContext.getFileContext(conf).makeQualified(remoteAppLogDir);
nodeFiles.addAll(Arrays.asList(FileContext.getFileContext(
qualifiedLogDir.toUri(), conf).util().listStatus(remoteAppLogDir)));
// Get Node Files from new app log dir
try {
nodeFiles.addAll(Arrays.asList(FileContext.getFileContext(
qualifiedLogDir.toUri(), conf).util().listStatus(remoteAppLogDir)));
} catch (IOException ex) {
diagnosticsMsg.append(ex.getMessage() + "\n");
}
// Get Node Files from old app log dir
if (isOlderPathEnabled(conf)) {
remoteAppLogDir = getOlderRemoteAppLogDir(appId, appOwner,
remoteRootLogDir, suffix);
qualifiedLogDir = FileContext.getFileContext(conf).
makeQualified(remoteAppLogDir);
try {
nodeFiles.addAll(Arrays.asList(FileContext.getFileContext(
qualifiedLogDir.toUri(), conf).util().listStatus(remoteAppLogDir)));
} catch (IOException ex) {
diagnosticsMsg.append(ex.getMessage() + "\n");
}
}
// Error reading from or new app log dir does not exist
if (nodeFiles.isEmpty()) {
throw new IOException(diagnosticsMsg.toString());
}
return nodeFiles;
}
@ -233,12 +384,11 @@ public static List<FileStatus> getRemoteNodeFileList(
public static RemoteIterator<FileStatus> getRemoteNodeFileDir(
Configuration conf, ApplicationId appId, String appOwner)
throws IOException {
Path remoteAppLogDir = getRemoteAppLogDir(conf, appId, appOwner);
RemoteIterator<FileStatus> nodeFiles = null;
Path qualifiedLogDir =
FileContext.getFileContext(conf).makeQualified(remoteAppLogDir);
nodeFiles = FileContext.getFileContext(qualifiedLogDir.toUri(),
conf).listStatus(remoteAppLogDir);
return nodeFiles;
String suffix = LogAggregationUtils.getRemoteNodeLogDirSuffix(conf);
Path remoteRootLogDir = new Path(conf.get(
YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR));
return getRemoteNodeFileDir(conf, appId, appOwner,
remoteRootLogDir, suffix);
}
}

View File

@ -31,9 +31,11 @@
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
@ -361,32 +363,25 @@ public Object run() throws Exception {
// unnecessary load on the filesystem from all of the nodes
Path appDir = LogAggregationUtils.getRemoteAppLogDir(
remoteRootLogDir, appId, user, remoteRootLogDirSuffix);
appDir = appDir.makeQualified(remoteFS.getUri(),
Path curDir = appDir.makeQualified(remoteFS.getUri(),
remoteFS.getWorkingDirectory());
Path rootLogDir = remoteRootLogDir.makeQualified(remoteFS.getUri(),
remoteFS.getWorkingDirectory());
if (!checkExists(remoteFS, appDir, APP_DIR_PERMISSIONS)) {
Path suffixDir = LogAggregationUtils.getRemoteLogSuffixedDir(
remoteRootLogDir, user, remoteRootLogDirSuffix);
suffixDir = suffixDir.makeQualified(remoteFS.getUri(),
remoteFS.getWorkingDirectory());
LinkedList<Path> pathsToCreate = new LinkedList<>();
if (!checkExists(remoteFS, suffixDir, APP_DIR_PERMISSIONS)) {
Path userDir = LogAggregationUtils.getRemoteLogUserDir(
remoteRootLogDir, user);
userDir = userDir.makeQualified(remoteFS.getUri(),
remoteFS.getWorkingDirectory());
if (!checkExists(remoteFS, userDir, APP_DIR_PERMISSIONS)) {
createDir(remoteFS, userDir, APP_DIR_PERMISSIONS);
}
createDir(remoteFS, suffixDir, APP_DIR_PERMISSIONS);
while (!curDir.equals(rootLogDir)) {
if (!checkExists(remoteFS, curDir, APP_DIR_PERMISSIONS)) {
pathsToCreate.addFirst(curDir);
curDir = curDir.getParent();
} else {
break;
}
createDir(remoteFS, appDir, APP_DIR_PERMISSIONS);
}
for (Path path : pathsToCreate) {
createDir(remoteFS, path, APP_DIR_PERMISSIONS);
}
} catch (IOException e) {
LOG.error("Failed to setup application log directory for "
+ appId, e);
@ -411,7 +406,6 @@ protected FileSystem getFileSystem(Configuration conf) throws IOException {
protected void createDir(FileSystem fs, Path path, FsPermission fsPerm)
throws IOException {
if (fsSupportsChmod) {
FsPermission dirPerm = new FsPermission(fsPerm);
fs.mkdirs(path, dirPerm);
@ -467,6 +461,19 @@ public Path getRemoteAppLogDir(ApplicationId appId, String appOwner)
this.remoteRootLogDir, this.remoteRootLogDirSuffix);
}
/**
* Get the older remote application directory for log aggregation.
* @param appId the Application ID
* @param appOwner the Application Owner
* @return the older remote application directory
* @throws IOException if can not find the remote application directory
*/
public Path getOlderRemoteAppLogDir(ApplicationId appId, String appOwner)
throws IOException {
return LogAggregationUtils.getOlderRemoteAppLogDir(appId, appOwner,
this.remoteRootLogDir, this.remoteRootLogDirSuffix);
}
protected void cleanOldLogs(Path remoteNodeLogFileForApp,
final NodeId nodeId, UserGroupInformation userUgi) {
try {

View File

@ -34,13 +34,11 @@
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
/**
* Use {@code LogAggregationFileControllerFactory} to get the correct
@ -159,24 +157,39 @@ public LogAggregationFileController getFileControllerForWrite() {
*/
public LogAggregationFileController getFileControllerForRead(
ApplicationId appId, String appOwner) throws IOException {
StringBuilder diagnosis = new StringBuilder();
for(LogAggregationFileController fileController : controllers) {
StringBuilder diagnosticsMsg = new StringBuilder();
if (LogAggregationUtils.isOlderPathEnabled(conf)) {
for (LogAggregationFileController fileController : controllers) {
try {
Path remoteAppLogDir = fileController.getOlderRemoteAppLogDir(appId,
appOwner);
if (LogAggregationUtils.getNodeFiles(conf, remoteAppLogDir, appId,
appOwner).hasNext()) {
return fileController;
}
} catch (Exception ex) {
diagnosticsMsg.append(ex.getMessage() + "\n");
continue;
}
}
}
for (LogAggregationFileController fileController : controllers) {
try {
Path remoteAppLogDir = fileController.getRemoteAppLogDir(
appId, appOwner);
Path qualifiedLogDir = FileContext.getFileContext(conf).makeQualified(
remoteAppLogDir);
RemoteIterator<FileStatus> nodeFiles = FileContext.getFileContext(
qualifiedLogDir.toUri(), conf).listStatus(remoteAppLogDir);
if (nodeFiles.hasNext()) {
if (LogAggregationUtils.getNodeFiles(conf, remoteAppLogDir,
appId, appOwner).hasNext()) {
return fileController;
}
} catch (Exception ex) {
diagnosis.append(ex.getMessage() + "\n");
diagnosticsMsg.append(ex.getMessage() + "\n");
continue;
}
}
throw new IOException(diagnosis.toString());
throw new IOException(diagnosticsMsg.toString());
}
private boolean validateAggregatedFileControllerName(String name) {

View File

@ -819,6 +819,13 @@ public Path getRemoteAppLogDir(ApplicationId appId, String user)
this.remoteRootLogDir, this.remoteRootLogDirSuffix);
}
@Override
public Path getOlderRemoteAppLogDir(ApplicationId appId, String user)
throws IOException {
return LogAggregationUtils.getOlderRemoteAppLogDir(appId, user,
this.remoteRootLogDir, this.remoteRootLogDirSuffix);
}
@Private
public IndexedLogsMeta loadIndexedLogsMeta(Path remoteLogPath, long end,
ApplicationId appId) throws IOException {

View File

@ -1344,6 +1344,14 @@
<value>logs</value>
</property>
<property>
<description>If set to true, the older application log directory
will be considered while fetching application logs.
</description>
<name>yarn.nodemanager.remote-app-log-dir-include-older</name>
<value>true</value>
</property>
<property>
<description>Generate additional logs about container launches.
Currently, this creates a copy of the launch script and lists the

View File

@ -60,13 +60,14 @@ public void testDeletion() throws Exception {
String root = "mockfs://foo/";
String remoteRootLogDir = root+"tmp/logs";
String suffix = "logs";
String newSuffix = LogAggregationUtils.getBucketSuffix() + suffix;
final Configuration conf = new Configuration();
conf.setClass("fs.mockfs.impl", MockFileSystem.class, FileSystem.class);
conf.set(YarnConfiguration.LOG_AGGREGATION_ENABLED, "true");
conf.set(YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS, "1800");
conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, remoteRootLogDir);
conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX, suffix);
Path rootPath = new Path(root);
FileSystem rootFs = rootPath.getFileSystem(conf);
FileSystem mockFs = ((FilterFileSystem)rootFs).getRawFileSystem();
@ -80,39 +81,52 @@ public void testDeletion() throws Exception {
new FileStatus[]{userDirStatus});
ApplicationId appId1 =
ApplicationId.newInstance(System.currentTimeMillis(), 1);
Path userLogDir = new Path(userDir, suffix);
Path app1Dir = new Path(userLogDir, appId1.toString());
FileStatus app1DirStatus = new FileStatus(0, true, 0, 0, toDeleteTime, app1Dir);
ApplicationId.newInstance(now, 1);
Path suffixDir = new Path(userDir, newSuffix);
FileStatus suffixDirStatus = new FileStatus(0, true,
0, 0, toDeleteTime, suffixDir);
Path bucketDir = LogAggregationUtils.getRemoteBucketDir(
remoteRootLogPath, "me", suffix, appId1);
FileStatus bucketDirStatus = new FileStatus(0, true, 0,
0, toDeleteTime, bucketDir);
Path app1Dir = LogAggregationUtils.getRemoteAppLogDir(
remoteRootLogPath, appId1, "me", suffix);
FileStatus app1DirStatus = new FileStatus(0, true, 0, 0,
toDeleteTime, app1Dir);
ApplicationId appId2 =
ApplicationId.newInstance(System.currentTimeMillis(), 2);
Path app2Dir = new Path(userLogDir, appId2.toString());
FileStatus app2DirStatus = new FileStatus(0, true, 0, 0, toDeleteTime, app2Dir);
ApplicationId.newInstance(now, 2);
Path app2Dir = LogAggregationUtils.getRemoteAppLogDir(
remoteRootLogPath, appId2, "me", suffix);
FileStatus app2DirStatus = new FileStatus(0, true, 0, 0,
toDeleteTime, app2Dir);
ApplicationId appId3 =
ApplicationId.newInstance(System.currentTimeMillis(), 3);
Path app3Dir = new Path(userLogDir, appId3.toString());
FileStatus app3DirStatus = new FileStatus(0, true, 0, 0, toDeleteTime, app3Dir);
ApplicationId.newInstance(now, 3);
Path app3Dir = LogAggregationUtils.getRemoteAppLogDir(
remoteRootLogPath, appId3, "me", suffix);
FileStatus app3DirStatus = new FileStatus(0, true, 0, 0,
toDeleteTime, app3Dir);
ApplicationId appId4 =
ApplicationId.newInstance(System.currentTimeMillis(), 4);
Path app4Dir = new Path(userLogDir, appId4.toString());
FileStatus app4DirStatus = new FileStatus(0, true, 0, 0, toDeleteTime, app4Dir);
ApplicationId appId5 =
ApplicationId.newInstance(System.currentTimeMillis(), 5);
Path app5Dir = new Path(userLogDir, appId5.toString());
FileStatus app5DirStatus =
new FileStatus(0, true, 0, 0, toDeleteTime, app5Dir);
ApplicationId.newInstance(now, 4);
Path app4Dir = LogAggregationUtils.getRemoteAppLogDir(
remoteRootLogPath, appId4, "me", suffix);
FileStatus app4DirStatus =
new FileStatus(0, true, 0, 0, toDeleteTime, app4Dir);
when(mockFs.listStatus(userLogDir)).thenReturn(
new FileStatus[] { app1DirStatus, app2DirStatus, app3DirStatus,
app4DirStatus, app5DirStatus });
when(mockFs.listStatus(userDir)).thenReturn(
new FileStatus[] {suffixDirStatus});
when(mockFs.listStatus(suffixDir)).thenReturn(
new FileStatus[] {bucketDirStatus});
when(mockFs.listStatus(bucketDir)).thenReturn(
new FileStatus[] {app1DirStatus, app2DirStatus,
app3DirStatus, app4DirStatus});
when(mockFs.listStatus(app1Dir)).thenReturn(
new FileStatus[]{});
Path app2Log1 = new Path(app2Dir, "host1");
FileStatus app2Log1Status = new FileStatus(10, false, 1, 1, toDeleteTime, app2Log1);
@ -137,25 +151,16 @@ public void testDeletion() throws Exception {
FileStatus app4Log1Status = new FileStatus(10, false, 1, 1, toDeleteTime, app4Log1);
Path app4Log2 = new Path(app4Dir, "host2");
FileStatus app4Log2Status = new FileStatus(10, false, 1, 1, toDeleteTime, app4Log2);
FileStatus app4Log2Status = new FileStatus(10, false, 1, 1,
toKeepTime, app4Log2);
when(mockFs.listStatus(app4Dir)).thenReturn(
new FileStatus[]{app4Log1Status, app4Log2Status});
Path app5Log1 = new Path(app5Dir, "host1");
FileStatus app5Log1Status = new FileStatus(10, false, 1, 1, toDeleteTime, app5Log1);
Path app5Log2 = new Path(app5Dir, "host2");
FileStatus app5Log2Status = new FileStatus(10, false, 1, 1, toKeepTime, app5Log2);
when(mockFs.listStatus(app5Dir)).thenReturn(
new FileStatus[]{app5Log1Status, app5Log2Status});
final List<ApplicationId> finishedApplications =
Collections.unmodifiableList(Arrays.asList(appId1, appId2, appId3,
appId4));
Collections.unmodifiableList(Arrays.asList(appId1, appId2, appId3));
final List<ApplicationId> runningApplications =
Collections.unmodifiableList(Arrays.asList(appId5));
Collections.unmodifiableList(Arrays.asList(appId4));
AggregatedLogDeletionService deletionService =
new AggregatedLogDeletionService() {
@ -180,10 +185,9 @@ protected void stopRMClient() {
verify(mockFs, timeout(2000)).delete(app1Dir, true);
verify(mockFs, timeout(2000).times(0)).delete(app2Dir, true);
verify(mockFs, timeout(2000)).delete(app3Dir, true);
verify(mockFs, timeout(2000)).delete(app4Dir, true);
verify(mockFs, timeout(2000).times(0)).delete(app5Dir, true);
verify(mockFs, timeout(2000)).delete(app5Log1, true);
verify(mockFs, timeout(2000).times(0)).delete(app5Log2, true);
verify(mockFs, timeout(2000).times(0)).delete(app4Dir, true);
verify(mockFs, timeout(2000)).delete(app4Log1, true);
verify(mockFs, timeout(2000).times(0)).delete(app4Log2, true);
deletionService.stop();
}
@ -198,6 +202,7 @@ public void testRefreshLogRetentionSettings() throws Exception {
String root = "mockfs://foo/";
String remoteRootLogDir = root + "tmp/logs";
String suffix = "logs";
String newSuffix = LogAggregationUtils.getBucketSuffix() + suffix;
final Configuration conf = new Configuration();
conf.setClass("fs.mockfs.impl", MockFileSystem.class, FileSystem.class);
conf.set(YarnConfiguration.LOG_AGGREGATION_ENABLED, "true");
@ -220,24 +225,36 @@ public void testRefreshLogRetentionSettings() throws Exception {
when(mockFs.listStatus(remoteRootLogPath)).thenReturn(
new FileStatus[] { userDirStatus });
Path userLogDir = new Path(userDir, suffix);
Path suffixDir = new Path(userDir, newSuffix);
FileStatus suffixStatus = new FileStatus(0, true, 0, 0, before50Secs,
suffixDir);
ApplicationId appId1 =
ApplicationId.newInstance(System.currentTimeMillis(), 1);
//Set time last modified of app1Dir directory and its files to before2000Secs
Path app1Dir = new Path(userLogDir, appId1.toString());
Path app1Dir = LogAggregationUtils.getRemoteAppLogDir(
remoteRootLogPath, appId1, "me", suffix);
Path bucketDir = LogAggregationUtils.getRemoteBucketDir(
remoteRootLogPath, "me", suffix, appId1);
FileStatus bucketDirStatus = new FileStatus(0, true, 0,
0, before50Secs, bucketDir);
FileStatus app1DirStatus = new FileStatus(0, true, 0, 0, before2000Secs,
app1Dir);
ApplicationId appId2 =
ApplicationId.newInstance(System.currentTimeMillis(), 2);
//Set time last modified of app1Dir directory and its files to before50Secs
Path app2Dir = new Path(userLogDir, appId2.toString());
Path app2Dir = LogAggregationUtils.getRemoteAppLogDir(
remoteRootLogPath, appId2, "me", suffix);
FileStatus app2DirStatus = new FileStatus(0, true, 0, 0, before50Secs,
app2Dir);
when(mockFs.listStatus(userLogDir)).thenReturn(
new FileStatus[] { app1DirStatus, app2DirStatus });
when(mockFs.listStatus(userDir)).thenReturn(
new FileStatus[] {suffixStatus });
when(mockFs.listStatus(suffixDir)).thenReturn(
new FileStatus[] {bucketDirStatus });
when(mockFs.listStatus(bucketDir)).thenReturn(
new FileStatus[] {app1DirStatus, app2DirStatus });
Path app1Log1 = new Path(app1Dir, "host1");
FileStatus app1Log1Status = new FileStatus(10, false, 1, 1, before2000Secs,
@ -310,6 +327,7 @@ public void testCheckInterval() throws Exception {
String root = "mockfs://foo/";
String remoteRootLogDir = root+"tmp/logs";
String suffix = "logs";
String newSuffix = LogAggregationUtils.getBucketSuffix() + suffix;
Configuration conf = new Configuration();
conf.setClass("fs.mockfs.impl", MockFileSystem.class, FileSystem.class);
conf.set(YarnConfiguration.LOG_AGGREGATION_ENABLED, "true");
@ -334,12 +352,24 @@ public void testCheckInterval() throws Exception {
ApplicationId appId1 =
ApplicationId.newInstance(System.currentTimeMillis(), 1);
Path userLogDir = new Path(userDir, suffix);
Path app1Dir = new Path(userLogDir, appId1.toString());
Path suffixDir = new Path(userDir, newSuffix);
FileStatus suffixDirStatus = new FileStatus(0, true, 0, 0, now,
suffixDir);
Path bucketDir = LogAggregationUtils.getRemoteBucketDir(
remoteRootLogPath, "me", suffix, appId1);
Path app1Dir = LogAggregationUtils.getRemoteAppLogDir(
remoteRootLogPath, appId1, "me", suffix);
FileStatus bucketDirStatus = new FileStatus(0, true, 0,
0, now, bucketDir);
FileStatus app1DirStatus = new FileStatus(0, true, 0, 0, now, app1Dir);
when(mockFs.listStatus(userLogDir)).thenReturn(
new FileStatus[]{app1DirStatus});
when(mockFs.listStatus(userDir)).thenReturn(
new FileStatus[] {suffixDirStatus});
when(mockFs.listStatus(suffixDir)).thenReturn(
new FileStatus[] {bucketDirStatus});
when(mockFs.listStatus(bucketDir)).thenReturn(
new FileStatus[] {app1DirStatus});
Path app1Log1 = new Path(app1Dir, "host1");
FileStatus app1Log1Status = new FileStatus(10, false, 1, 1, now, app1Log1);
@ -373,10 +403,15 @@ protected void stopRMClient() {
verify(mockFs, never()).delete(app1Dir, true);
// modify the timestamp of the logs and verify it's picked up quickly
bucketDirStatus = new FileStatus(0, true, 0, 0, toDeleteTime, bucketDir);
app1DirStatus = new FileStatus(0, true, 0, 0, toDeleteTime, app1Dir);
app1Log1Status = new FileStatus(10, false, 1, 1, toDeleteTime, app1Log1);
when(mockFs.listStatus(userLogDir)).thenReturn(
new FileStatus[]{app1DirStatus});
when(mockFs.listStatus(userDir)).thenReturn(
new FileStatus[] {suffixDirStatus});
when(mockFs.listStatus(suffixDir)).thenReturn(
new FileStatus[] {bucketDirStatus });
when(mockFs.listStatus(bucketDir)).thenReturn(
new FileStatus[] {app1DirStatus });
when(mockFs.listStatus(app1Dir)).thenReturn(
new FileStatus[]{app1Log1Status});
@ -392,6 +427,7 @@ public void testRobustLogDeletion() throws Exception {
String root = "mockfs://foo/";
String remoteRootLogDir = root+"tmp/logs";
String suffix = "logs";
String newSuffix = LogAggregationUtils.getBucketSuffix() + suffix;
Configuration conf = new Configuration();
conf.setClass("fs.mockfs.impl", MockFileSystem.class,
FileSystem.class);
@ -411,27 +447,36 @@ public void testRobustLogDeletion() throws Exception {
Path remoteRootLogPath = new Path(remoteRootLogDir);
Path userDir = new Path(remoteRootLogPath, "me");
Path suffixDir = new Path(userDir, newSuffix);
FileStatus userDirStatus = new FileStatus(0, true, 0, 0, 0, userDir);
FileStatus suffixStatus = new FileStatus(0, true, 0, 0, 0, suffixDir);
Path bucketDir = new Path(suffixDir, String.valueOf(0));
FileStatus bucketDirStatus = new FileStatus(0, true, 0, 0, 0, bucketDir);
when(mockFs.listStatus(remoteRootLogPath)).thenReturn(
new FileStatus[]{userDirStatus});
when(mockFs.listStatus(userDir)).thenReturn(
new FileStatus[]{suffixStatus});
when(mockFs.listStatus(suffixDir)).thenReturn(
new FileStatus[]{bucketDirStatus});
Path userLogDir = new Path(userDir, suffix);
ApplicationId appId1 =
ApplicationId.newInstance(System.currentTimeMillis(), 1);
Path app1Dir = new Path(userLogDir, appId1.toString());
Path app1Dir = new Path(bucketDir, appId1.toString());
FileStatus app1DirStatus = new FileStatus(0, true, 0, 0, 0, app1Dir);
ApplicationId appId2 =
ApplicationId.newInstance(System.currentTimeMillis(), 2);
Path app2Dir = new Path(userLogDir, "application_a");
Path app2Dir = new Path(bucketDir, "application_a");
FileStatus app2DirStatus = new FileStatus(0, true, 0, 0, 0, app2Dir);
ApplicationId appId3 =
ApplicationId.newInstance(System.currentTimeMillis(), 3);
Path app3Dir = new Path(userLogDir, appId3.toString());
Path app3Dir = new Path(bucketDir, appId3.toString());
FileStatus app3DirStatus = new FileStatus(0, true, 0, 0, 0, app3Dir);
when(mockFs.listStatus(userLogDir)).thenReturn(
when(mockFs.listStatus(bucketDir)).thenReturn(
new FileStatus[]{app1DirStatus, app2DirStatus, app3DirStatus});
when(mockFs.listStatus(app2Dir)).thenReturn(
new FileStatus[]{});
when(mockFs.listStatus(app1Dir)).thenThrow(
new RuntimeException("Should Be Caught and Logged"));

View File

@ -87,8 +87,9 @@ public static void createContainerLogFileInRemoteFS(Configuration conf,
createContainerLogInLocalDir(appLogsDir, containerId, fs, fileName,
content);
// upload container logs to remote log dir
Path path = new Path(conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR),
user + "/logs/" + appId.toString());
Path path = LogAggregationUtils.getRemoteAppLogDir(
new Path(conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR)),
appId, user, "logs");
if (fs.exists(path) && deleteRemoteLogDir) {
fs.delete(path, true);
}

View File

@ -692,14 +692,16 @@ public void testRemoteRootLogDirIsCreatedWithCorrectGroupOwner()
logAggregationService.stop();
}
@Test
public void testAppLogDirCreation() throws Exception {
final String logSuffix = "logs";
final String logSuffix = "bucket_logs";
final String inputSuffix = "logs";
this.conf.set(YarnConfiguration.NM_LOG_DIRS,
localLogDir.getAbsolutePath());
this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
this.remoteRootLogDir.getAbsolutePath());
this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX, logSuffix);
this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX, inputSuffix);
InlineDispatcher dispatcher = new InlineDispatcher();
dispatcher.init(this.conf);
@ -733,7 +735,12 @@ public LogAggregationFileController getLogAggregationFileController(
Path userDir = fs.makeQualified(new Path(
remoteRootLogDir.getAbsolutePath(), this.user));
Path suffixDir = new Path(userDir, logSuffix);
Path appDir = new Path(suffixDir, appId.toString());
Path bucketDir = fs.makeQualified(LogAggregationUtils.getRemoteBucketDir(
new Path(remoteRootLogDir.getAbsolutePath()),
this.user, inputSuffix, appId));
Path appDir = fs.makeQualified(LogAggregationUtils.getRemoteAppLogDir(
new Path(remoteRootLogDir.getAbsolutePath()), appId,
this.user, inputSuffix));
LogAggregationContext contextWithAllContainers =
Records.newRecord(LogAggregationContext.class);
contextWithAllContainers.setLogAggregationPolicyClassName(
@ -742,23 +749,44 @@ public LogAggregationFileController getLogAggregationFileController(
this.acls, contextWithAllContainers));
verify(spyFs).mkdirs(eq(userDir), isA(FsPermission.class));
verify(spyFs).mkdirs(eq(suffixDir), isA(FsPermission.class));
verify(spyFs).mkdirs(eq(bucketDir), isA(FsPermission.class));
verify(spyFs).mkdirs(eq(appDir), isA(FsPermission.class));
// start another application and verify only app dir created
ApplicationId appId2 = BuilderUtils.newApplicationId(1, 2);
Path appDir2 = new Path(suffixDir, appId2.toString());
Path appDir2 = fs.makeQualified(LogAggregationUtils.getRemoteAppLogDir(
new Path(remoteRootLogDir.getAbsolutePath()),
appId2, this.user, inputSuffix));
aggSvc.handle(new LogHandlerAppStartedEvent(appId2, this.user, null,
this.acls, contextWithAllContainers));
verify(spyFs).mkdirs(eq(appDir2), isA(FsPermission.class));
// start another application with the app dir already created and verify
// we do not try to create it again
ApplicationId appId3 = BuilderUtils.newApplicationId(1, 3);
Path appDir3 = new Path(suffixDir, appId3.toString());
ApplicationId appId3 = BuilderUtils.newApplicationId(2, 2);
Path appDir3 = fs.makeQualified(LogAggregationUtils.getRemoteAppLogDir(
new Path(remoteRootLogDir.getAbsolutePath()),
appId3, this.user, inputSuffix));
new File(appDir3.toUri().getPath()).mkdir();
aggSvc.handle(new LogHandlerAppStartedEvent(appId3, this.user, null,
this.acls, contextWithAllContainers));
verify(spyFs, never()).mkdirs(eq(appDir3), isA(FsPermission.class));
// Verify we do not create bucket dir again
ApplicationId appId4 = BuilderUtils.newApplicationId(2, 10003);
Path bucketDir4 = fs.makeQualified(LogAggregationUtils.getRemoteBucketDir(
new Path(remoteRootLogDir.getAbsolutePath()),
this.user, logSuffix, appId4));
new File(bucketDir4.toUri().getPath()).mkdir();
Path appDir4 = fs.makeQualified(LogAggregationUtils.getRemoteAppLogDir(
new Path(remoteRootLogDir.getAbsolutePath()),
appId4, this.user, inputSuffix));
aggSvc.handle(new LogHandlerAppStartedEvent(appId4, this.user, null,
this.acls, contextWithAllContainers));
verify(spyFs, never()).mkdirs(eq(bucketDir4), isA(FsPermission.class));
verify(spyFs).mkdirs(eq(appDir4), isA(FsPermission.class));
aggSvc.stop();
aggSvc.close();
dispatcher.stop();