YARN-6876. Create an abstract log writer for extendability. Contributed by Xuan Gong.

This commit is contained in:
Junping Du 2017-08-24 13:36:49 -07:00
parent 8196a07c32
commit c2cb7ea1ef
18 changed files with 1421 additions and 451 deletions

View File

@ -1064,7 +1064,17 @@ public static boolean isAclEnabled(Configuration conf) {
public static final String LOG_AGGREGATION_ENABLED = YARN_PREFIX
+ "log-aggregation-enable";
public static final boolean DEFAULT_LOG_AGGREGATION_ENABLED = false;
public static final String LOG_AGGREGATION_FILE_FORMATS = YARN_PREFIX
+ "log-aggregation.file-formats";
public static final String LOG_AGGREGATION_FILE_CONTROLLER_FMT =
YARN_PREFIX + "log-aggregation.file-controller.%s.class";
public static final String LOG_AGGREGATION_REMOTE_APP_LOG_DIR_FMT
= YARN_PREFIX + "log-aggregation.%s.remote-app-log-dir";
public static final String LOG_AGGREGATION_REMOTE_APP_LOG_DIR_SUFFIX_FMT
= YARN_PREFIX + "log-aggregation.%s.remote-app-log-dir-suffix";
/**
* How long to wait before deleting aggregated logs, -1 disables.
* Be careful set this too small and you will spam the name node.

View File

@ -184,6 +184,8 @@ public void initializeMemberVariables() {
// Currently defined in RegistryConstants/core-site.xml
xmlPrefixToSkipCompare.add("hadoop.registry");
xmlPrefixToSkipCompare.add(
"yarn.log-aggregation.file-controller.TFile.class");
// Add the filters used for checking for collision of default values.
initDefaultValueCollisionCheck();
}

View File

@ -36,7 +36,6 @@
import com.sun.jersey.api.client.ClientResponse.Status;
import java.io.BufferedReader;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileWriter;
@ -78,6 +77,9 @@
import org.apache.hadoop.yarn.logaggregation.ContainerLogsRequest;
import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
import org.apache.hadoop.yarn.logaggregation.LogCLIHelpers;
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileController;
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerContext;
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerFactory;
import org.codehaus.jettison.json.JSONObject;
import org.junit.Assert;
import org.junit.Before;
@ -1345,42 +1347,55 @@ private static void uploadContainerLogIntoRemoteDir(UserGroupInformation ugi,
Path path =
new Path(appDir, LogAggregationUtils.getNodeString(nodeId)
+ System.currentTimeMillis());
try (AggregatedLogFormat.LogWriter writer =
new AggregatedLogFormat.LogWriter()) {
writer.initialize(configuration, path, ugi);
writer.writeApplicationOwner(ugi.getUserName());
LogAggregationFileControllerFactory factory
= new LogAggregationFileControllerFactory(configuration);
LogAggregationFileController fileFormat = factory
.getFileControllerForWrite();
try {
Map<ApplicationAccessType, String> appAcls = new HashMap<>();
appAcls.put(ApplicationAccessType.VIEW_APP, ugi.getUserName());
writer.writeApplicationACLs(appAcls);
writer.append(new AggregatedLogFormat.LogKey(containerId),
LogAggregationFileControllerContext context
= new LogAggregationFileControllerContext(
path, path, true, 1000,
containerId.getApplicationAttemptId().getApplicationId(),
appAcls, nodeId, ugi);
fileFormat.initializeWriter(context);
fileFormat.write(new AggregatedLogFormat.LogKey(containerId),
new AggregatedLogFormat.LogValue(rootLogDirs, containerId,
UserGroupInformation.getCurrentUser().getShortUserName()));
} finally {
fileFormat.closeWriter();
}
}
@SuppressWarnings("static-access")
private static void uploadEmptyContainerLogIntoRemoteDir(UserGroupInformation ugi,
Configuration configuration, List<String> rootLogDirs, NodeId nodeId,
ContainerId containerId, Path appDir, FileSystem fs) throws Exception {
Path path =
new Path(appDir, LogAggregationUtils.getNodeString(nodeId)
+ System.currentTimeMillis());
try (AggregatedLogFormat.LogWriter writer =
new AggregatedLogFormat.LogWriter()) {
writer.initialize(configuration, path, ugi);
writer.writeApplicationOwner(ugi.getUserName());
LogAggregationFileControllerFactory factory
= new LogAggregationFileControllerFactory(configuration);
LogAggregationFileController fileFormat = factory
.getFileControllerForWrite();
try {
Map<ApplicationAccessType, String> appAcls = new HashMap<>();
appAcls.put(ApplicationAccessType.VIEW_APP, ugi.getUserName());
writer.writeApplicationACLs(appAcls);
DataOutputStream out = writer.getWriter().prepareAppendKey(-1);
new AggregatedLogFormat.LogKey(containerId).write(out);
out.close();
out = writer.getWriter().prepareAppendValue(-1);
new AggregatedLogFormat.LogValue(rootLogDirs, containerId,
UserGroupInformation.getCurrentUser().getShortUserName()).write(out,
new HashSet<>());
out.close();
ApplicationId appId = containerId.getApplicationAttemptId()
.getApplicationId();
Path path = fileFormat.getRemoteNodeLogFileForApp(
appId, ugi.getCurrentUser().getShortUserName(), nodeId);
LogAggregationFileControllerContext context
= new LogAggregationFileControllerContext(
path, path, true, 1000,
appId, appAcls, nodeId, ugi);
fileFormat.initializeWriter(context);
AggregatedLogFormat.LogKey key = new AggregatedLogFormat.LogKey(
containerId);
AggregatedLogFormat.LogValue value = new AggregatedLogFormat.LogValue(
rootLogDirs, containerId, UserGroupInformation.getCurrentUser()
.getShortUserName());
fileFormat.write(key, value);
} finally {
fileFormat.closeWriter();
}
}

View File

@ -44,7 +44,7 @@
import java.util.Map.Entry;
import java.util.Set;
import java.util.regex.Pattern;
import org.apache.commons.io.IOUtils;
import org.apache.commons.io.input.BoundedInputStream;
import org.apache.commons.io.output.WriterOutputStream;
import org.apache.commons.logging.Log;
@ -61,7 +61,6 @@
import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.SecureIOUtils;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.file.tfile.TFile;
@ -71,7 +70,6 @@
import org.apache.hadoop.yarn.api.records.LogAggregationContext;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Times;
import com.google.common.annotations.VisibleForTesting;
@ -249,7 +247,7 @@ public void write(DataOutputStream out, Set<File> pendingUploadFiles)
in = secureOpenFile(logFile);
} catch (IOException e) {
logErrorMessage(logFile, e);
IOUtils.cleanup(LOG, in);
IOUtils.closeQuietly(in);
continue;
}
@ -287,7 +285,7 @@ public void write(DataOutputStream out, Set<File> pendingUploadFiles)
String message = logErrorMessage(logFile, e);
out.write(message.getBytes(Charset.forName("UTF-8")));
} finally {
IOUtils.cleanup(LOG, in);
IOUtils.closeQuietly(in);
}
}
}
@ -557,7 +555,7 @@ public void close() {
} catch (Exception e) {
LOG.warn("Exception closing writer", e);
} finally {
IOUtils.closeStream(this.fsDataOStream);
IOUtils.closeQuietly(this.fsDataOStream);
}
}
}
@ -605,7 +603,7 @@ public String getApplicationOwner() throws IOException {
}
return null;
} finally {
IOUtils.cleanup(LOG, ownerScanner);
IOUtils.closeQuietly(ownerScanner);
}
}
@ -651,7 +649,7 @@ public Map<ApplicationAccessType, String> getApplicationAcls()
}
return acls;
} finally {
IOUtils.cleanup(LOG, aclScanner);
IOUtils.closeQuietly(aclScanner);
}
}
@ -775,8 +773,8 @@ public static void readAcontainerLogs(DataInputStream valueStream,
}
}
} finally {
IOUtils.cleanup(LOG, ps);
IOUtils.cleanup(LOG, os);
IOUtils.closeQuietly(ps);
IOUtils.closeQuietly(os);
}
}
@ -1001,7 +999,9 @@ public static Pair<String, String> readContainerMetaDataAndSkipData(
}
public void close() {
IOUtils.cleanup(LOG, scanner, reader, fsDataIStream);
IOUtils.closeQuietly(scanner);
IOUtils.closeQuietly(reader);
IOUtils.closeQuietly(fsDataIStream);
}
}

View File

@ -133,6 +133,23 @@ public static org.apache.hadoop.fs.Path getRemoteAppLogDir(
new org.apache.hadoop.fs.Path(conf.get(
YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR));
return getRemoteAppLogDir(conf, appId, appOwner, remoteRootLogDir, suffix);
}
/**
* Return the remote application log directory.
* @param conf the configuration
* @param appId the application
* @param appOwner the application owner
* @param remoteRootLogDir the remote root log directory
* @param suffix the log directory suffix
* @return the remote application log directory path
* @throws IOException if we can not find remote application log directory
*/
public static org.apache.hadoop.fs.Path getRemoteAppLogDir(
Configuration conf, ApplicationId appId, String appOwner,
org.apache.hadoop.fs.Path remoteRootLogDir, String suffix)
throws IOException {
org.apache.hadoop.fs.Path remoteAppDir = null;
if (appOwner == null) {
org.apache.hadoop.fs.Path qualifiedRemoteRootLogDir =
@ -154,6 +171,30 @@ public static org.apache.hadoop.fs.Path getRemoteAppLogDir(
return remoteAppDir;
}
/**
* Get all available log files under remote app log directory.
* @param conf the configuration
* @param appId the applicationId
* @param appOwner the application owner
* @param remoteRootLogDir the remote root log directory
* @param suffix the log directory suffix
* @return the iterator of available log files
* @throws IOException if there is no log file available
*/
public static RemoteIterator<FileStatus> getRemoteNodeFileDir(
Configuration conf, ApplicationId appId, String appOwner,
org.apache.hadoop.fs.Path remoteRootLogDir, String suffix)
throws IOException {
Path remoteAppLogDir = getRemoteAppLogDir(conf, appId, appOwner,
remoteRootLogDir, suffix);
RemoteIterator<FileStatus> nodeFiles = null;
Path qualifiedLogDir =
FileContext.getFileContext(conf).makeQualified(remoteAppLogDir);
nodeFiles = FileContext.getFileContext(qualifiedLogDir.toUri(),
conf).listStatus(remoteAppLogDir);
return nodeFiles;
}
/**
* Get all available log files under remote app log directory.
* @param conf the configuration

View File

@ -0,0 +1,404 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.logaggregation.filecontroller;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue;
/**
* Base class to implement Log Aggregation File Controller.
*/
@Private
@Unstable
public abstract class LogAggregationFileController {
private static final Log LOG = LogFactory.getLog(
LogAggregationFileController.class);
/*
* Expected deployment TLD will be 1777, owner=<NMOwner>, group=<NMGroup -
* Group to which NMOwner belongs> App dirs will be created as 770,
* owner=<AppOwner>, group=<NMGroup>: so that the owner and <NMOwner> can
* access / modify the files.
* <NMGroup> should obviously be a limited access group.
*/
/**
* Permissions for the top level directory under which app directories will be
* created.
*/
protected static final FsPermission TLDIR_PERMISSIONS = FsPermission
.createImmutable((short) 01777);
/**
* Permissions for the Application directory.
*/
protected static final FsPermission APP_DIR_PERMISSIONS = FsPermission
.createImmutable((short) 0770);
// This is temporary solution. The configuration will be deleted once
// we find a more scalable method to only write a single log file per LRS.
private static final String NM_LOG_AGGREGATION_NUM_LOG_FILES_SIZE_PER_APP
= YarnConfiguration.NM_PREFIX + "log-aggregation.num-log-files-per-app";
private static final int
DEFAULT_NM_LOG_AGGREGATION_NUM_LOG_FILES_SIZE_PER_APP = 30;
protected Configuration conf;
protected Path remoteRootLogDir;
protected String remoteRootLogDirSuffix;
protected int retentionSize;
protected String fileControllerName;
public LogAggregationFileController() {}
/**
* Initialize the log file controller.
* @param conf the Configuration
* @param controllerName the log controller class name
*/
public void initialize(Configuration conf, String controllerName) {
this.conf = conf;
int configuredRentionSize =
conf.getInt(NM_LOG_AGGREGATION_NUM_LOG_FILES_SIZE_PER_APP,
DEFAULT_NM_LOG_AGGREGATION_NUM_LOG_FILES_SIZE_PER_APP);
if (configuredRentionSize <= 0) {
this.retentionSize =
DEFAULT_NM_LOG_AGGREGATION_NUM_LOG_FILES_SIZE_PER_APP;
} else {
this.retentionSize = configuredRentionSize;
}
this.fileControllerName = controllerName;
initInternal(conf);
}
/**
* Derived classes initialize themselves using this method.
* @param conf the Configuration
*/
protected abstract void initInternal(Configuration conf);
/**
* Get the remote root log directory.
* @return the remote root log directory path
*/
public Path getRemoteRootLogDir() {
return this.remoteRootLogDir;
}
/**
* Get the log aggregation directory suffix.
* @return the log aggregation directory suffix
*/
public String getRemoteRootLogDirSuffix() {
return this.remoteRootLogDirSuffix;
}
/**
* Initialize the writer.
* @param context the {@link LogAggregationFileControllerContext}
* @throws IOException if fails to initialize the writer
*/
public abstract void initializeWriter(
LogAggregationFileControllerContext context) throws IOException;
/**
* Close the writer.
*/
public abstract void closeWriter();
/**
* Write the log content.
* @param logKey the log key
* @param logValue the log content
* @throws IOException if fails to write the logs
*/
public abstract void write(LogKey logKey, LogValue logValue)
throws IOException;
/**
* Operations needed after write the log content.
* @param record the {@link LogAggregationFileControllerContext}
* @throws Exception if anything fails
*/
public abstract void postWrite(LogAggregationFileControllerContext record)
throws Exception;
/**
* Verify and create the remote log directory.
*/
public void verifyAndCreateRemoteLogDir() {
boolean logPermError = true;
// Checking the existence of the TLD
FileSystem remoteFS = null;
try {
remoteFS = getFileSystem(conf);
} catch (IOException e) {
throw new YarnRuntimeException(
"Unable to get Remote FileSystem instance", e);
}
boolean remoteExists = true;
Path remoteRootLogDir = getRemoteRootLogDir();
try {
FsPermission perms =
remoteFS.getFileStatus(remoteRootLogDir).getPermission();
if (!perms.equals(TLDIR_PERMISSIONS) && logPermError) {
LOG.warn("Remote Root Log Dir [" + remoteRootLogDir
+ "] already exist, but with incorrect permissions. "
+ "Expected: [" + TLDIR_PERMISSIONS + "], Found: [" + perms
+ "]." + " The cluster may have problems with multiple users.");
logPermError = false;
} else {
logPermError = true;
}
} catch (FileNotFoundException e) {
remoteExists = false;
} catch (IOException e) {
throw new YarnRuntimeException(
"Failed to check permissions for dir ["
+ remoteRootLogDir + "]", e);
}
if (!remoteExists) {
LOG.warn("Remote Root Log Dir [" + remoteRootLogDir
+ "] does not exist. Attempting to create it.");
try {
Path qualified =
remoteRootLogDir.makeQualified(remoteFS.getUri(),
remoteFS.getWorkingDirectory());
remoteFS.mkdirs(qualified, new FsPermission(TLDIR_PERMISSIONS));
remoteFS.setPermission(qualified, new FsPermission(TLDIR_PERMISSIONS));
UserGroupInformation loginUser = UserGroupInformation.getLoginUser();
String primaryGroupName = null;
try {
primaryGroupName = loginUser.getPrimaryGroupName();
} catch (IOException e) {
LOG.warn("No primary group found. The remote root log directory" +
" will be created with the HDFS superuser being its group " +
"owner. JobHistoryServer may be unable to read the directory.");
}
// set owner on the remote directory only if the primary group exists
if (primaryGroupName != null) {
remoteFS.setOwner(qualified,
loginUser.getShortUserName(), primaryGroupName);
}
} catch (IOException e) {
throw new YarnRuntimeException("Failed to create remoteLogDir ["
+ remoteRootLogDir + "]", e);
}
}
}
/**
* Create remote Application directory for log aggregation.
* @param user the user
* @param appId the application ID
* @param userUgi the UGI
*/
public void createAppDir(final String user, final ApplicationId appId,
UserGroupInformation userUgi) {
final Path remoteRootLogDir = getRemoteRootLogDir();
final String remoteRootLogDirSuffix = getRemoteRootLogDirSuffix();
try {
userUgi.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws Exception {
try {
// TODO: Reuse FS for user?
FileSystem remoteFS = getFileSystem(conf);
// Only creating directories if they are missing to avoid
// unnecessary load on the filesystem from all of the nodes
Path appDir = LogAggregationUtils.getRemoteAppLogDir(
remoteRootLogDir, appId, user, remoteRootLogDirSuffix);
appDir = appDir.makeQualified(remoteFS.getUri(),
remoteFS.getWorkingDirectory());
if (!checkExists(remoteFS, appDir, APP_DIR_PERMISSIONS)) {
Path suffixDir = LogAggregationUtils.getRemoteLogSuffixedDir(
remoteRootLogDir, user, remoteRootLogDirSuffix);
suffixDir = suffixDir.makeQualified(remoteFS.getUri(),
remoteFS.getWorkingDirectory());
if (!checkExists(remoteFS, suffixDir, APP_DIR_PERMISSIONS)) {
Path userDir = LogAggregationUtils.getRemoteLogUserDir(
remoteRootLogDir, user);
userDir = userDir.makeQualified(remoteFS.getUri(),
remoteFS.getWorkingDirectory());
if (!checkExists(remoteFS, userDir, APP_DIR_PERMISSIONS)) {
createDir(remoteFS, userDir, APP_DIR_PERMISSIONS);
}
createDir(remoteFS, suffixDir, APP_DIR_PERMISSIONS);
}
createDir(remoteFS, appDir, APP_DIR_PERMISSIONS);
}
} catch (IOException e) {
LOG.error("Failed to setup application log directory for "
+ appId, e);
throw e;
}
return null;
}
});
} catch (Exception e) {
throw new YarnRuntimeException(e);
}
}
@VisibleForTesting
protected FileSystem getFileSystem(Configuration conf) throws IOException {
return getRemoteRootLogDir().getFileSystem(conf);
}
protected void createDir(FileSystem fs, Path path, FsPermission fsPerm)
throws IOException {
FsPermission dirPerm = new FsPermission(fsPerm);
fs.mkdirs(path, dirPerm);
FsPermission umask = FsPermission.getUMask(fs.getConf());
if (!dirPerm.equals(dirPerm.applyUMask(umask))) {
fs.setPermission(path, new FsPermission(fsPerm));
}
}
protected boolean checkExists(FileSystem fs, Path path, FsPermission fsPerm)
throws IOException {
boolean exists = true;
try {
FileStatus appDirStatus = fs.getFileStatus(path);
if (!APP_DIR_PERMISSIONS.equals(appDirStatus.getPermission())) {
fs.setPermission(path, APP_DIR_PERMISSIONS);
}
} catch (FileNotFoundException fnfe) {
exists = false;
}
return exists;
}
/**
* Get the remote aggregated log path.
* @param appId the ApplicationId
* @param user the Application Owner
* @param nodeId the NodeManager Id
* @return the remote aggregated log path
*/
public Path getRemoteNodeLogFileForApp(ApplicationId appId, String user,
NodeId nodeId) {
return LogAggregationUtils.getRemoteNodeLogFileForApp(
getRemoteRootLogDir(), appId, user, nodeId,
getRemoteRootLogDirSuffix());
}
/**
* Get the remote application directory for log aggregation.
* @param appId the Application ID
* @param appOwner the Application Owner
* @return the remote application directory
* @throws IOException if can not find the remote application directory
*/
public Path getRemoteAppLogDir(ApplicationId appId, String appOwner)
throws IOException {
return LogAggregationUtils.getRemoteAppLogDir(conf, appId, appOwner,
this.remoteRootLogDir, this.remoteRootLogDirSuffix);
}
protected void cleanOldLogs(Path remoteNodeLogFileForApp,
final NodeId nodeId, UserGroupInformation userUgi) {
try {
final FileSystem remoteFS = remoteNodeLogFileForApp.getFileSystem(conf);
Path appDir = remoteNodeLogFileForApp.getParent().makeQualified(
remoteFS.getUri(), remoteFS.getWorkingDirectory());
Set<FileStatus> status =
new HashSet<FileStatus>(Arrays.asList(remoteFS.listStatus(appDir)));
Iterable<FileStatus> mask =
Iterables.filter(status, new Predicate<FileStatus>() {
@Override
public boolean apply(FileStatus next) {
return next.getPath().getName()
.contains(LogAggregationUtils.getNodeString(nodeId))
&& !next.getPath().getName().endsWith(
LogAggregationUtils.TMP_FILE_SUFFIX);
}
});
status = Sets.newHashSet(mask);
// Normally, we just need to delete one oldest log
// before we upload a new log.
// If we can not delete the older logs in this cycle,
// we will delete them in next cycle.
if (status.size() >= this.retentionSize) {
// sort by the lastModificationTime ascending
List<FileStatus> statusList = new ArrayList<FileStatus>(status);
Collections.sort(statusList, new Comparator<FileStatus>() {
public int compare(FileStatus s1, FileStatus s2) {
return s1.getModificationTime() < s2.getModificationTime() ? -1
: s1.getModificationTime() > s2.getModificationTime() ? 1 : 0;
}
});
for (int i = 0; i <= statusList.size() - this.retentionSize; i++) {
final FileStatus remove = statusList.get(i);
try {
userUgi.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws Exception {
remoteFS.delete(remove.getPath(), false);
return null;
}
});
} catch (Exception e) {
LOG.error("Failed to delete " + remove.getPath(), e);
}
}
}
} catch (Exception e) {
LOG.error("Failed to clean old logs", e);
}
}
}

View File

@ -0,0 +1,130 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.logaggregation.filecontroller;
import java.util.Map;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.NodeId;
/**
* {@code LogAggregationFileControllerContext} is a record used in
* the log aggregation process.
*/
@Private
@Unstable
public class LogAggregationFileControllerContext {
private final boolean logAggregationInRolling;
private final long rollingMonitorInterval;
private final Path remoteNodeLogFileForApp;
private final NodeId nodeId;
private final UserGroupInformation userUgi;
private final ApplicationId appId;
private final Path remoteNodeTmpLogFileForApp;
private final Map<ApplicationAccessType, String> appAcls;
private int logAggregationTimes = 0;
private int cleanOldLogsTimes = 0;
private boolean uploadedLogsInThisCycle;
private long logUploadedTimeStamp;
public LogAggregationFileControllerContext(Path remoteNodeLogFileForApp,
Path remoteNodeTmpLogFileForApp,
boolean logAggregationInRolling,
long rollingMonitorInterval,
ApplicationId appId,
Map<ApplicationAccessType, String> appAcls,
NodeId nodeId, UserGroupInformation userUgi) {
this.remoteNodeLogFileForApp = remoteNodeLogFileForApp;
this.remoteNodeTmpLogFileForApp = remoteNodeTmpLogFileForApp;
this.logAggregationInRolling = logAggregationInRolling;
this.rollingMonitorInterval = rollingMonitorInterval;
this.nodeId = nodeId;
this.appId = appId;
this.appAcls = appAcls;
this.userUgi = userUgi;
}
public boolean isUploadedLogsInThisCycle() {
return uploadedLogsInThisCycle;
}
public void setUploadedLogsInThisCycle(boolean uploadedLogsInThisCycle) {
this.uploadedLogsInThisCycle = uploadedLogsInThisCycle;
}
public Path getRemoteNodeLogFileForApp() {
return remoteNodeLogFileForApp;
}
public long getRollingMonitorInterval() {
return rollingMonitorInterval;
}
public boolean isLogAggregationInRolling() {
return logAggregationInRolling;
}
public long getLogUploadTimeStamp() {
return logUploadedTimeStamp;
}
public void setLogUploadTimeStamp(long uploadTimeStamp) {
this.logUploadedTimeStamp = uploadTimeStamp;
}
public NodeId getNodeId() {
return nodeId;
}
public UserGroupInformation getUserUgi() {
return userUgi;
}
public ApplicationId getAppId() {
return appId;
}
public Path getRemoteNodeTmpLogFileForApp() {
return remoteNodeTmpLogFileForApp;
}
public void increLogAggregationTimes() {
this.logAggregationTimes++;
}
public void increcleanupOldLogTimes() {
this.cleanOldLogsTimes++;
}
public int getLogAggregationTimes() {
return logAggregationTimes;
}
public int getCleanOldLogsTimes() {
return cleanOldLogsTimes;
}
public Map<ApplicationAccessType, String> getAppAcls() {
return appAcls;
}
}

View File

@ -0,0 +1,195 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.logaggregation.filecontroller;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
/**
* Use {@code LogAggregationFileControllerFactory} to get the correct
* {@link LogAggregationFileController} for write and read.
*
*/
@Private
@Unstable
public class LogAggregationFileControllerFactory {
private static final Log LOG = LogFactory.getLog(
LogAggregationFileControllerFactory.class);
private final Pattern p = Pattern.compile(
"^[A-Za-z_]+[A-Za-z0-9_]*$");
private LinkedList<LogAggregationFileController> controllers
= new LinkedList<>();
private Configuration conf;
/**
* Construct the LogAggregationFileControllerFactory object.
* @param conf the Configuration
*/
public LogAggregationFileControllerFactory(Configuration conf) {
this.conf = conf;
Collection<String> fileControllers = conf.getStringCollection(
YarnConfiguration.LOG_AGGREGATION_FILE_FORMATS);
List<String> controllerClassName = new ArrayList<>();
Map<String, String> controllerChecker = new HashMap<>();
for (String fileController : fileControllers) {
Preconditions.checkArgument(validateAggregatedFileControllerName(
fileController), "The FileControllerName: " + fileController
+ " set in " + YarnConfiguration.LOG_AGGREGATION_FILE_FORMATS
+" is invalid." + "The valid File Controller name should only "
+ "contain a-zA-Z0-9_ and can not start with numbers");
String remoteDirStr = String.format(
YarnConfiguration.LOG_AGGREGATION_REMOTE_APP_LOG_DIR_FMT,
fileController);
String remoteDir = conf.get(remoteDirStr);
boolean defaultRemoteDir = false;
if (remoteDir == null || remoteDir.isEmpty()) {
remoteDir = conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR);
defaultRemoteDir = true;
}
String suffixStr = String.format(
YarnConfiguration.LOG_AGGREGATION_REMOTE_APP_LOG_DIR_SUFFIX_FMT,
fileController);
String suffix = conf.get(suffixStr);
boolean defaultSuffix = false;
if (suffix == null || suffix.isEmpty()) {
suffix = conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX,
YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR_SUFFIX);
defaultSuffix = true;
}
String dirSuffix = remoteDir + "-" + suffix;
if (controllerChecker.containsKey(dirSuffix)) {
if (defaultRemoteDir && defaultSuffix) {
String fileControllerStr = controllerChecker.get(dirSuffix);
List<String> controllersList = new ArrayList<>();
controllersList.add(fileControllerStr);
controllersList.add(fileController);
fileControllerStr = StringUtils.join(controllersList, ",");
controllerChecker.put(dirSuffix, fileControllerStr);
} else {
String conflictController = controllerChecker.get(dirSuffix);
throw new RuntimeException("The combined value of " + remoteDirStr
+ " and " + suffixStr + " should not be the same as the value"
+ " set for " + conflictController);
}
} else {
controllerChecker.put(dirSuffix, fileController);
}
String classKey = String.format(
YarnConfiguration.LOG_AGGREGATION_FILE_CONTROLLER_FMT,
fileController);
String className = conf.get(classKey);
if (className == null || className.isEmpty()) {
throw new RuntimeException("No class configured for "
+ fileController);
}
controllerClassName.add(className);
Class<? extends LogAggregationFileController> sClass = conf.getClass(
classKey, null, LogAggregationFileController.class);
if (sClass == null) {
throw new RuntimeException("No class defined for " + fileController);
}
LogAggregationFileController s = ReflectionUtils.newInstance(
sClass, conf);
if (s == null) {
throw new RuntimeException("No object created for "
+ controllerClassName);
}
s.initialize(conf, fileController);
controllers.add(s);
}
}
/**
* Get {@link LogAggregationFileController} to write.
* @return the LogAggregationFileController instance
*/
public LogAggregationFileController getFileControllerForWrite() {
return controllers.getFirst();
}
/**
* Get {@link LogAggregationFileController} to read the aggregated logs
* for this application.
* @param appId the ApplicationId
* @param appOwner the Application Owner
* @return the LogAggregationFileController instance
* @throws IOException if can not find any log aggregation file controller
*/
public LogAggregationFileController getFileControllerForRead(
ApplicationId appId, String appOwner) throws IOException {
StringBuilder diagnosis = new StringBuilder();
for(LogAggregationFileController fileController : controllers) {
try {
Path remoteAppLogDir = fileController.getRemoteAppLogDir(
appId, appOwner);
Path qualifiedLogDir = FileContext.getFileContext(conf).makeQualified(
remoteAppLogDir);
RemoteIterator<FileStatus> nodeFiles = FileContext.getFileContext(
qualifiedLogDir.toUri(), conf).listStatus(remoteAppLogDir);
if (nodeFiles.hasNext()) {
return fileController;
}
} catch (Exception ex) {
diagnosis.append(ex.getMessage() + "\n");
continue;
}
}
throw new IOException(diagnosis.toString());
}
private boolean validateAggregatedFileControllerName(String name) {
if (name == null || name.trim().isEmpty()) {
return false;
}
return p.matcher(name).matches();
}
@Private
@VisibleForTesting
public LinkedList<LogAggregationFileController>
getConfiguredLogAggregationFileControllerList() {
return this.controllers;
}
}

View File

@ -0,0 +1,127 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.logaggregation.filecontroller;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogWriter;
import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
import org.apache.hadoop.yarn.util.Times;
/**
* The TFile log aggregation file Controller implementation.
*/
@Private
@Unstable
public class LogAggregationTFileController
extends LogAggregationFileController {
private static final Log LOG = LogFactory.getLog(
LogAggregationTFileController.class);
private LogWriter writer;
public LogAggregationTFileController(){}
@Override
public void initInternal(Configuration conf) {
this.remoteRootLogDir = new Path(
conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR));
this.remoteRootLogDirSuffix =
conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX,
YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR_SUFFIX);
}
@Override
public void initializeWriter(LogAggregationFileControllerContext context)
throws IOException {
this.writer = new LogWriter();
writer.initialize(this.conf, context.getRemoteNodeTmpLogFileForApp(),
context.getUserUgi());
// Write ACLs once when the writer is created.
writer.writeApplicationACLs(context.getAppAcls());
writer.writeApplicationOwner(context.getUserUgi().getShortUserName());
}
@Override
public void closeWriter() {
this.writer.close();
}
@Override
public void write(LogKey logKey, LogValue logValue) throws IOException {
this.writer.append(logKey, logValue);
}
@Override
public void postWrite(final LogAggregationFileControllerContext record)
throws Exception {
// Before upload logs, make sure the number of existing logs
// is smaller than the configured NM log aggregation retention size.
if (record.isUploadedLogsInThisCycle() &&
record.isLogAggregationInRolling()) {
cleanOldLogs(record.getRemoteNodeLogFileForApp(), record.getNodeId(),
record.getUserUgi());
record.increcleanupOldLogTimes();
}
final Path renamedPath = record.getRollingMonitorInterval() <= 0
? record.getRemoteNodeLogFileForApp() : new Path(
record.getRemoteNodeLogFileForApp().getParent(),
record.getRemoteNodeLogFileForApp().getName() + "_"
+ record.getLogUploadTimeStamp());
final boolean rename = record.isUploadedLogsInThisCycle();
try {
record.getUserUgi().doAs(new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws Exception {
FileSystem remoteFS = record.getRemoteNodeLogFileForApp()
.getFileSystem(conf);
if (rename) {
remoteFS.rename(record.getRemoteNodeTmpLogFileForApp(),
renamedPath);
} else {
remoteFS.delete(record.getRemoteNodeTmpLogFileForApp(), false);
}
return null;
}
});
} catch (Exception e) {
LOG.error(
"Failed to move temporary log file to final location: ["
+ record.getRemoteNodeTmpLogFileForApp() + "] to ["
+ renamedPath + "]", e);
throw new Exception("Log uploaded failed for Application: "
+ record.getAppId() + " in NodeManager: "
+ LogAggregationUtils.getNodeString(record.getNodeId()) + " at "
+ Times.format(record.getLogUploadTimeStamp()) + "\n");
}
}
}

View File

@ -0,0 +1,21 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
@InterfaceAudience.Public
package org.apache.hadoop.yarn.logaggregation.filecontroller;
import org.apache.hadoop.classification.InterfaceAudience;

View File

@ -1166,6 +1166,25 @@
<value>-1</value>
</property>
<property>
<description>Specify which log file controllers we will support. The first
file controller we add will be used to write the aggregated logs.
This comma separated configuration will work with the configuration:
yarn.log-aggregation.file-controller.%s.class which defines the supported
file controller's class. By default, the TFile controller would be used.
The user could override this configuration by adding more file controllers.
To support back-ward compatibility, make sure that we always
add TFile file controller.</description>
<name>yarn.log-aggregation.file-formats</name>
<value>TFile</value>
</property>
<property>
<description>Class that supports TFile read and write operations.</description>
<name>yarn.log-aggregation.file-controller.TFile.class</name>
<value>org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationTFileController</value>
</property>
<property>
<description>
How long for ResourceManager to wait for NodeManager to report its

View File

@ -40,10 +40,14 @@
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileController;
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerContext;
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerFactory;
import org.apache.hadoop.yarn.webapp.YarnWebParams;
import org.apache.hadoop.yarn.webapp.log.AggregatedLogsBlockForTest;
import org.apache.hadoop.yarn.webapp.view.BlockForTest;
@ -249,7 +253,7 @@ public void testNoLogs() throws Exception {
private Configuration getConfiguration() {
Configuration configuration = new Configuration();
Configuration configuration = new YarnConfiguration();
configuration.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true);
configuration.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, "target/logs");
configuration.setBoolean(YarnConfiguration.YARN_ACL_ENABLE, true);
@ -295,19 +299,25 @@ private void writeLog(Configuration configuration, String user)
List<String> rootLogDirs = Arrays.asList("target/logs/logs");
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
try (AggregatedLogFormat.LogWriter writer =
new AggregatedLogFormat.LogWriter()) {
writer.initialize(configuration, new Path(path), ugi);
writer.writeApplicationOwner(ugi.getUserName());
LogAggregationFileControllerFactory factory
= new LogAggregationFileControllerFactory(configuration);
LogAggregationFileController fileController = factory
.getFileControllerForWrite();
try {
Map<ApplicationAccessType, String> appAcls = new HashMap<>();
appAcls.put(ApplicationAccessType.VIEW_APP, ugi.getUserName());
writer.writeApplicationACLs(appAcls);
writer.append(
NodeId nodeId = NodeId.newInstance("localhost", 1234);
LogAggregationFileControllerContext context
= new LogAggregationFileControllerContext(
new Path(path), new Path(path), false, 3600,
appId, appAcls, nodeId, ugi);
fileController.initializeWriter(context);
fileController.write(
new AggregatedLogFormat.LogKey("container_0_0001_01_000001"),
new AggregatedLogFormat.LogValue(rootLogDirs, containerId,
UserGroupInformation.getCurrentUser().getShortUserName()));
} finally {
fileController.closeWriter();
}
}

View File

@ -24,15 +24,21 @@
import java.io.IOException;
import java.io.Writer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileController;
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerContext;
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerFactory;
/**
* This class contains several utility functions for log aggregation tests.
@ -110,14 +116,25 @@ private static void uploadContainerLogIntoRemoteDir(UserGroupInformation ugi,
ContainerId containerId, Path appDir, FileSystem fs) throws IOException {
Path path =
new Path(appDir, LogAggregationUtils.getNodeString(nodeId));
try (AggregatedLogFormat.LogWriter writer =
new AggregatedLogFormat.LogWriter()) {
writer.initialize(configuration, path, ugi);
writer.writeApplicationOwner(ugi.getUserName());
writer.append(new AggregatedLogFormat.LogKey(containerId),
LogAggregationFileControllerFactory factory
= new LogAggregationFileControllerFactory(configuration);
LogAggregationFileController fileController = factory
.getFileControllerForWrite();
try {
Map<ApplicationAccessType, String> appAcls = new HashMap<>();
appAcls.put(ApplicationAccessType.VIEW_APP, ugi.getUserName());
ApplicationId appId = containerId.getApplicationAttemptId()
.getApplicationId();
LogAggregationFileControllerContext context
= new LogAggregationFileControllerContext(
path, path, true, 1000,
appId, appAcls, nodeId, ugi);
fileController.initializeWriter(context);
fileController.write(new AggregatedLogFormat.LogKey(containerId),
new AggregatedLogFormat.LogValue(rootLogDirs, containerId,
ugi.getShortUserName()));
} finally {
fileController.closeWriter();
}
}
}

View File

@ -0,0 +1,171 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.logaggregation.filecontroller;
import static org.junit.Assert.*;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.Writer;
import java.util.LinkedList;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue;
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileController;
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerContext;
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerFactory;
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationTFileController;
import org.junit.Test;
/**
* Test LogAggregationFileControllerFactory.
*
*/
public class TestLogAggregationFileControllerFactory {
@Test(timeout = 10000)
public void testLogAggregationFileControllerFactory() throws Exception {
ApplicationId appId = ApplicationId.newInstance(
System.currentTimeMillis(), 1);
String appOwner = "test";
String remoteLogRootDir = "target/app-logs/";
Configuration conf = new YarnConfiguration();
conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true);
conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, remoteLogRootDir);
conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX, "log");
FileSystem fs = FileSystem.get(conf);
LogAggregationFileControllerFactory factory =
new LogAggregationFileControllerFactory(conf);
LinkedList<LogAggregationFileController> list = factory
.getConfiguredLogAggregationFileControllerList();
assertTrue(list.size() == 1);
assertTrue(list.getFirst() instanceof LogAggregationTFileController);
assertTrue(factory.getFileControllerForWrite()
instanceof LogAggregationTFileController);
Path logPath = list.getFirst().getRemoteAppLogDir(appId, appOwner);
try {
if (fs.exists(logPath)) {
fs.delete(logPath, true);
}
assertTrue(fs.mkdirs(logPath));
Writer writer =
new FileWriter(new File(logPath.toString(), "testLog"));
writer.write("test");
writer.close();
assertTrue(factory.getFileControllerForRead(appId, appOwner)
instanceof LogAggregationTFileController);
} finally {
fs.delete(logPath, true);
}
conf.set(YarnConfiguration.LOG_AGGREGATION_FILE_FORMATS,
"TestLogAggregationFileController");
// Did not set class for TestLogAggregationFileController,
// should get the exception.
try {
factory =
new LogAggregationFileControllerFactory(conf);
fail();
} catch (Exception ex) {
// should get exception
}
conf.set(YarnConfiguration.LOG_AGGREGATION_FILE_FORMATS,
"TestLogAggregationFileController,TFile");
conf.setClass(
"yarn.log-aggregation.file-controller.TestLogAggregationFileController"
+ ".class", TestLogAggregationFileController.class,
LogAggregationFileController.class);
conf.set(
"yarn.log-aggregation.TestLogAggregationFileController"
+ ".remote-app-log-dir", remoteLogRootDir);
conf.set(
"yarn.log-aggregation.TestLogAggregationFileController"
+ ".remote-app-log-dir-suffix", "testLog");
factory = new LogAggregationFileControllerFactory(conf);
list = factory.getConfiguredLogAggregationFileControllerList();
assertTrue(list.size() == 2);
assertTrue(list.getFirst() instanceof TestLogAggregationFileController);
assertTrue(list.getLast() instanceof LogAggregationTFileController);
assertTrue(factory.getFileControllerForWrite()
instanceof TestLogAggregationFileController);
logPath = list.getFirst().getRemoteAppLogDir(appId, appOwner);
try {
if (fs.exists(logPath)) {
fs.delete(logPath, true);
}
assertTrue(fs.mkdirs(logPath));
Writer writer =
new FileWriter(new File(logPath.toString(), "testLog"));
writer.write("test");
writer.close();
assertTrue(factory.getFileControllerForRead(appId, appOwner)
instanceof TestLogAggregationFileController);
} finally {
fs.delete(logPath, true);
}
}
private static class TestLogAggregationFileController
extends LogAggregationFileController {
@Override
public void initInternal(Configuration conf) {
String remoteDirStr = String.format(
YarnConfiguration.LOG_AGGREGATION_REMOTE_APP_LOG_DIR_FMT,
this.fileControllerName);
this.remoteRootLogDir = new Path(conf.get(remoteDirStr));
String suffix = String.format(
YarnConfiguration.LOG_AGGREGATION_REMOTE_APP_LOG_DIR_SUFFIX_FMT,
this.fileControllerName);
this.remoteRootLogDirSuffix = conf.get(suffix);
}
@Override
public void closeWriter() {
// Do Nothing
}
@Override
public void write(LogKey logKey, LogValue logValue) throws IOException {
// Do Nothing
}
@Override
public void postWrite(LogAggregationFileControllerContext record)
throws Exception {
// Do Nothing
}
@Override
public void initializeWriter(LogAggregationFileControllerContext context)
throws IOException {
// Do Nothing
}
}
}

View File

@ -19,11 +19,7 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@ -38,8 +34,6 @@
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.security.Credentials;
@ -57,7 +51,9 @@
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogWriter;
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileController;
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerContext;
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationTFileController;
import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
import org.apache.hadoop.yarn.server.api.ContainerLogAggregationPolicy;
import org.apache.hadoop.yarn.server.api.ContainerLogContext;
@ -71,7 +67,6 @@
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task.DeletionTask;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.deletion.task.FileDeletionTask;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Records;
import org.apache.hadoop.yarn.util.Times;
@ -86,18 +81,6 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
private static final Logger LOG =
LoggerFactory.getLogger(AppLogAggregatorImpl.class);
private static final int THREAD_SLEEP_TIME = 1000;
// This is temporary solution. The configuration will be deleted once
// we find a more scalable method to only write a single log file per LRS.
private static final String NM_LOG_AGGREGATION_NUM_LOG_FILES_SIZE_PER_APP
= YarnConfiguration.NM_PREFIX + "log-aggregation.num-log-files-per-app";
private static final int
DEFAULT_NM_LOG_AGGREGATION_NUM_LOG_FILES_SIZE_PER_APP = 30;
// This configuration is for debug and test purpose. By setting
// this configuration as true. We can break the lower bound of
// NM_LOG_AGGREGATION_ROLL_MONITORING_INTERVAL_SECONDS.
private static final String NM_LOG_AGGREGATION_DEBUG_ENABLED
= YarnConfiguration.NM_PREFIX + "log-aggregation.debug-enabled";
private final LocalDirsHandlerService dirsHandler;
private final Dispatcher dispatcher;
@ -118,10 +101,8 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
private final FileContext lfs;
private final LogAggregationContext logAggregationContext;
private final Context context;
private final int retentionSize;
private final long rollingMonitorInterval;
private final boolean logAggregationInRolling;
private final NodeId nodeId;
private final LogAggregationFileControllerContext logControllerContext;
// These variables are only for testing
private final AtomicBoolean waiting = new AtomicBoolean(false);
@ -134,6 +115,8 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
new HashMap<ContainerId, ContainerLogAggregator>();
private final ContainerLogAggregationPolicy logAggPolicy;
private final LogAggregationFileController logAggregationFileController;
/**
* The value recovered from state store to determine the age of application
@ -151,7 +134,7 @@ public AppLogAggregatorImpl(Dispatcher dispatcher,
FileContext lfs, long rollingMonitorInterval) {
this(dispatcher, deletionService, conf, appId, userUgi, nodeId,
dirsHandler, remoteNodeLogFileForApp, appAcls,
logAggregationContext, context, lfs, rollingMonitorInterval, -1);
logAggregationContext, context, lfs, rollingMonitorInterval, -1, null);
}
public AppLogAggregatorImpl(Dispatcher dispatcher,
@ -162,6 +145,21 @@ public AppLogAggregatorImpl(Dispatcher dispatcher,
LogAggregationContext logAggregationContext, Context context,
FileContext lfs, long rollingMonitorInterval,
long recoveredLogInitedTime) {
this(dispatcher, deletionService, conf, appId, userUgi, nodeId,
dirsHandler, remoteNodeLogFileForApp, appAcls,
logAggregationContext, context, lfs, rollingMonitorInterval,
recoveredLogInitedTime, null);
}
public AppLogAggregatorImpl(Dispatcher dispatcher,
DeletionService deletionService, Configuration conf,
ApplicationId appId, UserGroupInformation userUgi, NodeId nodeId,
LocalDirsHandlerService dirsHandler, Path remoteNodeLogFileForApp,
Map<ApplicationAccessType, String> appAcls,
LogAggregationContext logAggregationContext, Context context,
FileContext lfs, long rollingMonitorInterval,
long recoveredLogInitedTime,
LogAggregationFileController logAggregationFileController) {
this.dispatcher = dispatcher;
this.conf = conf;
this.delService = deletionService;
@ -169,31 +167,41 @@ public AppLogAggregatorImpl(Dispatcher dispatcher,
this.applicationId = appId.toString();
this.userUgi = userUgi;
this.dirsHandler = dirsHandler;
this.remoteNodeLogFileForApp = remoteNodeLogFileForApp;
this.remoteNodeTmpLogFileForApp = getRemoteNodeTmpLogFileForApp();
this.pendingContainers = new LinkedBlockingQueue<ContainerId>();
this.appAcls = appAcls;
this.lfs = lfs;
this.logAggregationContext = logAggregationContext;
this.context = context;
this.nodeId = nodeId;
int configuredRentionSize =
conf.getInt(NM_LOG_AGGREGATION_NUM_LOG_FILES_SIZE_PER_APP,
DEFAULT_NM_LOG_AGGREGATION_NUM_LOG_FILES_SIZE_PER_APP);
if (configuredRentionSize <= 0) {
this.retentionSize =
DEFAULT_NM_LOG_AGGREGATION_NUM_LOG_FILES_SIZE_PER_APP;
} else {
this.retentionSize = configuredRentionSize;
}
this.rollingMonitorInterval = rollingMonitorInterval;
this.logAggregationInRolling =
this.rollingMonitorInterval <= 0 || this.logAggregationContext == null
|| this.logAggregationContext.getRolledLogsIncludePattern() == null
|| this.logAggregationContext.getRolledLogsIncludePattern()
.isEmpty() ? false : true;
this.logAggPolicy = getLogAggPolicy(conf);
this.recoveredLogInitedTime = recoveredLogInitedTime;
if (logAggregationFileController == null) {
// by default, use T-File Controller
this.logAggregationFileController = new LogAggregationTFileController();
this.logAggregationFileController.initialize(conf, "TFile");
this.logAggregationFileController.verifyAndCreateRemoteLogDir();
this.logAggregationFileController.createAppDir(
this.userUgi.getShortUserName(), appId, userUgi);
this.remoteNodeLogFileForApp = this.logAggregationFileController
.getRemoteNodeLogFileForApp(appId,
this.userUgi.getShortUserName(), nodeId);
this.remoteNodeTmpLogFileForApp = getRemoteNodeTmpLogFileForApp();
} else {
this.logAggregationFileController = logAggregationFileController;
this.remoteNodeLogFileForApp = remoteNodeLogFileForApp;
this.remoteNodeTmpLogFileForApp = getRemoteNodeTmpLogFileForApp();
}
boolean logAggregationInRolling =
rollingMonitorInterval <= 0 || this.logAggregationContext == null
|| this.logAggregationContext.getRolledLogsIncludePattern() == null
|| this.logAggregationContext.getRolledLogsIncludePattern()
.isEmpty() ? false : true;
logControllerContext = new LogAggregationFileControllerContext(
this.remoteNodeLogFileForApp,
this.remoteNodeTmpLogFileForApp,
logAggregationInRolling,
rollingMonitorInterval,
this.appId, this.appAcls, this.nodeId, this.userUgi);
}
private ContainerLogAggregationPolicy getLogAggPolicy(Configuration conf) {
@ -293,14 +301,9 @@ private void uploadLogsForContainers(boolean appFinished) {
logAggregationTimes++;
String diagnosticMessage = "";
boolean logAggregationSucceedInThisCycle = true;
try (LogWriter writer = createLogWriter()) {
try {
try {
writer.initialize(this.conf, this.remoteNodeTmpLogFileForApp,
this.userUgi);
// Write ACLs once when the writer is created.
writer.writeApplicationACLs(appAcls);
writer.writeApplicationOwner(this.userUgi.getShortUserName());
logAggregationFileController.initializeWriter(logControllerContext);
} catch (IOException e1) {
logAggregationSucceedInThisCycle = false;
LOG.error("Cannot create writer for app " + this.applicationId
@ -318,8 +321,8 @@ private void uploadLogsForContainers(boolean appFinished) {
containerLogAggregators.put(container, aggregator);
}
Set<Path> uploadedFilePathsInThisCycle =
aggregator.doContainerLogAggregation(writer, appFinished,
finishedContainers.contains(container));
aggregator.doContainerLogAggregation(logAggregationFileController,
appFinished, finishedContainers.contains(container));
if (uploadedFilePathsInThisCycle.size() > 0) {
uploadedLogsInThisCycle = true;
List<Path> uploadedFilePathsInThisCycleList = new ArrayList<>();
@ -337,60 +340,28 @@ private void uploadLogsForContainers(boolean appFinished) {
}
}
// Before upload logs, make sure the number of existing logs
// is smaller than the configured NM log aggregation retention size.
if (uploadedLogsInThisCycle && logAggregationInRolling) {
cleanOldLogs();
cleanupOldLogTimes++;
}
long currentTime = System.currentTimeMillis();
final Path renamedPath = getRenamedPath(currentTime);
final boolean rename = uploadedLogsInThisCycle;
logControllerContext.setUploadedLogsInThisCycle(uploadedLogsInThisCycle);
logControllerContext.setLogUploadTimeStamp(System.currentTimeMillis());
logControllerContext.increLogAggregationTimes();
try {
userUgi.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws Exception {
FileSystem remoteFS = remoteNodeLogFileForApp.getFileSystem(conf);
if (rename) {
remoteFS.rename(remoteNodeTmpLogFileForApp, renamedPath);
} else {
remoteFS.delete(remoteNodeTmpLogFileForApp, false);
}
return null;
}
});
diagnosticMessage =
"Log uploaded successfully for Application: " + appId
+ " in NodeManager: "
+ LogAggregationUtils.getNodeString(nodeId) + " at "
+ Times.format(currentTime) + "\n";
this.logAggregationFileController.postWrite(logControllerContext);
diagnosticMessage = "Log uploaded successfully for Application: "
+ appId + " in NodeManager: "
+ LogAggregationUtils.getNodeString(nodeId) + " at "
+ Times.format(logControllerContext.getLogUploadTimeStamp())
+ "\n";
} catch (Exception e) {
LOG.error(
"Failed to move temporary log file to final location: ["
+ remoteNodeTmpLogFileForApp + "] to ["
+ renamedPath + "]", e);
diagnosticMessage =
"Log uploaded failed for Application: " + appId
+ " in NodeManager: "
+ LogAggregationUtils.getNodeString(nodeId) + " at "
+ Times.format(currentTime) + "\n";
diagnosticMessage = e.getMessage();
renameTemporaryLogFileFailed = true;
logAggregationSucceedInThisCycle = false;
}
} finally {
sendLogAggregationReport(logAggregationSucceedInThisCycle,
diagnosticMessage, appFinished);
logAggregationFileController.closeWriter();
}
}
private Path getRenamedPath(long currentTime) {
return this.rollingMonitorInterval <= 0 ? remoteNodeLogFileForApp
: new Path(remoteNodeLogFileForApp.getParent(),
remoteNodeLogFileForApp.getName() + "_" + currentTime);
}
private void addCredentials() {
if (UserGroupInformation.isSecurityEnabled()) {
Credentials systemCredentials =
@ -407,11 +378,6 @@ private void addCredentials() {
}
}
@VisibleForTesting
protected LogWriter createLogWriter() {
return new LogWriter();
}
private void sendLogAggregationReport(
boolean logAggregationSucceedInThisCycle, String diagnosticMessage,
boolean appFinished) {
@ -442,60 +408,6 @@ private void sendLogAggregationReportInternal(
this.context.getLogAggregationStatusForApps().add(report);
}
private void cleanOldLogs() {
try {
final FileSystem remoteFS =
this.remoteNodeLogFileForApp.getFileSystem(conf);
Path appDir =
this.remoteNodeLogFileForApp.getParent().makeQualified(
remoteFS.getUri(), remoteFS.getWorkingDirectory());
Set<FileStatus> status =
new HashSet<FileStatus>(Arrays.asList(remoteFS.listStatus(appDir)));
Iterable<FileStatus> mask =
Iterables.filter(status, new Predicate<FileStatus>() {
@Override
public boolean apply(FileStatus next) {
return next.getPath().getName()
.contains(LogAggregationUtils.getNodeString(nodeId))
&& !next.getPath().getName().endsWith(
LogAggregationUtils.TMP_FILE_SUFFIX);
}
});
status = Sets.newHashSet(mask);
// Normally, we just need to delete one oldest log
// before we upload a new log.
// If we can not delete the older logs in this cycle,
// we will delete them in next cycle.
if (status.size() >= this.retentionSize) {
// sort by the lastModificationTime ascending
List<FileStatus> statusList = new ArrayList<FileStatus>(status);
Collections.sort(statusList, new Comparator<FileStatus>() {
public int compare(FileStatus s1, FileStatus s2) {
return s1.getModificationTime() < s2.getModificationTime() ? -1
: s1.getModificationTime() > s2.getModificationTime() ? 1 : 0;
}
});
for (int i = 0 ; i <= statusList.size() - this.retentionSize; i++) {
final FileStatus remove = statusList.get(i);
try {
userUgi.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws Exception {
remoteFS.delete(remove.getPath(), false);
return null;
}
});
} catch (Exception e) {
LOG.error("Failed to delete " + remove.getPath(), e);
}
}
}
} catch (Exception e) {
LOG.error("Failed to clean old logs", e);
}
}
@SuppressWarnings("unchecked")
@Override
public void run() {
@ -523,8 +435,8 @@ private void doAppLogAggregation() {
synchronized(this) {
try {
waiting.set(true);
if (logAggregationInRolling) {
wait(this.rollingMonitorInterval * 1000);
if (logControllerContext.isLogAggregationInRolling()) {
wait(logControllerContext.getRollingMonitorInterval() * 1000);
if (this.appFinishing.get() || this.aborted.get()) {
break;
}
@ -653,7 +565,8 @@ private AggregatedLogFormat.LogRetentionContext getRetentionContext() {
recoveredLogInitedTime, logRetentionSecs * 1000);
}
public Set<Path> doContainerLogAggregation(LogWriter writer,
public Set<Path> doContainerLogAggregation(
LogAggregationFileController logAggregationFileController,
boolean appFinished, boolean containerFinished) {
LOG.info("Uploading logs for container " + containerId
+ ". Current good log dirs are "
@ -665,7 +578,7 @@ public Set<Path> doContainerLogAggregation(LogWriter writer,
this.uploadedFileMeta, retentionContext, appFinished,
containerFinished);
try {
writer.append(logKey, logValue);
logAggregationFileController.write(logKey, logValue);
} catch (Exception e) {
LOG.error("Couldn't upload logs for " + containerId
+ ". Skipping this container.", e);
@ -708,4 +621,15 @@ public int getLogAggregationTimes() {
int getCleanupOldLogTimes() {
return this.cleanupOldLogTimes;
}
@VisibleForTesting
public LogAggregationFileController getLogAggregationFileController() {
return this.logAggregationFileController;
}
@VisibleForTesting
public LogAggregationFileControllerContext
getLogAggregationFileControllerContext() {
return this.logControllerContext;
}
}

View File

@ -18,9 +18,7 @@
package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@ -32,10 +30,7 @@
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.AbstractService;
@ -48,7 +43,8 @@
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileController;
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerFactory;
import org.apache.hadoop.yarn.server.api.ContainerLogContext;
import org.apache.hadoop.yarn.server.api.ContainerType;
import org.apache.hadoop.yarn.server.nodemanager.Context;
@ -79,36 +75,14 @@ public class LogAggregationService extends AbstractService implements
= YarnConfiguration.NM_PREFIX + "log-aggregation.debug-enabled";
private long rollingMonitorInterval;
/*
* Expected deployment TLD will be 1777, owner=<NMOwner>, group=<NMGroup -
* Group to which NMOwner belongs> App dirs will be created as 770,
* owner=<AppOwner>, group=<NMGroup>: so that the owner and <NMOwner> can
* access / modify the files.
* <NMGroup> should obviously be a limited access group.
*/
/**
* Permissions for the top level directory under which app directories will be
* created.
*/
private static final FsPermission TLDIR_PERMISSIONS = FsPermission
.createImmutable((short) 01777);
/**
* Permissions for the Application directory.
*/
private static final FsPermission APP_DIR_PERMISSIONS = FsPermission
.createImmutable((short) 0770);
private final Context context;
private final DeletionService deletionService;
private final Dispatcher dispatcher;
private LocalDirsHandlerService dirsHandler;
Path remoteRootLogDir;
String remoteRootLogDirSuffix;
private NodeId nodeId;
private final ConcurrentMap<ApplicationId, AppLogAggregator> appLogAggregators;
private boolean logPermError = true;
@VisibleForTesting
ExecutorService threadPool;
@ -125,12 +99,6 @@ public LogAggregationService(Dispatcher dispatcher, Context context,
}
protected void serviceInit(Configuration conf) throws Exception {
this.remoteRootLogDir =
new Path(conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR));
this.remoteRootLogDirSuffix =
conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX,
YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR_SUFFIX);
int threadPoolSize = getAggregatorThreadPoolSize(conf);
this.threadPool = HadoopExecutors.newFixedThreadPool(threadPoolSize,
new ThreadFactoryBuilder()
@ -218,158 +186,6 @@ private void stopAggregators() {
}
}
protected FileSystem getFileSystem(Configuration conf) throws IOException {
return this.remoteRootLogDir.getFileSystem(conf);
}
void verifyAndCreateRemoteLogDir(Configuration conf) {
// Checking the existence of the TLD
FileSystem remoteFS = null;
try {
remoteFS = getFileSystem(conf);
} catch (IOException e) {
throw new YarnRuntimeException("Unable to get Remote FileSystem instance", e);
}
boolean remoteExists = true;
try {
FsPermission perms =
remoteFS.getFileStatus(this.remoteRootLogDir).getPermission();
if (!perms.equals(TLDIR_PERMISSIONS) && logPermError) {
LOG.warn("Remote Root Log Dir [" + this.remoteRootLogDir
+ "] already exist, but with incorrect permissions. "
+ "Expected: [" + TLDIR_PERMISSIONS + "], Found: [" + perms
+ "]." + " The cluster may have problems with multiple users.");
logPermError = false;
} else {
logPermError = true;
}
} catch (FileNotFoundException e) {
remoteExists = false;
} catch (IOException e) {
throw new YarnRuntimeException(
"Failed to check permissions for dir ["
+ this.remoteRootLogDir + "]", e);
}
if (!remoteExists) {
LOG.warn("Remote Root Log Dir [" + this.remoteRootLogDir
+ "] does not exist. Attempting to create it.");
try {
Path qualified =
this.remoteRootLogDir.makeQualified(remoteFS.getUri(),
remoteFS.getWorkingDirectory());
remoteFS.mkdirs(qualified, new FsPermission(TLDIR_PERMISSIONS));
remoteFS.setPermission(qualified, new FsPermission(TLDIR_PERMISSIONS));
UserGroupInformation loginUser = UserGroupInformation.getLoginUser();
String primaryGroupName = null;
try {
primaryGroupName = loginUser.getPrimaryGroupName();
} catch (IOException e) {
LOG.warn("No primary group found. The remote root log directory" +
" will be created with the HDFS superuser being its group " +
"owner. JobHistoryServer may be unable to read the directory.");
}
// set owner on the remote directory only if the primary group exists
if (primaryGroupName != null) {
remoteFS.setOwner(qualified,
loginUser.getShortUserName(), primaryGroupName);
}
} catch (IOException e) {
throw new YarnRuntimeException("Failed to create remoteLogDir ["
+ this.remoteRootLogDir + "]", e);
}
}
}
Path getRemoteNodeLogFileForApp(ApplicationId appId, String user) {
return LogAggregationUtils.getRemoteNodeLogFileForApp(
this.remoteRootLogDir, appId, user, this.nodeId,
this.remoteRootLogDirSuffix);
}
Path getRemoteAppLogDir(ApplicationId appId, String user) {
return LogAggregationUtils.getRemoteAppLogDir(this.remoteRootLogDir, appId,
user, this.remoteRootLogDirSuffix);
}
private void createDir(FileSystem fs, Path path, FsPermission fsPerm)
throws IOException {
FsPermission dirPerm = new FsPermission(fsPerm);
fs.mkdirs(path, dirPerm);
FsPermission umask = FsPermission.getUMask(fs.getConf());
if (!dirPerm.equals(dirPerm.applyUMask(umask))) {
fs.setPermission(path, new FsPermission(fsPerm));
}
}
private boolean checkExists(FileSystem fs, Path path, FsPermission fsPerm)
throws IOException {
boolean exists = true;
try {
FileStatus appDirStatus = fs.getFileStatus(path);
if (!APP_DIR_PERMISSIONS.equals(appDirStatus.getPermission())) {
fs.setPermission(path, APP_DIR_PERMISSIONS);
}
} catch (FileNotFoundException fnfe) {
exists = false;
}
return exists;
}
protected void createAppDir(final String user, final ApplicationId appId,
UserGroupInformation userUgi) {
try {
userUgi.doAs(new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws Exception {
try {
// TODO: Reuse FS for user?
FileSystem remoteFS = getFileSystem(getConfig());
// Only creating directories if they are missing to avoid
// unnecessary load on the filesystem from all of the nodes
Path appDir = LogAggregationUtils.getRemoteAppLogDir(
LogAggregationService.this.remoteRootLogDir, appId, user,
LogAggregationService.this.remoteRootLogDirSuffix);
appDir = appDir.makeQualified(remoteFS.getUri(),
remoteFS.getWorkingDirectory());
if (!checkExists(remoteFS, appDir, APP_DIR_PERMISSIONS)) {
Path suffixDir = LogAggregationUtils.getRemoteLogSuffixedDir(
LogAggregationService.this.remoteRootLogDir, user,
LogAggregationService.this.remoteRootLogDirSuffix);
suffixDir = suffixDir.makeQualified(remoteFS.getUri(),
remoteFS.getWorkingDirectory());
if (!checkExists(remoteFS, suffixDir, APP_DIR_PERMISSIONS)) {
Path userDir = LogAggregationUtils.getRemoteLogUserDir(
LogAggregationService.this.remoteRootLogDir, user);
userDir = userDir.makeQualified(remoteFS.getUri(),
remoteFS.getWorkingDirectory());
if (!checkExists(remoteFS, userDir, APP_DIR_PERMISSIONS)) {
createDir(remoteFS, userDir, APP_DIR_PERMISSIONS);
}
createDir(remoteFS, suffixDir, APP_DIR_PERMISSIONS);
}
createDir(remoteFS, appDir, APP_DIR_PERMISSIONS);
}
} catch (IOException e) {
LOG.error("Failed to setup application log directory for "
+ appId, e);
throw e;
}
return null;
}
});
} catch (Exception e) {
throw new YarnRuntimeException(e);
}
}
@SuppressWarnings("unchecked")
private void initApp(final ApplicationId appId, String user,
Credentials credentials, Map<ApplicationAccessType, String> appAcls,
@ -377,7 +193,6 @@ private void initApp(final ApplicationId appId, String user,
long recoveredLogInitedTime) {
ApplicationEvent eventResponse;
try {
verifyAndCreateRemoteLogDir(getConfig());
initAppAggregator(appId, user, credentials, appAcls,
logAggregationContext, recoveredLogInitedTime);
eventResponse = new ApplicationEvent(appId,
@ -410,14 +225,17 @@ protected void initAppAggregator(final ApplicationId appId, String user,
userUgi.addCredentials(credentials);
}
LogAggregationFileController logAggregationFileController
= getLogAggregationFileController(getConfig());
logAggregationFileController.verifyAndCreateRemoteLogDir();
// New application
final AppLogAggregator appLogAggregator =
new AppLogAggregatorImpl(this.dispatcher, this.deletionService,
getConfig(), appId, userUgi, this.nodeId, dirsHandler,
getRemoteNodeLogFileForApp(appId, user),
appAcls, logAggregationContext, this.context,
logAggregationFileController.getRemoteNodeLogFileForApp(appId,
user, nodeId), appAcls, logAggregationContext, this.context,
getLocalFileContext(getConfig()), this.rollingMonitorInterval,
recoveredLogInitedTime);
recoveredLogInitedTime, logAggregationFileController);
if (this.appLogAggregators.putIfAbsent(appId, appLogAggregator) != null) {
throw new YarnRuntimeException("Duplicate initApp for " + appId);
}
@ -425,7 +243,7 @@ protected void initAppAggregator(final ApplicationId appId, String user,
YarnRuntimeException appDirException = null;
try {
// Create the app dir
createAppDir(user, appId, userUgi);
logAggregationFileController.createAppDir(user, appId, userUgi);
} catch (Exception e) {
appLogAggregator.disableLogAggregation();
if (!(e instanceof YarnRuntimeException)) {
@ -570,4 +388,14 @@ private int getAggregatorThreadPoolSize(Configuration conf) {
}
return threadPoolSize;
}
@VisibleForTesting
public LogAggregationFileController getLogAggregationFileController(
Configuration conf) {
LogAggregationFileControllerFactory factory
= new LogAggregationFileControllerFactory(conf);
LogAggregationFileController logAggregationFileController = factory
.getFileControllerForWrite();
return logAggregationFileController;
}
}

View File

@ -35,7 +35,7 @@
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogValue;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogWriter;
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationTFileController;
import org.apache.hadoop.yarn.server.api.ContainerLogContext;
import org.apache.hadoop.yarn.server.api.ContainerType;
import org.apache.hadoop.yarn.server.nodemanager.Context;
@ -241,8 +241,8 @@ public void verifyLogAggregationWithExpectedFiles2DeleteAndUpload(
// verify uploaded files
ArgumentCaptor<LogValue> logValCaptor =
ArgumentCaptor.forClass(LogValue.class);
verify(appLogAggregator.logWriter).append(any(LogKey.class),
logValCaptor.capture());
verify(appLogAggregator.getLogAggregationFileController()).write(
any(LogKey.class), logValCaptor.capture());
Set<String> filesUploaded = new HashSet<>();
LogValue logValue = logValCaptor.getValue();
for(File file: logValue.getPendingLogFilesToUploadForThisContainer()) {
@ -287,11 +287,13 @@ private static AppLogAggregatorInTest createAppLogAggregator(
final Context context = createContext(config);
final FileContext fakeLfs = mock(FileContext.class);
final Path remoteLogDirForApp = new Path(REMOTE_LOG_FILE.getAbsolutePath());
LogAggregationTFileController format = spy(
new LogAggregationTFileController());
format.initialize(config, "TFile");
return new AppLogAggregatorInTest(dispatcher, deletionService,
config, applicationId, ugi, nodeId, dirsService,
remoteLogDirForApp, appAcls, logAggregationContext,
context, fakeLfs, recoveredLogInitedTimeMillis);
context, fakeLfs, recoveredLogInitedTimeMillis, format);
}
/**
@ -402,7 +404,6 @@ private static final class AppLogAggregatorInTest extends
final DeletionService deletionService;
final ApplicationId applicationId;
final LogWriter logWriter;
final ArgumentCaptor<LogValue> logValue;
public AppLogAggregatorInTest(Dispatcher dispatcher,
@ -411,19 +412,15 @@ public AppLogAggregatorInTest(Dispatcher dispatcher,
LocalDirsHandlerService dirsHandler, Path remoteNodeLogFileForApp,
Map<ApplicationAccessType, String> appAcls,
LogAggregationContext logAggregationContext, Context context,
FileContext lfs, long recoveredLogInitedTime) throws IOException {
FileContext lfs, long recoveredLogInitedTime,
LogAggregationTFileController format) throws IOException {
super(dispatcher, deletionService, conf, appId, ugi, nodeId,
dirsHandler, remoteNodeLogFileForApp, appAcls,
logAggregationContext, context, lfs, -1, recoveredLogInitedTime);
logAggregationContext, context, lfs, -1, recoveredLogInitedTime,
format);
this.applicationId = appId;
this.deletionService = deletionService;
this.logWriter = spy(new LogWriter());
this.logValue = ArgumentCaptor.forClass(LogValue.class);
}
@Override
protected LogWriter createLogWriter() {
return this.logWriter;
}
}
}

View File

@ -103,6 +103,9 @@
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogReader;
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileController;
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationFileControllerFactory;
import org.apache.hadoop.yarn.logaggregation.filecontroller.LogAggregationTFileController;
import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
@ -161,11 +164,12 @@ public TestLogAggregationService() throws UnsupportedFileSystemException {
DrainDispatcher dispatcher;
EventHandler<Event> appEventHandler;
private NodeId nodeId = NodeId.newInstance("0.0.0.0", 5555);
@Override
@SuppressWarnings("unchecked")
public void setup() throws IOException {
super.setup();
NodeId nodeId = NodeId.newInstance("0.0.0.0", 5555);
((NMContext)context).setNodeId(nodeId);
dispatcher = createDispatcher();
appEventHandler = mock(EventHandler.class);
@ -246,9 +250,9 @@ private void verifyLocalFileDeletion(
Assert.assertFalse("Directory [" + app1LogDir + "] was not deleted",
app1LogDir.exists());
Path logFilePath =
logAggregationService.getRemoteNodeLogFileForApp(application1,
this.user);
Path logFilePath = logAggregationService
.getLogAggregationFileController(conf)
.getRemoteNodeLogFileForApp(application1, this.user, nodeId);
Assert.assertTrue("Log file [" + logFilePath + "] not found", new File(
logFilePath.toUri().getPath()).exists());
@ -369,9 +373,10 @@ public void testNoContainerOnNode() throws Exception {
logAggregationService.stop();
assertEquals(0, logAggregationService.getNumAggregators());
Assert.assertFalse(new File(logAggregationService
.getRemoteNodeLogFileForApp(application1, this.user).toUri().getPath())
LogAggregationFileController format1 =
logAggregationService.getLogAggregationFileController(conf);
Assert.assertFalse(new File(format1.getRemoteNodeLogFileForApp(
application1, this.user, this.nodeId).toUri().getPath())
.exists());
dispatcher.await();
@ -541,26 +546,33 @@ public void testMultipleAppsLogAggregation() throws Exception {
};
checkEvents(appEventHandler, expectedFinishedEvents, false, "getType", "getApplicationID");
}
@Test
public void testVerifyAndCreateRemoteDirsFailure()
throws Exception {
this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath());
this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
this.remoteRootLogDir.getAbsolutePath());
LogAggregationFileControllerFactory factory
= new LogAggregationFileControllerFactory(conf);
LogAggregationFileController logAggregationFileFormat = factory
.getFileControllerForWrite();
LogAggregationFileController spyLogAggregationFileFormat =
spy(logAggregationFileFormat);
YarnRuntimeException e = new YarnRuntimeException("KABOOM!");
doThrow(e).doNothing().when(spyLogAggregationFileFormat)
.verifyAndCreateRemoteLogDir();
LogAggregationService logAggregationService = spy(
new LogAggregationService(dispatcher, this.context, this.delSrvc,
super.dirsHandler));
super.dirsHandler) {
@Override
public LogAggregationFileController getLogAggregationFileController(
Configuration conf) {
return spyLogAggregationFileFormat;
}
});
logAggregationService.init(this.conf);
YarnRuntimeException e = new YarnRuntimeException("KABOOM!");
doThrow(e)
.when(logAggregationService).verifyAndCreateRemoteLogDir(
any(Configuration.class));
logAggregationService.start();
// Now try to start an application
ApplicationId appId =
BuilderUtils.newApplicationId(System.currentTimeMillis(),
@ -607,8 +619,7 @@ public void testVerifyAndCreateRemoteDirsFailure()
logAggregationService.stop();
}
@Test
public void testVerifyAndCreateRemoteDirNonExistence()
throws Exception {
@ -621,14 +632,24 @@ public void testVerifyAndCreateRemoteDirNonExistence()
new LogAggregationService(dispatcher, this.context, this.delSrvc,
super.dirsHandler));
logAggregationService.init(this.conf);
logAggregationService.start();
boolean existsBefore = aNewFile.exists();
assertTrue("The new file already exists!", !existsBefore);
logAggregationService.verifyAndCreateRemoteLogDir(this.conf);
ApplicationId appId = ApplicationId.newInstance(
System.currentTimeMillis(), 1);
LogAggregationContext contextWithAMAndFailed =
Records.newRecord(LogAggregationContext.class);
contextWithAMAndFailed.setLogAggregationPolicyClassName(
AMOrFailedContainerLogAggregationPolicy.class.getName());
logAggregationService.handle(new LogHandlerAppStartedEvent(appId,
this.user, null, this.acls, contextWithAMAndFailed));
dispatcher.await();
boolean existsAfter = aNewFile.exists();
assertTrue("The new aggregate file is not successfully created", existsAfter);
aNewFile.delete(); //housekeeping
logAggregationService.stop();
}
@Test
@ -641,7 +662,17 @@ public void testRemoteRootLogDirIsCreatedWithCorrectGroupOwner()
LogAggregationService logAggregationService = new LogAggregationService(
dispatcher, this.context, this.delSrvc, super.dirsHandler);
logAggregationService.init(this.conf);
logAggregationService.verifyAndCreateRemoteLogDir(this.conf);
logAggregationService.start();
ApplicationId appId = ApplicationId.newInstance(
System.currentTimeMillis(), 1);
LogAggregationContext contextWithAMAndFailed =
Records.newRecord(LogAggregationContext.class);
contextWithAMAndFailed.setLogAggregationPolicyClassName(
AMOrFailedContainerLogAggregationPolicy.class.getName());
logAggregationService.handle(new LogHandlerAppStartedEvent(appId,
this.user, null, this.acls, contextWithAMAndFailed));
dispatcher.await();
String targetGroup =
UserGroupInformation.getLoginUser().getPrimaryGroupName();
@ -651,6 +682,7 @@ public void testRemoteRootLogDirIsCreatedWithCorrectGroupOwner()
fileStatus.getGroup(), targetGroup);
fs.delete(aNewFile, true);
logAggregationService.stop();
}
@Test
@ -669,14 +701,23 @@ public void testAppLogDirCreation() throws Exception {
FileSystem fs = FileSystem.get(this.conf);
final FileSystem spyFs = spy(FileSystem.get(this.conf));
final LogAggregationTFileController spyFileFormat
= new LogAggregationTFileController() {
@Override
public FileSystem getFileSystem(Configuration conf)
throws IOException {
return spyFs;
}
};
spyFileFormat.initialize(conf, "TFile");
LogAggregationService aggSvc = new LogAggregationService(dispatcher,
this.context, this.delSrvc, super.dirsHandler) {
@Override
protected FileSystem getFileSystem(Configuration conf) {
return spyFs;
public LogAggregationFileController getLogAggregationFileController(
Configuration conf) {
return spyFileFormat;
}
};
aggSvc.init(this.conf);
aggSvc.start();
@ -769,18 +810,36 @@ public void testLogAggregationInitAppFailsWithoutKillingNM() throws Exception {
@Test
public void testLogAggregationCreateDirsFailsWithoutKillingNM()
throws Exception {
this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath());
this.conf.set(YarnConfiguration.NM_LOG_DIRS,
localLogDir.getAbsolutePath());
this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
this.remoteRootLogDir.getAbsolutePath());
DeletionService spyDelSrvc = spy(this.delSrvc);
LogAggregationFileControllerFactory factory
= new LogAggregationFileControllerFactory(conf);
LogAggregationFileController logAggregationFileFormat = factory
.getFileControllerForWrite();
LogAggregationFileController spyLogAggregationFileFormat =
spy(logAggregationFileFormat);
Exception e = new RuntimeException("KABOOM!");
doThrow(e).when(spyLogAggregationFileFormat)
.createAppDir(any(String.class), any(ApplicationId.class),
any(UserGroupInformation.class));
LogAggregationService logAggregationService = spy(
new LogAggregationService(dispatcher, this.context, spyDelSrvc,
super.dirsHandler));
super.dirsHandler){
@Override
public LogAggregationFileController getLogAggregationFileController(
Configuration conf) {
return spyLogAggregationFileFormat;
}
});
logAggregationService.init(this.conf);
logAggregationService.start();
ApplicationId appId =
BuilderUtils.newApplicationId(System.currentTimeMillis(),
(int) (Math.random() * 1000));
@ -789,10 +848,6 @@ public void testLogAggregationCreateDirsFailsWithoutKillingNM()
new File(localLogDir, appId.toString());
appLogDir.mkdir();
Exception e = new RuntimeException("KABOOM!");
doThrow(e)
.when(logAggregationService).createAppDir(any(String.class),
any(ApplicationId.class), any(UserGroupInformation.class));
LogAggregationContext contextWithAMAndFailed =
Records.newRecord(LogAggregationContext.class);
contextWithAMAndFailed.setLogAggregationPolicyClassName(
@ -867,7 +922,8 @@ private LogFileStatusInLastCycle verifyContainerLogs(
int minNumOfContainers, int maxNumOfContainers,
String[] logFiles, int numOfLogsPerContainer, boolean multiLogs)
throws IOException {
Path appLogDir = logAggregationService.getRemoteAppLogDir(appId, this.user);
Path appLogDir = logAggregationService.getLogAggregationFileController(
conf).getRemoteAppLogDir(appId, this.user);
RemoteIterator<FileStatus> nodeFiles = null;
try {
Path qualifiedLogDir =
@ -2108,7 +2164,7 @@ private LogAggregationService createLogAggregationService(
}
logAggregationService.handle(new LogHandlerAppStartedEvent(appId,
this.user, null, this.acls, logAggContext));
dispatcher.await();
return logAggregationService;
}
@ -2462,17 +2518,20 @@ private void verifySkipUnnecessaryNNOperations(
logAggregationService.stop();
assertEquals(expectedLogAggregationTimes,
aggregator.getLogAggregationTimes());
aggregator.getLogAggregationFileControllerContext()
.getLogAggregationTimes());
assertEquals(expectedAggregationReportNum,
this.context.getLogAggregationStatusForApps().size());
assertEquals(expectedCleanupOldLogsTimes,
aggregator.getCleanupOldLogTimes());
aggregator.getLogAggregationFileControllerContext()
.getCleanOldLogsTimes());
}
private int numOfLogsAvailable(LogAggregationService logAggregationService,
ApplicationId appId, boolean sizeLimited, String lastLogFile)
throws IOException {
Path appLogDir = logAggregationService.getRemoteAppLogDir(appId, this.user);
Path appLogDir = logAggregationService.getLogAggregationFileController(
conf).getRemoteAppLogDir(appId, this.user);
RemoteIterator<FileStatus> nodeFiles = null;
try {
Path qualifiedLogDir =