YARN-6288. Exceptions during aggregated log writes are mishandled. Contributed by Akira Ajisaka

This commit is contained in:
Jason Lowe 2017-04-06 16:24:36 -05:00
parent 1a9439e299
commit 1b081ca27e
7 changed files with 161 additions and 152 deletions

View File

@ -1345,18 +1345,18 @@ private static void uploadContainerLogIntoRemoteDir(UserGroupInformation ugi,
Path path =
new Path(appDir, LogAggregationUtils.getNodeString(nodeId)
+ System.currentTimeMillis());
AggregatedLogFormat.LogWriter writer =
new AggregatedLogFormat.LogWriter(configuration, path, ugi);
writer.writeApplicationOwner(ugi.getUserName());
try (AggregatedLogFormat.LogWriter writer =
new AggregatedLogFormat.LogWriter()) {
writer.initialize(configuration, path, ugi);
writer.writeApplicationOwner(ugi.getUserName());
Map<ApplicationAccessType, String> appAcls =
new HashMap<ApplicationAccessType, String>();
appAcls.put(ApplicationAccessType.VIEW_APP, ugi.getUserName());
writer.writeApplicationACLs(appAcls);
writer.append(new AggregatedLogFormat.LogKey(containerId),
new AggregatedLogFormat.LogValue(rootLogDirs, containerId,
UserGroupInformation.getCurrentUser().getShortUserName()));
writer.close();
Map<ApplicationAccessType, String> appAcls = new HashMap<>();
appAcls.put(ApplicationAccessType.VIEW_APP, ugi.getUserName());
writer.writeApplicationACLs(appAcls);
writer.append(new AggregatedLogFormat.LogKey(containerId),
new AggregatedLogFormat.LogValue(rootLogDirs, containerId,
UserGroupInformation.getCurrentUser().getShortUserName()));
}
}
private static void uploadEmptyContainerLogIntoRemoteDir(UserGroupInformation ugi,
@ -1365,23 +1365,23 @@ private static void uploadEmptyContainerLogIntoRemoteDir(UserGroupInformation ug
Path path =
new Path(appDir, LogAggregationUtils.getNodeString(nodeId)
+ System.currentTimeMillis());
AggregatedLogFormat.LogWriter writer =
new AggregatedLogFormat.LogWriter(configuration, path, ugi);
writer.writeApplicationOwner(ugi.getUserName());
try (AggregatedLogFormat.LogWriter writer =
new AggregatedLogFormat.LogWriter()) {
writer.initialize(configuration, path, ugi);
writer.writeApplicationOwner(ugi.getUserName());
Map<ApplicationAccessType, String> appAcls =
new HashMap<ApplicationAccessType, String>();
appAcls.put(ApplicationAccessType.VIEW_APP, ugi.getUserName());
writer.writeApplicationACLs(appAcls);
DataOutputStream out = writer.getWriter().prepareAppendKey(-1);
new AggregatedLogFormat.LogKey(containerId).write(out);
out.close();
out = writer.getWriter().prepareAppendValue(-1);
new AggregatedLogFormat.LogValue(rootLogDirs, containerId,
UserGroupInformation.getCurrentUser().getShortUserName()).write(out,
new HashSet<File>());
out.close();
writer.close();
Map<ApplicationAccessType, String> appAcls = new HashMap<>();
appAcls.put(ApplicationAccessType.VIEW_APP, ugi.getUserName());
writer.writeApplicationACLs(appAcls);
DataOutputStream out = writer.getWriter().prepareAppendKey(-1);
new AggregatedLogFormat.LogKey(containerId).write(out);
out.close();
out = writer.getWriter().prepareAppendValue(-1);
new AggregatedLogFormat.LogValue(rootLogDirs, containerId,
UserGroupInformation.getCurrentUser().getShortUserName()).write(out,
new HashSet<>());
out.close();
}
}
private YarnClient createMockYarnClient(YarnApplicationState appState,

View File

@ -446,14 +446,23 @@ public boolean shouldRetainLog() {
* The writer that writes out the aggregated logs.
*/
@Private
public static class LogWriter {
public static class LogWriter implements AutoCloseable {
private final FSDataOutputStream fsDataOStream;
private final TFile.Writer writer;
private FSDataOutputStream fsDataOStream;
private TFile.Writer writer;
private FileContext fc;
public LogWriter(final Configuration conf, final Path remoteAppLogFile,
UserGroupInformation userUgi) throws IOException {
/**
* Initialize the LogWriter.
* Must be called just after the instance is created.
* @param conf Configuration
* @param remoteAppLogFile remote log file path
* @param userUgi Ugi of the user
* @throws IOException Failed to initialize
*/
public void initialize(final Configuration conf,
final Path remoteAppLogFile,
UserGroupInformation userUgi) throws IOException {
try {
this.fsDataOStream =
userUgi.doAs(new PrivilegedExceptionAction<FSDataOutputStream>() {
@ -530,11 +539,14 @@ public void append(LogKey logKey, LogValue logValue) throws IOException {
}
}
@Override
public void close() {
try {
this.writer.close();
} catch (IOException e) {
LOG.warn("Exception closing writer", e);
if (writer != null) {
try {
writer.close();
} catch (IOException e) {
LOG.warn("Exception closing writer", e);
}
}
IOUtils.closeStream(fsDataOStream);
}

View File

@ -140,44 +140,44 @@ private void writeSrcFileAndALog(Path srcFilePath, String fileName, final long l
final int ch = filler;
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
LogWriter logWriter = new LogWriter(new Configuration(), remoteAppLogFile,
ugi);
try (LogWriter logWriter = new LogWriter()) {
logWriter.initialize(new Configuration(), remoteAppLogFile, ugi);
LogKey logKey = new LogKey(testContainerId);
LogValue logValue =
spy(new LogValue(Collections.singletonList(srcFileRoot.toString()),
testContainerId, ugi.getShortUserName()));
LogKey logKey = new LogKey(testContainerId);
LogValue logValue =
spy(new LogValue(Collections.singletonList(srcFileRoot.toString()),
testContainerId, ugi.getShortUserName()));
final CountDownLatch latch = new CountDownLatch(1);
final CountDownLatch latch = new CountDownLatch(1);
Thread t = new Thread() {
public void run() {
try {
for(int i=0; i < length/3; i++) {
Thread t = new Thread() {
public void run() {
try {
for (int i = 0; i < length / 3; i++) {
osw.write(ch);
}
}
latch.countDown();
latch.countDown();
for(int i=0; i < (2*length)/3; i++) {
osw.write(ch);
for (int i = 0; i < (2 * length) / 3; i++) {
osw.write(ch);
}
osw.close();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
osw.close();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
};
t.start();
};
t.start();
//Wait till the osw is partially written
//aggregation starts once the ows has completed 1/3rd of its work
latch.await();
//Wait till the osw is partially written
//aggregation starts once the ows has completed 1/3rd of its work
latch.await();
//Aggregate The Logs
logWriter.append(logKey, logValue);
logWriter.close();
//Aggregate The Logs
logWriter.append(logKey, logValue);
}
}
@Test
@ -216,22 +216,23 @@ private void testReadAcontainerLog(boolean logUploadedTime) throws Exception {
writeSrcFile(srcFilePath, "stdout", numChars);
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
LogWriter logWriter = new LogWriter(conf, remoteAppLogFile, ugi);
try (LogWriter logWriter = new LogWriter()) {
logWriter.initialize(conf, remoteAppLogFile, ugi);
LogKey logKey = new LogKey(testContainerId);
LogValue logValue =
new LogValue(Collections.singletonList(srcFileRoot.toString()),
testContainerId, ugi.getShortUserName());
LogKey logKey = new LogKey(testContainerId);
LogValue logValue =
new LogValue(Collections.singletonList(srcFileRoot.toString()),
testContainerId, ugi.getShortUserName());
// When we try to open FileInputStream for stderr, it will throw out an IOException.
// Skip the log aggregation for stderr.
LogValue spyLogValue = spy(logValue);
File errorFile = new File((new Path(srcFilePath, "stderr")).toString());
doThrow(new IOException("Mock can not open FileInputStream")).when(
spyLogValue).secureOpenFile(errorFile);
// When we try to open FileInputStream for stderr, it will throw out an
// IOException. Skip the log aggregation for stderr.
LogValue spyLogValue = spy(logValue);
File errorFile = new File((new Path(srcFilePath, "stderr")).toString());
doThrow(new IOException("Mock can not open FileInputStream")).when(
spyLogValue).secureOpenFile(errorFile);
logWriter.append(logKey, spyLogValue);
logWriter.close();
logWriter.append(logKey, spyLogValue);
}
// make sure permission are correct on the file
FileStatus fsStatus = fs.getFileStatus(remoteAppLogFile);
@ -311,24 +312,24 @@ public void testContainerLogsFileAccess() throws IOException {
UserGroupInformation ugi =
UserGroupInformation.getCurrentUser();
LogWriter logWriter = new LogWriter(conf, remoteAppLogFile, ugi);
try (LogWriter logWriter = new LogWriter()) {
logWriter.initialize(conf, remoteAppLogFile, ugi);
LogKey logKey = new LogKey(testContainerId1);
String randomUser = "randomUser";
LogValue logValue =
spy(new LogValue(Collections.singletonList(srcFileRoot.toString()),
testContainerId1, randomUser));
// It is trying simulate a situation where first log file is owned by
// different user (probably symlink) and second one by the user itself.
// The first file should not be aggregated. Because this log file has the invalid
// user name.
when(logValue.getUser()).thenReturn(randomUser).thenReturn(
ugi.getShortUserName());
logWriter.append(logKey, logValue);
LogKey logKey = new LogKey(testContainerId1);
String randomUser = "randomUser";
LogValue logValue =
spy(new LogValue(Collections.singletonList(srcFileRoot.toString()),
testContainerId1, randomUser));
// It is trying simulate a situation where first log file is owned by
// different user (probably symlink) and second one by the user itself.
// The first file should not be aggregated. Because this log file has
// the invalid user name.
when(logValue.getUser()).thenReturn(randomUser).thenReturn(
ugi.getShortUserName());
logWriter.append(logKey, logValue);
}
logWriter.close();
BufferedReader in =
new BufferedReader(new FileReader(new File(remoteAppLogFile
.toUri().getRawPath())));

View File

@ -295,17 +295,20 @@ private void writeLog(Configuration configuration, String user)
List<String> rootLogDirs = Arrays.asList("target/logs/logs");
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
AggregatedLogFormat.LogWriter writer = new AggregatedLogFormat.LogWriter(
configuration, new Path(path), ugi);
writer.writeApplicationOwner(ugi.getUserName());
try (AggregatedLogFormat.LogWriter writer =
new AggregatedLogFormat.LogWriter()) {
writer.initialize(configuration, new Path(path), ugi);
writer.writeApplicationOwner(ugi.getUserName());
Map<ApplicationAccessType, String> appAcls = new HashMap<ApplicationAccessType, String>();
appAcls.put(ApplicationAccessType.VIEW_APP, ugi.getUserName());
writer.writeApplicationACLs(appAcls);
Map<ApplicationAccessType, String> appAcls = new HashMap<>();
appAcls.put(ApplicationAccessType.VIEW_APP, ugi.getUserName());
writer.writeApplicationACLs(appAcls);
writer.append(new AggregatedLogFormat.LogKey("container_0_0001_01_000001"),
new AggregatedLogFormat.LogValue(rootLogDirs, containerId,UserGroupInformation.getCurrentUser().getShortUserName()));
writer.close();
writer.append(
new AggregatedLogFormat.LogKey("container_0_0001_01_000001"),
new AggregatedLogFormat.LogValue(rootLogDirs, containerId,
UserGroupInformation.getCurrentUser().getShortUserName()));
}
}
private void writeLogs(String dirName) throws Exception {

View File

@ -110,13 +110,14 @@ private static void uploadContainerLogIntoRemoteDir(UserGroupInformation ugi,
ContainerId containerId, Path appDir, FileSystem fs) throws IOException {
Path path =
new Path(appDir, LogAggregationUtils.getNodeString(nodeId));
AggregatedLogFormat.LogWriter writer =
new AggregatedLogFormat.LogWriter(configuration, path, ugi);
writer.writeApplicationOwner(ugi.getUserName());
try (AggregatedLogFormat.LogWriter writer =
new AggregatedLogFormat.LogWriter()) {
writer.initialize(configuration, path, ugi);
writer.writeApplicationOwner(ugi.getUserName());
writer.append(new AggregatedLogFormat.LogKey(containerId),
new AggregatedLogFormat.LogValue(rootLogDirs, containerId,
ugi.getShortUserName()));
writer.close();
writer.append(new AggregatedLogFormat.LogKey(containerId),
new AggregatedLogFormat.LogValue(rootLogDirs, containerId,
ugi.getShortUserName()));
}
}
}

View File

@ -295,18 +295,18 @@ private void uploadLogsForContainers(boolean appFinished) {
}
}
LogWriter writer = null;
if (pendingContainerInThisCycle.isEmpty()) {
sendLogAggregationReport(true, "", appFinished);
return;
}
logAggregationTimes++;
String diagnosticMessage = "";
boolean logAggregationSucceedInThisCycle = true;
try {
if (pendingContainerInThisCycle.isEmpty()) {
return;
}
logAggregationTimes++;
try (LogWriter writer = createLogWriter()) {
try {
writer = createLogWriter();
writer.initialize(this.conf, this.remoteNodeTmpLogFileForApp,
this.userUgi);
// Write ACLs once when the writer is created.
writer.writeApplicationACLs(appAcls);
writer.writeApplicationOwner(this.userUgi.getShortUserName());
@ -351,11 +351,6 @@ private void uploadLogsForContainers(boolean appFinished) {
cleanupOldLogTimes++;
}
if (writer != null) {
writer.close();
writer = null;
}
long currentTime = System.currentTimeMillis();
final Path renamedPath = this.rollingMonitorInterval <= 0
? remoteNodeLogFileForApp : new Path(
@ -396,34 +391,37 @@ public Object run() throws Exception {
logAggregationSucceedInThisCycle = false;
}
} finally {
LogAggregationStatus logAggregationStatus =
logAggregationSucceedInThisCycle
? LogAggregationStatus.RUNNING
: LogAggregationStatus.RUNNING_WITH_FAILURE;
sendLogAggregationReport(logAggregationStatus, diagnosticMessage);
if (appFinished) {
// If the app is finished, one extra final report with log aggregation
// status SUCCEEDED/FAILED will be sent to RM to inform the RM
// that the log aggregation in this NM is completed.
LogAggregationStatus finalLogAggregationStatus =
renameTemporaryLogFileFailed || !logAggregationSucceedInThisCycle
? LogAggregationStatus.FAILED
: LogAggregationStatus.SUCCEEDED;
sendLogAggregationReport(finalLogAggregationStatus, "");
}
if (writer != null) {
writer.close();
}
sendLogAggregationReport(logAggregationSucceedInThisCycle,
diagnosticMessage, appFinished);
}
}
protected LogWriter createLogWriter() throws IOException {
return new LogWriter(this.conf, this.remoteNodeTmpLogFileForApp,
this.userUgi);
@VisibleForTesting
protected LogWriter createLogWriter() {
return new LogWriter();
}
private void sendLogAggregationReport(
boolean logAggregationSucceedInThisCycle, String diagnosticMessage,
boolean appFinished) {
LogAggregationStatus logAggregationStatus =
logAggregationSucceedInThisCycle
? LogAggregationStatus.RUNNING
: LogAggregationStatus.RUNNING_WITH_FAILURE;
sendLogAggregationReportInternal(logAggregationStatus, diagnosticMessage);
if (appFinished) {
// If the app is finished, one extra final report with log aggregation
// status SUCCEEDED/FAILED will be sent to RM to inform the RM
// that the log aggregation in this NM is completed.
LogAggregationStatus finalLogAggregationStatus =
renameTemporaryLogFileFailed || !logAggregationSucceedInThisCycle
? LogAggregationStatus.FAILED
: LogAggregationStatus.SUCCEEDED;
sendLogAggregationReportInternal(finalLogAggregationStatus, "");
}
}
private void sendLogAggregationReportInternal(
LogAggregationStatus logAggregationStatus, String diagnosticMessage) {
LogAggregationReport report =
Records.newRecord(LogAggregationReport.class);

View File

@ -416,8 +416,7 @@ public AppLogAggregatorInTest(Dispatcher dispatcher,
logAggregationContext, context, lfs, -1, recoveredLogInitedTime);
this.applicationId = appId;
this.deletionService = deletionService;
this.logWriter = getSpiedLogWriter(conf, ugi, remoteNodeLogFileForApp);
this.logWriter = spy(new LogWriter());
this.logValue = ArgumentCaptor.forClass(LogValue.class);
}
@ -425,10 +424,5 @@ public AppLogAggregatorInTest(Dispatcher dispatcher,
protected LogWriter createLogWriter() {
return this.logWriter;
}
private LogWriter getSpiedLogWriter(Configuration conf,
UserGroupInformation ugi, Path remoteAppLogFile) throws IOException {
return spy(new LogWriter(conf, remoteAppLogFile, ugi));
}
}
}