YARN-2582. Fixed Log CLI and Web UI for showing aggregated logs of LRS. Contributed Xuan Gong.
This commit is contained in:
parent
89427419a3
commit
e90718fa5a
@ -376,6 +376,9 @@ Release 2.6.0 - UNRELEASED
|
||||
YARN-2673. Made timeline client put APIs retry if ConnectException happens.
|
||||
(Li Lu via zjshen)
|
||||
|
||||
YARN-2582. Fixed Log CLI and Web UI for showing aggregated logs of LRS. (Xuan
|
||||
Gong via zjshen)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
BUG FIXES
|
||||
|
@ -31,7 +31,6 @@
|
||||
import org.apache.hadoop.classification.InterfaceStability.Evolving;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.conf.Configured;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.util.Tool;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
@ -39,8 +38,6 @@
|
||||
import org.apache.hadoop.yarn.client.api.YarnClient;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat;
|
||||
import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
|
||||
import org.apache.hadoop.yarn.logaggregation.LogCLIHelpers;
|
||||
import org.apache.hadoop.yarn.util.ConverterUtils;
|
||||
|
||||
@ -113,17 +110,16 @@ public int run(String[] args) throws Exception {
|
||||
System.err.println("Invalid ApplicationId specified");
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
||||
try {
|
||||
int resultCode = verifyApplicationState(appId);
|
||||
if (resultCode != 0) {
|
||||
System.out.println("Application has not completed." +
|
||||
" Logs are only available after an application completes");
|
||||
System.out.println("Logs are not avaiable right now.");
|
||||
return resultCode;
|
||||
}
|
||||
} catch (Exception e) {
|
||||
System.err.println("Unable to get ApplicationState." +
|
||||
" Attempting to fetch logs directly from the filesystem.");
|
||||
System.err.println("Unable to get ApplicationState."
|
||||
+ " Attempting to fetch logs directly from the filesystem.");
|
||||
}
|
||||
|
||||
LogCLIHelpers logCliHelper = new LogCLIHelpers();
|
||||
@ -141,18 +137,9 @@ public int run(String[] args) throws Exception {
|
||||
printHelpMessage(printOpts);
|
||||
resultCode = -1;
|
||||
} else {
|
||||
Path remoteRootLogDir =
|
||||
new Path(getConf().get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
|
||||
YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR));
|
||||
AggregatedLogFormat.LogReader reader =
|
||||
new AggregatedLogFormat.LogReader(getConf(),
|
||||
LogAggregationUtils.getRemoteNodeLogFileForApp(
|
||||
remoteRootLogDir,
|
||||
appId,
|
||||
appOwner,
|
||||
ConverterUtils.toNodeId(nodeAddress),
|
||||
LogAggregationUtils.getRemoteNodeLogDirSuffix(getConf())));
|
||||
resultCode = logCliHelper.dumpAContainerLogs(containerIdStr, reader, System.out);
|
||||
resultCode =
|
||||
logCliHelper.dumpAContainersLogs(appIdStr, containerIdStr,
|
||||
nodeAddress, appOwner);
|
||||
}
|
||||
|
||||
return resultCode;
|
||||
@ -167,10 +154,10 @@ private int verifyApplicationState(ApplicationId appId) throws IOException,
|
||||
switch (appReport.getYarnApplicationState()) {
|
||||
case NEW:
|
||||
case NEW_SAVING:
|
||||
case ACCEPTED:
|
||||
case SUBMITTED:
|
||||
case RUNNING:
|
||||
return -1;
|
||||
case ACCEPTED:
|
||||
case RUNNING:
|
||||
case FAILED:
|
||||
case FINISHED:
|
||||
case KILLED:
|
||||
|
@ -25,21 +25,38 @@
|
||||
import static org.mockito.Mockito.mock;
|
||||
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.File;
|
||||
import java.io.FileWriter;
|
||||
import java.io.IOException;
|
||||
import java.io.PrintStream;
|
||||
import java.io.PrintWriter;
|
||||
import java.io.Writer;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.junit.Assert;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.LocalFileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
||||
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptIdPBImpl;
|
||||
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
|
||||
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl;
|
||||
import org.apache.hadoop.yarn.client.api.YarnClient;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat;
|
||||
import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
|
||||
import org.apache.hadoop.yarn.logaggregation.LogCLIHelpers;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
@ -138,6 +155,116 @@ public void testHelpMessage() throws Exception {
|
||||
Assert.assertEquals(appReportStr, sysOutStream.toString());
|
||||
}
|
||||
|
||||
@Test (timeout = 15000)
|
||||
public void testFetchApplictionLogs() throws Exception {
|
||||
String remoteLogRootDir = "target/logs/";
|
||||
Configuration configuration = new Configuration();
|
||||
configuration.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true);
|
||||
configuration
|
||||
.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, remoteLogRootDir);
|
||||
configuration.setBoolean(YarnConfiguration.YARN_ACL_ENABLE, true);
|
||||
configuration.set(YarnConfiguration.YARN_ADMIN_ACL, "admin");
|
||||
FileSystem fs = FileSystem.get(configuration);
|
||||
|
||||
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
|
||||
ApplicationId appId = ApplicationIdPBImpl.newInstance(0, 1);
|
||||
ApplicationAttemptId appAttemptId =
|
||||
ApplicationAttemptIdPBImpl.newInstance(appId, 1);
|
||||
ContainerId containerId1 = ContainerIdPBImpl.newInstance(appAttemptId, 1);
|
||||
ContainerId containerId2 = ContainerIdPBImpl.newInstance(appAttemptId, 2);
|
||||
|
||||
NodeId nodeId = NodeId.newInstance("localhost", 1234);
|
||||
|
||||
// create local logs
|
||||
String rootLogDir = "target/LocalLogs";
|
||||
Path rootLogDirPath = new Path(rootLogDir);
|
||||
if (fs.exists(rootLogDirPath)) {
|
||||
fs.delete(rootLogDirPath, true);
|
||||
}
|
||||
assertTrue(fs.mkdirs(rootLogDirPath));
|
||||
|
||||
Path appLogsDir = new Path(rootLogDirPath, appId.toString());
|
||||
if (fs.exists(appLogsDir)) {
|
||||
fs.delete(appLogsDir, true);
|
||||
}
|
||||
assertTrue(fs.mkdirs(appLogsDir));
|
||||
List<String> rootLogDirs = Arrays.asList(rootLogDir);
|
||||
|
||||
// create container logs in localLogDir
|
||||
createContainerLogInLocalDir(appLogsDir, containerId1, fs);
|
||||
createContainerLogInLocalDir(appLogsDir, containerId2, fs);
|
||||
|
||||
Path path =
|
||||
new Path(remoteLogRootDir + ugi.getShortUserName()
|
||||
+ "/logs/application_0_0001");
|
||||
if (fs.exists(path)) {
|
||||
fs.delete(path, true);
|
||||
}
|
||||
assertTrue(fs.mkdirs(path));
|
||||
// upload container logs into remote directory
|
||||
uploadContainerLogIntoRemoteDir(ugi, configuration, rootLogDirs, nodeId,
|
||||
containerId1, path, fs);
|
||||
uploadContainerLogIntoRemoteDir(ugi, configuration, rootLogDirs, nodeId,
|
||||
containerId2, path, fs);
|
||||
|
||||
YarnClient mockYarnClient =
|
||||
createMockYarnClient(YarnApplicationState.FINISHED);
|
||||
LogsCLI cli = new LogsCLIForTest(mockYarnClient);
|
||||
cli.setConf(configuration);
|
||||
|
||||
int exitCode = cli.run(new String[] { "-applicationId", appId.toString() });
|
||||
assertTrue(exitCode == 0);
|
||||
assertTrue(sysOutStream.toString().contains(
|
||||
"Hello container_0_0001_01_000001!"));
|
||||
assertTrue(sysOutStream.toString().contains(
|
||||
"Hello container_0_0001_01_000002!"));
|
||||
sysOutStream.reset();
|
||||
|
||||
exitCode =
|
||||
cli.run(new String[] { "-applicationId", appId.toString(),
|
||||
"-nodeAddress", nodeId.toString(), "-containerId",
|
||||
containerId1.toString() });
|
||||
assertTrue(exitCode == 0);
|
||||
assertTrue(sysOutStream.toString().contains(
|
||||
"Hello container_0_0001_01_000001!"));
|
||||
|
||||
fs.delete(new Path(remoteLogRootDir), true);
|
||||
fs.delete(new Path(rootLogDir), true);
|
||||
}
|
||||
|
||||
private static void createContainerLogInLocalDir(Path appLogsDir,
|
||||
ContainerId containerId, FileSystem fs) throws Exception {
|
||||
Path containerLogsDir = new Path(appLogsDir, containerId.toString());
|
||||
if (fs.exists(containerLogsDir)) {
|
||||
fs.delete(containerLogsDir, true);
|
||||
}
|
||||
assertTrue(fs.mkdirs(containerLogsDir));
|
||||
Writer writer =
|
||||
new FileWriter(new File(containerLogsDir.toString(), "sysout"));
|
||||
writer.write("Hello " + containerId + "!");
|
||||
writer.close();
|
||||
}
|
||||
|
||||
private static void uploadContainerLogIntoRemoteDir(UserGroupInformation ugi,
|
||||
Configuration configuration, List<String> rootLogDirs, NodeId nodeId,
|
||||
ContainerId containerId, Path appDir, FileSystem fs) throws Exception {
|
||||
Path path =
|
||||
new Path(appDir, LogAggregationUtils.getNodeString(nodeId)
|
||||
+ System.currentTimeMillis());
|
||||
AggregatedLogFormat.LogWriter writer =
|
||||
new AggregatedLogFormat.LogWriter(configuration, path, ugi);
|
||||
writer.writeApplicationOwner(ugi.getUserName());
|
||||
|
||||
Map<ApplicationAccessType, String> appAcls =
|
||||
new HashMap<ApplicationAccessType, String>();
|
||||
appAcls.put(ApplicationAccessType.VIEW_APP, ugi.getUserName());
|
||||
writer.writeApplicationACLs(appAcls);
|
||||
writer.append(new AggregatedLogFormat.LogKey(containerId),
|
||||
new AggregatedLogFormat.LogValue(rootLogDirs, containerId,
|
||||
UserGroupInformation.getCurrentUser().getShortUserName()));
|
||||
writer.close();
|
||||
}
|
||||
|
||||
private YarnClient createMockYarnClient(YarnApplicationState appState)
|
||||
throws YarnException, IOException {
|
||||
YarnClient mockClient = mock(YarnClient.class);
|
||||
|
@ -110,4 +110,9 @@ public static String getRemoteNodeLogDirSuffix(Configuration conf) {
|
||||
public static String getNodeString(NodeId nodeId) {
|
||||
return nodeId.toString().replace(":", "_");
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public static String getNodeString(String nodeId) {
|
||||
return nodeId.toString().replace(":", "_");
|
||||
}
|
||||
}
|
||||
|
@ -52,19 +52,47 @@ public int dumpAContainersLogs(String appId, String containerId,
|
||||
YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
|
||||
YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR));
|
||||
String suffix = LogAggregationUtils.getRemoteNodeLogDirSuffix(getConf());
|
||||
Path logPath = LogAggregationUtils.getRemoteNodeLogFileForApp(
|
||||
Path remoteAppLogDir = LogAggregationUtils.getRemoteAppLogDir(
|
||||
remoteRootLogDir, ConverterUtils.toApplicationId(appId), jobOwner,
|
||||
ConverterUtils.toNodeId(nodeId), suffix);
|
||||
AggregatedLogFormat.LogReader reader;
|
||||
suffix);
|
||||
RemoteIterator<FileStatus> nodeFiles;
|
||||
try {
|
||||
reader = new AggregatedLogFormat.LogReader(getConf(), logPath);
|
||||
} catch (FileNotFoundException fnfe) {
|
||||
System.out.println("Logs not available at " + logPath.toString());
|
||||
System.out
|
||||
.println("Log aggregation has not completed or is not enabled.");
|
||||
Path qualifiedLogDir =
|
||||
FileContext.getFileContext(getConf()).makeQualified(
|
||||
remoteAppLogDir);
|
||||
nodeFiles =
|
||||
FileContext.getFileContext(qualifiedLogDir.toUri(), getConf())
|
||||
.listStatus(remoteAppLogDir);
|
||||
} catch (FileNotFoundException fnf) {
|
||||
logDirNotExist(remoteAppLogDir.toString());
|
||||
return -1;
|
||||
}
|
||||
return dumpAContainerLogs(containerId, reader, System.out);
|
||||
boolean foundContainerLogs = false;
|
||||
while (nodeFiles.hasNext()) {
|
||||
FileStatus thisNodeFile = nodeFiles.next();
|
||||
String fileName = thisNodeFile.getPath().getName();
|
||||
if (fileName.contains(LogAggregationUtils.getNodeString(nodeId))
|
||||
&& !fileName.endsWith(LogAggregationUtils.TMP_FILE_SUFFIX)) {
|
||||
AggregatedLogFormat.LogReader reader = null;
|
||||
try {
|
||||
reader =
|
||||
new AggregatedLogFormat.LogReader(getConf(),
|
||||
thisNodeFile.getPath());
|
||||
if (dumpAContainerLogs(containerId, reader, System.out) > -1) {
|
||||
foundContainerLogs = true;
|
||||
}
|
||||
} finally {
|
||||
if (reader != null) {
|
||||
reader.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if (!foundContainerLogs) {
|
||||
containerLogNotFound(containerId);
|
||||
return -1;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Private
|
||||
@ -81,8 +109,7 @@ public int dumpAContainerLogs(String containerIdStr,
|
||||
}
|
||||
|
||||
if (valueStream == null) {
|
||||
System.out.println("Logs for container " + containerIdStr
|
||||
+ " are not present in this log-file.");
|
||||
containerLogNotFound(containerIdStr);
|
||||
return -1;
|
||||
}
|
||||
|
||||
@ -114,42 +141,49 @@ public int dumpAllContainersLogs(ApplicationId appId, String appOwner,
|
||||
nodeFiles = FileContext.getFileContext(qualifiedLogDir.toUri(),
|
||||
getConf()).listStatus(remoteAppLogDir);
|
||||
} catch (FileNotFoundException fnf) {
|
||||
System.out.println("Logs not available at " + remoteAppLogDir.toString());
|
||||
System.out
|
||||
.println("Log aggregation has not completed or is not enabled.");
|
||||
logDirNotExist(remoteAppLogDir.toString());
|
||||
return -1;
|
||||
}
|
||||
boolean foundAnyLogs = false;
|
||||
while (nodeFiles.hasNext()) {
|
||||
FileStatus thisNodeFile = nodeFiles.next();
|
||||
AggregatedLogFormat.LogReader reader = new AggregatedLogFormat.LogReader(
|
||||
getConf(), new Path(remoteAppLogDir, thisNodeFile.getPath().getName()));
|
||||
try {
|
||||
if (!thisNodeFile.getPath().getName()
|
||||
.endsWith(LogAggregationUtils.TMP_FILE_SUFFIX)) {
|
||||
AggregatedLogFormat.LogReader reader =
|
||||
new AggregatedLogFormat.LogReader(getConf(), thisNodeFile.getPath());
|
||||
try {
|
||||
|
||||
DataInputStream valueStream;
|
||||
LogKey key = new LogKey();
|
||||
valueStream = reader.next(key);
|
||||
|
||||
while (valueStream != null) {
|
||||
String containerString = "\n\nContainer: " + key + " on "
|
||||
+ thisNodeFile.getPath().getName();
|
||||
out.println(containerString);
|
||||
out.println(StringUtils.repeat("=", containerString.length()));
|
||||
while (true) {
|
||||
try {
|
||||
LogReader.readAContainerLogsForALogType(valueStream, out);
|
||||
} catch (EOFException eof) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// Next container
|
||||
key = new LogKey();
|
||||
DataInputStream valueStream;
|
||||
LogKey key = new LogKey();
|
||||
valueStream = reader.next(key);
|
||||
|
||||
while (valueStream != null) {
|
||||
String containerString =
|
||||
"\n\nContainer: " + key + " on " + thisNodeFile.getPath().getName();
|
||||
out.println(containerString);
|
||||
out.println(StringUtils.repeat("=", containerString.length()));
|
||||
while (true) {
|
||||
try {
|
||||
LogReader.readAContainerLogsForALogType(valueStream, out);
|
||||
foundAnyLogs = true;
|
||||
} catch (EOFException eof) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// Next container
|
||||
key = new LogKey();
|
||||
valueStream = reader.next(key);
|
||||
}
|
||||
} finally {
|
||||
reader.close();
|
||||
}
|
||||
} finally {
|
||||
reader.close();
|
||||
}
|
||||
}
|
||||
if (! foundAnyLogs) {
|
||||
emptyLogDir(remoteAppLogDir.toString());
|
||||
return -1;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
@ -162,4 +196,18 @@ public void setConf(Configuration conf) {
|
||||
public Configuration getConf() {
|
||||
return this.conf;
|
||||
}
|
||||
|
||||
private static void containerLogNotFound(String containerId) {
|
||||
System.out.println("Logs for container " + containerId
|
||||
+ " are not present in this log-file.");
|
||||
}
|
||||
|
||||
private static void logDirNotExist(String remoteAppLogDir) {
|
||||
System.out.println(remoteAppLogDir + "does not exist.");
|
||||
System.out.println("Log aggregation has not completed or is not enabled.");
|
||||
}
|
||||
|
||||
private static void emptyLogDir(String remoteAppLogDir) {
|
||||
System.out.println(remoteAppLogDir + "does not have any log files.");
|
||||
}
|
||||
}
|
||||
|
@ -30,7 +30,10 @@
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileContext;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.RemoteIterator;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
@ -59,113 +62,127 @@ public class AggregatedLogsBlock extends HtmlBlock {
|
||||
|
||||
@Override
|
||||
protected void render(Block html) {
|
||||
AggregatedLogFormat.LogReader reader = null;
|
||||
ContainerId containerId = verifyAndGetContainerId(html);
|
||||
NodeId nodeId = verifyAndGetNodeId(html);
|
||||
String appOwner = verifyAndGetAppOwner(html);
|
||||
LogLimits logLimits = verifyAndGetLogLimits(html);
|
||||
if (containerId == null || nodeId == null || appOwner == null
|
||||
|| appOwner.isEmpty() || logLimits == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
ApplicationId applicationId = containerId.getApplicationAttemptId()
|
||||
.getApplicationId();
|
||||
String logEntity = $(ENTITY_STRING);
|
||||
if (logEntity == null || logEntity.isEmpty()) {
|
||||
logEntity = containerId.toString();
|
||||
}
|
||||
|
||||
if (!conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED,
|
||||
YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED)) {
|
||||
html.h1()
|
||||
._("Aggregation is not enabled. Try the nodemanager at " + nodeId)
|
||||
._();
|
||||
return;
|
||||
}
|
||||
|
||||
Path remoteRootLogDir = new Path(conf.get(
|
||||
YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
|
||||
YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR));
|
||||
Path remoteAppDir = LogAggregationUtils.getRemoteAppLogDir(
|
||||
remoteRootLogDir, applicationId, appOwner,
|
||||
LogAggregationUtils.getRemoteNodeLogDirSuffix(conf));
|
||||
RemoteIterator<FileStatus> nodeFiles;
|
||||
try {
|
||||
ContainerId containerId = verifyAndGetContainerId(html);
|
||||
NodeId nodeId = verifyAndGetNodeId(html);
|
||||
String appOwner = verifyAndGetAppOwner(html);
|
||||
LogLimits logLimits = verifyAndGetLogLimits(html);
|
||||
if (containerId == null || nodeId == null || appOwner == null
|
||||
|| appOwner.isEmpty() || logLimits == null) {
|
||||
return;
|
||||
}
|
||||
Path qualifiedLogDir =
|
||||
FileContext.getFileContext(conf).makeQualified(
|
||||
remoteAppDir);
|
||||
nodeFiles =
|
||||
FileContext.getFileContext(qualifiedLogDir.toUri(), conf)
|
||||
.listStatus(remoteAppDir);
|
||||
} catch (FileNotFoundException fnf) {
|
||||
html.h1()
|
||||
._("Logs not available for " + logEntity
|
||||
+ ". Aggregation may not be complete, "
|
||||
+ "Check back later or try the nodemanager at " + nodeId)._();
|
||||
return;
|
||||
} catch (Exception ex) {
|
||||
html.h1()
|
||||
._("Error getting logs at " + nodeId)._();
|
||||
return;
|
||||
}
|
||||
|
||||
ApplicationId applicationId = containerId.getApplicationAttemptId()
|
||||
.getApplicationId();
|
||||
String logEntity = $(ENTITY_STRING);
|
||||
if (logEntity == null || logEntity.isEmpty()) {
|
||||
logEntity = containerId.toString();
|
||||
}
|
||||
|
||||
if (!conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED,
|
||||
YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED)) {
|
||||
html.h1()
|
||||
._("Aggregation is not enabled. Try the nodemanager at " + nodeId)
|
||||
._();
|
||||
return;
|
||||
}
|
||||
|
||||
Path remoteRootLogDir = new Path(conf.get(
|
||||
YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
|
||||
YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR));
|
||||
|
||||
try {
|
||||
reader = new AggregatedLogFormat.LogReader(conf,
|
||||
LogAggregationUtils.getRemoteNodeLogFileForApp(remoteRootLogDir,
|
||||
applicationId, appOwner, nodeId,
|
||||
LogAggregationUtils.getRemoteNodeLogDirSuffix(conf)));
|
||||
} catch (FileNotFoundException e) {
|
||||
// ACLs not available till the log file is opened.
|
||||
html.h1()
|
||||
._("Logs not available for " + logEntity
|
||||
+ ". Aggregation may not be complete, "
|
||||
+ "Check back later or try the nodemanager at " + nodeId)._();
|
||||
return;
|
||||
} catch (IOException e) {
|
||||
html.h1()._("Error getting logs for " + logEntity)._();
|
||||
LOG.error("Error getting logs for " + logEntity, e);
|
||||
return;
|
||||
}
|
||||
|
||||
String owner = null;
|
||||
Map<ApplicationAccessType, String> appAcls = null;
|
||||
try {
|
||||
owner = reader.getApplicationOwner();
|
||||
appAcls = reader.getApplicationAcls();
|
||||
} catch (IOException e) {
|
||||
html.h1()._("Error getting logs for " + logEntity)._();
|
||||
LOG.error("Error getting logs for " + logEntity, e);
|
||||
return;
|
||||
}
|
||||
ApplicationACLsManager aclsManager = new ApplicationACLsManager(conf);
|
||||
aclsManager.addApplication(applicationId, appAcls);
|
||||
|
||||
String remoteUser = request().getRemoteUser();
|
||||
UserGroupInformation callerUGI = null;
|
||||
if (remoteUser != null) {
|
||||
callerUGI = UserGroupInformation.createRemoteUser(remoteUser);
|
||||
}
|
||||
if (callerUGI != null
|
||||
&& !aclsManager.checkAccess(callerUGI,
|
||||
ApplicationAccessType.VIEW_APP, owner, applicationId)) {
|
||||
html.h1()
|
||||
._("User [" + remoteUser
|
||||
+ "] is not authorized to view the logs for " + logEntity)._();
|
||||
return;
|
||||
}
|
||||
|
||||
String desiredLogType = $(CONTAINER_LOG_TYPE);
|
||||
try {
|
||||
AggregatedLogFormat.ContainerLogsReader logReader = reader
|
||||
.getContainerLogsReader(containerId);
|
||||
if (logReader == null) {
|
||||
html.h1()
|
||||
._("Logs not available for " + logEntity
|
||||
+ ". Could be caused by the rentention policy")._();
|
||||
return;
|
||||
}
|
||||
|
||||
boolean foundLog = readContainerLogs(html, logReader, logLimits,
|
||||
desiredLogType);
|
||||
|
||||
if (!foundLog) {
|
||||
if (desiredLogType.isEmpty()) {
|
||||
html.h1("No logs available for container " + containerId.toString());
|
||||
} else {
|
||||
html.h1("Unable to locate '" + desiredLogType
|
||||
+ "' log for container " + containerId.toString());
|
||||
boolean foundLog = false;
|
||||
String desiredLogType = $(CONTAINER_LOG_TYPE);
|
||||
try {
|
||||
while (nodeFiles.hasNext()) {
|
||||
AggregatedLogFormat.LogReader reader = null;
|
||||
try {
|
||||
FileStatus thisNodeFile = nodeFiles.next();
|
||||
if (!thisNodeFile.getPath().getName()
|
||||
.contains(LogAggregationUtils.getNodeString(nodeId))
|
||||
|| thisNodeFile.getPath().getName()
|
||||
.endsWith(LogAggregationUtils.TMP_FILE_SUFFIX)) {
|
||||
continue;
|
||||
}
|
||||
return;
|
||||
reader =
|
||||
new AggregatedLogFormat.LogReader(conf, thisNodeFile.getPath());
|
||||
|
||||
String owner = null;
|
||||
Map<ApplicationAccessType, String> appAcls = null;
|
||||
try {
|
||||
owner = reader.getApplicationOwner();
|
||||
appAcls = reader.getApplicationAcls();
|
||||
} catch (IOException e) {
|
||||
LOG.error("Error getting logs for " + logEntity, e);
|
||||
continue;
|
||||
}
|
||||
ApplicationACLsManager aclsManager = new ApplicationACLsManager(conf);
|
||||
aclsManager.addApplication(applicationId, appAcls);
|
||||
|
||||
String remoteUser = request().getRemoteUser();
|
||||
UserGroupInformation callerUGI = null;
|
||||
if (remoteUser != null) {
|
||||
callerUGI = UserGroupInformation.createRemoteUser(remoteUser);
|
||||
}
|
||||
if (callerUGI != null && !aclsManager.checkAccess(callerUGI,
|
||||
ApplicationAccessType.VIEW_APP, owner, applicationId)) {
|
||||
html.h1()
|
||||
._("User [" + remoteUser
|
||||
+ "] is not authorized to view the logs for " + logEntity
|
||||
+ " in log file [" + thisNodeFile.getPath().getName() + "]")._();
|
||||
LOG.error("User [" + remoteUser
|
||||
+ "] is not authorized to view the logs for " + logEntity);
|
||||
continue;
|
||||
}
|
||||
|
||||
AggregatedLogFormat.ContainerLogsReader logReader = reader
|
||||
.getContainerLogsReader(containerId);
|
||||
if (logReader == null) {
|
||||
continue;
|
||||
}
|
||||
|
||||
foundLog = readContainerLogs(html, logReader, logLimits,
|
||||
desiredLogType);
|
||||
} catch (IOException ex) {
|
||||
LOG.error("Error getting logs for " + logEntity, ex);
|
||||
continue;
|
||||
} finally {
|
||||
if (reader != null)
|
||||
reader.close();
|
||||
}
|
||||
} catch (IOException e) {
|
||||
html.h1()._("Error getting logs for " + logEntity)._();
|
||||
LOG.error("Error getting logs for " + logEntity, e);
|
||||
return;
|
||||
}
|
||||
} finally {
|
||||
if (reader != null) {
|
||||
reader.close();
|
||||
if (!foundLog) {
|
||||
if (desiredLogType.isEmpty()) {
|
||||
html.h1("No logs available for container " + containerId.toString());
|
||||
} else {
|
||||
html.h1("Unable to locate '" + desiredLogType
|
||||
+ "' log for container " + containerId.toString());
|
||||
}
|
||||
}
|
||||
} catch (IOException e) {
|
||||
html.h1()._("Error getting logs for " + logEntity)._();
|
||||
LOG.error("Error getting logs for " + logEntity, e);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -47,7 +47,6 @@
|
||||
import org.apache.hadoop.yarn.webapp.view.BlockForTest;
|
||||
import org.apache.hadoop.yarn.webapp.view.HtmlBlock;
|
||||
import org.apache.hadoop.yarn.webapp.view.HtmlBlockForTest;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Test;
|
||||
|
||||
import static org.mockito.Mockito.*;
|
||||
@ -149,10 +148,8 @@ public void testAggregatedLogsBlock() throws Exception {
|
||||
}
|
||||
/**
|
||||
* Log files was deleted.
|
||||
* TODO: YARN-2582: fix log web ui for Long Running application
|
||||
* @throws Exception
|
||||
*/
|
||||
@Ignore
|
||||
@Test
|
||||
public void testNoLogs() throws Exception {
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user